1 | /* |
---|
2 | * Copyright 2000, International Business Machines Corporation and others. |
---|
3 | * All Rights Reserved. |
---|
4 | * |
---|
5 | * This software has been released under the terms of the IBM Public |
---|
6 | * License. For details, see the LICENSE file in the top-level source |
---|
7 | * directory or online at http://www.openafs.org/dl/license10.html |
---|
8 | */ |
---|
9 | |
---|
10 | /* |
---|
11 | * An implementation of the rx socket listener for pthreads (not using select). |
---|
12 | * This assumes that multiple read system calls may be extant at any given |
---|
13 | * time. Also implements the pthread-specific event handling for rx. |
---|
14 | * |
---|
15 | * rx_pthread.c is used for the thread safe RX package. |
---|
16 | */ |
---|
17 | |
---|
18 | #include <afsconfig.h> |
---|
19 | #include <afs/param.h> |
---|
20 | |
---|
21 | RCSID("$Header: /afs/dev.mit.edu/source/repository/third/openafs/src/rx/rx_pthread.c,v 1.1.1.2 2002-12-13 20:40:04 zacheiss Exp $"); |
---|
22 | |
---|
23 | #include <sys/types.h> |
---|
24 | #include <errno.h> |
---|
25 | #include <signal.h> |
---|
26 | #ifndef AFS_NT40_ENV |
---|
27 | # include <sys/socket.h> |
---|
28 | # include <sys/file.h> |
---|
29 | # include <netdb.h> |
---|
30 | # include <netinet/in.h> |
---|
31 | # include <net/if.h> |
---|
32 | # include <sys/ioctl.h> |
---|
33 | # include <sys/time.h> |
---|
34 | #endif |
---|
35 | #include <sys/stat.h> |
---|
36 | #include <rx.h> |
---|
37 | #include <rx_globals.h> |
---|
38 | #include <assert.h> |
---|
39 | #include <rx/rx_pthread.h> |
---|
40 | |
---|
41 | /* |
---|
42 | * Number of times the event handling thread was signalled because a new |
---|
43 | * event was scheduled earlier than the lastest event. |
---|
44 | * |
---|
45 | * Protected by event_handler_mutex |
---|
46 | */ |
---|
47 | static long rx_pthread_n_event_wakeups; |
---|
48 | |
---|
49 | /* Set rx_pthread_event_rescheduled if event_handler should just try |
---|
50 | * again instead of sleeping. |
---|
51 | * |
---|
52 | * Protected by event_handler_mutex |
---|
53 | */ |
---|
54 | static int rx_pthread_event_rescheduled = 0; |
---|
55 | |
---|
56 | static void rx_ListenerProc(void *); |
---|
57 | |
---|
58 | /* |
---|
59 | * We supply an event handling thread for Rx's event processing. |
---|
60 | * The condition variable is used to wakeup the thread whenever a new |
---|
61 | * event is scheduled earlier than the previous earliest event. |
---|
62 | * This thread is also responsible for keeping time. |
---|
63 | */ |
---|
64 | static pthread_t event_handler_thread; |
---|
65 | pthread_cond_t rx_event_handler_cond; |
---|
66 | pthread_mutex_t event_handler_mutex; |
---|
67 | pthread_cond_t rx_listener_cond; |
---|
68 | pthread_mutex_t listener_mutex; |
---|
69 | static int listeners_started = 0; |
---|
70 | pthread_mutex_t rx_clock_mutex; |
---|
71 | struct clock rxi_clockNow; |
---|
72 | |
---|
73 | /* |
---|
74 | * Delay the current thread the specified number of seconds. |
---|
75 | */ |
---|
76 | void rxi_Delay(sec) |
---|
77 | int sec; |
---|
78 | { |
---|
79 | sleep(sec); |
---|
80 | } |
---|
81 | |
---|
82 | /* |
---|
83 | * Called from rx_Init() |
---|
84 | */ |
---|
85 | void rxi_InitializeThreadSupport() { |
---|
86 | |
---|
87 | listeners_started = 0; |
---|
88 | gettimeofday((struct timeval *)&rxi_clockNow, NULL); |
---|
89 | } |
---|
90 | |
---|
91 | static void *server_entry(void * argp) |
---|
92 | { |
---|
93 | void (*server_proc)() = (void (*)()) argp; |
---|
94 | server_proc(); |
---|
95 | printf("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"); |
---|
96 | exit(1); |
---|
97 | return (void *) 0; |
---|
98 | } |
---|
99 | |
---|
100 | /* |
---|
101 | * Start an Rx server process. |
---|
102 | */ |
---|
103 | void rxi_StartServerProc(proc, stacksize) |
---|
104 | void (*proc)(); |
---|
105 | int stacksize; |
---|
106 | { |
---|
107 | pthread_t thread; |
---|
108 | pthread_attr_t tattr; |
---|
109 | AFS_SIGSET_DECL; |
---|
110 | |
---|
111 | if (pthread_attr_init |
---|
112 | (&tattr) != 0) { |
---|
113 | printf("Unable to Create Rx server thread (pthread_attr_init)\n"); |
---|
114 | exit(1); |
---|
115 | } |
---|
116 | |
---|
117 | if (pthread_attr_setdetachstate |
---|
118 | (&tattr, |
---|
119 | PTHREAD_CREATE_DETACHED) != 0) { |
---|
120 | printf("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n"); |
---|
121 | exit(1); |
---|
122 | } |
---|
123 | |
---|
124 | /* |
---|
125 | * NOTE: We are ignoring the stack size parameter, for now. |
---|
126 | */ |
---|
127 | AFS_SIGSET_CLEAR(); |
---|
128 | if (pthread_create |
---|
129 | (&thread, |
---|
130 | &tattr, |
---|
131 | server_entry, |
---|
132 | (void *) proc) != 0) { |
---|
133 | printf("Unable to Create Rx server thread\n"); |
---|
134 | exit(1); |
---|
135 | } |
---|
136 | AFS_SIGSET_RESTORE(); |
---|
137 | } |
---|
138 | |
---|
139 | /* |
---|
140 | * The event handling process. |
---|
141 | */ |
---|
142 | static void *event_handler(void *argp) |
---|
143 | { |
---|
144 | struct clock rx_pthread_last_event_wait_time = {0,0}; |
---|
145 | unsigned long rx_pthread_n_event_expired = 0; |
---|
146 | unsigned long rx_pthread_n_event_waits = 0; |
---|
147 | long rx_pthread_n_event_woken = 0; |
---|
148 | struct timespec rx_pthread_next_event_time = {0,0}; |
---|
149 | |
---|
150 | assert(pthread_mutex_lock(&event_handler_mutex)==0); |
---|
151 | |
---|
152 | for (;;) { |
---|
153 | struct clock cv; |
---|
154 | struct clock next; |
---|
155 | |
---|
156 | assert(pthread_mutex_unlock(&event_handler_mutex)==0); |
---|
157 | |
---|
158 | next.sec = 30; /* Time to sleep if there are no events scheduled */ |
---|
159 | next.usec = 0; |
---|
160 | gettimeofday((struct timeval *)&cv, NULL); |
---|
161 | rxevent_RaiseEvents(&next); |
---|
162 | |
---|
163 | assert(pthread_mutex_lock(&event_handler_mutex)==0); |
---|
164 | if (rx_pthread_event_rescheduled) { |
---|
165 | rx_pthread_event_rescheduled = 0; |
---|
166 | continue; |
---|
167 | } |
---|
168 | |
---|
169 | clock_Add(&cv, &next); |
---|
170 | rx_pthread_next_event_time.tv_sec = cv.sec; |
---|
171 | rx_pthread_next_event_time.tv_nsec = cv.usec * 1000; |
---|
172 | rx_pthread_n_event_waits++; |
---|
173 | if (pthread_cond_timedwait |
---|
174 | (&rx_event_handler_cond, |
---|
175 | &event_handler_mutex, |
---|
176 | &rx_pthread_next_event_time) == -1) { |
---|
177 | #ifdef notdef |
---|
178 | assert(errno == EAGAIN); |
---|
179 | #endif |
---|
180 | rx_pthread_n_event_expired++; |
---|
181 | } else { |
---|
182 | rx_pthread_n_event_woken++; |
---|
183 | } |
---|
184 | rx_pthread_event_rescheduled = 0; |
---|
185 | } |
---|
186 | } |
---|
187 | |
---|
188 | |
---|
189 | /* |
---|
190 | * This routine will get called by the event package whenever a new, |
---|
191 | * earlier than others, event is posted. */ |
---|
192 | void rxi_ReScheduleEvents() { |
---|
193 | assert(pthread_mutex_lock(&event_handler_mutex)==0); |
---|
194 | pthread_cond_signal(&rx_event_handler_cond); |
---|
195 | rx_pthread_event_rescheduled = 1; |
---|
196 | assert(pthread_mutex_unlock(&event_handler_mutex)==0); |
---|
197 | } |
---|
198 | |
---|
199 | |
---|
200 | /* Loop to listen on a socket. Return setting *newcallp if this |
---|
201 | * thread should become a server thread. */ |
---|
202 | static void rxi_ListenerProc(sock, tnop, newcallp) |
---|
203 | int sock; |
---|
204 | int *tnop; |
---|
205 | struct rx_call **newcallp; |
---|
206 | { |
---|
207 | u_long host; |
---|
208 | u_short port; |
---|
209 | register struct rx_packet *p = (struct rx_packet *)0; |
---|
210 | |
---|
211 | assert(pthread_mutex_lock(&listener_mutex)==0); |
---|
212 | while (!listeners_started) { |
---|
213 | assert(pthread_cond_wait(&rx_listener_cond, &listener_mutex)==0); |
---|
214 | } |
---|
215 | assert(pthread_mutex_unlock(&listener_mutex)==0); |
---|
216 | |
---|
217 | for (;;) { |
---|
218 | /* |
---|
219 | * Grab a new packet only if necessary (otherwise re-use the old one) |
---|
220 | */ |
---|
221 | if (p) { |
---|
222 | rxi_RestoreDataBufs(p); |
---|
223 | } |
---|
224 | else { |
---|
225 | if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) { |
---|
226 | /* Could this happen with multiple socket listeners? */ |
---|
227 | printf("rxi_Listener: no packets!"); /* Shouldn't happen */ |
---|
228 | exit(1); |
---|
229 | } |
---|
230 | } |
---|
231 | |
---|
232 | if (rxi_ReadPacket(sock, p, &host, &port)) { |
---|
233 | clock_NewTime(); |
---|
234 | p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp); |
---|
235 | if (newcallp && *newcallp) { |
---|
236 | if (p) |
---|
237 | rxi_FreePacket(p); |
---|
238 | return; |
---|
239 | } |
---|
240 | } |
---|
241 | } |
---|
242 | /* NOTREACHED */ |
---|
243 | } |
---|
244 | |
---|
245 | /* This is the listener process request loop. The listener process loop |
---|
246 | * becomes a server thread when rxi_ListenerProc returns, and stays |
---|
247 | * server thread until rxi_ServerProc returns. */ |
---|
248 | static void rx_ListenerProc(void *argp) |
---|
249 | { |
---|
250 | int threadID; |
---|
251 | int sock = (int) argp; |
---|
252 | struct rx_call *newcall; |
---|
253 | |
---|
254 | while(1) { |
---|
255 | newcall = NULL; |
---|
256 | threadID = -1; |
---|
257 | rxi_ListenerProc(sock, &threadID, &newcall); |
---|
258 | /* assert(threadID != -1); */ |
---|
259 | /* assert(newcall != NULL); */ |
---|
260 | sock = OSI_NULLSOCKET; |
---|
261 | assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0); |
---|
262 | rxi_ServerProc(threadID, newcall, &sock); |
---|
263 | /* assert(sock != OSI_NULLSOCKET); */ |
---|
264 | } |
---|
265 | /* not reached */ |
---|
266 | } |
---|
267 | |
---|
268 | /* This is the server process request loop. The server process loop |
---|
269 | * becomes a listener thread when rxi_ServerProc returns, and stays |
---|
270 | * listener thread until rxi_ListenerProc returns. */ |
---|
271 | void rx_ServerProc() |
---|
272 | { |
---|
273 | int sock; |
---|
274 | int threadID; |
---|
275 | struct rx_call *newcall = NULL; |
---|
276 | |
---|
277 | rxi_MorePackets(rx_maxReceiveWindow+2); /* alloc more packets */ |
---|
278 | MUTEX_ENTER(&rx_stats_mutex); |
---|
279 | rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */ |
---|
280 | /* threadID is used for making decisions in GetCall. Get it by bumping |
---|
281 | * number of threads handling incoming calls */ |
---|
282 | threadID = rxi_availProcs++; |
---|
283 | MUTEX_EXIT(&rx_stats_mutex); |
---|
284 | |
---|
285 | while(1) { |
---|
286 | sock = OSI_NULLSOCKET; |
---|
287 | assert(pthread_setspecific(rx_thread_id_key, (void *)threadID) == 0); |
---|
288 | rxi_ServerProc(threadID, newcall, &sock); |
---|
289 | /* assert(sock != OSI_NULLSOCKET); */ |
---|
290 | newcall = NULL; |
---|
291 | rxi_ListenerProc(sock, &threadID, &newcall); |
---|
292 | /* assert(threadID != -1); */ |
---|
293 | /* assert(newcall != NULL); */ |
---|
294 | } |
---|
295 | /* not reached */ |
---|
296 | } |
---|
297 | |
---|
298 | /* |
---|
299 | * Historically used to start the listener process. We now have multiple |
---|
300 | * listener processes (one for each socket); these are started by GetUdpSocket. |
---|
301 | * |
---|
302 | * The event handling process *is* started here (the old listener used |
---|
303 | * to also handle events). The listener threads can't actually start |
---|
304 | * listening until rxi_StartListener is called because most of R may not |
---|
305 | * be initialized when rxi_Listen is called. |
---|
306 | */ |
---|
307 | void rxi_StartListener() { |
---|
308 | pthread_attr_t tattr; |
---|
309 | AFS_SIGSET_DECL; |
---|
310 | |
---|
311 | if (pthread_attr_init |
---|
312 | (&tattr) != 0) { |
---|
313 | printf("Unable to create Rx event handling thread (pthread_attr_init)\n"); |
---|
314 | exit(1); |
---|
315 | } |
---|
316 | |
---|
317 | if (pthread_attr_setdetachstate |
---|
318 | (&tattr, |
---|
319 | PTHREAD_CREATE_DETACHED) != 0) { |
---|
320 | printf("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n"); |
---|
321 | exit(1); |
---|
322 | } |
---|
323 | |
---|
324 | AFS_SIGSET_CLEAR(); |
---|
325 | if (pthread_create |
---|
326 | (&event_handler_thread, |
---|
327 | &tattr, |
---|
328 | event_handler, |
---|
329 | NULL) != 0) { |
---|
330 | printf("Unable to create Rx event handling thread\n"); |
---|
331 | exit(1); |
---|
332 | } |
---|
333 | AFS_SIGSET_RESTORE(); |
---|
334 | |
---|
335 | assert(pthread_mutex_lock(&listener_mutex)==0); |
---|
336 | assert(pthread_cond_broadcast(&rx_listener_cond)==0); |
---|
337 | listeners_started = 1; |
---|
338 | assert(pthread_mutex_unlock(&listener_mutex)==0); |
---|
339 | |
---|
340 | } |
---|
341 | |
---|
342 | /* |
---|
343 | * Listen on the specified socket. |
---|
344 | */ |
---|
345 | rxi_Listen(sock) |
---|
346 | osi_socket sock; |
---|
347 | { |
---|
348 | pthread_t thread; |
---|
349 | pthread_attr_t tattr; |
---|
350 | AFS_SIGSET_DECL; |
---|
351 | |
---|
352 | if (pthread_attr_init |
---|
353 | (&tattr) != 0) { |
---|
354 | printf("Unable to create socket listener thread (pthread_attr_init)\n"); |
---|
355 | exit(1); |
---|
356 | } |
---|
357 | |
---|
358 | if (pthread_attr_setdetachstate |
---|
359 | (&tattr, |
---|
360 | PTHREAD_CREATE_DETACHED) != 0) { |
---|
361 | printf("Unable to create socket listener thread (pthread_attr_setdetachstate)\n"); |
---|
362 | exit(1); |
---|
363 | } |
---|
364 | |
---|
365 | AFS_SIGSET_CLEAR(); |
---|
366 | if (pthread_create |
---|
367 | (&thread, |
---|
368 | &tattr, |
---|
369 | rx_ListenerProc, |
---|
370 | (void *) sock) != 0) { |
---|
371 | printf("Unable to create socket listener thread\n"); |
---|
372 | exit(1); |
---|
373 | } |
---|
374 | AFS_SIGSET_RESTORE(); |
---|
375 | return 0; |
---|
376 | } |
---|
377 | |
---|
378 | |
---|
379 | /* |
---|
380 | * Recvmsg. |
---|
381 | * |
---|
382 | */ |
---|
383 | int rxi_Recvmsg |
---|
384 | (int socket, |
---|
385 | struct msghdr *msg_p, |
---|
386 | int flags) |
---|
387 | { |
---|
388 | int ret; |
---|
389 | ret = recvmsg(socket, msg_p, flags); |
---|
390 | return ret; |
---|
391 | } |
---|
392 | |
---|
393 | /* |
---|
394 | * Sendmsg. |
---|
395 | */ |
---|
396 | rxi_Sendmsg(socket, msg_p, flags) |
---|
397 | osi_socket socket; |
---|
398 | struct msghdr *msg_p; |
---|
399 | int flags; |
---|
400 | { |
---|
401 | int ret; |
---|
402 | ret = sendmsg(socket, msg_p, flags); |
---|
403 | #ifdef AFS_LINUX22_ENV |
---|
404 | /* linux unfortunately returns ECONNREFUSED if the target port |
---|
405 | * is no longer in use */ |
---|
406 | /* and EAGAIN if a UDP checksum is incorrect */ |
---|
407 | if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) { |
---|
408 | #else |
---|
409 | if (ret == -1) { |
---|
410 | #endif |
---|
411 | printf("rxi_sendmsg failed, error %d\n", errno); |
---|
412 | fflush(stdout); |
---|
413 | } |
---|
414 | return 0; |
---|
415 | } |
---|