source: trunk/third/rpm/db/rep/rep_method.c @ 19079

Revision 19079, 28.6 KB checked in by ghudson, 22 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r19078, which included commits to RCS files with non-trunk default branches.
Line 
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001-2002
5 *      Sleepycat Software.  All rights reserved.
6 */
7#include "db_config.h"
8
9#ifndef lint
10static const char revid[] = "Id: rep_method.c,v 1.78 2002/09/10 12:58:07 bostic Exp ";
11#endif /* not lint */
12
13#ifndef NO_SYSTEM_INCLUDES
14#include <sys/types.h>
15
16#ifdef HAVE_RPC
17#include <rpc/rpc.h>
18#endif
19
20#include <stdlib.h>
21#include <string.h>
22#include <unistd.h>
23#endif
24
25#include "db_int.h"
26#include "dbinc/db_page.h"
27#include "dbinc/db_am.h"
28#include "dbinc/log.h"
29#include "dbinc/rep.h"
30#include "dbinc/txn.h"
31
32#ifdef HAVE_RPC
33#include "dbinc_auto/db_server.h"
34#include "dbinc_auto/rpc_client_ext.h"
35#endif
36
37static int __rep_abort_prepared __P((DB_ENV *));
38static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
39static int __rep_client_dbinit __P((DB_ENV *, int));
40static int __rep_elect __P((DB_ENV *, int, int, u_int32_t, int *));
41static int __rep_elect_init __P((DB_ENV *, DB_LSN *, int, int, int, int *));
42static int __rep_flush __P((DB_ENV *));
43static int __rep_restore_prepared __P((DB_ENV *));
44static int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));
45static int __rep_set_request __P((DB_ENV *, u_int32_t, u_int32_t));
46static int __rep_set_rep_transport __P((DB_ENV *, int,
47    int (*)(DB_ENV *, const DBT *, const DBT *, int, u_int32_t)));
48static int __rep_start __P((DB_ENV *, DBT *, u_int32_t));
49static int __rep_stat __P((DB_ENV *, DB_REP_STAT **, u_int32_t));
50static int __rep_wait __P((DB_ENV *, u_int32_t, int *, u_int32_t));
51
52/*
53 * __rep_dbenv_create --
54 *      Replication-specific initialization of the DB_ENV structure.
55 *
56 * PUBLIC: int __rep_dbenv_create __P((DB_ENV *));
57 */
58int
59__rep_dbenv_create(dbenv)
60        DB_ENV *dbenv;
61{
62        DB_REP *db_rep;
63        int ret;
64
65#ifdef HAVE_RPC
66        if (F_ISSET(dbenv, DB_ENV_RPCCLIENT)) {
67                COMPQUIET(db_rep, NULL);
68                COMPQUIET(ret, 0);
69                dbenv->rep_elect = __dbcl_rep_elect;
70                dbenv->rep_flush = __dbcl_rep_flush;
71                dbenv->rep_process_message = __dbcl_rep_process_message;
72                dbenv->rep_start = __dbcl_rep_start;
73                dbenv->rep_stat = __dbcl_rep_stat;
74                dbenv->set_rep_limit = __dbcl_rep_set_limit;
75                dbenv->set_rep_request = __dbcl_rep_set_request;
76                dbenv->set_rep_transport = __dbcl_rep_set_rep_transport;
77
78        } else
79#endif
80        {
81                dbenv->rep_elect = __rep_elect;
82                dbenv->rep_flush = __rep_flush;
83                dbenv->rep_process_message = __rep_process_message;
84                dbenv->rep_start = __rep_start;
85                dbenv->rep_stat = __rep_stat;
86                dbenv->set_rep_limit = __rep_set_limit;
87                dbenv->set_rep_request = __rep_set_request;
88                dbenv->set_rep_transport = __rep_set_rep_transport;
89                /*
90                 * !!!
91                 * Our caller has not yet had the opportunity to reset the panic
92                 * state or turn off mutex locking, and so we can neither check
93                 * the panic state or acquire a mutex in the DB_ENV create path.
94                 */
95
96                if ((ret = __os_calloc(dbenv, 1, sizeof(DB_REP), &db_rep)) != 0)
97                        return (ret);
98                dbenv->rep_handle = db_rep;
99
100                /* Initialize the per-process replication structure. */
101                db_rep->rep_send = NULL;
102        }
103
104        return (0);
105}
106
107/*
108 * __rep_start --
109 *      Become a master or client, and start sending messages to participate
110 * in the replication environment.  Must be called after the environment
111 * is open.
112 */
113static int
114__rep_start(dbenv, dbt, flags)
115        DB_ENV *dbenv;
116        DBT *dbt;
117        u_int32_t flags;
118{
119        DB_LOG *dblp;
120        DB_LSN lsn;
121        DB_REP *db_rep;
122        REP *rep;
123        int announce, init_db, redo_prepared, ret;
124
125        PANIC_CHECK(dbenv);
126        ENV_ILLEGAL_BEFORE_OPEN(dbenv, "rep_start");
127        ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN);
128
129        db_rep = dbenv->rep_handle;
130        rep = db_rep->region;
131
132        if ((ret = __db_fchk(dbenv, "DB_ENV->rep_start", flags,
133            DB_REP_CLIENT | DB_REP_LOGSONLY | DB_REP_MASTER)) != 0)
134                return (ret);
135
136        /* Exactly one of CLIENT and MASTER must be specified. */
137        if ((ret = __db_fcchk(dbenv,
138            "DB_ENV->rep_start", flags, DB_REP_CLIENT, DB_REP_MASTER)) != 0)
139                return (ret);
140        if (!LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER | DB_REP_LOGSONLY)) {
141                __db_err(dbenv,
142        "DB_ENV->rep_start: replication mode must be specified");
143                return (EINVAL);
144        }
145
146        /* Masters can't be logs-only. */
147        if ((ret = __db_fcchk(dbenv,
148            "DB_ENV->rep_start", flags, DB_REP_LOGSONLY, DB_REP_MASTER)) != 0)
149                return (ret);
150
151        /* We need a transport function. */
152        if (db_rep->rep_send == NULL) {
153                __db_err(dbenv,
154    "DB_ENV->set_rep_transport must be called before DB_ENV->rep_start");
155                return (EINVAL);
156        }
157       
158        /* We'd better not have any logged files open if we are a client. */
159        if (LF_ISSET(DB_REP_CLIENT) && (ret = __dbreg_nofiles(dbenv)) != 0) {
160                __db_err(dbenv, "DB_ENV->rep_start called with open files");
161                return (ret);
162        }
163
164        MUTEX_LOCK(dbenv, db_rep->mutexp);
165        if (rep->eid == DB_EID_INVALID)
166                rep->eid = dbenv->rep_eid;
167
168        if (LF_ISSET(DB_REP_MASTER)) {
169                if (F_ISSET(dbenv, DB_ENV_REP_CLIENT)) {
170                        /*
171                         * If we're upgrading from having been a client,
172                         * preclose, so that we close our temporary database.
173                         *
174                         * Do not close files that we may have opened while
175                         * doing a rep_apply;  they'll get closed when we
176                         * finally close the environment, but for now, leave
177                         * them open, as we don't want to recycle their
178                         * fileids, and we may need the handles again if
179                         * we become a client and the original master
180                         * that opened them becomes a master again.
181                         */
182                        if ((ret = __rep_preclose(dbenv, 0)) != 0)
183                                return (ret);
184
185                        /*
186                         * Now write a __txn_recycle record so that
187                         * clients don't get confused with our txnids
188                         * and txnids of previous masters.
189                         */
190                        F_CLR(dbenv, DB_ENV_REP_CLIENT);
191                        if ((ret = __txn_reset(dbenv)) != 0)
192                                return (ret);
193                }
194
195                redo_prepared = 0;
196                if (!F_ISSET(rep, REP_F_MASTER)) {
197                        /* Master is not yet set. */
198                        if (F_ISSET(rep, REP_ISCLIENT)) {
199                                F_CLR(rep, REP_ISCLIENT);
200                                rep->gen = ++rep->w_gen;
201                                redo_prepared = 1;
202                        } else if (rep->gen == 0)
203                                rep->gen = 1;
204                }
205
206                F_SET(rep, REP_F_MASTER);
207                F_SET(dbenv, DB_ENV_REP_MASTER);
208                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
209                dblp = (DB_LOG *)dbenv->lg_handle;
210                R_LOCK(dbenv, &dblp->reginfo);
211                lsn = ((LOG *)dblp->reginfo.primary)->lsn;
212                R_UNLOCK(dbenv, &dblp->reginfo);
213
214                /*
215                 * Send the NEWMASTER message, then restore prepared txns
216                 * if and only if we just upgraded from being a client.
217                 */
218                if ((ret = __rep_send_message(dbenv,
219                    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0)) == 0 &&
220                    redo_prepared)
221                        ret = __rep_restore_prepared(dbenv);
222        } else {
223                F_CLR(dbenv, DB_ENV_REP_MASTER);
224                F_SET(dbenv, DB_ENV_REP_CLIENT);
225                if (LF_ISSET(DB_REP_LOGSONLY))
226                        F_SET(dbenv, DB_ENV_REP_LOGSONLY);
227
228                announce = !F_ISSET(rep, REP_ISCLIENT) ||
229                    rep->master_id == DB_EID_INVALID;
230                init_db = 0;
231                if (!F_ISSET(rep, REP_ISCLIENT)) {
232                        F_CLR(rep, REP_F_MASTER);
233                        if (LF_ISSET(DB_REP_LOGSONLY))
234                                F_SET(rep, REP_F_LOGSONLY);
235                        else
236                                F_SET(rep, REP_F_UPGRADE);
237
238                        /*
239                         * We initialize the client's generation number to 0.
240                         * Upon startup, it looks for a master and updates the
241                         * generation number as necessary, exactly as it does
242                         * during normal operation and a master failure.
243                         */
244                        rep->gen = 0;
245                        rep->master_id = DB_EID_INVALID;
246                        init_db = 1;
247                }
248                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
249
250                /*
251                 * Abort any prepared transactions that were restored
252                 * by recovery.  We won't be able to create any txns of
253                 * our own until they're resolved, but we can't resolve
254                 * them ourselves;  the master has to.  If any get
255                 * resolved as commits, we'll redo them when commit
256                 * records come in.  Aborts will simply be ignored.
257                 */
258                if ((ret = __rep_abort_prepared(dbenv)) != 0)
259                        return (ret);
260
261                if ((ret = __rep_client_dbinit(dbenv, init_db)) != 0)
262                        return (ret);
263
264                /*
265                 * If this client created a newly replicated environment,
266                 * then announce the existence of this client.  The master
267                 * should respond with a message that will tell this client
268                 * the current generation number and the current LSN.  This
269                 * will allow the client to either perform recovery or
270                 * simply join in.
271                 */
272                if (announce)
273                        ret = __rep_send_message(dbenv,
274                            DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0);
275        }
276        return (ret);
277}
278
279/*
280 * __rep_client_dbinit --
281 *
282 * Initialize the LSN database on the client side.  This is called from the
283 * client initialization code.  The startup flag value indicates if
284 * this is the first thread/process starting up and therefore should create
285 * the LSN database.  This routine must be called once by each process acting
286 * as a client.
287 */
288static int
289__rep_client_dbinit(dbenv, startup)
290        DB_ENV *dbenv;
291        int startup;
292{
293        DB_REP *db_rep;
294        DB *dbp;
295        int ret, t_ret;
296        u_int32_t flags;
297
298        PANIC_CHECK(dbenv);
299        db_rep = dbenv->rep_handle;
300        dbp = NULL;
301
302#define REPDBNAME       "__db.rep.db"
303
304        /* Check if this has already been called on this environment. */
305        if (db_rep->rep_db != NULL)
306                return (0);
307
308        MUTEX_LOCK(dbenv, db_rep->db_mutexp);
309
310        if (startup) {
311                if ((ret = db_create(&dbp, dbenv, 0)) != 0)
312                        goto err;
313                /*
314                 * Ignore errors, because if the file doesn't exist, this
315                 * is perfectly OK.
316                 */
317                (void)dbp->remove(dbp, REPDBNAME, NULL, 0);
318        }
319
320        if ((ret = db_create(&dbp, dbenv, 0)) != 0)
321                goto err;
322        if ((ret = dbp->set_bt_compare(dbp, __rep_bt_cmp)) != 0)
323                goto err;
324
325        /* Allow writes to this database on a client. */
326        F_SET(dbp, DB_AM_CL_WRITER);
327
328        flags = (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0) |
329            (startup ? DB_CREATE : 0);
330        if ((ret = dbp->open(dbp, NULL,
331            "__db.rep.db", NULL, DB_BTREE, flags, 0)) != 0)
332                goto err;
333
334        db_rep->rep_db = dbp;
335
336        if (0) {
337err:            if (dbp != NULL &&
338                    (t_ret = dbp->close(dbp, DB_NOSYNC)) != 0 && ret == 0)
339                        ret = t_ret;
340                db_rep->rep_db = NULL;
341        }
342
343        MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
344
345        return (ret);
346}
347
348/*
349 * __rep_bt_cmp --
350 *
351 * Comparison function for the LSN table.  We use the entire control
352 * structure as a key (for simplicity, so we don't have to merge the
353 * other fields in the control with the data field), but really only
354 * care about the LSNs.
355 */
356static int
357__rep_bt_cmp(dbp, dbt1, dbt2)
358        DB *dbp;
359        const DBT *dbt1, *dbt2;
360{
361        DB_LSN lsn1, lsn2;
362        REP_CONTROL *rp1, *rp2;
363
364        COMPQUIET(dbp, NULL);
365
366        rp1 = dbt1->data;
367        rp2 = dbt2->data;
368
369        __ua_memcpy(&lsn1, &rp1->lsn, sizeof(DB_LSN));
370        __ua_memcpy(&lsn2, &rp2->lsn, sizeof(DB_LSN));
371
372        if (lsn1.file > lsn2.file)
373                return (1);
374
375        if (lsn1.file < lsn2.file)
376                return (-1);
377
378        if (lsn1.offset > lsn2.offset)
379                return (1);
380
381        if (lsn1.offset < lsn2.offset)
382                return (-1);
383
384        return (0);
385}
386
387/*
388 * __rep_abort_prepared --
389 *      Abort any prepared transactions that recovery restored.
390 *
391 *      This is used by clients that have just run recovery, since
392 * they cannot/should not call txn_recover and handle prepared transactions
393 * themselves.
394 */
395static int
396__rep_abort_prepared(dbenv)
397        DB_ENV *dbenv;
398{
399#define PREPLISTSIZE    50
400        DB_PREPLIST prep[PREPLISTSIZE], *p;
401        DB_TXNMGR *mgr;
402        DB_TXNREGION *region;
403        int do_aborts, ret;
404        long count, i;
405        u_int32_t op;
406
407        mgr = dbenv->tx_handle;
408        region = mgr->reginfo.primary;
409
410        do_aborts = 0;
411        R_LOCK(dbenv, &mgr->reginfo);
412        if (region->stat.st_nrestores != 0)
413                do_aborts = 1;
414        R_UNLOCK(dbenv, &mgr->reginfo);
415
416        if (do_aborts) {
417                op = DB_FIRST;
418                do {
419                        if ((ret = dbenv->txn_recover(dbenv,
420                            prep, PREPLISTSIZE, &count, op)) != 0)
421                                return (ret);
422                        for (i = 0; i < count; i++) {
423                                p = &prep[i];
424                                if ((ret = p->txn->abort(p->txn)) != 0)
425                                        return (ret);
426                        }
427                        op = DB_NEXT;
428                } while (count == PREPLISTSIZE);
429        }
430
431        return (0);
432}
433
434/*
435 * __rep_restore_prepared --
436 *      Restore to a prepared state any prepared but not yet committed
437 * transactions.
438 *
439 *      This performs, in effect, a "mini-recovery";  it is called from
440 * __rep_start by newly upgraded masters.  There may be transactions that an
441 * old master prepared but did not resolve, which we need to restore to an
442 * active state.
443 */
444static int
445__rep_restore_prepared(dbenv)
446        DB_ENV *dbenv;
447{
448        DB_LOGC *logc;
449        DB_LSN ckp_lsn, lsn;
450        DBT rec;
451        __txn_ckp_args *ckp_args;
452        __txn_regop_args *regop_args;
453        __txn_xa_regop_args *prep_args;
454        int ret, t_ret;
455        u_int32_t hi_txn, low_txn, rectype;
456        void *txninfo;
457
458        txninfo = NULL;
459        ckp_args = NULL;
460        prep_args = NULL;
461        regop_args = NULL;
462        ZERO_LSN(ckp_lsn);
463        ZERO_LSN(lsn);
464
465        if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
466                return (ret);
467
468        /*
469         * We need to consider the set of records between the most recent
470         * checkpoint LSN and the end of the log;  any txn in that
471         * range, and only txns in that range, could still have been
472         * active, and thus prepared but not yet committed (PBNYC),
473         * when the old master died.
474         *
475         * Find the most recent checkpoint LSN, and get the record there.
476         * If there is no checkpoint in the log, start off by getting
477         * the very first record in the log instead.
478         */
479        memset(&rec, 0, sizeof(DBT));
480        if ((ret = __txn_getckp(dbenv, &lsn)) == 0) {
481                if ((ret = logc->get(logc, &lsn, &rec, DB_SET)) != 0)  {
482                        __db_err(dbenv,
483                            "Checkpoint record at LSN [%lu][%lu] not found",
484                            (u_long)lsn.file, (u_long)lsn.offset);
485                        goto err;
486                }
487
488                if ((ret = __txn_ckp_read(dbenv, rec.data, &ckp_args)) != 0) {
489                        __db_err(dbenv,
490                            "Invalid checkpoint record at [%lu][%lu]",
491                            (u_long)lsn.file, (u_long)lsn.offset);
492                        goto err;
493                }
494
495                ckp_lsn = ckp_args->ckp_lsn;
496                __os_free(dbenv, ckp_args);
497
498                if ((ret = logc->get(logc, &ckp_lsn, &rec, DB_SET)) != 0) {
499                        __db_err(dbenv,
500                            "Checkpoint LSN record [%lu][%lu] not found",
501                            (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
502                        goto err;
503                }
504        } else if ((ret = logc->get(logc, &lsn, &rec, DB_FIRST)) != 0) {
505                if (ret == DB_NOTFOUND) {
506                        /* An empty log means no PBNYC txns. */
507                        ret = 0;
508                        goto done;
509                }
510                __db_err(dbenv, "Attempt to get first log record failed");
511                goto err;
512        }
513
514        /*
515         * We use the same txnlist infrastructure that recovery does;
516         * it demands an estimate of the high and low txnids for
517         * initialization.
518         *
519         * First, the low txnid.
520         */
521        do {
522                /* txnid is after rectype, which is a u_int32. */
523                memcpy(&low_txn,
524                    (u_int8_t *)rec.data + sizeof(u_int32_t), sizeof(low_txn));
525                if (low_txn != 0)
526                        break;
527        } while ((ret = logc->get(logc, &lsn, &rec, DB_NEXT)) == 0);
528
529        /* If there are no txns, there are no PBNYC txns. */
530        if (ret == DB_NOTFOUND) {
531                ret = 0;
532                goto done;
533        } else if (ret != 0)
534                goto err;
535
536        /* Now, the high txnid. */
537        if ((ret = logc->get(logc, &lsn, &rec, DB_LAST)) != 0) {
538                /*
539                 * Note that DB_NOTFOUND is unacceptable here because we
540                 * had to have looked at some log record to get this far.
541                 */
542                __db_err(dbenv, "Final log record not found");
543                goto err;
544        }
545        do {
546                /* txnid is after rectype, which is a u_int32. */
547                memcpy(&hi_txn,
548                    (u_int8_t *)rec.data + sizeof(u_int32_t), sizeof(hi_txn));
549                if (hi_txn != 0)
550                        break;
551        } while ((ret = logc->get(logc, &lsn, &rec, DB_PREV)) == 0);
552        if (ret == DB_NOTFOUND) {
553                ret = 0;
554                goto done;
555        } else if (ret != 0)
556                goto err;
557
558        /* We have a high and low txnid.  Initialise the txn list. */
559        if ((ret =
560            __db_txnlist_init(dbenv, low_txn, hi_txn, NULL, &txninfo)) != 0)
561                goto err;
562
563        /*
564         * Now, walk backward from the end of the log to ckp_lsn.  Any
565         * prepares that we hit without first hitting a commit or
566         * abort belong to PBNYC txns, and we need to apply them and
567         * restore them to a prepared state.
568         *
569         * Note that we wind up applying transactions out of order.
570         * Since all PBNYC txns still held locks on the old master and
571         * were isolated, this should be safe.
572         */
573        for (ret = logc->get(logc, &lsn, &rec, DB_LAST);
574            ret == 0 && log_compare(&lsn, &ckp_lsn) > 0;
575            ret = logc->get(logc, &lsn, &rec, DB_PREV)) {
576                memcpy(&rectype, rec.data, sizeof(rectype));
577                switch (rectype) {
578                case DB___txn_regop:
579                        /*
580                         * It's a commit or abort--but we don't care
581                         * which!  Just add it to the list of txns
582                         * that are resolved.
583                         */
584                        if ((ret = __txn_regop_read(dbenv, rec.data,
585                            &regop_args)) != 0)
586                                goto err;
587
588                        ret = __db_txnlist_find(dbenv,
589                            txninfo, regop_args->txnid->txnid);
590                        if (ret == DB_NOTFOUND)
591                                ret = __db_txnlist_add(dbenv, txninfo,
592                                    regop_args->txnid->txnid,
593                                    regop_args->opcode, &lsn);
594                        __os_free(dbenv, regop_args);
595                        break;
596                case DB___txn_xa_regop:
597                        /*
598                         * It's a prepare.  If we haven't put the
599                         * txn on our list yet, it hasn't been
600                         * resolved, so apply and restore it.
601                         */
602                        if ((ret = __txn_xa_regop_read(dbenv, rec.data,
603                            &prep_args)) != 0)
604                                goto err;
605                        ret = __db_txnlist_find(dbenv, txninfo,
606                            prep_args->txnid->txnid);
607                        if (ret == DB_NOTFOUND)
608                                if ((ret = __rep_process_txn(dbenv, &rec)) == 0)
609                                        ret = __txn_restore_txn(dbenv,
610                                            &lsn, prep_args);
611                        __os_free(dbenv, prep_args);
612                        break;
613                default:
614                        continue;
615                }
616        }
617
618        /* It's not an error to have hit the beginning of the log. */
619        if (ret == DB_NOTFOUND)
620                ret = 0;
621
622done:
623err:    t_ret = logc->close(logc, 0);
624
625        if (txninfo != NULL)
626                __db_txnlist_end(dbenv, txninfo);
627
628        return (ret == 0 ? t_ret : ret);
629}
630
631/*
632 * __rep_set_limit --
633 *      Set a limit on the amount of data that will be sent during a single
634 * invocation of __rep_process_message.
635 */
636static int
637__rep_set_limit(dbenv, gbytes, bytes)
638        DB_ENV *dbenv;
639        u_int32_t gbytes;
640        u_int32_t bytes;
641{
642        DB_REP *db_rep;
643        REP *rep;
644
645        PANIC_CHECK(dbenv);
646
647        if ((db_rep = dbenv->rep_handle) == NULL) {
648                __db_err(dbenv,
649    "DB_ENV->set_rep_limit: database environment not properly initialized");
650                return (__db_panic(dbenv, EINVAL));
651        }
652        rep = db_rep->region;
653        MUTEX_LOCK(dbenv, db_rep->mutexp);
654        if (bytes > GIGABYTE) {
655                gbytes += bytes / GIGABYTE;
656                bytes = bytes % GIGABYTE;
657        }
658        rep->gbytes = gbytes;
659        rep->bytes = bytes;
660        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
661
662        return (0);
663}
664
665/*
666 * __rep_set_request --
667 *      Set the minimum and maximum number of log records that we wait
668 * before retransmitting.
669 * UNDOCUMENTED.
670 */
671static int
672__rep_set_request(dbenv, min, max)
673        DB_ENV *dbenv;
674        u_int32_t min;
675        u_int32_t max;
676{
677        LOG *lp;
678        DB_LOG *dblp;
679        DB_REP *db_rep;
680        REP *rep;
681
682        PANIC_CHECK(dbenv);
683
684        if ((db_rep = dbenv->rep_handle) == NULL) {
685                __db_err(dbenv,
686    "DB_ENV->set_rep_request: database environment not properly initialized");
687                return (__db_panic(dbenv, EINVAL));
688        }
689        rep = db_rep->region;
690        MUTEX_LOCK(dbenv, db_rep->mutexp);
691        rep->request_gap = min;
692        rep->max_gap = max;
693        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
694        dblp = dbenv->lg_handle;
695        if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) {
696                R_LOCK(dbenv, &dblp->reginfo);
697                lp->wait_recs = 0;
698                lp->rcvd_recs = 0;
699                R_UNLOCK(dbenv, &dblp->reginfo);
700        }
701
702        return (0);
703}
704
705/*
706 * __rep_set_transport --
707 *      Set the transport function for replication.
708 */
709static int
710__rep_set_rep_transport(dbenv, eid, f_send)
711        DB_ENV *dbenv;
712        int eid;
713        int (*f_send) __P((DB_ENV *, const DBT *, const DBT *, int, u_int32_t));
714{
715        DB_REP *db_rep;
716
717        PANIC_CHECK(dbenv);
718
719        if ((db_rep = dbenv->rep_handle) == NULL) {
720                __db_err(dbenv,
721    "DB_ENV->set_rep_transport: database environment not properly initialized");
722                return (__db_panic(dbenv, EINVAL));
723        }
724
725        if (f_send == NULL) {
726                __db_err(dbenv,
727        "DB_ENV->set_rep_transport: no send function specified");
728                return (EINVAL);
729        }
730
731        if (eid < 0) {
732                __db_err(dbenv,
733        "DB_ENV->set_rep_transport: eid must be greater than or equal to 0");
734                return (EINVAL);
735        }
736
737        db_rep->rep_send = f_send;
738
739        dbenv->rep_eid = eid;
740        return (0);
741}
742
743/*
744 * __rep_elect --
745 *      Called after master failure to hold/participate in an election for
746 *      a new master.
747 */
748static int
749__rep_elect(dbenv, nsites, priority, timeout, eidp)
750        DB_ENV *dbenv;
751        int nsites, priority;
752        u_int32_t timeout;
753        int *eidp;
754{
755        DB_LOG *dblp;
756        DB_LSN lsn;
757        DB_REP *db_rep;
758        REP *rep;
759        int in_progress, ret, send_vote, tiebreaker;
760        u_int32_t pid, sec, usec;
761
762        PANIC_CHECK(dbenv);
763        ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_elect", DB_INIT_TXN);
764
765        /* Error checking. */
766        if (nsites <= 0) {
767                __db_err(dbenv,
768                    "DB_ENV->rep_elect: nsites must be greater than 0");
769                return (EINVAL);
770        }
771        if (priority < 0) {
772                __db_err(dbenv,
773                    "DB_ENV->rep_elect: priority may not be negative");
774                return (EINVAL);
775        }
776
777        db_rep = dbenv->rep_handle;
778        rep = db_rep->region;
779        dblp = dbenv->lg_handle;
780
781        R_LOCK(dbenv, &dblp->reginfo);
782        lsn = ((LOG *)dblp->reginfo.primary)->lsn;
783        R_UNLOCK(dbenv, &dblp->reginfo);
784
785        /* Generate a randomized tiebreaker value. */
786        __os_id(&pid);
787        if ((ret = __os_clock(dbenv, &sec, &usec)) != 0)
788                return (ret);
789        tiebreaker = pid ^ sec ^ usec ^ (u_int)rand() ^ P_TO_UINT32(&pid);
790
791        if ((ret = __rep_elect_init(dbenv,
792            &lsn, nsites, priority, tiebreaker, &in_progress)) != 0) {
793                if (ret == DB_REP_NEWMASTER) {
794                        ret = 0;
795                        *eidp = dbenv->rep_eid;
796                }
797                return (ret);
798        }
799
800        if (!in_progress) {
801#ifdef DIAGNOSTIC
802                if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
803                        __db_err(dbenv, "Beginning an election");
804#endif
805                if ((ret = __rep_send_message(dbenv,
806                    DB_EID_BROADCAST, REP_ELECT, NULL, NULL, 0)) != 0)
807                        goto err;
808                DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTSEND, ret, NULL);
809        }
810
811        /* Now send vote */
812        if ((ret =
813            __rep_send_vote(dbenv, &lsn, nsites, priority, tiebreaker)) != 0)
814                goto err;
815        DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTVOTE1, ret, NULL);
816
817        ret = __rep_wait(dbenv, timeout, eidp, REP_F_EPHASE1);
818        DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTWAIT1, ret, NULL);
819        switch (ret) {
820                case 0:
821                        /* Check if election complete or phase complete. */
822                        if (*eidp != DB_EID_INVALID)
823                                return (0);
824                        goto phase2;
825                case DB_TIMEOUT:
826                        break;
827                default:
828                        goto err;
829        }
830        /*
831         * If we got here, we haven't heard from everyone, but we've
832         * run out of time, so it's time to decide if we have enough
833         * votes to pick a winner and if so, to send out a vote to
834         * the winner.
835         */
836        MUTEX_LOCK(dbenv, db_rep->mutexp);
837        send_vote = DB_EID_INVALID;
838        if (rep->sites > rep->nsites / 2) {
839                /* We think we've seen enough to cast a vote. */
840                send_vote = rep->winner;
841                if (rep->winner == rep->eid)
842                        rep->votes++;
843                F_CLR(rep, REP_F_EPHASE1);
844                F_SET(rep, REP_F_EPHASE2);
845        }
846        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
847        if (send_vote == DB_EID_INVALID) {
848                /* We do not have enough votes to elect. */
849#ifdef DIAGNOSTIC
850                if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
851                        __db_err(dbenv,
852                            "Not enough votes to elect: received %d of %d",
853                            rep->sites, rep->nsites);
854#endif
855                ret = DB_REP_UNAVAIL;
856                goto err;
857
858        }
859#ifdef DIAGNOSTIC
860        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION) &&
861            send_vote != rep->eid)
862                __db_err(dbenv, "Sending vote");
863#endif
864
865        if (send_vote != rep->eid && (ret = __rep_send_message(dbenv,
866            send_vote, REP_VOTE2, NULL, NULL, 0)) != 0)
867                goto err;
868        DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTVOTE2, ret, NULL);
869
870phase2: ret = __rep_wait(dbenv, timeout, eidp, REP_F_EPHASE2);
871        DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTWAIT2, ret, NULL);
872        switch (ret) {
873                case 0:
874                        return (0);
875                case DB_TIMEOUT:
876                        ret = DB_REP_UNAVAIL;
877                        break;
878                default:
879                        goto err;
880        }
881
882DB_TEST_RECOVERY_LABEL
883err:    MUTEX_LOCK(dbenv, db_rep->mutexp);
884        ELECTION_DONE(rep);
885        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
886
887#ifdef DIAGNOSTIC
888        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
889                __db_err(dbenv, "Ended election with %d", ret);
890#endif
891        return (ret);
892}
893
894/*
895 * __rep_elect_init
896 *      Initialize an election.  Sets beginp non-zero if the election is
897 * already in progress; makes it 0 otherwise.
898 */
899static int
900__rep_elect_init(dbenv, lsnp, nsites, priority, tiebreaker, beginp)
901        DB_ENV *dbenv;
902        DB_LSN *lsnp;
903        int nsites, priority, tiebreaker, *beginp;
904{
905        DB_REP *db_rep;
906        REP *rep;
907        int ret, *tally;
908
909        db_rep = dbenv->rep_handle;
910        rep = db_rep->region;
911
912        ret = 0;
913
914        /* We may miscount, as we don't hold the replication mutex here. */
915        rep->stat.st_elections++;
916
917        /* If we are already a master; simply broadcast that fact and return. */
918        if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
919                (void)__rep_send_message(dbenv,
920                    DB_EID_BROADCAST, REP_NEWMASTER, lsnp, NULL, 0);
921                rep->stat.st_elections_won++;
922                return (DB_REP_NEWMASTER);
923        }
924
925        MUTEX_LOCK(dbenv, db_rep->mutexp);
926        *beginp = IN_ELECTION(rep);
927        if (!*beginp) {
928                /*
929                 * Make sure that we always initialize all the election fields
930                 * before putting ourselves in an election state.  That means
931                 * issuing calls that can fail (allocation) before setting all
932                 * the variables.
933                 */
934                if (nsites > rep->asites &&
935                    (ret = __rep_grow_sites(dbenv, nsites)) != 0)
936                        goto err;
937                DB_ENV_TEST_RECOVERY(dbenv, DB_TEST_ELECTINIT, ret, NULL);
938                rep->nsites = nsites;
939                rep->priority = priority;
940                rep->votes = 0;
941                rep->master_id = DB_EID_INVALID;
942                F_SET(rep, REP_F_EPHASE1);
943
944                /* We have always heard from ourselves. */
945                rep->sites = 1;
946                tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off);
947                tally[0] = rep->eid;
948
949                if (priority != 0) {
950                        /* Make ourselves the winner to start. */
951                        rep->winner = rep->eid;
952                        rep->w_priority = priority;
953                        rep->w_gen = rep->gen;
954                        rep->w_lsn = *lsnp;
955                        rep->w_tiebreaker = tiebreaker;
956                } else {
957                        rep->winner = DB_EID_INVALID;
958                        rep->w_priority = 0;
959                        rep->w_gen = 0;
960                        ZERO_LSN(rep->w_lsn);
961                        rep->w_tiebreaker = 0;
962                }
963        }
964DB_TEST_RECOVERY_LABEL
965err:    MUTEX_UNLOCK(dbenv, db_rep->mutexp);
966        return (ret);
967}
968
969static int
970__rep_wait(dbenv, timeout, eidp, flags)
971        DB_ENV *dbenv;
972        u_int32_t timeout;
973        int *eidp;
974        u_int32_t flags;
975{
976        DB_REP *db_rep;
977        REP *rep;
978        int done, ret;
979        u_int32_t sleeptime;
980
981        done = 0;
982        db_rep = dbenv->rep_handle;
983        rep = db_rep->region;
984
985        /*
986         * The user specifies an overall timeout function, but checking
987         * is cheap and the timeout may be a generous upper bound.
988         * Sleep repeatedly for the smaller of .5s and timeout/10.
989         */
990        sleeptime = (timeout > 5000000) ? 500000 : timeout / 10;
991        if (sleeptime == 0)
992                sleeptime++;
993        while (timeout > 0) {
994                if ((ret = __os_sleep(dbenv, 0, sleeptime)) != 0)
995                        return (ret);
996                MUTEX_LOCK(dbenv, db_rep->mutexp);
997                done = !F_ISSET(rep, flags) && rep->master_id != DB_EID_INVALID;
998
999                *eidp = rep->master_id;
1000                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
1001
1002                if (done)
1003                        return (0);
1004
1005                if (timeout > sleeptime)
1006                        timeout -= sleeptime;
1007                else
1008                        timeout = 0;
1009        }
1010        return (DB_TIMEOUT);
1011}
1012
1013/*
1014 * __rep_flush --
1015 *      Re-push the last log record to all clients, in case they've lost
1016 * messages and don't know it.
1017 */
1018static int
1019__rep_flush(dbenv)
1020        DB_ENV *dbenv;
1021{
1022        DBT rec;
1023        DB_LOGC *logc;
1024        DB_LSN lsn;
1025        int ret, t_ret;
1026
1027        PANIC_CHECK(dbenv);
1028        ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN);
1029
1030        if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
1031                return (ret);
1032
1033        memset(&rec, 0, sizeof(rec));
1034        memset(&lsn, 0, sizeof(lsn));
1035
1036        if ((ret = logc->get(logc, &lsn, &rec, DB_LAST)) != 0)
1037                goto err;
1038
1039        ret = __rep_send_message(dbenv,
1040            DB_EID_BROADCAST, REP_LOG, &lsn, &rec, 0);
1041
1042err:    if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
1043                ret = t_ret;
1044        return (ret);
1045}
1046
1047/*
1048 * __rep_stat --
1049 *      Fetch replication statistics.
1050 */
1051static int
1052__rep_stat(dbenv, statp, flags)
1053        DB_ENV *dbenv;
1054        DB_REP_STAT **statp;
1055        u_int32_t flags;
1056{
1057        DB_LOG *dblp;
1058        DB_REP *db_rep;
1059        DB_REP_STAT *stats;
1060        LOG *lp;
1061        REP *rep;
1062        u_int32_t queued;
1063        int ret;
1064
1065        PANIC_CHECK(dbenv);
1066        ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN);
1067
1068        db_rep = dbenv->rep_handle;
1069        rep = db_rep->region;
1070        dblp = dbenv->lg_handle;
1071        lp = dblp->reginfo.primary;
1072
1073        *statp = NULL;
1074        if ((ret = __db_fchk(dbenv,
1075            "DB_ENV->rep_stat", flags, DB_STAT_CLEAR)) != 0)
1076                return (ret);
1077
1078        /* Allocate a stat struct to return to the user. */
1079        if ((ret = __os_umalloc(dbenv, sizeof(DB_REP_STAT), &stats)) != 0)
1080                return (ret);
1081
1082        MUTEX_LOCK(dbenv, db_rep->mutexp);
1083        memcpy(stats, &rep->stat, sizeof(*stats));
1084
1085        /* Copy out election stats. */
1086        if (IN_ELECTION(rep)) {
1087                if (F_ISSET(rep, REP_F_EPHASE1))
1088                        stats->st_election_status = 1;
1089                else if (F_ISSET(rep, REP_F_EPHASE2))
1090                        stats->st_election_status = 2;
1091
1092                stats->st_election_nsites = rep->sites;
1093                stats->st_election_cur_winner = rep->winner;
1094                stats->st_election_priority = rep->w_priority;
1095                stats->st_election_gen = rep->w_gen;
1096                stats->st_election_lsn = rep->w_lsn;
1097                stats->st_election_votes = rep->votes;
1098                stats->st_election_tiebreaker = rep->w_tiebreaker;
1099        }
1100
1101        /* Copy out other info that's protected by the rep mutex. */
1102        stats->st_env_id = rep->eid;
1103        stats->st_env_priority = rep->priority;
1104        stats->st_nsites = rep->nsites;
1105        stats->st_master = rep->master_id;
1106        stats->st_gen = rep->gen;
1107
1108        if (F_ISSET(rep, REP_F_MASTER))
1109                stats->st_status = DB_REP_MASTER;
1110        else if (F_ISSET(rep, REP_F_LOGSONLY))
1111                stats->st_status = DB_REP_LOGSONLY;
1112        else if (F_ISSET(rep, REP_F_UPGRADE))
1113                stats->st_status = DB_REP_CLIENT;
1114        else
1115                stats->st_status = 0;
1116
1117        if (LF_ISSET(DB_STAT_CLEAR)) {
1118                queued = rep->stat.st_log_queued;
1119                memset(&rep->stat, 0, sizeof(rep->stat));
1120                rep->stat.st_log_queued = rep->stat.st_log_queued_total =
1121                    rep->stat.st_log_queued_max = queued;
1122        }
1123        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
1124
1125        /*
1126         * Log-related replication info is stored in the log system and
1127         * protected by the log region lock.
1128         */
1129        R_LOCK(dbenv, &dblp->reginfo);
1130        if (F_ISSET(rep, REP_ISCLIENT)) {
1131                stats->st_next_lsn = lp->ready_lsn;
1132                stats->st_waiting_lsn = lp->waiting_lsn;
1133        } else {
1134                if (F_ISSET(rep, REP_F_MASTER))
1135                        stats->st_next_lsn = lp->lsn;
1136                else
1137                        ZERO_LSN(stats->st_next_lsn);
1138                ZERO_LSN(stats->st_waiting_lsn);
1139        }
1140        R_UNLOCK(dbenv, &dblp->reginfo);
1141
1142        *statp = stats;
1143        return (0);
1144}
Note: See TracBrowser for help on using the repository browser.