source: trunk/third/linc/src/linc-connection.c @ 18552

Revision 18552, 21.9 KB checked in by ghudson, 22 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r18551, which included commits to RCS files with non-trunk default branches.
Line 
1/*
2 * linc-connection.c: This file is part of the linc library.
3 *
4 * Authors:
5 *    Elliot Lee     (sopwith@redhat.com)
6 *    Michael Meeks  (michael@ximian.com)
7 *    Mark McLouglin (mark@skynet.ie) & others
8 *
9 * Copyright 2001, Red Hat, Inc., Ximian, Inc.,
10 *                 Sun Microsystems, Inc.
11 */
12#include <config.h>
13
14#include <unistd.h>
15#include <fcntl.h>
16#include <errno.h>
17#include <string.h>
18
19#ifdef LINC_SSL_SUPPORT
20#include <openssl/ssl.h>
21#endif
22
23#include "linc-debug.h"
24#include "linc-private.h"
25#include <linc/linc-config.h>
26#include <linc/linc-connection.h>
27
28static GObjectClass *parent_class = NULL;
29
30enum {
31        BROKEN,
32        BLOCKING,
33        LAST_SIGNAL
34};
35static guint linc_connection_signals [LAST_SIGNAL];
36
37static gboolean linc_connection_io_handler (GIOChannel  *gioc,
38                                            GIOCondition condition,
39                                            gpointer     data);
40
41static void
42linc_close_fd (LINCConnection *cnx)
43{
44        if (cnx->priv->fd >= 0) {
45                d_printf ("Close %d\n", cnx->priv->fd);
46
47                LINC_CLOSE (cnx->priv->fd);
48        }
49        cnx->priv->fd = -1;
50}
51
52typedef struct {
53        guchar       *data;
54
55        struct iovec *vecs;
56        int           nvecs;
57        struct iovec  single_vec;
58} QueuedWrite;
59
60static void
61queue_signal (LINCConnection *cnx,
62              glong           delta)
63{
64        gulong old_size;
65        gulong new_size;
66
67        d_printf ("Queue signal %ld bytes, delta %ld, max %ld\n",
68                  cnx->priv->write_queue_bytes, delta,
69                  cnx->priv->max_buffer_bytes);
70
71        old_size = cnx->priv->write_queue_bytes;
72        cnx->priv->write_queue_bytes += delta;
73        new_size = cnx->priv->write_queue_bytes;
74
75        g_object_ref (G_OBJECT (cnx));
76
77        if (cnx->options & LINC_CONNECTION_BLOCK_SIGNAL) {
78                if (new_size == 0 ||
79                    (old_size < (cnx->priv->max_buffer_bytes >> 1) &&
80                     new_size >= (cnx->priv->max_buffer_bytes >> 1)) ||
81                    new_size >= cnx->priv->max_buffer_bytes)
82                        g_signal_emit (G_OBJECT (cnx),
83                                       linc_connection_signals [BLOCKING],
84                                       0, new_size);
85        }
86
87        if (cnx->priv->max_buffer_bytes &&
88            cnx->priv->write_queue_bytes >= cnx->priv->max_buffer_bytes)
89                linc_connection_state_changed (cnx, LINC_DISCONNECTED);
90
91        g_object_unref (G_OBJECT (cnx));
92}
93
94static gulong
95calc_size (struct iovec *src_vecs,
96           int           nvecs)
97{
98        int i;
99        gulong total_size = 0;
100
101        for (i = 0; i < nvecs; i++)
102                total_size += src_vecs [i].iov_len;
103
104        return total_size;
105}
106
107static void
108queue_flattened (LINCConnection *cnx,
109                 struct iovec   *src_vecs,
110                 int             nvecs)
111{
112        int     i;
113        guchar *p;
114        gulong  total_size;
115        QueuedWrite *qw = g_new (QueuedWrite, 1);
116
117        total_size = calc_size (src_vecs, nvecs);
118
119        p = g_malloc (total_size);
120
121        qw->data  = p;
122        qw->vecs  = &qw->single_vec;
123        qw->nvecs = 1;
124
125        qw->vecs->iov_base = p;
126        qw->vecs->iov_len = total_size;
127
128        for (i = 0; i < nvecs; i++) {
129                memcpy (p, src_vecs [i].iov_base, src_vecs [i].iov_len);
130                p += src_vecs [i].iov_len;
131        }
132        g_assert (p == (qw->data + total_size));
133
134        d_printf ("Queueing write of %ld bytes on fd %d\n",
135                  total_size, cnx->priv->fd);
136
137        cnx->priv->write_queue = g_list_append (cnx->priv->write_queue, qw);
138        queue_signal (cnx, total_size);
139}
140
141static void
142queued_write_free (QueuedWrite *qw)
143{
144        g_free (qw->data);
145        g_free (qw);
146}
147
148static void
149queue_free (LINCConnection *cnx)
150{
151        GList *l;
152
153        for (l = cnx->priv->write_queue; l; l = l->next)
154                queued_write_free (l->data);
155
156        g_list_free (cnx->priv->write_queue);
157        cnx->priv->write_queue = NULL;
158}
159
160static void
161linc_source_remove (LINCConnection *cnx)
162{
163        if (cnx->priv->tag) {
164                LincWatch *thewatch = cnx->priv->tag;
165                cnx->priv->tag = NULL;
166                linc_io_remove_watch (thewatch);
167                d_printf ("Removed watch on %d\n", cnx->priv->fd);
168        }
169}
170
171static void
172linc_source_add (LINCConnection *cnx,
173                 GIOCondition    condition)
174{
175        g_assert (cnx->priv->tag == NULL);
176
177        cnx->priv->tag = linc_io_add_watch_fd (
178                cnx->priv->fd, condition,
179                linc_connection_io_handler, cnx);
180
181        d_printf ("Added watch on %d (0x%x)\n",
182                 cnx->priv->fd, condition);
183}
184
185/*
186 * linc_connection_class_state_changed:
187 * @cnx: a #LINCConnection
188 * @status: a #LINCConnectionStatus value.
189 *
190 * Set up linc's #GSources if the connection is in the #LINC_CONNECTED
191 * or #LINC_CONNECTING state.
192 *
193 * Remove the #GSources if the state has channged to #LINC_DISCONNECTED,
194 * close the socket and a gobject broken signal which may be caught by
195 * the application.
196 *
197 * Also perform SSL specific operations if the connection has move into
198 * the #LINC_CONNECTED state.
199 */
200
201static void
202linc_connection_class_state_changed (LINCConnection      *cnx,
203                                     LINCConnectionStatus status)
204{
205        gboolean changed;
206
207        d_printf ("State changing from '%s' to '%s' on fd %d\n",
208                 STATE_NAME (cnx->status), STATE_NAME (status),
209                 cnx->priv->fd);
210
211        changed = cnx->status != status;
212
213        cnx->status = status;
214
215        switch (status) {
216        case LINC_CONNECTED:
217#ifdef LINC_SSL_SUPPORT
218                if (cnx->options & LINC_CONNECTION_SSL) {
219                        if (cnx->was_initiated)
220                                SSL_connect (cnx->priv->ssl);
221                        else
222                                SSL_accept (cnx->priv->ssl);
223                }
224#endif
225                if (!cnx->priv->tag)
226                        linc_source_add (cnx, LINC_ERR_CONDS | LINC_IN_CONDS);
227                break;
228
229        case LINC_CONNECTING:
230
231                if (cnx->priv->tag) /* re-connecting */
232                        linc_watch_set_condition (
233                                cnx->priv->tag,
234                                G_IO_OUT | LINC_ERR_CONDS);
235                else
236                        linc_source_add (cnx, G_IO_OUT | LINC_ERR_CONDS);
237                break;
238
239        case LINC_DISCONNECTED:
240                linc_source_remove (cnx);
241                linc_close_fd (cnx);
242                /* don't free pending queue - we could get re-connected */
243                if (changed)
244                        g_signal_emit (G_OBJECT (cnx),
245                                       linc_connection_signals [BROKEN], 0);
246                break;
247        }
248}
249
250/*
251 * linc_connection_from_fd:
252 * @cnx: a #LINCConnection.
253 * @fd: a connected/connecting file descriptor.
254 * @proto: a #LINCProtocolInfo.
255 * @remote_host_info: protocol dependant host information; gallocation swallowed
256 * @remote_serv_info: protocol dependant service information(e.g. port number). gallocation swallowed
257 * @was_initiated: #TRUE if the connection was initiated by us.
258 * @status: a #LINCConnectionStatus value.
259 * @options: combination of #LINCConnectionOptions.
260 *
261 * Fill in @cnx, call protocol specific initialisation methonds and then
262 * call linc_connection_state_changed.
263 *
264 * Return Value: #TRUE if the function succeeds, #FALSE otherwise.
265 */
266gboolean
267linc_connection_from_fd (LINCConnection         *cnx,
268                         int                     fd,
269                         const LINCProtocolInfo *proto,
270                         gchar                  *remote_host_info,
271                         gchar                  *remote_serv_info,
272                         gboolean                was_initiated,
273                         LINCConnectionStatus    status,
274                         LINCConnectionOptions   options)
275{
276        cnx->was_initiated = was_initiated;
277        cnx->is_auth       = (proto->flags & LINC_PROTOCOL_SECURE);
278        cnx->proto         = proto;
279        cnx->options       = options;
280        cnx->priv->fd      = fd;
281
282        cnx->remote_host_info = remote_host_info;
283        cnx->remote_serv_info = remote_serv_info;
284
285        d_printf ("Cnx from fd (%d) '%s', '%s', '%s'\n",
286                 fd, proto->name,
287                 remote_host_info ? remote_host_info : "<Null>",
288                 remote_serv_info ? remote_serv_info : "<Null>");
289
290        if (proto->setup)
291                proto->setup (fd, options);
292
293#ifdef LINC_SSL_SUPPORT
294        if (options & LINC_CONNECTION_SSL) {
295                cnx->priv->ssl = SSL_new (linc_ssl_ctx);
296                SSL_set_fd (cnx->priv->ssl, fd);
297        }
298#endif
299
300        linc_connection_state_changed (cnx, status);
301
302        return TRUE;
303}
304
305/*
306 * linc_connection_initiate:
307 * @cnx: a #LINCConnection.
308 * @proto_name: the name of the protocol to use.
309 * @host: protocol dependant host information.
310 * @service: protocol dependant service information(e.g. port number).
311 * @options: combination of #LINCConnectionOptions.
312 *
313 * Initiate a connection to @service on @host using the @proto_name protocol.
314 *
315 * Note: this function may be successful without actually having connected
316 *       to @host - the connection handshake may not have completed.
317 *
318 * Return Value: #TRUE if the function succeeds, #FALSE otherwise.
319 */
320gboolean
321linc_connection_initiate (LINCConnection        *cnx,
322                          const char            *proto_name,
323                          const char            *host,
324                          const char            *service,
325                          LINCConnectionOptions  options)
326{
327        const LINCProtocolInfo *proto;
328        int                     rv;
329        int                     fd;
330        gboolean                retval = FALSE;
331        struct sockaddr        *saddr;
332        LincSockLen             saddr_len;
333
334        proto = linc_protocol_find (proto_name);
335
336        if (!proto)
337                return FALSE;
338
339
340        saddr = linc_protocol_get_sockaddr (
341                proto, host, service, &saddr_len);
342
343        if (!saddr)
344                return FALSE;
345
346        fd = socket (proto->family, SOCK_STREAM,
347                     proto->stream_proto_num);
348
349        if (fd < 0)
350                goto out;
351
352        if (options & LINC_CONNECTION_NONBLOCKING)
353                if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0)
354                        goto out;
355
356        if (fcntl (fd, F_SETFD, FD_CLOEXEC) < 0)
357                goto out;
358
359        rv = connect (fd, saddr, saddr_len);
360        if (rv && errno != EINPROGRESS)
361                goto out;
362
363        d_printf ("initiate 'connect' on new fd %d [ %d; %d ]\n",
364                 fd, rv, errno);
365
366        retval = linc_connection_from_fd (
367                cnx, fd, proto,
368                g_strdup (host), g_strdup (service),
369                TRUE, rv ? LINC_CONNECTING : LINC_CONNECTED,
370                options);
371
372 out:
373        if (!retval && fd >= 0) {
374                d_printf ("initiation failed\n");
375                LINC_CLOSE (fd);
376        }
377
378        g_free (saddr);
379
380        return retval;
381}
382
383/*
384 * linc_connection_state_changed:
385 * @cnx: a #LINCConnection.
386 * @status: a #LINCConnectionStatus.
387 *
388 * A wrapper for the #LINCConnectionClass's state change method.
389 */
390void
391linc_connection_state_changed (LINCConnection      *cnx,
392                               LINCConnectionStatus status)
393{
394        LINCConnectionClass *klass;
395
396        klass = (LINCConnectionClass *)G_OBJECT_GET_CLASS (cnx);
397
398        if (klass->state_changed)
399                klass->state_changed (cnx, status);
400}
401
402/**
403 * linc_connection_read:
404 * @cnx: the connection to write to
405 * @buf: a pointer to the start of an array of bytes to read data into
406 * @len: the length of the array in bytes to read ingo
407 * @block_for_full_read: whether to block for a full read
408 *
409 * Warning, block_for_full_read is of limited usefullness.
410 *
411 * Return value: number of bytes written on success; negative on error.
412 **/
413glong
414linc_connection_read (LINCConnection *cnx,
415                      guchar         *buf,
416                      int             len,
417                      gboolean        block_for_full_read)
418{
419        int bytes_read = 0;
420
421        d_printf ("Read up to %d bytes from fd %d\n", len, cnx->priv->fd);
422
423        if (!len)
424                return 0;
425
426        if (cnx->status != LINC_CONNECTED)
427                return LINC_IO_FATAL_ERROR;
428
429        do {
430                int n;
431
432#ifdef LINC_SSL_SUPPORT
433                if (cnx->options & LINC_CONNECTION_SSL)
434                        n = SSL_read (cnx->priv->ssl, buf, len);
435                else
436#endif
437                        n = read (cnx->priv->fd, buf, len);
438
439                g_assert (n <= len);
440
441                if (n < 0) {
442#ifdef LINC_SSL_SUPPORT
443                        if (cnx->options & LINC_CONNECTION_SSL) {
444                                gulong rv;
445
446                                rv = SSL_get_error (cnx->priv->ssl, n);
447
448                                if ((rv == SSL_ERROR_WANT_READ ||
449                                     rv == SSL_ERROR_WANT_WRITE) &&
450                                    (cnx->options & LINC_CONNECTION_NONBLOCKING))
451                                        return bytes_read;
452                                else
453                                        return LINC_IO_FATAL_ERROR;
454                        } else
455#endif
456                        {
457                                if (errno == EINTR)
458                                        continue;
459
460                                else if (errno == EAGAIN &&
461                                         (cnx->options & LINC_CONNECTION_NONBLOCKING))
462                                        return bytes_read;
463
464                                else if (errno == EBADF) {
465                                        g_warning ("Serious fd usage error %d", cnx->priv->fd);
466                                        return LINC_IO_FATAL_ERROR;
467
468                                } else
469                                        return LINC_IO_FATAL_ERROR;
470                        }
471
472                } else if (n == 0) {
473                        d_printf ("we got EOF on fd %d\n", cnx->priv->fd);
474                        return LINC_IO_FATAL_ERROR;
475                } else {
476                        buf += n;
477                        len -= n;
478                        bytes_read += n;
479                }
480        } while (len > 0 && block_for_full_read);
481
482        d_printf ("we read %d bytes\n", bytes_read);
483
484        return bytes_read;
485}
486
487/* Determine the maximum size of the iovec vector */
488
489#if defined (MAXIOV) /* HPUX */
490# define LINC_IOV_MAX (MAXIOV)
491#elif defined (IOV_MAX) /* AIX */
492# define LINC_IOV_MAX (IOV_MAX)
493#elif defined (_SC_IOV_MAX) /* SGI */
494# define LINC_IOV_MAX_INIT (sysconf (_SC_IOV_MAX))
495#elif defined (__APPLE__)
496/* Even though the write(2) man page mentions it, UIO_MAXIOV is only
497 * available if KERNEL is defined on MacOS X 10.1
498 */
499#  define LINC_IOV_MAX 1024
500#elif defined (UIO_MAXIOV) /* Glibc */
501# define LINC_IOV_MAX (UIO_MAXIOV)
502#else /* Safe Guess */
503# define LINC_IOV_MAX 16
504#endif
505
506/* If the value requires initialization, define the function here */
507#if defined (LINC_IOV_MAX_INIT)
508# define LINC_IOV_MAX linc_iov_max
509  static guint linc_iov_max = 0;
510  static inline void
511  linc_iov_max_init ()
512  {
513    if (linc_iov_max == 0)
514      {
515        gint max;
516        G_LOCK_DEFINE_STATIC (linc_iov_max);
517        G_LOCK (linc_iov_max);
518        if (linc_iov_max == 0)
519          {
520            max = LINC_IOV_MAX_INIT;
521            if (max <= 0)
522              max = 16;
523            linc_iov_max = max;
524          }
525        G_UNLOCK (linc_iov_max);
526      }
527  }
528#else
529# define linc_iov_max_init()
530#endif
531
532static glong
533write_data (LINCConnection *cnx, QueuedWrite *qw)
534{
535        glong bytes_written = 0;
536
537        g_return_val_if_fail (cnx->status == LINC_CONNECTED,
538                              LINC_IO_FATAL_ERROR);
539
540        linc_iov_max_init ();
541
542        while ((qw->nvecs > 0) && (qw->vecs->iov_len > 0)) {
543                int n;
544
545                d_printf ("write_data %ld bytes to fd %d - ",
546                          calc_size (qw->vecs, qw->nvecs), cnx->priv->fd);
547
548#ifdef LINC_SSL_SUPPORT
549                if (cnx->options & LINC_CONNECTION_SSL)
550                        n = SSL_write (cnx->priv->ssl, qw->vecs->iov_base,
551                                       qw->vecs->iov_len);
552                else
553#endif
554                        n = writev (cnx->priv->fd, qw->vecs,
555                                    MIN (qw->nvecs, LINC_IOV_MAX));
556
557                d_printf ("wrote %d bytes\n", n);
558
559                if (n < 0) {
560#ifdef LINC_SSL_SUPPORT
561                        if (cnx->options & LINC_CONNECTION_SSL) {
562                                gulong rv;
563                                       
564                                rv = SSL_get_error (cnx->priv->ssl, n);
565                                       
566                                if ((rv == SSL_ERROR_WANT_READ ||
567                                     rv == SSL_ERROR_WANT_WRITE) &&
568                                    cnx->options & LINC_CONNECTION_NONBLOCKING)
569                                        return LINC_IO_QUEUED_DATA;
570                                else
571                                        return LINC_IO_FATAL_ERROR;
572                        } else
573#endif
574                        {
575                                if (errno == EINTR)
576                                        continue;
577
578                                else if (errno == EAGAIN &&
579                                         (cnx->options & LINC_CONNECTION_NONBLOCKING))
580                                        return LINC_IO_QUEUED_DATA;
581
582                                else if (errno == EBADF)
583                                        g_warning ("Serious fd usage error %d", cnx->priv->fd);
584                               
585                                return LINC_IO_FATAL_ERROR; /* Unhandlable error */
586                        }
587
588                } else if (n == 0) /* CHECK: is this really an error condition */
589                        return LINC_IO_FATAL_ERROR;
590
591                else {
592                        bytes_written += n;
593
594                        while (qw->nvecs > 0 && n >= qw->vecs->iov_len) {
595                                n -= qw->vecs->iov_len;
596                                qw->nvecs--;
597                                qw->vecs++;
598                        }
599
600                        if (n) {
601                                qw->vecs->iov_len  -= n;
602                                qw->vecs->iov_base += n;
603                        }
604                }
605        }
606
607        return bytes_written;
608}
609
610static gboolean
611linc_connection_should_block (LINCConnection      *cnx,
612                              const LINCWriteOpts *opt_write_opts)
613{
614        if (!opt_write_opts)
615                return TRUE;
616
617        if (opt_write_opts->block_on_write)
618                return TRUE;
619
620        return FALSE;
621}
622
623/**
624 * linc_connection_writev:
625 * @cnx: the connection to write to
626 * @vecs: a structure of iovecs to write - this is altered.
627 * @nvecs: the number of populated iovecs
628 * @opt_write_opts: optional write options, or NULL
629 *
630 * This routine writes data to the abstract connection.
631 * FIXME: it allows re-enterancy via linc_connection_iterate
632 *        in certain cases.
633 * FIXME: on this basis, the connection can die underneath
634 *        our feet.
635 *
636 * Return value: 0 on success, non 0 on error.
637 **/
638LINCIOStatus
639linc_connection_writev (LINCConnection       *cnx,
640                        struct iovec         *vecs,
641                        int                   nvecs,
642                        const LINCWriteOpts  *opt_write_opts)
643{
644        QueuedWrite qw;
645        int         status;
646
647        /* FIXME: need an option to turn this off ? */
648        if (cnx->options & LINC_CONNECTION_NONBLOCKING) {
649                while (cnx->status == LINC_CONNECTING)
650                        linc_main_iteration (TRUE);
651        }
652
653        g_return_val_if_fail (cnx->status == LINC_CONNECTED,
654                              LINC_IO_FATAL_ERROR);
655
656        if (cnx->priv->write_queue) {
657                /* FIXME: we should really retry the write here, but we'll
658                 * get a POLLOUT for this lot at some stage anyway */
659                queue_flattened (cnx, vecs, nvecs);
660                return LINC_IO_QUEUED_DATA;
661        }
662
663        qw.vecs  = vecs;
664        qw.nvecs = nvecs;
665
666 continue_write:
667        status = write_data (cnx, &qw);
668
669        if (status == LINC_IO_QUEUED_DATA) {
670                /* Queue data & listen for buffer space */
671                linc_watch_set_condition (cnx->priv->tag,
672                                          LINC_ERR_CONDS | LINC_IN_CONDS | G_IO_OUT);
673
674                if (!linc_connection_should_block (cnx, opt_write_opts)) {
675                        queue_flattened (cnx, qw.vecs, qw.nvecs);
676                        return LINC_IO_QUEUED_DATA;
677
678                } else {
679                        linc_main_iteration (TRUE);
680                        goto continue_write;
681                }
682
683        } else if (status >= LINC_IO_OK)
684                status = LINC_IO_OK;
685
686        return status;
687}
688
689/**
690 * linc_connection_write:
691 * @cnx: the connection to write to
692 * @buf: a pointer to the start of an array of bytes
693 * @len: the length of the array in bytes
694 * @opt_write_opts: optional write options, or NULL
695 *
696 * Writes a contiguous block of data to the abstract connection.
697 *
698 * FIXME: it allows re-enterancy via linc_connection_iterate
699 *        in certain cases.
700 * FIXME: on this basis, the connection can die underneath
701 *        our feet eg. between the main_iteration and the
702 *        g_return_if_fail.
703 *
704 * Return value: 0 on success, non 0 on error.
705 **/
706LINCIOStatus
707linc_connection_write (LINCConnection       *cnx,
708                       const guchar         *buf,
709                       gulong                len,
710                       const LINCWriteOpts  *opt_write_opts)
711{
712        struct iovec vec;
713
714        vec.iov_base = (guchar *) buf;
715        vec.iov_len  = len;
716
717        return linc_connection_writev (cnx, &vec, 1, opt_write_opts);
718}
719
720static void
721linc_connection_dispose (GObject *obj)
722{
723        LINCConnection *cnx = (LINCConnection *)obj;
724
725        d_printf ("dispose connection %p\n", obj);
726
727        linc_source_remove (cnx);
728        queue_free (cnx);
729
730        parent_class->dispose (obj);
731}
732
733static void
734linc_connection_finalize (GObject *obj)
735{
736        LINCConnection *cnx = (LINCConnection *)obj;
737
738        linc_close_fd (cnx);
739
740        g_free (cnx->remote_host_info);
741        g_free (cnx->remote_serv_info);
742
743        g_free (cnx->priv);
744
745        parent_class->finalize (obj);
746}
747
748static void
749linc_connection_init (LINCConnection *cnx)
750{
751        d_printf ("create new connection %p\n", cnx);
752
753        cnx->priv = g_new0 (LINCConnectionPrivate, 1);
754        cnx->priv->fd = -1;
755}
756
757static void
758linc_connection_class_init (LINCConnectionClass *klass)
759{
760        GObjectClass *object_class = (GObjectClass *) klass;
761
762        object_class->dispose  = linc_connection_dispose;
763        object_class->finalize = linc_connection_finalize;
764
765        klass->state_changed  = linc_connection_class_state_changed;
766        klass->broken         = NULL;
767
768        linc_connection_signals [BROKEN] =
769                g_signal_new ("broken",
770                              G_TYPE_FROM_CLASS (object_class),
771                              G_SIGNAL_RUN_LAST,
772                              G_STRUCT_OFFSET (LINCConnectionClass, broken),
773                              NULL, NULL,
774                              g_cclosure_marshal_VOID__VOID,
775                              G_TYPE_NONE, 0);
776
777        linc_connection_signals [BLOCKING] =
778                g_signal_new ("blocking",
779                              G_TYPE_FROM_CLASS (object_class),
780                              G_SIGNAL_RUN_LAST,
781                              G_STRUCT_OFFSET (LINCConnectionClass, blocking),
782                              NULL, NULL,
783                              g_cclosure_marshal_VOID__ULONG,
784                              G_TYPE_NONE, 1, G_TYPE_ULONG);
785
786        parent_class = g_type_class_peek_parent (klass);
787}
788
789GType
790linc_connection_get_type (void)
791{
792        static GType object_type = 0;
793
794        if (!object_type) {
795                static const GTypeInfo object_info = {
796                        sizeof (LINCConnectionClass),
797                        (GBaseInitFunc) NULL,
798                        (GBaseFinalizeFunc) NULL,
799                        (GClassInitFunc) linc_connection_class_init,
800                        NULL,           /* class_finalize */
801                        NULL,           /* class_data */
802                        sizeof (LINCConnection),
803                        0,              /* n_preallocs */
804                        (GInstanceInitFunc) linc_connection_init,
805                };
806     
807                object_type = g_type_register_static (G_TYPE_OBJECT,
808                                                      "LINCConnection",
809                                                      &object_info,
810                                                      0);
811        } 
812
813        return object_type;
814}
815
816
817LINCWriteOpts *
818linc_write_options_new (gboolean block_on_write)
819{
820        LINCWriteOpts *write_opts = g_new0 (LINCWriteOpts, 1);
821
822        write_opts->block_on_write = block_on_write;
823
824        return write_opts;
825}
826
827void
828linc_write_options_free (LINCWriteOpts *write_opts)
829{
830        g_free (write_opts);
831}
832
833void
834linc_connection_set_max_buffer (LINCConnection *cnx,
835                                gulong          max_buffer_bytes)
836{
837        g_return_if_fail (cnx != NULL);
838
839        /* FIXME: we might want to check the current buffer size */
840        cnx->priv->max_buffer_bytes = max_buffer_bytes;
841}
842
843static gboolean
844linc_connection_io_handler (GIOChannel  *gioc,
845                            GIOCondition condition,
846                            gpointer     data)
847{
848        LINCConnection      *cnx = data;
849        LINCConnectionClass *klass;
850        int rv, n;
851        LincSockLen n_size = sizeof (n);
852
853        g_object_ref (G_OBJECT (cnx));
854
855        klass = (LINCConnectionClass *) G_TYPE_INSTANCE_GET_CLASS (
856                data, LINC_TYPE_CONNECTION, LINCConnection);
857
858        if (cnx->status == LINC_CONNECTED &&
859            condition & LINC_IN_CONDS && klass->handle_input) {
860               
861                d_printf ("Handle input on fd %d\n", cnx->priv->fd);
862                klass->handle_input (cnx);
863
864        }
865
866        if (cnx->status == LINC_CONNECTED && condition & G_IO_OUT) {
867                gboolean done_writes = TRUE;
868
869                d_printf ("IO Out - buffer space free ...\n");
870
871                if (cnx->priv->write_queue) {
872                        glong        status;
873                        QueuedWrite *qw = cnx->priv->write_queue->data;
874
875                        status = write_data (cnx, qw);
876
877                        d_printf ("Wrote queue %ld on fd %d\n", status, cnx->priv->fd);
878
879                        if (status >= LINC_IO_OK) {
880                                cnx->priv->write_queue = g_list_delete_link (
881                                        cnx->priv->write_queue, cnx->priv->write_queue);
882                                queued_write_free (qw);
883
884                                queue_signal (cnx, -status);
885
886                                done_writes = (cnx->priv->write_queue == NULL);
887
888                        } else if (status == LINC_IO_FATAL_ERROR) {
889                                d_printf ("Fatal error on queued write");
890                                linc_connection_state_changed (cnx, LINC_DISCONNECTED);
891
892                        } else {
893                                d_printf ("Write blocked\n");
894                                done_writes = FALSE;
895                        }
896                }
897
898                d_printf ("Blocked write queue %s\n", done_writes ?
899                          "flushed & empty" : "still active");
900
901                if (done_writes) /* drop G_IO_OUT */
902                        linc_watch_set_condition (cnx->priv->tag,
903                                                  LINC_ERR_CONDS | LINC_IN_CONDS);
904
905        }
906
907        if (condition & (LINC_ERR_CONDS | G_IO_OUT)) {
908                switch (cnx->status) {
909                case LINC_CONNECTING:
910                        n = 0;
911                        rv = getsockopt (cnx->priv->fd, SOL_SOCKET, SO_ERROR, &n, &n_size);
912                        if (!rv && !n && condition == G_IO_OUT) {
913                                d_printf ("State changed to connected on %d\n", cnx->priv->fd);
914
915                                linc_watch_set_condition (
916                                        cnx->priv->tag,
917                                        LINC_ERR_CONDS | LINC_IN_CONDS);
918
919                                linc_connection_state_changed (cnx, LINC_CONNECTED);
920                               
921                        } else {
922                                d_printf ("Error connecting %d %d %d on fd %d\n",
923                                           rv, n, errno, cnx->priv->fd);
924                                linc_connection_state_changed (cnx, LINC_DISCONNECTED);
925                        }
926                        break;
927                case LINC_CONNECTED: {
928                        if (condition & LINC_ERR_CONDS) {
929                                d_printf ("Disconnect on err: %d\n", cnx->priv->fd);
930                                linc_connection_state_changed (cnx, LINC_DISCONNECTED);
931                        }
932                        break;
933                }
934                default:
935                        break;
936                }
937        }
938
939        g_object_unref (G_OBJECT (cnx));
940
941        return TRUE;
942}
Note: See TracBrowser for help on using the repository browser.