source: trunk/third/openafs/src/rx/rx_pthread.c @ 18109

Revision 18109, 10.2 KB checked in by zacheiss, 22 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r18108, which included commits to RCS files with non-trunk default branches.
Line 
1/*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License.  For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10/*
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
14 *
15 * rx_pthread.c is used for the thread safe RX package.
16 */
17
18#include <afsconfig.h>
19#include <afs/param.h>
20
21RCSID("$Header: /afs/dev.mit.edu/source/repository/third/openafs/src/rx/rx_pthread.c,v 1.1.1.2 2002-12-13 20:40:04 zacheiss Exp $");
22
23#include <sys/types.h>
24#include <errno.h>
25#include <signal.h>
26#ifndef AFS_NT40_ENV
27# include <sys/socket.h>
28# include <sys/file.h>
29# include <netdb.h>
30# include <netinet/in.h>
31# include <net/if.h>
32# include <sys/ioctl.h>
33# include <sys/time.h>
34#endif
35#include <sys/stat.h>
36#include <rx.h>
37#include <rx_globals.h>
38#include <assert.h>
39#include <rx/rx_pthread.h>
40
41/*
42 * Number of times the event handling thread was signalled because a new
43 * event was scheduled earlier than the lastest event.
44 *
45 * Protected by event_handler_mutex
46 */
47static long rx_pthread_n_event_wakeups;
48
49/* Set rx_pthread_event_rescheduled if event_handler should just try
50 * again instead of sleeping.
51 *
52 * Protected by event_handler_mutex
53 */
54static int rx_pthread_event_rescheduled = 0;
55
56static void rx_ListenerProc(void *);
57
58/*
59 * We supply an event handling thread for Rx's event processing.
60 * The condition variable is used to wakeup the thread whenever a new
61 * event is scheduled earlier than the previous earliest event.
62 * This thread is also responsible for keeping time.
63 */
64static pthread_t event_handler_thread;
65pthread_cond_t rx_event_handler_cond;
66pthread_mutex_t event_handler_mutex;
67pthread_cond_t rx_listener_cond;
68pthread_mutex_t listener_mutex;
69static int listeners_started = 0;
70pthread_mutex_t rx_clock_mutex;
71struct clock rxi_clockNow;
72
73/*
74 * Delay the current thread the specified number of seconds.
75 */
76void rxi_Delay(sec)
77    int sec;
78{
79    sleep(sec);
80}
81
82/*
83 * Called from rx_Init()
84 */
85void rxi_InitializeThreadSupport() {
86
87    listeners_started = 0;
88    gettimeofday((struct timeval *)&rxi_clockNow, NULL);
89}
90
91static void *server_entry(void * argp)
92{
93    void (*server_proc)() = (void (*)()) argp;
94    server_proc();
95    printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n");
96    exit(1);
97    return (void *) 0;
98}
99
100/*
101 * Start an Rx server process.
102 */
103void rxi_StartServerProc(proc, stacksize)
104    void (*proc)();
105    int stacksize;
106{
107    pthread_t thread;
108    pthread_attr_t tattr;
109    AFS_SIGSET_DECL;
110
111    if (pthread_attr_init
112        (&tattr) != 0) {
113        printf("Unable to Create Rx server thread (pthread_attr_init)\n");
114        exit(1);
115    }
116
117    if (pthread_attr_setdetachstate
118        (&tattr,
119         PTHREAD_CREATE_DETACHED) != 0) {
120        printf("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
121        exit(1);
122    }
123
124    /*
125     * NOTE: We are ignoring the stack size parameter, for now.
126     */
127    AFS_SIGSET_CLEAR();
128    if (pthread_create
129        (&thread,
130         &tattr,
131         server_entry,
132         (void *) proc) != 0) {
133        printf("Unable to Create Rx server thread\n");
134        exit(1);
135    }
136    AFS_SIGSET_RESTORE();
137}
138
139/*
140 * The event handling process.
141 */
142static void *event_handler(void *argp)
143{
144    struct clock rx_pthread_last_event_wait_time = {0,0};
145    unsigned long rx_pthread_n_event_expired = 0;
146    unsigned long rx_pthread_n_event_waits = 0;
147    long rx_pthread_n_event_woken = 0;
148    struct timespec rx_pthread_next_event_time = {0,0};
149
150    assert(pthread_mutex_lock(&event_handler_mutex)==0);
151
152    for (;;) {
153        struct clock cv;
154        struct clock next;
155
156        assert(pthread_mutex_unlock(&event_handler_mutex)==0);
157   
158        next.sec = 30; /* Time to sleep if there are no events scheduled */
159        next.usec = 0;
160        gettimeofday((struct timeval *)&cv, NULL);
161        rxevent_RaiseEvents(&next);
162
163        assert(pthread_mutex_lock(&event_handler_mutex)==0);
164        if (rx_pthread_event_rescheduled) {
165            rx_pthread_event_rescheduled = 0;
166            continue;
167        }
168
169        clock_Add(&cv, &next);
170        rx_pthread_next_event_time.tv_sec = cv.sec;
171        rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
172        rx_pthread_n_event_waits++;
173        if (pthread_cond_timedwait
174            (&rx_event_handler_cond,
175             &event_handler_mutex,
176             &rx_pthread_next_event_time) == -1) {
177#ifdef notdef
178            assert(errno == EAGAIN);
179#endif
180            rx_pthread_n_event_expired++;
181        } else {       
182            rx_pthread_n_event_woken++;
183        }
184        rx_pthread_event_rescheduled = 0;
185    }
186}
187
188
189/*
190 * This routine will get called by the event package whenever a new,
191 * earlier than others, event is posted. */
192void rxi_ReScheduleEvents() {
193    assert(pthread_mutex_lock(&event_handler_mutex)==0);
194    pthread_cond_signal(&rx_event_handler_cond);
195    rx_pthread_event_rescheduled = 1;
196    assert(pthread_mutex_unlock(&event_handler_mutex)==0);
197}
198
199
200/* Loop to listen on a socket. Return setting *newcallp if this
201 * thread should become a server thread.  */
202static void rxi_ListenerProc(sock, tnop, newcallp)
203int sock;
204int *tnop;
205struct rx_call **newcallp;
206{
207    u_long host;
208    u_short port;
209    register struct rx_packet *p = (struct rx_packet *)0;
210
211    assert(pthread_mutex_lock(&listener_mutex)==0);
212    while (!listeners_started) {
213        assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex)==0);
214    }
215    assert(pthread_mutex_unlock(&listener_mutex)==0);
216
217    for (;;) {
218        /*
219         * Grab a new packet only if necessary (otherwise re-use the old one)
220         */
221        if (p) {
222            rxi_RestoreDataBufs(p);
223        }
224        else {
225            if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
226                /* Could this happen with multiple socket listeners? */
227                printf("rxi_Listener: no packets!"); /* Shouldn't happen */
228                exit(1);
229            }
230        }
231
232        if (rxi_ReadPacket(sock, p, &host, &port)) {
233            clock_NewTime();
234            p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
235            if (newcallp && *newcallp) {
236                if (p)
237                    rxi_FreePacket(p);
238                return;
239            }
240        }
241    }
242    /* NOTREACHED */
243}
244
245/* This is the listener process request loop. The listener process loop
246 * becomes a server thread when rxi_ListenerProc returns, and stays
247 * server thread until rxi_ServerProc returns. */
248static void rx_ListenerProc(void *argp)
249{
250    int threadID;
251    int sock = (int) argp;
252    struct rx_call *newcall;
253
254    while(1) {
255        newcall = NULL;
256        threadID = -1;
257        rxi_ListenerProc(sock, &threadID, &newcall);
258        /* assert(threadID != -1); */
259        /* assert(newcall != NULL); */
260        sock = OSI_NULLSOCKET;
261        assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
262        rxi_ServerProc(threadID, newcall, &sock);
263        /* assert(sock != OSI_NULLSOCKET); */
264    }
265    /* not reached */
266}
267
268/* This is the server process request loop. The server process loop
269 * becomes a listener thread when rxi_ServerProc returns, and stays
270 * listener thread until rxi_ListenerProc returns. */
271void rx_ServerProc()
272{
273    int sock;
274    int threadID;
275    struct rx_call *newcall = NULL;
276
277    rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */
278    MUTEX_ENTER(&rx_stats_mutex);
279    rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
280    /* threadID is used for making decisions in GetCall.  Get it by bumping
281     * number of threads handling incoming calls */
282    threadID = rxi_availProcs++;
283    MUTEX_EXIT(&rx_stats_mutex);
284
285    while(1) {
286        sock = OSI_NULLSOCKET;
287        assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0);
288        rxi_ServerProc(threadID, newcall, &sock);
289        /* assert(sock != OSI_NULLSOCKET); */
290        newcall = NULL;
291        rxi_ListenerProc(sock, &threadID, &newcall);
292        /* assert(threadID != -1); */
293        /* assert(newcall != NULL); */
294    }
295    /* not reached */
296}
297
298/*
299 * Historically used to start the listener process. We now have multiple
300 * listener processes (one for each socket); these are started by GetUdpSocket.
301 *
302 * The event handling process *is* started here (the old listener used
303 * to also handle events). The listener threads can't actually start
304 * listening until rxi_StartListener is called because most of R may not
305 * be initialized when rxi_Listen is called.
306 */
307void rxi_StartListener() {
308    pthread_attr_t tattr;
309    AFS_SIGSET_DECL;
310
311    if (pthread_attr_init
312        (&tattr) != 0) {
313        printf("Unable to create Rx event handling thread (pthread_attr_init)\n");
314        exit(1);
315    }
316
317    if (pthread_attr_setdetachstate
318        (&tattr,
319         PTHREAD_CREATE_DETACHED) != 0) {
320        printf("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
321        exit(1);
322    }
323
324    AFS_SIGSET_CLEAR();
325    if (pthread_create
326        (&event_handler_thread,
327         &tattr,
328         event_handler,
329         NULL) != 0) {
330        printf("Unable to create Rx event handling thread\n");
331        exit(1);
332    }
333    AFS_SIGSET_RESTORE();
334
335    assert(pthread_mutex_lock(&listener_mutex)==0);
336    assert(pthread_cond_broadcast(&rx_listener_cond)==0);
337    listeners_started = 1;
338    assert(pthread_mutex_unlock(&listener_mutex)==0);
339
340}
341
342/*
343 * Listen on the specified socket.
344 */
345rxi_Listen(sock)
346    osi_socket sock;
347{
348    pthread_t thread;
349    pthread_attr_t tattr;
350    AFS_SIGSET_DECL;
351
352    if (pthread_attr_init
353        (&tattr) != 0) {
354        printf("Unable to create socket listener thread (pthread_attr_init)\n");
355        exit(1);
356    }
357
358    if (pthread_attr_setdetachstate
359        (&tattr,
360         PTHREAD_CREATE_DETACHED) != 0) {
361        printf("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
362        exit(1);
363    }
364
365    AFS_SIGSET_CLEAR();
366    if (pthread_create
367        (&thread,
368         &tattr,
369         rx_ListenerProc,
370         (void *) sock) != 0) {
371        printf("Unable to create socket listener thread\n");
372        exit(1);
373    }
374    AFS_SIGSET_RESTORE();
375    return 0;
376}
377
378
379/*
380 * Recvmsg.
381 *
382 */
383int rxi_Recvmsg
384    (int socket,
385     struct msghdr *msg_p,
386     int flags)
387{
388    int ret;
389    ret = recvmsg(socket, msg_p, flags);
390    return ret;
391}
392
393/*
394 * Sendmsg.
395 */
396rxi_Sendmsg(socket, msg_p, flags)
397    osi_socket socket;
398    struct msghdr *msg_p;
399    int flags;
400{
401    int ret;
402    ret = sendmsg(socket, msg_p, flags);
403#ifdef AFS_LINUX22_ENV
404    /* linux unfortunately returns ECONNREFUSED if the target port
405     * is no longer in use */
406    /* and EAGAIN if a UDP checksum is incorrect */
407    if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
408#else
409    if (ret == -1) {
410#endif
411        printf("rxi_sendmsg failed, error %d\n", errno);
412        fflush(stdout);
413    }
414    return 0;
415}
Note: See TracBrowser for help on using the repository browser.