/* $Id: increment.pc 4068 2012-02-02 21:31:10Z zacheiss $ * * Deal with incremental updates * * Copyright (C) 1989-1998 by the Massachusetts Institute of Technology * For copying and distribution information, please see the file * . */ #include #include "mr_server.h" #include "query.h" #include "qrtn.h" #include #include #include #include #include #include EXEC SQL INCLUDE sqlca; RCSID("$HeadURL: svn+ssh://svn.mit.edu/moira/trunk/moira/server/increment.pc $ $Id: increment.pc 4068 2012-02-02 21:31:10Z zacheiss $"); extern char *whoami; extern char *table_name[]; extern int num_tables; int inc_pid = 0; int inc_running = 0; time_t inc_started; #define MAXARGC 20 EXEC SQL WHENEVER SQLERROR DO dbmserr(); EXEC SQL BEGIN DECLARE SECTION; /* structures to save before args */ static char *before[MAXARGC]; static int beforec; static enum tables beforetable; /* structures to save after args */ static char *after[MAXARGC]; static int afterc; EXEC SQL END DECLARE SECTION; /* structures to save entire sets of incremental changes */ struct save_queue *incremental_sq = NULL; struct save_queue *incremental_exec = NULL; struct iupdate { char *table; int beforec; char **before; int afterc; char **after; char *service; }; void next_incremental(void); char **copy_argv(char **argv, int argc); void free_argv(char **argv, int argc); int table_num(char *table); void incremental_init(void) { int i; if (!incremental_sq) incremental_sq = sq_create(); if (!incremental_exec) incremental_exec = sq_create(); for (i = 0; i < MAXARGC; i++) { before[i] = xmalloc(MAX_FIELD_WIDTH); after[i] = xmalloc(MAX_FIELD_WIDTH); } } /* record the state of a table row before it is changed */ void incremental_before(enum tables table, char *qual, char **argv) { EXEC SQL BEGIN DECLARE SECTION; int id; EXEC SQL END DECLARE SECTION; char *name, *name2; beforetable = table; switch (table) { case USERS_TABLE: sprintf(stmt_buf, "SELECT u.login, u.unix_uid, u.shell, " "u.winconsoleshell, u.last, u.first, u.middle, u.status, " "u.clearid, u.type, u.users_id, u.winhomedir, u.winprofiledir, " "u.potype FROM users u WHERE %s", qual); dosql(before); beforec = 14; break; case MACHINE_TABLE: sprintf(stmt_buf, "SELECT m.name, m.mach_id, m.vendor, m.model, m.os, m.location, " "m.contact, m.billing_contact, m.account_number, m.status, m.address," "m.owner_type, m.owner_id, m.acomment, m.ocomment, m.snet_id FROM machine m " "WHERE %s", qual); dosql(before); beforec = 16; name = xmalloc(0); id = atoi(before[12]); if (!strncmp(before[11], "USER", 4)) { id_to_name(id, USERS_TABLE, &name); strcpy(before[12], name); } else if (!strncmp(before[11], "LIST", 4)) { id_to_name(id, LIST_TABLE, &name); strcpy(before[12], name); } else if (!strncmp(before[11], "KERBEROS", 8)) { id_to_name(id, STRINGS_TABLE, &name); strcpy(before[12], name); } id = atoi(before[13]); id_to_name(id, STRINGS_TABLE, &name); strcpy(before[13], name); id = atoi(before[14]); id_to_name(id, STRINGS_TABLE, &name); strcpy(before[14], name); id = atoi(before[15]); id_to_name(id, SUBNET_TABLE, &name); strcpy(before[15], name); break; case HOSTALIAS_TABLE: strcpy(before[0], argv[0]); strcpy(before[1], argv[1]); beforec = 2; break; case HWADDRMAP_TABLE: strcpy(before[0], argv[0]); strcpy(before[1], argv[1]); beforec = 2; break; case CLUSTERS_TABLE: sprintf(stmt_buf, "SELECT c.name, c.description, c.location, " "c.clu_id FROM clusters c WHERE %s", qual); dosql(before); beforec = 4; break; case CONTAINERS_TABLE: sprintf(stmt_buf, "SELECT c.name, c.description, c.location, c.contact, " "c.acl_type, c.acl_id, c.cnt_id, c.list_id FROM containers c " "WHERE %s", qual); dosql(before); beforec = 8; name = xmalloc(0); id = atoi(before[5]); if (!strncmp(before[4], "USER", 4)) { id_to_name(id, USERS_TABLE, &name); strcpy(before[5], name); } else if (!strncmp(before[4], "LIST", 4)) { id_to_name(id, LIST_TABLE, &name); strcpy(before[5], name); } else if (!strncmp(before[4], "KERBEROS", 8)) { id_to_name(id, STRINGS_TABLE, &name); strcpy(before[5], name); } id = atoi(before[7]); id_to_name(id, LIST_TABLE, &name); strcpy(before[7], name); break; case MCMAP_TABLE: strcpy(before[0], argv[0]); strcpy(before[1], argv[1]); beforec = 2; break; case MCNTMAP_TABLE: strcpy(before[0], argv[0]); strcpy(before[1], argv[1]); name_to_id(before[0], MACHINE_TABLE, &id); sprintf(before[2], "%d", id); name_to_id(before[1], CONTAINERS_TABLE, &id); sprintf(before[3], "%d", id); name = xmalloc(0); EXEC SQL SELECT list_id INTO :before[4] FROM containers WHERE cnt_id = :id; id = atoi(before[4]); id_to_name(id, LIST_TABLE, &name); strcpy(before[4], name); beforec = 5; break; case SVC_TABLE: strcpy(before[0], argv[0]); strcpy(before[1], argv[1]); strcpy(before[2], argv[2]); beforec = 3; break; case FILESYS_TABLE: sprintf(stmt_buf, "SELECT fs.label, fs.type, fs.mach_id, fs.name, " "fs.mount, fs.rwaccess, fs.comments, fs.owner, fs.owners, " "fs.createflg, fs.lockertype, fs.filsys_id FROM filesys fs " "WHERE %s", qual); dosql(before); name = xmalloc(0); id = atoi(before[2]); id_to_name(id, MACHINE_TABLE, &name); strcpy(before[2], name); id = atoi(before[7]); id_to_name(id, USERS_TABLE, &name); strcpy(before[7], name); id = atoi(before[8]); id_to_name(id, LIST_TABLE, &name); strcpy(before[8], name); free(name); beforec = 12; break; case QUOTA_TABLE: strcpy(before[0], "?"); strcpy(before[1], argv[1]); strcpy(before[2], "?"); sprintf(stmt_buf, "SELECT q.quota, fs.name FROM quota q, filesys fs " "WHERE %s AND fs.filsys_id = q.filsys_id", qual); dosql(&(before[3])); strcpy(before[2], argv[1]); beforec = 5; break; case LIST_TABLE: sprintf(stmt_buf, "SELECT l.name, l.active, l.publicflg, l.hidden, " "l.maillist, l.grouplist, l.gid, l.acl_type, l.acl_id, " "l.description, l.list_id, l.nfsgroup FROM list l WHERE %s", qual); dosql(before); beforec = 12; break; case IMEMBERS_TABLE: id = (int)(long)argv[0]; sprintf(stmt_buf, "SELECT active, publicflg, hidden, maillist, " "grouplist, gid, nfsgroup FROM list WHERE list_id = %d", id); dosql(&(before[3])); name = xmalloc(0); id_to_name(id, LIST_TABLE, &name); name2 = xmalloc(0); strcpy(before[0], name); strcpy(before[1], argv[1]); id = (int)(long)argv[2]; beforec = 11; if (!strcmp(before[1], "USER")) { id_to_name(id, USERS_TABLE, &name2); EXEC SQL SELECT status, users_id INTO :before[10], :before[12] FROM users WHERE users_id = :id; EXEC SQL SELECT list_id INTO :before[11] FROM list WHERE name = :name; beforec = 13; } else if (!strcmp(before[1], "LIST")) { id_to_name(id, LIST_TABLE, &name2); EXEC SQL SELECT list_id INTO :before[10] FROM list WHERE name = :name; sprintf(before[11], "%d", id); beforec = 12; } else if (!strcmp(before[1], "STRING") || !strcmp(before[1], "KERBEROS")) { id_to_name(id, STRINGS_TABLE, &name2); EXEC SQL SELECT list_id INTO :before[10] FROM list WHERE name = :name; } else if (!strcmp(before[1], "MACHINE")) { id_to_name(id, MACHINE_TABLE, &name2); EXEC SQL SELECT list_id INTO :before[10] FROM list WHERE name = :name; sprintf(before[11], "%d", id); beforec = 12; } strcpy(before[2], name2); free(name); free(name2); break; default: /* com_err(whoami, 0, "requested incremental on unexpected table `%s'", table_name[table]); */ break; } } void incremental_clear_before(void) { beforec = 0; } /* add an element to the incremental queue for the changed row */ void incremental_after(enum tables table, char *qual, char **argv) { char *name, *name2; EXEC SQL BEGIN DECLARE SECTION; int id; EXEC SQL END DECLARE SECTION; struct iupdate *iu; switch (table) { case USERS_TABLE: sprintf(stmt_buf, "SELECT u.login, u.unix_uid, u.shell, " "u.winconsoleshell, u.last, u.first, u.middle, u.status, " "u.clearid, u.type, u.users_id, u.winhomedir, u.winprofiledir, " "u.potype FROM users u WHERE %s", qual); dosql(after); afterc = 14; break; case MACHINE_TABLE: sprintf(stmt_buf, "SELECT m.name, m.mach_id, m.vendor, m.model, m.os, m.location, " "m.contact, m.billing_contact, m.account_number, m.status, m.address," "m.owner_type, m.owner_id, m.acomment, m.ocomment, m.snet_id FROM machine m " "WHERE %s", qual); dosql(after); afterc = 16; name = xmalloc(0); id = atoi(after[12]); if (!strncmp(after[11], "USER", 4)) { id_to_name(id, USERS_TABLE, &name); strcpy(after[12], name); } else if (!strncmp(after[11], "LIST", 4)) { id_to_name(id, LIST_TABLE, &name); strcpy(after[12], name); } else if (!strncmp(after[11], "KERBEROS", 8)) { id_to_name(id, STRINGS_TABLE, &name); strcpy(after[12], name); } id = atoi(after[13]); id_to_name(id, STRINGS_TABLE, &name); strcpy(after[13], name); id = atoi(after[14]); id_to_name(id, STRINGS_TABLE, &name); strcpy(after[14], name); id = atoi(after[15]); id_to_name(id, SUBNET_TABLE, &name); strcpy(after[15], name); break; case HOSTALIAS_TABLE: strcpy(after[0], argv[0]); strcpy(after[1], argv[1]); afterc = 2; break; case HWADDRMAP_TABLE: strcpy(after[0], argv[0]); strcpy(after[1], argv[1]); afterc = 2; break; case CLUSTERS_TABLE: sprintf(stmt_buf, "SELECT c.name, c.description, c.location, " "c.clu_id FROM clusters c WHERE %s", qual); dosql(after); afterc = 4; break; case CONTAINERS_TABLE: sprintf(stmt_buf, "SELECT c.name, c.description, c.location, c.contact, " "c.acl_type, c.acl_id, c.cnt_id, c.list_id FROM containers c " "WHERE %s", qual); dosql(after); afterc = 8; name = xmalloc(0); id = atoi(after[5]); if (!strncmp(after[4], "USER", 4)) { id_to_name(id, USERS_TABLE, &name); strcpy(after[5], name); } else if (!strncmp(after[4], "LIST", 4)) { id_to_name(id, LIST_TABLE, &name); strcpy(after[5], name); } else if (!strncmp(after[4], "KERBEROS", 8)) { id_to_name(id, STRINGS_TABLE, &name); strcpy(after[5], name); } id = atoi(after[7]); id_to_name(id, LIST_TABLE, &name); strcpy(after[7], name); break; case MCMAP_TABLE: strcpy(after[0], argv[0]); strcpy(after[1], argv[1]); afterc = 2; break; case MCNTMAP_TABLE: strcpy(after[0], argv[0]); strcpy(after[1], argv[1]); name_to_id(after[0], MACHINE_TABLE, &id); sprintf(after[2], "%d", id); name_to_id(after[1], CONTAINERS_TABLE, &id); sprintf(after[3], "%d", id); name = xmalloc(0); EXEC SQL SELECT list_id INTO :after[4] FROM containers WHERE cnt_id = :id; id = atoi(after[4]); id_to_name(id, LIST_TABLE, &name); strcpy(after[4], name); afterc = 5; break; case SVC_TABLE: strcpy(after[0], argv[0]); strcpy(after[1], argv[1]); strcpy(after[2], argv[2]); afterc = 3; break; case FILESYS_TABLE: sprintf(stmt_buf, "SELECT fs.label, fs.type, fs.mach_id, fs.name, " "fs.mount, fs.rwaccess, fs.comments, fs.owner, fs.owners, " "fs.createflg, fs.lockertype, fs.filsys_id FROM filesys fs " "WHERE %s", qual); dosql(after); name = xmalloc(0); id = atoi(after[2]); id_to_name(id, MACHINE_TABLE, &name); strcpy(after[2], name); id = atoi(after[7]); id_to_name(id, USERS_TABLE, &name); strcpy(after[7], name); id = atoi(after[8]); id_to_name(id, LIST_TABLE, &name); strcpy(after[8], name); free(name); afterc = 12; break; case QUOTA_TABLE: strcpy(after[0], "?"); strcpy(after[1], argv[1]); strcpy(after[2], "?"); sprintf(stmt_buf, "SELECT q.quota, fs.name FROM quota q, filesys fs " "WHERE %s and fs.filsys_id = q.filsys_id and q.type = '%s'", qual, argv[1]); dosql(&(after[3])); afterc = 5; break; case LIST_TABLE: sprintf(stmt_buf, "SELECT l.name, l.active, l.publicflg, l.hidden, " "l.maillist, l.grouplist, l.gid, l.acl_type, l.acl_id, " "l.description, l.list_id, l.nfsgroup FROM list l WHERE %s", qual); dosql(after); afterc = 12; break; case IMEMBERS_TABLE: id = (int)(long)argv[0]; sprintf(stmt_buf, "SELECT active, publicflg, hidden, maillist, " "grouplist, gid, nfsgroup FROM list WHERE list_id = %d", id); dosql(&(after[3])); name = xmalloc(0); id_to_name(id, LIST_TABLE, &name); name2 = xmalloc(0); strcpy(after[0], name); strcpy(after[1], argv[1]); id = (int)(long)argv[2]; afterc = 11; if (!strcmp(after[1], "USER")) { id_to_name(id, USERS_TABLE, &name2); EXEC SQL SELECT status, users_id INTO :after[10], :after[12] FROM users WHERE users_id = :id; EXEC SQL SELECT list_id INTO :after[11] FROM list WHERE name = :name; afterc = 13; } else if (!strcmp(after[1], "LIST")) { id_to_name(id, LIST_TABLE, &name2); EXEC SQL SELECT list_id INTO :after[10] FROM list WHERE name = :name; sprintf(after[11], "%d", id); afterc = 12; } else if (!strcmp(after[1], "STRING") || !strcmp(after[1], "KERBEROS")) { id_to_name(id, STRINGS_TABLE, &name2); EXEC SQL SELECT list_id INTO :after[10] FROM list WHERE name = :name; } else if (!strcmp(after[1], "MACHINE")) { id_to_name(id, MACHINE_TABLE, &name2); EXEC SQL SELECT list_id INTO :after[10] FROM list WHERE name = :name; sprintf(after[11], "%d", id); afterc = 12; } strcpy(after[2], name2); free(name); free(name2); break; case NO_TABLE: afterc = 0; table = beforetable; break; default: /* com_err(whoami, 0, "requested incremental on unexpected table `%s'", table_name[table]); */ break; } iu = xmalloc(sizeof(struct iupdate)); iu->table = table_name[table]; iu->beforec = beforec; iu->before = copy_argv(before, beforec); iu->afterc = afterc; iu->after = copy_argv(after, afterc); sq_save_data(incremental_sq, iu); } void incremental_clear_after(void) { incremental_after(NO_TABLE, NULL, NULL); } /* Called when the current transaction is committed to start any queued * incremental updates. This caches the update table the first time it * is called. */ struct inc_cache { struct inc_cache *next; char *table, *service; }; void incremental_update(void) { static int inited = 0; static struct inc_cache *cache; struct inc_cache *c; EXEC SQL BEGIN DECLARE SECTION; char tab[INCREMENTAL_TABLE_NAME_SIZE], serv[INCREMENTAL_SERVICE_SIZE]; EXEC SQL END DECLARE SECTION; struct iupdate *iu, *iu_save; if (!inited) { inited++; EXEC SQL DECLARE inc CURSOR FOR SELECT table_name, service FROM incremental; EXEC SQL OPEN inc; while (1) { EXEC SQL FETCH inc INTO :tab, :serv; if (sqlca.sqlcode) break; c = xmalloc(sizeof(struct inc_cache)); c->next = cache; c->table = xstrdup(strtrim(tab)); c->service = xstrdup(strtrim(serv)); cache = c; } EXEC SQL CLOSE inc; EXEC SQL COMMIT WORK; } while (sq_remove_data(incremental_sq, &iu)) { for (c = cache; c; c = c->next) { if (!strcmp(c->table, iu->table)) { iu->service = c->service; iu_save = xmalloc(sizeof(struct iupdate)); iu_save->service = iu->service; iu_save->table = iu->table; iu_save->beforec = iu->beforec; iu_save->afterc = iu->afterc; iu_save->before = copy_argv(iu->before, iu->beforec); iu_save->after = copy_argv(iu->after, iu->afterc); sq_save_data(incremental_exec, iu_save); } } if (!c) { free_argv(iu->before, iu->beforec); free_argv(iu->after, iu->afterc); free(iu); } } if (inc_running == 0) next_incremental(); } /* Pro*C 2.2.4 can't cope with the sigset_t below, at least in Solaris 2.6. We add DEFINE=_PROC_ to the proc invocation and then #ifndef that around this function so proc will pass it through without reading it. */ #ifndef _PROC_ void next_incremental(void) { struct iupdate *iu; char *argv[MAXARGC * 2 + 4], cafter[3], cbefore[3], prog[MAXPATHLEN]; int i; sigset_t sigs; if (!incremental_exec) incremental_init(); if (sq_empty(incremental_exec) || (inc_running && now - inc_started < INC_TIMEOUT)) return; if (inc_running) critical_alert(whoami, "moirad", "incremental timeout on pid %d", inc_pid); sq_remove_data(incremental_exec, &iu); argv[1] = iu->table; sprintf(cbefore, "%d", iu->beforec); argv[2] = cbefore; sprintf(cafter, "%d", iu->afterc); argv[3] = cafter; for (i = 0; i < iu->beforec; i++) argv[4 + i] = iu->before[i]; for (i = 0; i < iu->afterc; i++) argv[4 + iu->beforec + i] = iu->after[i]; sprintf(prog, "%s/%s.incr", BIN_DIR, iu->service); argv[0] = prog; argv[4 + iu->beforec + iu->afterc] = 0; sigemptyset(&sigs); sigaddset(&sigs, SIGCHLD); sigprocmask(SIG_BLOCK, &sigs, NULL); inc_pid = vfork(); switch (inc_pid) { case 0: execv(prog, argv); _exit(1); case -1: critical_alert(whoami, "moirad", "Failed to start incremental update %s", prog); break; default: inc_running = 1; inc_started = now; } sigprocmask(SIG_UNBLOCK, &sigs, NULL); free_argv(iu->before, iu->beforec); free_argv(iu->after, iu->afterc); free(iu); } #endif /* Called when the current transaction is aborted to throw away any queued * incremental updates */ void incremental_flush(void) { struct iupdate *iu; while (sq_get_data(incremental_sq, &iu)) { free_argv(iu->before, iu->beforec); free_argv(iu->after, iu->afterc); free(iu); } sq_destroy(incremental_sq); incremental_sq = sq_create(); } char **copy_argv(char **argv, int argc) { char **ret = xmalloc(sizeof(char *) * argc); while (--argc >= 0) ret[argc] = xstrdup(strtrim(argv[argc])); return ret; } void free_argv(char **argv, int argc) { while (--argc >= 0) free(argv[argc]); free(argv); } int table_num(char *name) { int i; for (i = num_tables - 1; i; i--) { if (!strcmp(table_name[i], name)) break; } return i; /* 0 = "none" if no match */ }