source: trunk/third/rpm/db/rep/rep_util.c @ 19079

Revision 19079, 19.6 KB checked in by ghudson, 22 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r19078, which included commits to RCS files with non-trunk default branches.
Line 
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001-2002
5 *      Sleepycat Software.  All rights reserved.
6 */
7
8#include "db_config.h"
9
10#ifndef lint
11static const char revid[] = "Id: rep_util.c,v 1.51 2002/09/05 02:30:00 margo Exp ";
12#endif /* not lint */
13
14#ifndef NO_SYSTEM_INCLUDES
15#include <stdlib.h>
16#include <string.h>
17#endif
18
19#include "db_int.h"
20#include "dbinc/db_page.h"
21#include "dbinc/btree.h"
22#include "dbinc/fop.h"
23#include "dbinc/hash.h"
24#include "dbinc/log.h"
25#include "dbinc/qam.h"
26#include "dbinc/rep.h"
27#include "dbinc/txn.h"
28
29/*
30 * rep_util.c:
31 *      Miscellaneous replication-related utility functions, including
32 *      those called by other subsystems.
33 */
34static int __rep_cmp_bylsn __P((const void *, const void *));
35static int __rep_cmp_bypage __P((const void *, const void *));
36
37#ifdef REP_DIAGNOSTIC
38static void __rep_print_logmsg __P((DB_ENV *, const DBT *, DB_LSN *));
39#endif
40
41/*
42 * __rep_check_alloc --
43 *      Make sure the array of TXN_REC entries is of at least size n.
44 *      (This function is called by the __*_getpgnos() functions in
45 *      *.src.)
46 *
47 * PUBLIC: int __rep_check_alloc __P((DB_ENV *, TXN_RECS *, int));
48 */
49int
50__rep_check_alloc(dbenv, r, n)
51        DB_ENV *dbenv;
52        TXN_RECS *r;
53        int n;
54{
55        int nalloc, ret;
56
57        while (r->nalloc < r->npages + n) {
58                nalloc = r->nalloc == 0 ? 20 : r->nalloc * 2;
59
60                if ((ret = __os_realloc(dbenv, nalloc * sizeof(LSN_PAGE),
61                    &r->array)) != 0)
62                        return (ret);
63
64                r->nalloc = nalloc;
65        }
66
67        return (0);
68}
69
70/*
71 * __rep_send_message --
72 *      This is a wrapper for sending a message.  It takes care of constructing
73 * the REP_CONTROL structure and calling the user's specified send function.
74 *
75 * PUBLIC: int __rep_send_message __P((DB_ENV *, int,
76 * PUBLIC:     u_int32_t, DB_LSN *, const DBT *, u_int32_t));
77 */
78int
79__rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags)
80        DB_ENV *dbenv;
81        int eid;
82        u_int32_t rtype;
83        DB_LSN *lsnp;
84        const DBT *dbtp;
85        u_int32_t flags;
86{
87        DB_REP *db_rep;
88        REP *rep;
89        DBT cdbt, scrap_dbt;
90        REP_CONTROL cntrl;
91        u_int32_t send_flags;
92        int ret;
93
94        db_rep = dbenv->rep_handle;
95        rep = db_rep->region;
96
97        /* Set up control structure. */
98        memset(&cntrl, 0, sizeof(cntrl));
99        if (lsnp == NULL)
100                ZERO_LSN(cntrl.lsn);
101        else
102                cntrl.lsn = *lsnp;
103        cntrl.rectype = rtype;
104        cntrl.flags = flags;
105        cntrl.rep_version = DB_REPVERSION;
106        cntrl.log_version = DB_LOGVERSION;
107        MUTEX_LOCK(dbenv, db_rep->mutexp);
108        cntrl.gen = rep->gen;
109        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
110
111        memset(&cdbt, 0, sizeof(cdbt));
112        cdbt.data = &cntrl;
113        cdbt.size = sizeof(cntrl);
114
115        /* Don't assume the send function will be tolerant of NULL records. */
116        if (dbtp == NULL) {
117                memset(&scrap_dbt, 0, sizeof(DBT));
118                dbtp = &scrap_dbt;
119        }
120
121        send_flags = (LF_ISSET(DB_PERMANENT) ? DB_REP_PERMANENT : 0);
122
123#if 0
124        __rep_print_message(dbenv, eid, &cntrl, "rep_send_message");
125#endif
126#ifdef REP_DIAGNOSTIC
127        if (rtype == REP_LOG)
128                __rep_print_logmsg(dbenv, dbtp, lsnp);
129#endif
130        ret = db_rep->rep_send(dbenv, &cdbt, dbtp, eid, send_flags);
131
132        /*
133         * We don't hold the rep lock, so this could miscount if we race.
134         * I don't think it's worth grabbing the mutex for that bit of
135         * extra accuracy.
136         */
137        if (ret == 0)
138                rep->stat.st_msgs_sent++;
139        else
140                rep->stat.st_msgs_send_failures++;
141
142        return (ret);
143}
144
145#ifdef REP_DIAGNOSTIC
146
147/*
148 * __rep_print_logmsg --
149 *      This is a debugging routine for printing out log records that
150 * we are about to transmit to a client.
151 */
152
153static void
154__rep_print_logmsg(dbenv, logdbt, lsnp)
155        DB_ENV *dbenv;
156        const DBT *logdbt;
157        DB_LSN *lsnp;
158{
159        /* Static structures to hold the printing functions. */
160        static int (**ptab)__P((DB_ENV *,
161            DBT *, DB_LSN *, db_recops, void *)) = NULL;
162        size_t ptabsize = 0;
163
164        if (ptabsize == 0) {
165                /* Initialize the table. */
166                (void)__bam_init_print(dbenv, &ptab, &ptabsize);
167                (void)__crdel_init_print(dbenv, &ptab, &ptabsize);
168                (void)__db_init_print(dbenv, &ptab, &ptabsize);
169                (void)__dbreg_init_print(dbenv, &ptab, &ptabsize);
170                (void)__fop_init_print(dbenv, &ptab, &ptabsize);
171                (void)__qam_init_print(dbenv, &ptab, &ptabsize);
172                (void)__ham_init_print(dbenv, &ptab, &ptabsize);
173                (void)__txn_init_print(dbenv, &ptab, &ptabsize);
174        }
175
176        (void)__db_dispatch(dbenv,
177            ptab, ptabsize, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
178}
179
180#endif
181/*
182 * __rep_new_master --
183 *      Called after a master election to sync back up with a new master.
184 * It's possible that we already know of this new master in which case
185 * we don't need to do anything.
186 *
187 * This is written assuming that this message came from the master; we
188 * need to enforce that in __rep_process_record, but right now, we have
189 * no way to identify the master.
190 *
191 * PUBLIC: int __rep_new_master __P((DB_ENV *, REP_CONTROL *, int));
192 */
193int
194__rep_new_master(dbenv, cntrl, eid)
195        DB_ENV *dbenv;
196        REP_CONTROL *cntrl;
197        int eid;
198{
199        DB_LOG *dblp;
200        DB_LOGC *logc;
201        DB_LSN last_lsn, lsn;
202        DB_REP *db_rep;
203        DBT dbt;
204        LOG *lp;
205        REP *rep;
206        int change, ret, t_ret;
207
208        db_rep = dbenv->rep_handle;
209        rep = db_rep->region;
210        MUTEX_LOCK(dbenv, db_rep->mutexp);
211        ELECTION_DONE(rep);
212        change = rep->gen != cntrl->gen || rep->master_id != eid;
213        if (change) {
214                rep->gen = cntrl->gen;
215                rep->master_id = eid;
216                F_SET(rep, REP_F_RECOVER);
217                rep->stat.st_master_changes++;
218        }
219        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
220
221        if (!change)
222                return (0);
223
224        /*
225         * If the master changed, we need to start the process of
226         * figuring out what our last valid log record is.  However,
227         * if both the master and we agree that the max LSN is 0,0,
228         * then there is no recovery to be done.  If we are at 0 and
229         * the master is not, then we just need to request all the log
230         * records from the master.
231         */
232        dblp = dbenv->lg_handle;
233        lp = dblp->reginfo.primary;
234        R_LOCK(dbenv, &dblp->reginfo);
235        last_lsn = lsn = lp->lsn;
236        if (last_lsn.offset > sizeof(LOGP))
237                last_lsn.offset -= lp->len;
238        R_UNLOCK(dbenv, &dblp->reginfo);
239        if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
240empty:          MUTEX_LOCK(dbenv, db_rep->mutexp);
241                F_CLR(rep, REP_F_RECOVER);
242                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
243
244                if (IS_INIT_LSN(cntrl->lsn))
245                        ret = 0;
246                else
247                        ret = __rep_send_message(dbenv, rep->master_id,
248                            REP_ALL_REQ, &lsn, NULL, 0);
249
250                if (ret == 0)
251                        ret = DB_REP_NEWMASTER;
252                return (ret);
253        } else if (last_lsn.offset <= sizeof(LOGP)) {
254                /*
255                 * We have just changed log files and need to set lastlsn
256                 * to the last record in the previous log files.
257                 */
258                if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
259                        return (ret);
260                memset(&dbt, 0, sizeof(dbt));
261                ret = logc->get(logc, &last_lsn, &dbt, DB_LAST);
262                if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
263                        ret = t_ret;
264                if (ret == DB_NOTFOUND)
265                        goto empty;
266                if (ret != 0)
267                        return (ret);
268        }
269
270        R_LOCK(dbenv, &dblp->reginfo);
271        lp->verify_lsn = last_lsn;
272        R_UNLOCK(dbenv, &dblp->reginfo);
273        if ((ret = __rep_send_message(dbenv,
274            eid, REP_VERIFY_REQ, &last_lsn, NULL, 0)) != 0)
275                return (ret);
276
277        return (DB_REP_NEWMASTER);
278}
279
280/*
281 * __rep_lockpgno_init
282 *      Create a dispatch table for acquiring locks on each log record.
283 *
284 * PUBLIC: int __rep_lockpgno_init __P((DB_ENV *,
285 * PUBLIC:     int (***)(DB_ENV *, DBT *, DB_LSN *, db_recops, void *),
286 * PUBLIC:     size_t *));
287 */
288int
289__rep_lockpgno_init(dbenv, dtabp, dtabsizep)
290        DB_ENV *dbenv;
291        int (***dtabp)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *));
292        size_t *dtabsizep;
293{
294        int ret;
295
296        /* Initialize dispatch table. */
297        *dtabsizep = 0;
298        *dtabp = NULL;
299        if ((ret = __bam_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
300            (ret = __crdel_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
301            (ret = __db_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
302            (ret = __dbreg_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
303            (ret = __fop_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
304            (ret = __qam_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
305            (ret = __ham_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
306            (ret = __txn_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0)
307                return (ret);
308
309        return (0);
310}
311
312/*
313 * __rep_unlockpages --
314 *      Unlock the pages locked in __rep_lockpages.
315 *
316 * PUBLIC: int __rep_unlockpages __P((DB_ENV *, u_int32_t));
317 */
318int
319__rep_unlockpages(dbenv, lid)
320        DB_ENV *dbenv;
321        u_int32_t lid;
322{
323        DB_LOCKREQ req, *lvp;
324
325        req.op = DB_LOCK_PUT_ALL;
326        return (dbenv->lock_vec(dbenv, lid, 0, &req, 1, &lvp));
327}
328
329/*
330 * __rep_lockpages --
331 *      Called to gather and lock pages in preparation for both
332 *      single transaction apply as well as client synchronization
333 *      with a new master.  A non-NULL key_lsn means that we're locking
334 *      in order to apply a single log record during client recovery
335 *      to the joint LSN.  A non-NULL max_lsn means that we are applying
336 *      a transaction whose commit is at max_lsn.
337 *
338 * PUBLIC: int __rep_lockpages __P((DB_ENV *,
339 * PUBLIC:     int (**)(DB_ENV *, DBT *, DB_LSN *, db_recops, void *),
340 * PUBLIC:     size_t, DB_LSN *, DB_LSN *, TXN_RECS *, u_int32_t));
341 */
342int
343__rep_lockpages(dbenv, dtab, dtabsize, key_lsn, max_lsn, recs, lid)
344        DB_ENV *dbenv;
345        int (**dtab)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *));
346        size_t dtabsize;
347        DB_LSN *key_lsn, *max_lsn;
348        TXN_RECS *recs;
349        u_int32_t lid;
350{
351        DBT data_dbt, lo;
352        DB_LOCK l;
353        DB_LOCKREQ *lvp;
354        DB_LOGC *logc;
355        DB_LSN tmp_lsn;
356        TXN_RECS tmp, *t;
357        db_pgno_t cur_pgno;
358        linfo_t locks;
359        int i, ret, t_ret, unique;
360        u_int32_t cur_fid;
361
362        /*
363         * There are two phases:  First, we have to traverse backwards through
364         * the log records gathering the list of all the pages accessed.  Once
365         * we have this information we can acquire all the locks we need.
366         */
367
368        /* Initialization */
369        memset(&locks, 0, sizeof(locks));
370        ret = 0;
371
372        t = recs != NULL ? recs : &tmp;
373        t->npages = t->nalloc = 0;
374        t->array = NULL;
375
376        /*
377         * We've got to be in one mode or the other; else life will either
378         * be excessively boring or overly exciting.
379         */
380        DB_ASSERT(key_lsn != NULL || max_lsn != NULL);
381        DB_ASSERT(key_lsn == NULL || max_lsn == NULL);
382
383        /*
384         * Phase 1:  Fill in the pgno array.
385         */
386        memset(&data_dbt, 0, sizeof(data_dbt));
387        if (F_ISSET(dbenv, DB_ENV_THREAD))
388                F_SET(&data_dbt, DB_DBT_REALLOC);
389
390        /* Single transaction apply. */
391        if (max_lsn != NULL) {
392                DB_ASSERT(0); /* XXX */
393                /*
394                tmp_lsn = *max_lsn;
395                if ((ret = __rep_apply_thread(dbenv, dtab, dtabsize,
396                    &data_dbt, &tmp_lsn, t)) != 0)
397                        goto err;
398                        */
399        }
400
401        /* In recovery. */
402        if (key_lsn != NULL) {
403                if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
404                        goto err;
405                ret = logc->get(logc, key_lsn, &data_dbt, DB_SET);
406
407                /* Save lsn values, since dispatch functions can change them. */
408                tmp_lsn = *key_lsn;
409                ret = __db_dispatch(dbenv,
410                    dtab, dtabsize, &data_dbt, &tmp_lsn, DB_TXN_GETPGNOS, t);
411
412                if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
413                        ret = t_ret;
414
415                /*
416                 * If ret == DB_DELETED, this record refers to a temporary
417                 * file and there's nothing to apply.
418                 */
419                if (ret == DB_DELETED) {
420                        ret = 0;
421                        goto out;
422                } else if (ret != 0)
423                        goto err;
424        }
425
426        if (t->npages == 0)
427                goto out;
428
429        /* Phase 2: Write lock all the pages. */
430
431        /* Sort the entries in the array by page number. */
432        qsort(t->array, t->npages, sizeof(LSN_PAGE), __rep_cmp_bypage);
433
434        /* Count the number of unique pages. */
435        cur_fid = DB_LOGFILEID_INVALID;
436        cur_pgno = PGNO_INVALID;
437        unique = 0;
438        for (i = 0; i < t->npages; i++) {
439                if (F_ISSET(&t->array[i], LSN_PAGE_NOLOCK))
440                        continue;
441                if (t->array[i].pgdesc.pgno != cur_pgno ||
442                    t->array[i].fid != cur_fid) {
443                        cur_pgno = t->array[i].pgdesc.pgno;
444                        cur_fid = t->array[i].fid;
445                        unique++;
446                }
447        }
448
449        if (unique == 0)
450                goto out;
451
452        /* Handle single lock case specially, else allocate space for locks. */
453        if (unique == 1) {
454                memset(&lo, 0, sizeof(lo));
455                lo.data = &t->array[0].pgdesc;
456                lo.size = sizeof(t->array[0].pgdesc);
457                ret = dbenv->lock_get(dbenv, lid, 0, &lo, DB_LOCK_WRITE, &l);
458                goto out2;
459        }
460
461        /* Multi-lock case. */
462        locks.n = unique;
463        if ((ret = __os_calloc(dbenv,
464            unique, sizeof(DB_LOCKREQ), &locks.reqs)) != 0)
465                goto err;
466        if ((ret = __os_calloc(dbenv, unique, sizeof(DBT), &locks.objs)) != 0)
467                goto err;
468
469        unique = 0;
470        cur_fid = DB_LOGFILEID_INVALID;
471        cur_pgno = PGNO_INVALID;
472        for (i = 0; i < t->npages; i++) {
473                if (F_ISSET(&t->array[i], LSN_PAGE_NOLOCK))
474                        continue;
475                if (t->array[i].pgdesc.pgno != cur_pgno ||
476                    t->array[i].fid != cur_fid) {
477                        cur_pgno = t->array[i].pgdesc.pgno;
478                        cur_fid = t->array[i].fid;
479                        locks.reqs[unique].op = DB_LOCK_GET;
480                        locks.reqs[unique].mode = DB_LOCK_WRITE;
481                        locks.reqs[unique].obj = &locks.objs[unique];
482                        locks.objs[unique].data = &t->array[i].pgdesc;
483                        locks.objs[unique].size = sizeof(t->array[i].pgdesc);
484                        unique++;
485                }
486        }
487
488        /* Finally, get the locks. */
489        if ((ret =
490            dbenv->lock_vec(dbenv, lid, 0, locks.reqs, unique, &lvp)) != 0) {
491                /*
492                 * If we were unsuccessful, unlock any locks we acquired before
493                 * the error and return the original error value.
494                 */
495                (void)__rep_unlockpages(dbenv, lid);
496        }
497
498err:
499out:    if (locks.objs != NULL)
500                __os_free(dbenv, locks.objs);
501        if (locks.reqs != NULL)
502                __os_free(dbenv, locks.reqs);
503
504        /*
505         * Before we return, sort by LSN so that we apply records in the
506         * right order.
507         */
508        qsort(t->array, t->npages, sizeof(LSN_PAGE), __rep_cmp_bylsn);
509
510out2:   if ((ret != 0 || recs == NULL) && t->nalloc != 0) {
511                __os_free(dbenv, t->array);
512                t->array = NULL;
513                t->npages = t->nalloc = 0;
514        }
515
516        if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
517                __os_ufree(dbenv, data_dbt.data);
518
519        return (ret);
520}
521
522/*
523 * __rep_cmp_bypage and __rep_cmp_bylsn --
524 *      Sort functions for qsort.  "bypage" sorts first by page numbers and
525 *      then by the LSN.  "bylsn" sorts first by the LSN, then by page numbers.
526 */
527static int
528__rep_cmp_bypage(a, b)
529        const void *a, *b;
530{
531        LSN_PAGE *ap, *bp;
532
533        ap = (LSN_PAGE *)a;
534        bp = (LSN_PAGE *)b;
535
536        if (ap->fid < bp->fid)
537                return (-1);
538
539        if (ap->fid > bp->fid)
540                return (1);
541
542        if (ap->pgdesc.pgno < bp->pgdesc.pgno)
543                return (-1);
544
545        if (ap->pgdesc.pgno > bp->pgdesc.pgno)
546                return (1);
547
548        if (ap->lsn.file < bp->lsn.file)
549                return (-1);
550
551        if (ap->lsn.file > bp->lsn.file)
552                return (1);
553
554        if (ap->lsn.offset < bp->lsn.offset)
555                return (-1);
556
557        if (ap->lsn.offset > bp->lsn.offset)
558                return (1);
559
560        return (0);
561}
562
563static int
564__rep_cmp_bylsn(a, b)
565        const void *a, *b;
566{
567        LSN_PAGE *ap, *bp;
568
569        ap = (LSN_PAGE *)a;
570        bp = (LSN_PAGE *)b;
571
572        if (ap->lsn.file < bp->lsn.file)
573                return (-1);
574
575        if (ap->lsn.file > bp->lsn.file)
576                return (1);
577
578        if (ap->lsn.offset < bp->lsn.offset)
579                return (-1);
580
581        if (ap->lsn.offset > bp->lsn.offset)
582                return (1);
583
584        if (ap->fid < bp->fid)
585                return (-1);
586
587        if (ap->fid > bp->fid)
588                return (1);
589
590        if (ap->pgdesc.pgno < bp->pgdesc.pgno)
591                return (-1);
592
593        if (ap->pgdesc.pgno > bp->pgdesc.pgno)
594                return (1);
595
596        return (0);
597}
598
599/*
600 * __rep_is_client
601 *      Used by other subsystems to figure out if this is a replication
602 * client sites.
603 *
604 * PUBLIC: int __rep_is_client __P((DB_ENV *));
605 */
606int
607__rep_is_client(dbenv)
608        DB_ENV *dbenv;
609{
610        DB_REP *db_rep;
611        REP *rep;
612        int ret;
613
614        if ((db_rep = dbenv->rep_handle) == NULL)
615                return (0);
616        rep = db_rep->region;
617
618        MUTEX_LOCK(dbenv, db_rep->mutexp);
619        ret = F_ISSET(rep, REP_F_UPGRADE | REP_F_LOGSONLY);
620        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
621        return (ret);
622}
623
624/*
625 * __rep_send_vote
626 *      Send this site's vote for the election.
627 *
628 * PUBLIC: int __rep_send_vote __P((DB_ENV *, DB_LSN *, int, int, int));
629 */
630int
631__rep_send_vote(dbenv, lsnp, nsites, pri, tiebreaker)
632        DB_ENV *dbenv;
633        DB_LSN *lsnp;
634        int nsites, pri, tiebreaker;
635{
636        DBT vote_dbt;
637        REP_VOTE_INFO vi;
638
639        memset(&vi, 0, sizeof(vi));
640
641        vi.priority = pri;
642        vi.nsites = nsites;
643        vi.tiebreaker = tiebreaker;
644
645        memset(&vote_dbt, 0, sizeof(vote_dbt));
646        vote_dbt.data = &vi;
647        vote_dbt.size = sizeof(vi);
648
649        return (__rep_send_message(dbenv,
650            DB_EID_BROADCAST, REP_VOTE1, lsnp, &vote_dbt, 0));
651}
652
653/*
654 * __rep_grow_sites --
655 *      Called to allocate more space in the election tally information.
656 * Called with the rep mutex held.  We need to call the region mutex, so
657 * we need to make sure that we *never* acquire those mutexes in the
658 * opposite order.
659 *
660 * PUBLIC: int __rep_grow_sites __P((DB_ENV *dbenv, int nsites));
661 */
662int
663__rep_grow_sites(dbenv, nsites)
664        DB_ENV *dbenv;
665        int nsites;
666{
667        REGENV *renv;
668        REGINFO *infop;
669        REP *rep;
670        int nalloc, ret, *tally;
671
672        rep = ((DB_REP *)dbenv->rep_handle)->region;
673
674        /*
675         * Allocate either twice the current allocation or nsites,
676         * whichever is more.
677         */
678
679        nalloc = 2 * rep->asites;
680        if (nalloc < nsites)
681                nalloc = nsites;
682
683        infop = dbenv->reginfo;
684        renv = infop->primary;
685        MUTEX_LOCK(dbenv, &renv->mutex);
686        if ((ret = __db_shalloc(infop->addr,
687            sizeof(nalloc * sizeof(int)), sizeof(int), &tally)) == 0) {
688                if (rep->tally_off != INVALID_ROFF)
689                         __db_shalloc_free(infop->addr,
690                            R_ADDR(infop, rep->tally_off));
691                rep->asites = nalloc;
692                rep->nsites = nsites;
693                rep->tally_off = R_OFFSET(infop, tally);
694        }
695        MUTEX_UNLOCK(dbenv, &renv->mutex);
696        return (ret);
697}
698
699#ifdef NOTYET
700static int __rep_send_file __P((DB_ENV *, DBT *, u_int32_t));
701/*
702 * __rep_send_file --
703 *      Send an entire file, one block at a time.
704 */
705static int
706__rep_send_file(dbenv, rec, eid)
707        DB_ENV *dbenv;
708        DBT *rec;
709        u_int32_t eid;
710{
711        DB *dbp;
712        DB_LOCK lk;
713        DB_MPOOLFILE *mpf;
714        DBC *dbc;
715        DBT rec_dbt;
716        PAGE *pagep;
717        db_pgno_t last_pgno, pgno;
718        int ret, t_ret;
719
720        dbp = NULL;
721        dbc = NULL;
722        pagep = NULL;
723        mpf = NULL;
724        LOCK_INIT(lk);
725
726        if ((ret = db_create(&dbp, dbenv, 0)) != 0)
727                goto err;
728
729        if ((ret = dbp->open(dbp, rec->data, NULL, DB_UNKNOWN, 0, 0)) != 0)
730                goto err;
731
732        if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
733                goto err;
734        /*
735         * Force last_pgno to some value that will let us read the meta-dat
736         * page in the following loop.
737         */
738        memset(&rec_dbt, 0, sizeof(rec_dbt));
739        last_pgno = 1;
740        for (pgno = 0; pgno <= last_pgno; pgno++) {
741                if ((ret = __db_lget(dbc, 0, pgno, DB_LOCK_READ, 0, &lk)) != 0)
742                        goto err;
743
744                if ((ret = mpf->get(mpf, &pgno, 0, &pagep)) != 0)
745                        goto err;
746
747                if (pgno == 0)
748                        last_pgno = ((DBMETA *)pagep)->last_pgno;
749
750                rec_dbt.data = pagep;
751                rec_dbt.size = dbp->pgsize;
752                if ((ret = __rep_send_message(dbenv, eid,
753                    REP_FILE, NULL, &rec_dbt, pgno == last_pgno)) != 0)
754                        goto err;
755                ret = mpf->put(mpf, pagep, 0);
756                pagep = NULL;
757                if (ret != 0)
758                        goto err;
759                ret = __LPUT(dbc, lk);
760                LOCK_INIT(lk);
761                if (ret != 0)
762                        goto err;
763        }
764
765err:    if (LOCK_ISSET(lk) && (t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
766                ret = t_ret;
767        if (dbc != NULL && (t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
768                ret = t_ret;
769        if (pagep != NULL && (t_ret = mpf->put(mpf, pagep, 0)) != 0 && ret == 0)
770                ret = t_ret;
771        if (dbp != NULL && (t_ret = dbp->close(dbp, 0)) != 0 && ret == 0)
772                ret = t_ret;
773        return (ret);
774}
775#endif
776
777#if 0
778/*
779 * PUBLIC: void __rep_print_message __P((DB_ENV *, int, REP_CONTROL *, char *));
780 */
781void
782__rep_print_message(dbenv, eid, rp, str)
783        DB_ENV *dbenv;
784        int eid;
785        REP_CONTROL *rp;
786        char *str;
787{
788        char *type;
789        switch (rp->rectype) {
790        case REP_ALIVE:
791                type = "alive";
792                break;
793        case REP_ALIVE_REQ:
794                type = "alive_req";
795                break;
796        case REP_ALL_REQ:
797                type = "all_req";
798                break;
799        case REP_ELECT:
800                type = "elect";
801                break;
802        case REP_FILE:
803                type = "file";
804                break;
805        case REP_FILE_REQ:
806                type = "file_req";
807                break;
808        case REP_LOG:
809                type = "log";
810                break;
811        case REP_LOG_MORE:
812                type = "log_more";
813                break;
814        case REP_LOG_REQ:
815                type = "log_req";
816                break;
817        case REP_MASTER_REQ:
818                type = "master_req";
819                break;
820        case REP_NEWCLIENT:
821                type = "newclient";
822                break;
823        case REP_NEWFILE:
824                type = "newfile";
825                break;
826        case REP_NEWMASTER:
827                type = "newmaster";
828                break;
829        case REP_NEWSITE:
830                type = "newsite";
831                break;
832        case REP_PAGE:
833                type = "page";
834                break;
835        case REP_PAGE_REQ:
836                type = "page_req";
837                break;
838        case REP_PLIST:
839                type = "plist";
840                break;
841        case REP_PLIST_REQ:
842                type = "plist_req";
843                break;
844        case REP_VERIFY:
845                type = "verify";
846                break;
847        case REP_VERIFY_FAIL:
848                type = "verify_fail";
849                break;
850        case REP_VERIFY_REQ:
851                type = "verify_req";
852                break;
853        case REP_VOTE1:
854                type = "vote1";
855                break;
856        case REP_VOTE2:
857                type = "vote2";
858                break;
859        default:
860                type = "NOTYPE";
861                break;
862        }
863        printf("%s %s: gen = %d eid %d, type %s, LSN [%u][%u]\n",
864            dbenv->db_home, str, rp->gen, eid, type, rp->lsn.file,
865            rp->lsn.offset);
866}
867#endif
Note: See TracBrowser for help on using the repository browser.