source: trunk/third/moira/server/increment.pc @ 23740

Revision 23740, 16.6 KB checked in by broder, 16 years ago (diff)
In moira: * New CVS snapshot (Trac: #195) * Drop patches that have been incorporated upstream. * Update to build without krb4 on systems that no longer have it. This doesn't build yet on squeeze, which lacks a krb4 library, but I'm committing now before I start hacking away at a patch to fix that.
Line 
1/* $Id: increment.pc,v 2.21 2008-08-22 17:49:12 zacheiss Exp $
2 *
3 * Deal with incremental updates
4 *
5 * Copyright (C) 1989-1998 by the Massachusetts Institute of Technology
6 * For copying and distribution information, please see the file
7 * <mit-copyright.h>.
8 */
9
10#include <mit-copyright.h>
11#include "mr_server.h"
12#include "query.h"
13#include "qrtn.h"
14
15#include <signal.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <unistd.h>
20
21EXEC SQL INCLUDE sqlca;
22
23RCSID("$Header: /afs/athena.mit.edu/astaff/project/moiradev/repository/moira/server/increment.pc,v 2.21 2008-08-22 17:49:12 zacheiss Exp $");
24
25extern char *whoami;
26extern char *table_name[];
27extern int num_tables;
28
29int inc_pid = 0;
30int inc_running = 0;
31time_t inc_started;
32
33#define MAXARGC 16
34
35EXEC SQL WHENEVER SQLERROR DO dbmserr();
36
37EXEC SQL BEGIN DECLARE SECTION;
38/* structures to save before args */
39static char *before[MAXARGC];
40static int beforec;
41static enum tables beforetable;
42
43/* structures to save after args */
44static char *after[MAXARGC];
45static int afterc;
46EXEC SQL END DECLARE SECTION;
47
48/* structures to save entire sets of incremental changes */
49struct save_queue *incremental_sq = NULL;
50struct save_queue *incremental_exec = NULL;
51struct iupdate {
52  char *table;
53  int beforec;
54  char **before;
55  int afterc;
56  char **after;
57  char *service;
58};
59
60void next_incremental(void);
61char **copy_argv(char **argv, int argc);
62void free_argv(char **argv, int argc);
63int table_num(char *table);
64
65void incremental_init(void)
66{
67  int i;
68
69  if (!incremental_sq)
70    incremental_sq = sq_create();
71  if (!incremental_exec)
72    incremental_exec = sq_create();
73
74  for (i = 0; i < MAXARGC; i++)
75    {
76      before[i] = xmalloc(MAX_FIELD_WIDTH);
77      after[i] = xmalloc(MAX_FIELD_WIDTH);
78    }
79}
80
81
82/* record the state of a table row before it is changed */
83
84void incremental_before(enum tables table, char *qual, char **argv)
85{
86  EXEC SQL BEGIN DECLARE SECTION;
87  int id;
88  EXEC SQL END DECLARE SECTION;
89
90  char *name, *name2;
91
92  beforetable = table;
93
94  switch (table)
95    {
96    case USERS_TABLE:
97      sprintf(stmt_buf, "SELECT u.login, u.unix_uid, u.shell, "
98              "u.winconsoleshell, u.last, u.first, u.middle, u.status, "
99              "u.clearid, u.type, u.users_id, u.winhomedir, u.winprofiledir, "
100              "u.potype FROM users u WHERE %s", qual);
101      dosql(before);
102      beforec = 14;
103      break;
104    case MACHINE_TABLE:
105      sprintf(stmt_buf, "SELECT m.name, m.vendor, m.mach_id FROM machine m "
106              "WHERE %s", qual);
107      dosql(before);
108      beforec = 3;
109      break;
110    case CLUSTERS_TABLE:
111      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, "
112              "c.clu_id FROM clusters c WHERE %s", qual);
113      dosql(before);
114      beforec = 4;
115      break;
116    case CONTAINERS_TABLE:
117      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, c.contact, "
118              "c.acl_type, c.acl_id, c.cnt_id, c.list_id FROM containers c "
119              "WHERE %s", qual);
120      dosql(before);
121      beforec = 8;
122      name = xmalloc(0);
123      id = atoi(before[5]);
124      if (!strncmp(before[4], "USER", 4))
125        {
126          id_to_name(id, USERS_TABLE, &name);
127          strcpy(before[5], name);
128        }
129      else if (!strncmp(before[4], "LIST", 4))
130        {
131          id_to_name(id, LIST_TABLE, &name);
132          strcpy(before[5], name);
133        }
134      else if (!strncmp(before[4], "KERBEROS", 8))
135        {
136          id_to_name(id, STRINGS_TABLE, &name);
137          strcpy(before[5], name);
138        }
139      id = atoi(before[7]);
140      id_to_name(id, LIST_TABLE, &name);
141      strcpy(before[7], name);
142      break;
143    case MCMAP_TABLE:
144      strcpy(before[0], argv[0]);
145      strcpy(before[1], argv[1]);
146      beforec = 2;
147      break;
148    case MCNTMAP_TABLE:
149      strcpy(before[0], argv[0]);
150      strcpy(before[1], argv[1]);
151      name_to_id(before[0], MACHINE_TABLE, &id);
152      sprintf(before[2], "%d", id);
153      name_to_id(before[1], CONTAINERS_TABLE, &id);
154      sprintf(before[3], "%d", id);
155      name = xmalloc(0);
156      EXEC SQL SELECT list_id INTO :before[4] FROM containers
157        WHERE cnt_id = :id;
158      id = atoi(before[4]);
159      id_to_name(id, LIST_TABLE, &name);
160      strcpy(before[4], name);
161      beforec = 5;
162      break;
163    case SVC_TABLE:
164      strcpy(before[0], argv[0]);
165      strcpy(before[1], argv[1]);
166      strcpy(before[2], argv[2]);
167      beforec = 3;
168      break;
169    case FILESYS_TABLE:
170      sprintf(stmt_buf, "SELECT fs.label, fs.type, fs.mach_id, fs.name, "
171              "fs.mount, fs.rwaccess, fs.comments, fs.owner, fs.owners, "
172              "fs.createflg, fs.lockertype, fs.filsys_id FROM filesys fs "
173              "WHERE %s", qual);
174      dosql(before);
175      name = xmalloc(0);
176      id = atoi(before[2]);
177      id_to_name(id, MACHINE_TABLE, &name);
178      strcpy(before[2], name);
179      id = atoi(before[7]);
180      id_to_name(id, USERS_TABLE, &name);
181      strcpy(before[7], name);
182      id = atoi(before[8]);
183      id_to_name(id, LIST_TABLE, &name);
184      strcpy(before[8], name);
185      free(name);
186      beforec = 12;
187      break;
188    case QUOTA_TABLE:
189      strcpy(before[0], "?");
190      strcpy(before[1], argv[1]);
191      strcpy(before[2], "?");
192      sprintf(stmt_buf, "SELECT q.quota, fs.name FROM quota q, filesys fs "
193              "WHERE %s AND fs.filsys_id = q.filsys_id", qual);
194      dosql(&(before[3]));
195      strcpy(before[2], argv[1]);
196      beforec = 5;
197      break;
198    case LIST_TABLE:
199      sprintf(stmt_buf, "SELECT l.name, l.active, l.publicflg, l.hidden, "
200              "l.maillist, l.grouplist, l.gid, l.acl_type, l.acl_id, "
201              "l.description, l.list_id FROM list l WHERE %s", qual);
202      dosql(before);
203      beforec = 11;
204      break;
205    case IMEMBERS_TABLE:
206      id = (int) argv[0];
207      sprintf(stmt_buf, "SELECT active, publicflg, hidden, maillist, "
208              "grouplist, gid FROM list WHERE list_id = %d", id);
209      dosql(&(before[3]));
210      name = xmalloc(0);
211      id_to_name(id, LIST_TABLE, &name);
212      name2 = xmalloc(0);
213      strcpy(before[0], name);
214      strcpy(before[1], argv[1]);
215      id = (int) argv[2];
216      beforec = 10;
217      if (!strcmp(before[1], "USER"))
218        {
219          id_to_name(id, USERS_TABLE, &name2);
220          EXEC SQL SELECT status, users_id INTO :before[9], :before[11]
221            FROM users WHERE users_id = :id;
222          EXEC SQL SELECT list_id INTO :before[10] FROM list
223            WHERE name = :name;
224          beforec = 12;
225      }
226      else if (!strcmp(before[1], "LIST"))
227        {
228          id_to_name(id, LIST_TABLE, &name2);
229          EXEC SQL SELECT list_id INTO :before[9] FROM list
230            WHERE name = :name;
231          sprintf(before[10], "%d", id);
232          beforec = 11;
233        }
234      else if (!strcmp(before[1], "STRING") || !strcmp(before[1], "KERBEROS"))
235        {
236          id_to_name(id, STRINGS_TABLE, &name2);
237          EXEC SQL SELECT list_id INTO :before[9] FROM list
238            WHERE name = :name;
239        }
240      else if (!strcmp(before[1], "MACHINE"))
241        {
242          id_to_name(id, MACHINE_TABLE, &name2);
243          EXEC SQL SELECT list_id INTO :before[9] FROM list
244            WHERE name = :name;
245          sprintf(before[10], "%d", id);
246          beforec = 11;
247        }
248      strcpy(before[2], name2);
249      free(name);
250      free(name2);
251      break;
252    default:
253        /*
254        com_err(whoami, 0, "requested incremental on unexpected table `%s'",
255                table_name[table]);
256        */
257      break;
258    }
259}
260
261
262void incremental_clear_before(void)
263{
264  beforec = 0;
265}
266
267
268/* add an element to the incremental queue for the changed row */
269
270void incremental_after(enum tables table, char *qual, char **argv)
271{
272  char *name, *name2;
273  EXEC SQL BEGIN DECLARE SECTION;
274  int id;
275  EXEC SQL END DECLARE SECTION;
276  struct iupdate *iu;
277
278  switch (table)
279    {
280    case USERS_TABLE:
281      sprintf(stmt_buf, "SELECT u.login, u.unix_uid, u.shell, "
282              "u.winconsoleshell, u.last, u.first, u.middle, u.status, "
283              "u.clearid, u.type, u.users_id, u.winhomedir, u.winprofiledir, "
284              "u.potype FROM users u WHERE %s", qual);
285      dosql(after);
286      afterc = 14;
287      break;
288    case MACHINE_TABLE:
289      sprintf(stmt_buf, "SELECT m.name, m.vendor, m.mach_id FROM machine m "
290              "WHERE %s", qual);
291      dosql(after);
292      afterc = 3;
293      break;
294    case CLUSTERS_TABLE:
295      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, "
296              "c.clu_id FROM clusters c WHERE %s", qual);
297      dosql(after);
298      afterc = 4;
299      break;
300    case CONTAINERS_TABLE:
301      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, c.contact, "
302              "c.acl_type, c.acl_id, c.cnt_id, c.list_id FROM containers c "
303              "WHERE %s", qual);
304      dosql(after);
305      afterc = 8;
306      name = xmalloc(0);
307      id = atoi(after[5]);
308      if (!strncmp(after[4], "USER", 4))
309        {
310          id_to_name(id, USERS_TABLE, &name);
311          strcpy(after[5], name);
312        }
313      else if (!strncmp(after[4], "LIST", 4))
314        {
315          id_to_name(id, LIST_TABLE, &name);
316          strcpy(after[5], name);
317        }
318      else if (!strncmp(after[4], "KERBEROS", 8))
319        {
320          id_to_name(id, STRINGS_TABLE, &name);
321          strcpy(after[5], name);
322        }
323      id = atoi(after[7]);
324      id_to_name(id, LIST_TABLE, &name);
325      strcpy(after[7], name);
326      break;
327    case MCMAP_TABLE:
328      strcpy(after[0], argv[0]);
329      strcpy(after[1], argv[1]);
330      afterc = 2;
331      break;
332    case MCNTMAP_TABLE:
333      strcpy(after[0], argv[0]);
334      strcpy(after[1], argv[1]);
335      name_to_id(after[0], MACHINE_TABLE, &id);
336      sprintf(after[2], "%d", id);
337      name_to_id(after[1], CONTAINERS_TABLE, &id);
338      sprintf(after[3], "%d", id);
339      name = xmalloc(0);
340      EXEC SQL SELECT list_id INTO :after[4] FROM containers
341        WHERE cnt_id = :id;
342      id = atoi(after[4]);
343      id_to_name(id, LIST_TABLE, &name);
344      strcpy(after[4], name);
345      afterc = 5;
346      break;
347    case SVC_TABLE:
348      strcpy(after[0], argv[0]);
349      strcpy(after[1], argv[1]);
350      strcpy(after[2], argv[2]);
351      afterc = 3;
352      break;
353    case FILESYS_TABLE:
354      sprintf(stmt_buf, "SELECT fs.label, fs.type, fs.mach_id, fs.name, "
355              "fs.mount, fs.rwaccess, fs.comments, fs.owner, fs.owners, "
356              "fs.createflg, fs.lockertype, fs.filsys_id FROM filesys fs "
357              "WHERE %s", qual);
358      dosql(after);
359      name = xmalloc(0);
360      id = atoi(after[2]);
361      id_to_name(id, MACHINE_TABLE, &name);
362      strcpy(after[2], name);
363      id = atoi(after[7]);
364      id_to_name(id, USERS_TABLE, &name);
365      strcpy(after[7], name);
366      id = atoi(after[8]);
367      id_to_name(id, LIST_TABLE, &name);
368      strcpy(after[8], name);
369      free(name);
370      afterc = 12;
371      break;
372    case QUOTA_TABLE:
373      strcpy(after[0], "?");
374      strcpy(after[1], argv[1]);
375      strcpy(after[2], "?");
376      sprintf(stmt_buf, "SELECT q.quota, fs.name FROM quota q, filesys fs "
377              "WHERE %s and fs.filsys_id = q.filsys_id and q.type = '%s'",
378              qual, argv[1]);
379      dosql(&(after[3]));
380      afterc = 5;
381      break;
382    case LIST_TABLE:
383      sprintf(stmt_buf, "SELECT l.name, l.active, l.publicflg, l.hidden, "
384              "l.maillist, l.grouplist, l.gid, l.acl_type, l.acl_id, "
385              "l.description, l.list_id FROM list l WHERE %s", qual);
386      dosql(after);
387      afterc = 11;
388      break;
389    case IMEMBERS_TABLE:
390      id = (int) argv[0];
391      sprintf(stmt_buf, "SELECT active, publicflg, hidden, maillist, "
392              "grouplist, gid FROM list WHERE list_id = %d", id);
393      dosql(&(after[3]));
394      name = xmalloc(0);
395      id_to_name(id, LIST_TABLE, &name);
396      name2 = xmalloc(0);
397      strcpy(after[0], name);
398      strcpy(after[1], argv[1]);
399      id = (int) argv[2];
400      afterc = 10;
401      if (!strcmp(after[1], "USER"))
402        {
403          id_to_name(id, USERS_TABLE, &name2);
404          EXEC SQL SELECT status, users_id INTO :after[9], :after[11]
405            FROM users WHERE users_id = :id;
406          EXEC SQL SELECT list_id INTO :after[10] FROM list
407            WHERE name = :name;
408          afterc = 12;
409        }
410      else if (!strcmp(after[1], "LIST"))
411        {
412          id_to_name(id, LIST_TABLE, &name2);
413          EXEC SQL SELECT list_id INTO :after[9] FROM list
414            WHERE name = :name;
415          sprintf(after[10], "%d", id);
416          afterc = 11;
417        }
418      else if (!strcmp(after[1], "STRING") || !strcmp(after[1], "KERBEROS"))
419        {
420          id_to_name(id, STRINGS_TABLE, &name2);
421          EXEC SQL SELECT list_id INTO :after[9] FROM list
422            WHERE name = :name;
423        }
424      else if (!strcmp(after[1], "MACHINE"))
425        {
426          id_to_name(id, MACHINE_TABLE, &name2);
427          EXEC SQL SELECT list_id INTO :after[9] FROM list
428            WHERE name = :name;
429          sprintf(after[10], "%d", id);
430          afterc = 11;
431        }
432      strcpy(after[2], name2);
433      free(name);
434      free(name2);
435      break;
436    case NO_TABLE:
437      afterc = 0;
438      table = beforetable;
439      break;
440    default:
441        /*
442        com_err(whoami, 0, "requested incremental on unexpected table `%s'",
443                table_name[table]);
444        */
445      break;
446    }
447
448  iu = xmalloc(sizeof(struct iupdate));
449  iu->table = table_name[table];
450  iu->beforec = beforec;
451  iu->before = copy_argv(before, beforec);
452  iu->afterc = afterc;
453  iu->after = copy_argv(after, afterc);
454  sq_save_data(incremental_sq, iu);
455}
456
457void incremental_clear_after(void)
458{
459  incremental_after(NO_TABLE, NULL, NULL);
460}
461
462
463/* Called when the current transaction is committed to start any queued
464 * incremental updates.  This caches the update table the first time it
465 * is called.
466 */
467
468struct inc_cache {
469  struct inc_cache *next;
470  char *table, *service;
471};
472
473
474void incremental_update(void)
475{
476  static int inited = 0;
477  static struct inc_cache *cache;
478  struct inc_cache *c;
479  EXEC SQL BEGIN DECLARE SECTION;
480  char tab[INCREMENTAL_TABLE_NAME_SIZE], serv[INCREMENTAL_SERVICE_SIZE];
481  EXEC SQL END DECLARE SECTION;
482  struct iupdate *iu, *iu_save;
483
484  if (!inited)
485    {
486      inited++;
487
488      EXEC SQL DECLARE inc CURSOR FOR SELECT table_name, service
489        FROM incremental;
490      EXEC SQL OPEN inc;
491      while (1)
492        {
493          EXEC SQL FETCH inc INTO :tab, :serv;
494          if (sqlca.sqlcode)
495            break;
496          c = xmalloc(sizeof(struct inc_cache));
497          c->next = cache;
498          c->table = xstrdup(strtrim(tab));
499          c->service = xstrdup(strtrim(serv));
500          cache = c;
501        }
502      EXEC SQL CLOSE inc;
503      EXEC SQL COMMIT WORK;
504    }
505
506  while (sq_remove_data(incremental_sq, &iu))
507    {
508      for (c = cache; c; c = c->next)
509        {
510          if (!strcmp(c->table, iu->table))
511            {
512              iu->service = c->service;
513              iu_save = xmalloc(sizeof(struct iupdate));
514              iu_save->service = iu->service;
515              iu_save->table = iu->table;
516              iu_save->beforec = iu->beforec;
517              iu_save->afterc = iu->afterc;
518              iu_save->before = copy_argv(iu->before, iu->beforec);
519              iu_save->after = copy_argv(iu->after, iu->afterc);
520              sq_save_data(incremental_exec, iu_save);
521            }
522        }
523      if (!c)
524        {
525          free_argv(iu->before, iu->beforec);
526          free_argv(iu->after, iu->afterc);
527          free(iu);
528        }
529    }
530  if (inc_running == 0)
531    next_incremental();
532}
533
534/* Pro*C 2.2.4 can't cope with the sigset_t below, at least in Solaris 2.6.
535   We add DEFINE=_PROC_ to the proc invocation and then #ifndef that around
536   this function so proc will pass it through without reading it. */
537
538#ifndef _PROC_
539void next_incremental(void)
540{
541  struct iupdate *iu;
542  char *argv[MAXARGC * 2 + 4], cafter[3], cbefore[3], prog[MAXPATHLEN];
543  int i;
544  sigset_t sigs;
545
546  if (!incremental_exec)
547    incremental_init();
548
549  if (sq_empty(incremental_exec) ||
550      (inc_running && now - inc_started < INC_TIMEOUT))
551    return;
552
553  if (inc_running)
554    com_err(whoami, 0, "incremental timeout on pid %d", inc_pid);
555
556  sq_remove_data(incremental_exec, &iu);
557  argv[1] = iu->table;
558  sprintf(cbefore, "%d", iu->beforec);
559  argv[2] = cbefore;
560  sprintf(cafter, "%d", iu->afterc);
561  argv[3] = cafter;
562  for (i = 0; i < iu->beforec; i++)
563    argv[4 + i] = iu->before[i];
564  for (i = 0; i < iu->afterc; i++)
565    argv[4 + iu->beforec + i] = iu->after[i];
566
567  sprintf(prog, "%s/%s.incr", BIN_DIR, iu->service);
568  argv[0] = prog;
569  argv[4 + iu->beforec + iu->afterc] = 0;
570
571  sigemptyset(&sigs);
572  sigaddset(&sigs, SIGCHLD);
573  sigprocmask(SIG_BLOCK, &sigs, NULL);
574  inc_pid = vfork();
575  switch (inc_pid)
576    {
577    case 0:
578      execv(prog, argv);
579      _exit(1);
580    case -1:
581      com_err(whoami, 0, "Failed to start incremental update");
582      break;
583    default:
584      inc_running = 1;
585      inc_started = now;
586    }
587  sigprocmask(SIG_UNBLOCK, &sigs, NULL);
588
589  free_argv(iu->before, iu->beforec);
590  free_argv(iu->after, iu->afterc);
591  free(iu);
592}
593#endif
594
595/* Called when the current transaction is aborted to throw away any queued
596 * incremental updates
597 */
598
599void incremental_flush(void)
600{
601  struct iupdate *iu;
602
603  while (sq_get_data(incremental_sq, &iu))
604    {
605      free_argv(iu->before, iu->beforec);
606      free_argv(iu->after, iu->afterc);
607      free(iu);
608    }
609  sq_destroy(incremental_sq);
610  incremental_sq = sq_create();
611}
612
613
614char **copy_argv(char **argv, int argc)
615{
616  char **ret = xmalloc(sizeof(char *) * argc);
617  while (--argc >= 0)
618    ret[argc] = xstrdup(strtrim(argv[argc]));
619  return ret;
620}
621
622void free_argv(char **argv, int argc)
623{
624  while (--argc >= 0)
625    free(argv[argc]);
626  free(argv);
627}
628
629int table_num(char *name)
630{
631  int i;
632
633  for (i = num_tables - 1; i; i--)
634    {
635      if (!strcmp(table_name[i], name))
636        break;
637    }
638
639  return i; /* 0 = "none" if no match */
640}
Note: See TracBrowser for help on using the repository browser.