source: trunk/third/moira/server/increment.pc @ 24250

Revision 24250, 16.7 KB checked in by broder, 15 years ago (diff)
New Moira snapshot from subversion. Sorry for the large diff - looks like all of the keywords changed in the SVN import process.
Line 
1/* $Id: increment.pc,v 2.23 2009-12-29 17:29:33 zacheiss Exp $
2 *
3 * Deal with incremental updates
4 *
5 * Copyright (C) 1989-1998 by the Massachusetts Institute of Technology
6 * For copying and distribution information, please see the file
7 * <mit-copyright.h>.
8 */
9
10#include <mit-copyright.h>
11#include "mr_server.h"
12#include "query.h"
13#include "qrtn.h"
14
15#include <signal.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <unistd.h>
20#include <sys/param.h>
21
22EXEC SQL INCLUDE sqlca;
23
24RCSID("$Header: /afs/.athena.mit.edu/astaff/project/moiradev/repository/moira/server/increment.pc,v 2.23 2009-12-29 17:29:33 zacheiss Exp $");
25
26extern char *whoami;
27extern char *table_name[];
28extern int num_tables;
29
30int inc_pid = 0;
31int inc_running = 0;
32time_t inc_started;
33
34#define MAXARGC 16
35
36EXEC SQL WHENEVER SQLERROR DO dbmserr();
37
38EXEC SQL BEGIN DECLARE SECTION;
39/* structures to save before args */
40static char *before[MAXARGC];
41static int beforec;
42static enum tables beforetable;
43
44/* structures to save after args */
45static char *after[MAXARGC];
46static int afterc;
47EXEC SQL END DECLARE SECTION;
48
49/* structures to save entire sets of incremental changes */
50struct save_queue *incremental_sq = NULL;
51struct save_queue *incremental_exec = NULL;
52struct iupdate {
53  char *table;
54  int beforec;
55  char **before;
56  int afterc;
57  char **after;
58  char *service;
59};
60
61void next_incremental(void);
62char **copy_argv(char **argv, int argc);
63void free_argv(char **argv, int argc);
64int table_num(char *table);
65
66void incremental_init(void)
67{
68  int i;
69
70  if (!incremental_sq)
71    incremental_sq = sq_create();
72  if (!incremental_exec)
73    incremental_exec = sq_create();
74
75  for (i = 0; i < MAXARGC; i++)
76    {
77      before[i] = xmalloc(MAX_FIELD_WIDTH);
78      after[i] = xmalloc(MAX_FIELD_WIDTH);
79    }
80}
81
82
83/* record the state of a table row before it is changed */
84
85void incremental_before(enum tables table, char *qual, char **argv)
86{
87  EXEC SQL BEGIN DECLARE SECTION;
88  int id;
89  EXEC SQL END DECLARE SECTION;
90
91  char *name, *name2;
92
93  beforetable = table;
94
95  switch (table)
96    {
97    case USERS_TABLE:
98      sprintf(stmt_buf, "SELECT u.login, u.unix_uid, u.shell, "
99              "u.winconsoleshell, u.last, u.first, u.middle, u.status, "
100              "u.clearid, u.type, u.users_id, u.winhomedir, u.winprofiledir, "
101              "u.potype FROM users u WHERE %s", qual);
102      dosql(before);
103      beforec = 14;
104      break;
105    case MACHINE_TABLE:
106      sprintf(stmt_buf, "SELECT m.name, m.vendor, m.mach_id FROM machine m "
107              "WHERE %s", qual);
108      dosql(before);
109      beforec = 3;
110      break;
111    case CLUSTERS_TABLE:
112      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, "
113              "c.clu_id FROM clusters c WHERE %s", qual);
114      dosql(before);
115      beforec = 4;
116      break;
117    case CONTAINERS_TABLE:
118      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, c.contact, "
119              "c.acl_type, c.acl_id, c.cnt_id, c.list_id FROM containers c "
120              "WHERE %s", qual);
121      dosql(before);
122      beforec = 8;
123      name = xmalloc(0);
124      id = atoi(before[5]);
125      if (!strncmp(before[4], "USER", 4))
126        {
127          id_to_name(id, USERS_TABLE, &name);
128          strcpy(before[5], name);
129        }
130      else if (!strncmp(before[4], "LIST", 4))
131        {
132          id_to_name(id, LIST_TABLE, &name);
133          strcpy(before[5], name);
134        }
135      else if (!strncmp(before[4], "KERBEROS", 8))
136        {
137          id_to_name(id, STRINGS_TABLE, &name);
138          strcpy(before[5], name);
139        }
140      id = atoi(before[7]);
141      id_to_name(id, LIST_TABLE, &name);
142      strcpy(before[7], name);
143      break;
144    case MCMAP_TABLE:
145      strcpy(before[0], argv[0]);
146      strcpy(before[1], argv[1]);
147      beforec = 2;
148      break;
149    case MCNTMAP_TABLE:
150      strcpy(before[0], argv[0]);
151      strcpy(before[1], argv[1]);
152      name_to_id(before[0], MACHINE_TABLE, &id);
153      sprintf(before[2], "%d", id);
154      name_to_id(before[1], CONTAINERS_TABLE, &id);
155      sprintf(before[3], "%d", id);
156      name = xmalloc(0);
157      EXEC SQL SELECT list_id INTO :before[4] FROM containers
158        WHERE cnt_id = :id;
159      id = atoi(before[4]);
160      id_to_name(id, LIST_TABLE, &name);
161      strcpy(before[4], name);
162      beforec = 5;
163      break;
164    case SVC_TABLE:
165      strcpy(before[0], argv[0]);
166      strcpy(before[1], argv[1]);
167      strcpy(before[2], argv[2]);
168      beforec = 3;
169      break;
170    case FILESYS_TABLE:
171      sprintf(stmt_buf, "SELECT fs.label, fs.type, fs.mach_id, fs.name, "
172              "fs.mount, fs.rwaccess, fs.comments, fs.owner, fs.owners, "
173              "fs.createflg, fs.lockertype, fs.filsys_id FROM filesys fs "
174              "WHERE %s", qual);
175      dosql(before);
176      name = xmalloc(0);
177      id = atoi(before[2]);
178      id_to_name(id, MACHINE_TABLE, &name);
179      strcpy(before[2], name);
180      id = atoi(before[7]);
181      id_to_name(id, USERS_TABLE, &name);
182      strcpy(before[7], name);
183      id = atoi(before[8]);
184      id_to_name(id, LIST_TABLE, &name);
185      strcpy(before[8], name);
186      free(name);
187      beforec = 12;
188      break;
189    case QUOTA_TABLE:
190      strcpy(before[0], "?");
191      strcpy(before[1], argv[1]);
192      strcpy(before[2], "?");
193      sprintf(stmt_buf, "SELECT q.quota, fs.name FROM quota q, filesys fs "
194              "WHERE %s AND fs.filsys_id = q.filsys_id", qual);
195      dosql(&(before[3]));
196      strcpy(before[2], argv[1]);
197      beforec = 5;
198      break;
199    case LIST_TABLE:
200      sprintf(stmt_buf, "SELECT l.name, l.active, l.publicflg, l.hidden, "
201              "l.maillist, l.grouplist, l.gid, l.acl_type, l.acl_id, "
202              "l.description, l.list_id, l.nfsgroup FROM list l WHERE %s", qual);
203      dosql(before);
204      beforec = 12;
205      break;
206    case IMEMBERS_TABLE:
207      id = (int)(long)argv[0];
208      sprintf(stmt_buf, "SELECT active, publicflg, hidden, maillist, "
209              "grouplist, gid, nfsgroup FROM list WHERE list_id = %d", id);
210      dosql(&(before[3]));
211      name = xmalloc(0);
212      id_to_name(id, LIST_TABLE, &name);
213      name2 = xmalloc(0);
214      strcpy(before[0], name);
215      strcpy(before[1], argv[1]);
216      id = (int)(long)argv[2];
217      beforec = 11;
218      if (!strcmp(before[1], "USER"))
219        {
220          id_to_name(id, USERS_TABLE, &name2);
221          EXEC SQL SELECT status, users_id INTO :before[10], :before[12]
222            FROM users WHERE users_id = :id;
223          EXEC SQL SELECT list_id INTO :before[11] FROM list
224            WHERE name = :name;
225          beforec = 13;
226      }
227      else if (!strcmp(before[1], "LIST"))
228        {
229          id_to_name(id, LIST_TABLE, &name2);
230          EXEC SQL SELECT list_id INTO :before[10] FROM list
231            WHERE name = :name;
232          sprintf(before[11], "%d", id);
233          beforec = 12;
234        }
235      else if (!strcmp(before[1], "STRING") || !strcmp(before[1], "KERBEROS"))
236        {
237          id_to_name(id, STRINGS_TABLE, &name2);
238          EXEC SQL SELECT list_id INTO :before[10] FROM list
239            WHERE name = :name;
240        }
241      else if (!strcmp(before[1], "MACHINE"))
242        {
243          id_to_name(id, MACHINE_TABLE, &name2);
244          EXEC SQL SELECT list_id INTO :before[10] FROM list
245            WHERE name = :name;
246          sprintf(before[11], "%d", id);
247          beforec = 12;
248        }
249      strcpy(before[2], name2);
250      free(name);
251      free(name2);
252      break;
253    default:
254        /*
255        com_err(whoami, 0, "requested incremental on unexpected table `%s'",
256                table_name[table]);
257        */
258      break;
259    }
260}
261
262
263void incremental_clear_before(void)
264{
265  beforec = 0;
266}
267
268
269/* add an element to the incremental queue for the changed row */
270
271void incremental_after(enum tables table, char *qual, char **argv)
272{
273  char *name, *name2;
274  EXEC SQL BEGIN DECLARE SECTION;
275  int id;
276  EXEC SQL END DECLARE SECTION;
277  struct iupdate *iu;
278
279  switch (table)
280    {
281    case USERS_TABLE:
282      sprintf(stmt_buf, "SELECT u.login, u.unix_uid, u.shell, "
283              "u.winconsoleshell, u.last, u.first, u.middle, u.status, "
284              "u.clearid, u.type, u.users_id, u.winhomedir, u.winprofiledir, "
285              "u.potype FROM users u WHERE %s", qual);
286      dosql(after);
287      afterc = 14;
288      break;
289    case MACHINE_TABLE:
290      sprintf(stmt_buf, "SELECT m.name, m.vendor, m.mach_id FROM machine m "
291              "WHERE %s", qual);
292      dosql(after);
293      afterc = 3;
294      break;
295    case CLUSTERS_TABLE:
296      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, "
297              "c.clu_id FROM clusters c WHERE %s", qual);
298      dosql(after);
299      afterc = 4;
300      break;
301    case CONTAINERS_TABLE:
302      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, c.contact, "
303              "c.acl_type, c.acl_id, c.cnt_id, c.list_id FROM containers c "
304              "WHERE %s", qual);
305      dosql(after);
306      afterc = 8;
307      name = xmalloc(0);
308      id = atoi(after[5]);
309      if (!strncmp(after[4], "USER", 4))
310        {
311          id_to_name(id, USERS_TABLE, &name);
312          strcpy(after[5], name);
313        }
314      else if (!strncmp(after[4], "LIST", 4))
315        {
316          id_to_name(id, LIST_TABLE, &name);
317          strcpy(after[5], name);
318        }
319      else if (!strncmp(after[4], "KERBEROS", 8))
320        {
321          id_to_name(id, STRINGS_TABLE, &name);
322          strcpy(after[5], name);
323        }
324      id = atoi(after[7]);
325      id_to_name(id, LIST_TABLE, &name);
326      strcpy(after[7], name);
327      break;
328    case MCMAP_TABLE:
329      strcpy(after[0], argv[0]);
330      strcpy(after[1], argv[1]);
331      afterc = 2;
332      break;
333    case MCNTMAP_TABLE:
334      strcpy(after[0], argv[0]);
335      strcpy(after[1], argv[1]);
336      name_to_id(after[0], MACHINE_TABLE, &id);
337      sprintf(after[2], "%d", id);
338      name_to_id(after[1], CONTAINERS_TABLE, &id);
339      sprintf(after[3], "%d", id);
340      name = xmalloc(0);
341      EXEC SQL SELECT list_id INTO :after[4] FROM containers
342        WHERE cnt_id = :id;
343      id = atoi(after[4]);
344      id_to_name(id, LIST_TABLE, &name);
345      strcpy(after[4], name);
346      afterc = 5;
347      break;
348    case SVC_TABLE:
349      strcpy(after[0], argv[0]);
350      strcpy(after[1], argv[1]);
351      strcpy(after[2], argv[2]);
352      afterc = 3;
353      break;
354    case FILESYS_TABLE:
355      sprintf(stmt_buf, "SELECT fs.label, fs.type, fs.mach_id, fs.name, "
356              "fs.mount, fs.rwaccess, fs.comments, fs.owner, fs.owners, "
357              "fs.createflg, fs.lockertype, fs.filsys_id FROM filesys fs "
358              "WHERE %s", qual);
359      dosql(after);
360      name = xmalloc(0);
361      id = atoi(after[2]);
362      id_to_name(id, MACHINE_TABLE, &name);
363      strcpy(after[2], name);
364      id = atoi(after[7]);
365      id_to_name(id, USERS_TABLE, &name);
366      strcpy(after[7], name);
367      id = atoi(after[8]);
368      id_to_name(id, LIST_TABLE, &name);
369      strcpy(after[8], name);
370      free(name);
371      afterc = 12;
372      break;
373    case QUOTA_TABLE:
374      strcpy(after[0], "?");
375      strcpy(after[1], argv[1]);
376      strcpy(after[2], "?");
377      sprintf(stmt_buf, "SELECT q.quota, fs.name FROM quota q, filesys fs "
378              "WHERE %s and fs.filsys_id = q.filsys_id and q.type = '%s'",
379              qual, argv[1]);
380      dosql(&(after[3]));
381      afterc = 5;
382      break;
383    case LIST_TABLE:
384      sprintf(stmt_buf, "SELECT l.name, l.active, l.publicflg, l.hidden, "
385              "l.maillist, l.grouplist, l.gid, l.acl_type, l.acl_id, "
386              "l.description, l.list_id, l.nfsgroup FROM list l WHERE %s", qual);
387      dosql(after);
388      afterc = 12;
389      break;
390    case IMEMBERS_TABLE:
391      id = (int)(long)argv[0];
392      sprintf(stmt_buf, "SELECT active, publicflg, hidden, maillist, "
393              "grouplist, gid, nfsgroup FROM list WHERE list_id = %d", id);
394      dosql(&(after[3]));
395      name = xmalloc(0);
396      id_to_name(id, LIST_TABLE, &name);
397      name2 = xmalloc(0);
398      strcpy(after[0], name);
399      strcpy(after[1], argv[1]);
400      id = (int)(long)argv[2];
401      afterc = 11;
402      if (!strcmp(after[1], "USER"))
403        {
404          id_to_name(id, USERS_TABLE, &name2);
405          EXEC SQL SELECT status, users_id INTO :after[10], :after[12]
406            FROM users WHERE users_id = :id;
407          EXEC SQL SELECT list_id INTO :after[11] FROM list
408            WHERE name = :name;
409          afterc = 13;
410        }
411      else if (!strcmp(after[1], "LIST"))
412        {
413          id_to_name(id, LIST_TABLE, &name2);
414          EXEC SQL SELECT list_id INTO :after[10] FROM list
415            WHERE name = :name;
416          sprintf(after[11], "%d", id);
417          afterc = 12;
418        }
419      else if (!strcmp(after[1], "STRING") || !strcmp(after[1], "KERBEROS"))
420        {
421          id_to_name(id, STRINGS_TABLE, &name2);
422          EXEC SQL SELECT list_id INTO :after[10] FROM list
423            WHERE name = :name;
424        }
425      else if (!strcmp(after[1], "MACHINE"))
426        {
427          id_to_name(id, MACHINE_TABLE, &name2);
428          EXEC SQL SELECT list_id INTO :after[10] FROM list
429            WHERE name = :name;
430          sprintf(after[11], "%d", id);
431          afterc = 12;
432        }
433      strcpy(after[2], name2);
434      free(name);
435      free(name2);
436      break;
437    case NO_TABLE:
438      afterc = 0;
439      table = beforetable;
440      break;
441    default:
442        /*
443        com_err(whoami, 0, "requested incremental on unexpected table `%s'",
444                table_name[table]);
445        */
446      break;
447    }
448
449  iu = xmalloc(sizeof(struct iupdate));
450  iu->table = table_name[table];
451  iu->beforec = beforec;
452  iu->before = copy_argv(before, beforec);
453  iu->afterc = afterc;
454  iu->after = copy_argv(after, afterc);
455  sq_save_data(incremental_sq, iu);
456}
457
458void incremental_clear_after(void)
459{
460  incremental_after(NO_TABLE, NULL, NULL);
461}
462
463
464/* Called when the current transaction is committed to start any queued
465 * incremental updates.  This caches the update table the first time it
466 * is called.
467 */
468
469struct inc_cache {
470  struct inc_cache *next;
471  char *table, *service;
472};
473
474
475void incremental_update(void)
476{
477  static int inited = 0;
478  static struct inc_cache *cache;
479  struct inc_cache *c;
480  EXEC SQL BEGIN DECLARE SECTION;
481  char tab[INCREMENTAL_TABLE_NAME_SIZE], serv[INCREMENTAL_SERVICE_SIZE];
482  EXEC SQL END DECLARE SECTION;
483  struct iupdate *iu, *iu_save;
484
485  if (!inited)
486    {
487      inited++;
488
489      EXEC SQL DECLARE inc CURSOR FOR SELECT table_name, service
490        FROM incremental;
491      EXEC SQL OPEN inc;
492      while (1)
493        {
494          EXEC SQL FETCH inc INTO :tab, :serv;
495          if (sqlca.sqlcode)
496            break;
497          c = xmalloc(sizeof(struct inc_cache));
498          c->next = cache;
499          c->table = xstrdup(strtrim(tab));
500          c->service = xstrdup(strtrim(serv));
501          cache = c;
502        }
503      EXEC SQL CLOSE inc;
504      EXEC SQL COMMIT WORK;
505    }
506
507  while (sq_remove_data(incremental_sq, &iu))
508    {
509      for (c = cache; c; c = c->next)
510        {
511          if (!strcmp(c->table, iu->table))
512            {
513              iu->service = c->service;
514              iu_save = xmalloc(sizeof(struct iupdate));
515              iu_save->service = iu->service;
516              iu_save->table = iu->table;
517              iu_save->beforec = iu->beforec;
518              iu_save->afterc = iu->afterc;
519              iu_save->before = copy_argv(iu->before, iu->beforec);
520              iu_save->after = copy_argv(iu->after, iu->afterc);
521              sq_save_data(incremental_exec, iu_save);
522            }
523        }
524      if (!c)
525        {
526          free_argv(iu->before, iu->beforec);
527          free_argv(iu->after, iu->afterc);
528          free(iu);
529        }
530    }
531  if (inc_running == 0)
532    next_incremental();
533}
534
535/* Pro*C 2.2.4 can't cope with the sigset_t below, at least in Solaris 2.6.
536   We add DEFINE=_PROC_ to the proc invocation and then #ifndef that around
537   this function so proc will pass it through without reading it. */
538
539#ifndef _PROC_
540void next_incremental(void)
541{
542  struct iupdate *iu;
543  char *argv[MAXARGC * 2 + 4], cafter[3], cbefore[3], prog[MAXPATHLEN];
544  int i;
545  sigset_t sigs;
546
547  if (!incremental_exec)
548    incremental_init();
549
550  if (sq_empty(incremental_exec) ||
551      (inc_running && now - inc_started < INC_TIMEOUT))
552    return;
553
554  if (inc_running)
555    com_err(whoami, 0, "incremental timeout on pid %d", inc_pid);
556
557  sq_remove_data(incremental_exec, &iu);
558  argv[1] = iu->table;
559  sprintf(cbefore, "%d", iu->beforec);
560  argv[2] = cbefore;
561  sprintf(cafter, "%d", iu->afterc);
562  argv[3] = cafter;
563  for (i = 0; i < iu->beforec; i++)
564    argv[4 + i] = iu->before[i];
565  for (i = 0; i < iu->afterc; i++)
566    argv[4 + iu->beforec + i] = iu->after[i];
567
568  sprintf(prog, "%s/%s.incr", BIN_DIR, iu->service);
569  argv[0] = prog;
570  argv[4 + iu->beforec + iu->afterc] = 0;
571
572  sigemptyset(&sigs);
573  sigaddset(&sigs, SIGCHLD);
574  sigprocmask(SIG_BLOCK, &sigs, NULL);
575  inc_pid = vfork();
576  switch (inc_pid)
577    {
578    case 0:
579      execv(prog, argv);
580      _exit(1);
581    case -1:
582      com_err(whoami, 0, "Failed to start incremental update");
583      break;
584    default:
585      inc_running = 1;
586      inc_started = now;
587    }
588  sigprocmask(SIG_UNBLOCK, &sigs, NULL);
589
590  free_argv(iu->before, iu->beforec);
591  free_argv(iu->after, iu->afterc);
592  free(iu);
593}
594#endif
595
596/* Called when the current transaction is aborted to throw away any queued
597 * incremental updates
598 */
599
600void incremental_flush(void)
601{
602  struct iupdate *iu;
603
604  while (sq_get_data(incremental_sq, &iu))
605    {
606      free_argv(iu->before, iu->beforec);
607      free_argv(iu->after, iu->afterc);
608      free(iu);
609    }
610  sq_destroy(incremental_sq);
611  incremental_sq = sq_create();
612}
613
614
615char **copy_argv(char **argv, int argc)
616{
617  char **ret = xmalloc(sizeof(char *) * argc);
618  while (--argc >= 0)
619    ret[argc] = xstrdup(strtrim(argv[argc]));
620  return ret;
621}
622
623void free_argv(char **argv, int argc)
624{
625  while (--argc >= 0)
626    free(argv[argc]);
627  free(argv);
628}
629
630int table_num(char *name)
631{
632  int i;
633
634  for (i = num_tables - 1; i; i--)
635    {
636      if (!strcmp(table_name[i], name))
637        break;
638    }
639
640  return i; /* 0 = "none" if no match */
641}
Note: See TracBrowser for help on using the repository browser.