source: trunk/third/db/btree/bt_cursor.c @ 17055

Revision 17055, 55.2 KB checked in by ghudson, 23 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r17054, which included commits to RCS files with non-trunk default branches.
Line 
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 1997, 1998, 1999, 2000
5 *      Sleepycat Software.  All rights reserved.
6 */
7
8#include "db_config.h"
9
10#ifndef lint
11static const char revid[] = "$Id: bt_cursor.c,v 1.1.1.2 2002-02-11 16:30:10 ghudson Exp $";
12#endif /* not lint */
13
14#ifndef NO_SYSTEM_INCLUDES
15#include <sys/types.h>
16
17#include <errno.h>
18#include <stdlib.h>
19#include <string.h>
20#endif
21
22#include "db_int.h"
23#include "db_page.h"
24#include "db_shash.h"
25#include "btree.h"
26#include "lock.h"
27#include "qam.h"
28#include "common_ext.h"
29
30static int  __bam_c_close __P((DBC *, db_pgno_t, int *));
31static int  __bam_c_del __P((DBC *));
32static int  __bam_c_destroy __P((DBC *));
33static int  __bam_c_first __P((DBC *));
34static int  __bam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
35static int  __bam_c_getstack __P((DBC *));
36static int  __bam_c_last __P((DBC *));
37static int  __bam_c_next __P((DBC *, int));
38static int  __bam_c_physdel __P((DBC *));
39static int  __bam_c_prev __P((DBC *));
40static int  __bam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
41static void __bam_c_reset __P((BTREE_CURSOR *));
42static int  __bam_c_search __P((DBC *, const DBT *, u_int32_t, int *));
43static int  __bam_c_writelock __P((DBC *));
44static int  __bam_getboth_finddatum __P((DBC *, DBT *));
45static int  __bam_getbothc __P((DBC *, DBT *));
46static int  __bam_isopd __P((DBC *, db_pgno_t *));
47
48/*
49 * Acquire a new page/lock.  If we hold a page/lock, discard the page, and
50 * lock-couple the lock.
51 *
52 * !!!
53 * We have to handle both where we have a lock to lock-couple and where we
54 * don't -- we don't duplicate locks when we duplicate cursors if we are
55 * running in a transaction environment as there's no point if locks are
56 * never discarded.  This means that the cursor may or may not hold a lock.
57 */
58#undef  ACQUIRE
59#define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, ret) {\
60        if ((pagep) != NULL) {                                          \
61                ret = memp_fput((dbc)->dbp->mpf, pagep, 0);             \
62                pagep = NULL;                                           \
63        } else                                                          \
64                ret = 0;                                                \
65        if ((ret) == 0 && STD_LOCKING(dbc))                             \
66                ret = __db_lget(dbc,                                    \
67                    (lock).off == LOCK_INVALID ? 0 : LCK_COUPLE,        \
68                    lpgno, mode, 0, &lock);                             \
69        else                                                            \
70                (lock).off = LOCK_INVALID;                              \
71        if ((ret) == 0)                                                 \
72                ret = memp_fget((dbc)->dbp->mpf, &(fpgno), 0, &(pagep));\
73}
74
75/* Acquire a new page/lock for a cursor. */
76#undef  ACQUIRE_CUR
77#define ACQUIRE_CUR(dbc, mode, ret) {                                   \
78        BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
79        ACQUIRE(dbc, mode,                                              \
80            __cp->pgno, __cp->lock, __cp->pgno, __cp->page, ret);       \
81        if ((ret) == 0)                                                 \
82                __cp->lock_mode = (mode);                               \
83}
84
85/*
86 * Acquire a new page/lock for a cursor, and move the cursor on success.
87 * The reason that this is a separate macro is because we don't want to
88 * set the pgno/indx fields in the cursor until we actually have the lock,
89 * otherwise the cursor adjust routines will adjust the cursor even though
90 * we're not really on the page.
91 */
92#undef  ACQUIRE_CUR_SET
93#define ACQUIRE_CUR_SET(dbc, mode, p, ret) {                            \
94        BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
95        ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, ret);          \
96        if ((ret) == 0) {                                               \
97                __cp->pgno = p;                                 \
98                __cp->indx = 0;                                 \
99                __cp->lock_mode = (mode);                               \
100        }                                                               \
101}
102
103/*
104 * Acquire a write lock if we don't already have one.
105 *
106 * !!!
107 * See ACQUIRE macro on why we handle cursors that don't have locks.
108 */
109#undef  ACQUIRE_WRITE_LOCK
110#define ACQUIRE_WRITE_LOCK(dbc, ret) {                                  \
111        BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
112        ret = 0;                                                        \
113        if (STD_LOCKING(dbc) &&                                         \
114            __cp->lock_mode != DB_LOCK_WRITE &&                         \
115            ((ret) = __db_lget(dbc,                                     \
116            __cp->lock.off == LOCK_INVALID ? 0 : LCK_COUPLE,            \
117            __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) == 0)           \
118                __cp->lock_mode = DB_LOCK_WRITE;                        \
119}
120
121/* Discard the current page/lock. */
122#undef  DISCARD
123#define DISCARD(dbc, ldiscard, lock, pagep, ret) {                      \
124        int __t_ret;                                                    \
125        if ((pagep) != NULL) {                                          \
126                ret = memp_fput((dbc)->dbp->mpf, pagep, 0);             \
127                pagep = NULL;                                           \
128        } else                                                          \
129                ret = 0;                                                \
130        if ((lock).off != LOCK_INVALID) {                               \
131                __t_ret = ldiscard ?                                    \
132                    __LPUT((dbc), lock): __TLPUT((dbc), lock);          \
133                if (__t_ret != 0 && (ret) == 0)                         \
134                        ret = __t_ret;                                  \
135                (lock).off = LOCK_INVALID;                              \
136        }                                                               \
137}
138
139/* Discard the current page/lock for a cursor. */
140#undef  DISCARD_CUR
141#define DISCARD_CUR(dbc, ret) {                                         \
142        BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
143        DISCARD(dbc, 0, __cp->lock, __cp->page, ret);                   \
144        if ((ret) == 0)                                                 \
145                __cp->lock_mode = DB_LOCK_NG;                           \
146}
147
148/* If on-page item is a deleted record. */
149#undef  IS_DELETED
150#define IS_DELETED(page, indx)                                          \
151        B_DISSET(GET_BKEYDATA(page,                                     \
152            (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type)
153#undef  IS_CUR_DELETED
154#define IS_CUR_DELETED(dbc)                                             \
155        IS_DELETED((dbc)->internal->page, (dbc)->internal->indx)
156
157/*
158 * Test to see if two cursors could point to duplicates of the same key.
159 * In the case of off-page duplicates they are they same, as the cursors
160 * will be in the same off-page duplicate tree.  In the case of on-page
161 * duplicates, the key index offsets must be the same.  For the last test,
162 * as the original cursor may not have a valid page pointer, we use the
163 * current cursor's.
164 */
165#undef  IS_DUPLICATE
166#define IS_DUPLICATE(dbc, i1, i2)                                       \
167            (((PAGE *)(dbc)->internal->page)->inp[i1] ==                \
168             ((PAGE *)(dbc)->internal->page)->inp[i2])
169#undef  IS_CUR_DUPLICATE
170#define IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)                     \
171        (F_ISSET(dbc, DBC_OPD) ||                                       \
172            (orig_pgno == (dbc)->internal->pgno &&                      \
173            IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx)))
174
175/*
176 * __bam_c_reset --
177 *      Initialize internal cursor structure.
178 */
179static void
180__bam_c_reset(cp)
181        BTREE_CURSOR *cp;
182{
183        cp->csp = cp->sp;
184        cp->lock.off = LOCK_INVALID;
185        cp->lock_mode = DB_LOCK_NG;
186        cp->recno = RECNO_OOB;
187        cp->flags = 0;
188}
189
190/*
191 * __bam_c_init --
192 *      Initialize the access private portion of a cursor
193 *
194 * PUBLIC: int __bam_c_init __P((DBC *, DBTYPE));
195 */
196int
197__bam_c_init(dbc, dbtype)
198        DBC *dbc;
199        DBTYPE dbtype;
200{
201        BTREE *t;
202        BTREE_CURSOR *cp;
203        DB *dbp;
204        u_int32_t minkey;
205        int ret;
206
207        dbp = dbc->dbp;
208
209        /* Allocate/initialize the internal structure. */
210        if (dbc->internal == NULL) {
211                if ((ret = __os_malloc(dbp->dbenv,
212                    sizeof(BTREE_CURSOR), NULL, &cp)) != 0)
213                        return (ret);
214                dbc->internal = (DBC_INTERNAL *)cp;
215
216                cp->sp = cp->csp = cp->stack;
217                cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]);
218        } else
219                cp = (BTREE_CURSOR *)dbc->internal;
220        __bam_c_reset(cp);
221
222        /* Initialize methods. */
223        dbc->c_close = __db_c_close;
224        dbc->c_count = __db_c_count;
225        dbc->c_del = __db_c_del;
226        dbc->c_dup = __db_c_dup;
227        dbc->c_get = __db_c_get;
228        dbc->c_put = __db_c_put;
229        if (dbtype == DB_BTREE) {
230                dbc->c_am_close = __bam_c_close;
231                dbc->c_am_del = __bam_c_del;
232                dbc->c_am_destroy = __bam_c_destroy;
233                dbc->c_am_get = __bam_c_get;
234                dbc->c_am_put = __bam_c_put;
235                dbc->c_am_writelock = __bam_c_writelock;
236        } else {
237                dbc->c_am_close = __bam_c_close;
238                dbc->c_am_del = __ram_c_del;
239                dbc->c_am_destroy = __bam_c_destroy;
240                dbc->c_am_get = __ram_c_get;
241                dbc->c_am_put = __ram_c_put;
242                dbc->c_am_writelock = __bam_c_writelock;
243        }
244
245        /*
246         * The btree leaf page data structures require that two key/data pairs
247         * (or four items) fit on a page, but other than that there's no fixed
248         * requirement.  The btree off-page duplicates only require two items,
249         * to be exact, but requiring four for them as well seems reasonable.
250         *
251         * Recno uses the btree bt_ovflsize value -- it's close enough.
252         */
253        t = dbp->bt_internal;
254        minkey = F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey;
255        cp->ovflsize = B_MINKEY_TO_OVFLSIZE(minkey, dbp->pgsize);
256
257        return (0);
258}
259
260/*
261 * __bam_c_refresh
262 *      Set things up properly for cursor re-use.
263 *
264 * PUBLIC: int __bam_c_refresh __P((DBC *));
265 */
266int
267__bam_c_refresh(dbc)
268        DBC *dbc;
269{
270        BTREE_CURSOR *cp;
271        DB *dbp;
272
273        dbp = dbc->dbp;
274        cp = (BTREE_CURSOR *)dbc->internal;
275        __bam_c_reset(cp);
276
277        /*
278         * If our caller set the root page number, it's because the root was
279         * known.  This is always the case for off page dup cursors.  Else,
280         * pull it out of our internal information.
281         */
282        if (cp->root == PGNO_INVALID)
283                cp->root = ((BTREE *)dbp->bt_internal)->bt_root;
284
285        /* Initialize for record numbers. */
286        if (F_ISSET(dbc, DBC_OPD) ||
287            dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) {
288                F_SET(cp, C_RECNUM);
289
290                /*
291                 * All btrees that support record numbers, optionally standard
292                 * recno trees, and all off-page duplicate recno trees have
293                 * mutable record numbers.
294                 */
295                if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) ||
296                    F_ISSET(dbp, DB_BT_RECNUM | DB_RE_RENUMBER))
297                        F_SET(cp, C_RENUMBER);
298        }
299
300        return (0);
301}
302
303/*
304 * __bam_c_close --
305 *      Close down the cursor.
306 */
307static int
308__bam_c_close(dbc, root_pgno, rmroot)
309        DBC *dbc;
310        db_pgno_t root_pgno;
311        int *rmroot;
312{
313        BTREE_CURSOR *cp, *cp_opd, *cp_c;
314        DB *dbp;
315        DBC *dbc_opd, *dbc_c;
316        PAGE *h;
317        u_int32_t num;
318        int cdb_lock, ret, t_ret;
319
320        dbp = dbc->dbp;
321        cp = (BTREE_CURSOR *)dbc->internal;
322        cp_opd = (dbc_opd = cp->opd) == NULL ?
323            NULL : (BTREE_CURSOR *)dbc_opd->internal;
324        cdb_lock = ret = 0;
325
326        /*
327         * There are 3 ways this function is called:
328         *
329         * 1. Closing a primary cursor: we get called with a pointer to a
330         *    primary cursor that has a NULL opd field.  This happens when
331         *    closing a btree/recno database cursor without an associated
332         *    off-page duplicate tree.
333         *
334         * 2. Closing a primary and an off-page duplicate cursor stack: we
335         *    get called with a pointer to the primary cursor which has a
336         *    non-NULL opd field.  This happens when closing a btree cursor
337         *    into database with an associated off-page btree/recno duplicate
338         *    tree. (It can't be a primary recno database, recno databases
339         *    don't support duplicates.)
340         *
341         * 3. Closing an off-page duplicate cursor stack: we get called with
342         *    a pointer to the off-page duplicate cursor.  This happens when
343         *    closing a non-btree database that has an associated off-page
344         *    btree/recno duplicate tree or for a btree database when the
345         *    opd tree is not empty (root_pgno == PGNO_INVALID).
346         *
347         * If either the primary or off-page duplicate cursor deleted a btree
348         * key/data pair, check to see if the item is still referenced by a
349         * different cursor.  If it is, confirm that cursor's delete flag is
350         * set and leave it to that cursor to do the delete.
351         *
352         * NB: The test for == 0 below is correct.  Our caller already removed
353         * our cursor argument from the active queue, we won't find it when we
354         * search the queue in __bam_ca_delete().
355         * NB: It can't be true that both the primary and off-page duplicate
356         * cursors have deleted a btree key/data pair.  Either the primary
357         * cursor may have deleted an item and there's no off-page duplicate
358         * cursor, or there's an off-page duplicate cursor and it may have
359         * deleted an item.
360         *
361         * Primary recno databases aren't an issue here.  Recno keys are either
362         * deleted immediately or never deleted, and do not have to be handled
363         * here.
364         *
365         * Off-page duplicate recno databases are an issue here, cases #2 and
366         * #3 above can both be off-page recno databases.  The problem is the
367         * same as the final problem for off-page duplicate btree databases.
368         * If we no longer need the off-page duplicate tree, we want to remove
369         * it.  For off-page duplicate btrees, we are done with the tree when
370         * we delete the last item it contains, i.e., there can be no further
371         * references to it when it's empty.  For off-page duplicate recnos,
372         * we remove items from the tree as the application calls the remove
373         * function, so we are done with the tree when we close the last cursor
374         * that references it.
375         *
376         * We optionally take the root page number from our caller.  If the
377         * primary database is a btree, we can get it ourselves because dbc
378         * is the primary cursor.  If the primary database is not a btree,
379         * the problem is that we may be dealing with a stack of pages.  The
380         * cursor we're using to do the delete points at the bottom of that
381         * stack and we need the top of the stack.
382         */
383        if (F_ISSET(cp, C_DELETED)) {
384                dbc_c = dbc;
385                switch (dbc->dbtype) {
386                case DB_BTREE:                          /* Case #1, #3. */
387                        if (__bam_ca_delete(dbp, cp->pgno, cp->indx, 1) == 0)
388                                goto lock;
389                        goto done;
390                case DB_RECNO:
391                        if (!F_ISSET(dbc, DBC_OPD))     /* Case #1. */
392                                goto done;
393                                                        /* Case #3. */
394                        if (__ram_ca_delete(dbp, cp->root) == 0)
395                                goto lock;
396                        goto done;
397                default:
398                        return (__db_unknown_type(dbp->dbenv,
399                                "__bam_c_close", dbc->dbtype));
400                }
401        }
402
403        if (dbc_opd == NULL)
404                goto done;
405
406        if (F_ISSET(cp_opd, C_DELETED)) {               /* Case #2. */
407                /*
408                 * We will not have been provided a root page number.  Acquire
409                 * one from the primary database.
410                 */
411                if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &h)) != 0)
412                        goto err;
413                root_pgno = GET_BOVERFLOW(h, cp->indx + O_INDX)->pgno;
414                if ((ret = memp_fput(dbp->mpf, h, 0)) != 0)
415                        goto err;
416
417                dbc_c = dbc_opd;
418                switch (dbc_opd->dbtype) {
419                case DB_BTREE:
420                        if (__bam_ca_delete(
421                            dbp, cp_opd->pgno, cp_opd->indx, 1) == 0)
422                                goto lock;
423                        goto done;
424                case DB_RECNO:
425                        if (__ram_ca_delete(dbp, cp_opd->root) == 0)
426                                goto lock;
427                        goto done;
428                default:
429                        return (__db_unknown_type(dbp->dbenv,
430                                "__bam_c_close", dbc->dbtype));
431                }
432        }
433        goto done;
434
435lock:   cp_c = (BTREE_CURSOR *)dbc_c->internal;
436
437        /*
438         * If this is CDB, upgrade the lock if necessary.  While we acquired
439         * the write lock to logically delete the record, we released it when
440         * we returned from that call, and so may not be holding a write lock
441         * at the moment.  NB: to get here in CDB we must either be holding a
442         * write lock or be the only cursor that is permitted to acquire write
443         * locks.  The reason is that there can never be more than a single CDB
444         * write cursor (that cursor cannot be dup'd), and so that cursor must
445         * be closed and the item therefore deleted before any other cursor
446         * could acquire a reference to this item.
447         *
448         * Note that dbc may be an off-page dup cursor;  this is the sole
449         * instance in which an OPD cursor does any locking, but it's necessary
450         * because we may be closed by ourselves without a parent cursor
451         * handy, and we have to do a lock upgrade on behalf of somebody.
452         * If this is the case, the OPD has been given the parent's locking
453         * info in __db_c_get--the OPD is also a WRITEDUP.
454         */
455        if (CDB_LOCKING(dbp->dbenv)) {
456                DB_ASSERT(!F_ISSET(dbc, DBC_OPD) || F_ISSET(dbc, DBC_WRITEDUP));
457                if (!F_ISSET(dbc, DBC_WRITER)) {
458                        if ((ret =
459                            lock_get(dbp->dbenv, dbc->locker, DB_LOCK_UPGRADE,
460                            &dbc->lock_dbt, DB_LOCK_WRITE, &dbc->mylock)) != 0)
461                                goto err;
462                        cdb_lock = 1;
463                }
464
465                cp_c->lock.off = LOCK_INVALID;
466                if ((ret =
467                    memp_fget(dbp->mpf, &cp_c->pgno, 0, &cp_c->page)) != 0)
468                        goto err;
469
470                goto delete;
471        }
472
473        /*
474         * The variable dbc_c has been initialized to reference the cursor in
475         * which we're going to do the delete.  Initialize the cursor's page
476         * and lock structures as necessary.
477         *
478         * First, we may not need to acquire any locks.  If we're in case #3,
479         * that is, the primary database isn't a btree database, our caller
480         * is responsible for acquiring any necessary locks before calling us.
481         */
482        if (F_ISSET(dbc, DBC_OPD)) {
483                cp_c->lock.off = LOCK_INVALID;
484                if ((ret =
485                    memp_fget(dbp->mpf, &cp_c->pgno, 0, &cp_c->page)) != 0)
486                        goto err;
487                goto delete;
488        }
489
490        /*
491         * Otherwise, acquire a write lock.  If the cursor that did the initial
492         * logical deletion (and which had a write lock) is not the same as the
493         * cursor doing the physical deletion (which may have only ever had a
494         * read lock on the item), we need to upgrade.  The confusion comes as
495         * follows:
496         *
497         *      C1      created, acquires item read lock
498         *      C2      dup C1, create C2, also has item read lock.
499         *      C1      acquire write lock, delete item
500         *      C1      close
501         *      C2      close, needs a write lock to physically delete item.
502         *
503         * If we're in a TXN, we know that C2 will be able to acquire the write
504         * lock, because no locker other than the one shared by C1 and C2 can
505         * acquire a write lock -- the original write lock C1 acquire was never
506         * discarded.
507         *
508         * If we're not in a TXN, it's nastier.  Other cursors might acquire
509         * read locks on the item after C1 closed, discarding its write lock,
510         * and such locks would prevent C2 from acquiring a read lock.  That's
511         * OK, though, we'll simply wait until we can acquire a read lock, or
512         * we'll deadlock.  (Which better not happen, since we're not in a TXN.)
513         *
514         * Lock the primary database page, regardless of whether we're deleting
515         * an item on a primary database page or an off-page duplicates page.
516         */
517        ACQUIRE(dbc, DB_LOCK_WRITE,
518            cp->pgno, cp_c->lock, cp_c->pgno, cp_c->page, ret);
519        if (ret != 0)
520                goto err;
521
522delete: /*
523         * If the delete occurred in a btree, delete the on-page physical item
524         * referenced by the cursor.
525         */
526        if (dbc_c->dbtype == DB_BTREE && (ret = __bam_c_physdel(dbc_c)) != 0)
527                goto err;
528
529        /*
530         * If we're not working in an off-page duplicate tree, then we're
531         * done.
532         */
533        if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID)
534                goto done;
535
536        /*
537         * We may have just deleted the last element in the off-page duplicate
538         * tree, and closed the last cursor in the tree.  For an off-page btree
539         * there are no other cursors in the tree by definition, if the tree is
540         * empty.  For an off-page recno we know we have closed the last cursor
541         * in the tree because the __ram_ca_delete call above returned 0 only
542         * in that case.  So, if the off-page duplicate tree is empty at this
543         * point, we want to remove it.
544         */
545        if ((ret = memp_fget(dbp->mpf, &root_pgno, 0, &h)) != 0)
546                goto err;
547        if ((num = NUM_ENT(h)) == 0) {
548                if ((ret = __db_free(dbc, h)) != 0)
549                        goto err;
550        } else {
551                if ((ret = memp_fput(dbp->mpf, h, 0)) != 0)
552                        goto err;
553                goto done;
554        }
555
556        /*
557         * When removing the tree, we have to do one of two things.  If this is
558         * case #2, that is, the primary tree is a btree, delete the key that's
559         * associated with the tree from the btree leaf page.  We know we are
560         * the only reference to it and we already have the correct lock.  We
561         * detect this case because the cursor that was passed to us references
562         * an off-page duplicate cursor.
563         *
564         * If this is case #3, that is, the primary tree isn't a btree, pass
565         * the information back to our caller, it's their job to do cleanup on
566         * the primary page.
567         */
568        if (dbc_opd != NULL) {
569                cp->lock.off = LOCK_INVALID;
570                if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
571                        goto err;
572                if ((ret = __bam_c_physdel(dbc)) != 0)
573                        goto err;
574        } else
575                *rmroot = 1;
576err:
577done:   /*
578         * Discard the page references and locks, and confirm that the stack
579         * has been emptied.
580         */
581        if (dbc_opd != NULL) {
582                DISCARD_CUR(dbc_opd, t_ret);
583                if (t_ret != 0 && ret == 0)
584                        ret = t_ret;
585        }
586        DISCARD_CUR(dbc, t_ret);
587        if (t_ret != 0 && ret == 0)
588                ret = t_ret;
589
590        /* Downgrade any CDB lock we acquired. */
591        if (cdb_lock)
592                (void)__lock_downgrade(
593                    dbp->dbenv, &dbc->mylock, DB_LOCK_IWRITE, 0);
594
595        return (ret);
596}
597
598/*
599 * __bam_c_destroy --
600 *      Close a single cursor -- internal version.
601 */
602static int
603__bam_c_destroy(dbc)
604        DBC *dbc;
605{
606        /* Discard the structures. */
607        __os_free(dbc->internal, sizeof(BTREE_CURSOR));
608
609        return (0);
610}
611
612/*
613 * __bam_c_count --
614 *      Return a count of on and off-page duplicates.
615 *
616 * PUBLIC: int __bam_c_count __P((DBC *, db_recno_t *));
617 */
618int
619__bam_c_count(dbc, recnop)
620        DBC *dbc;
621        db_recno_t *recnop;
622{
623        BTREE_CURSOR *cp;
624        DB *dbp;
625        db_indx_t indx, top;
626        db_recno_t recno;
627        int ret;
628
629        dbp = dbc->dbp;
630        cp = (BTREE_CURSOR *)dbc->internal;
631
632        /*
633         * Called with the top-level cursor that may reference an off-page
634         * duplicates page.  If it's a set of on-page duplicates, get the
635         * page and count.  Otherwise, get the root page of the off-page
636         * duplicate tree, and use the count.  We don't have to acquire any
637         * new locks, we have to have a read lock to even get here.
638         */
639        if (cp->opd == NULL) {
640                if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
641                        return (ret);
642
643                /*
644                 * Move back to the beginning of the set of duplicates and
645                 * then count forward.
646                 */
647                for (indx = cp->indx;; indx -= P_INDX)
648                        if (indx == 0 ||
649                            !IS_DUPLICATE(dbc, indx, indx - P_INDX))
650                                break;
651                for (recno = 1, top = NUM_ENT(cp->page) - P_INDX;
652                    indx < top; ++recno, indx += P_INDX)
653                        if (!IS_DUPLICATE(dbc, indx, indx + P_INDX))
654                                break;
655                *recnop = recno;
656        } else {
657                if ((ret = memp_fget(dbp->mpf,
658                    &cp->opd->internal->root, 0, &cp->page)) != 0)
659                        return (ret);
660
661                *recnop = RE_NREC(cp->page);
662        }
663
664        ret = memp_fput(dbp->mpf, cp->page, 0);
665        cp->page = NULL;
666
667        return (ret);
668}
669
670/*
671 * __bam_c_del --
672 *      Delete using a cursor.
673 */
674static int
675__bam_c_del(dbc)
676        DBC *dbc;
677{
678        BTREE_CURSOR *cp;
679        DB *dbp;
680        int ret, t_ret;
681
682        dbp = dbc->dbp;
683        cp = (BTREE_CURSOR *)dbc->internal;
684        ret = 0;
685
686        /* If the item was already deleted, return failure. */
687        if (F_ISSET(cp, C_DELETED))
688                return (DB_KEYEMPTY);
689
690        /*
691         * We don't physically delete the record until the cursor moves, so
692         * we have to have a long-lived write lock on the page instead of a
693         * a long-lived read lock.  Note, we have to have a read lock to even
694         * get here.
695         *
696         * If we're maintaining record numbers, we lock the entire tree, else
697         * we lock the single page.
698         */
699        if (F_ISSET(cp, C_RECNUM)) {
700                if ((ret = __bam_c_getstack(dbc)) != 0)
701                        goto err;
702        } else {
703                ACQUIRE_CUR(dbc, DB_LOCK_WRITE, ret);
704                if (ret != 0)
705                        goto err;
706        }
707
708        /* Log the change. */
709        if (DB_LOGGING(dbc) &&
710            (ret = __bam_cdel_log(dbp->dbenv, dbc->txn, &LSN(cp->page), 0,
711            dbp->log_fileid, PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0)
712                goto err;
713
714        /* Set the intent-to-delete flag on the page and update all cursors. */
715        if (TYPE(cp->page) == P_LBTREE)
716                B_DSET(GET_BKEYDATA(cp->page, cp->indx + O_INDX)->type);
717        else
718                B_DSET(GET_BKEYDATA(cp->page, cp->indx)->type);
719
720        /* Mark the page dirty. */
721        ret = memp_fset(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
722
723err:    /*
724         * If we've been successful so far and the tree has record numbers,
725         * adjust the record counts.  Either way, release any acquired pages.
726         */
727        if (F_ISSET(cp, C_RECNUM)) {
728                if (ret == 0)
729                        ret = __bam_adjust(dbc, -1);
730                (void)__bam_stkrel(dbc, STK_CLRDBC);
731        } else {
732                DISCARD_CUR(dbc, t_ret);
733                if (t_ret != 0 && ret == 0)
734                        ret = t_ret;
735        }
736
737        /* Update the cursors last, after all chance of failure is past. */
738        if (ret == 0)
739                (void)__bam_ca_delete(dbp, cp->pgno, cp->indx, 1);
740
741        return (ret);
742}
743
744/*
745 * __bam_c_dup --
746 *      Duplicate a btree cursor, such that the new one holds appropriate
747 *      locks for the position of the original.
748 *
749 * PUBLIC: int __bam_c_dup __P((DBC *, DBC *));
750 */
751int
752__bam_c_dup(orig_dbc, new_dbc)
753        DBC *orig_dbc, *new_dbc;
754{
755        BTREE_CURSOR *orig, *new;
756        int ret;
757
758        orig = (BTREE_CURSOR *)orig_dbc->internal;
759        new = (BTREE_CURSOR *)new_dbc->internal;
760
761        /*
762         * If we're holding a lock we need to acquire a copy of it, unless
763         * we're in a transaction.  We don't need to copy any lock we're
764         * holding inside a transaction because all the locks are retained
765         * until the transaction commits or aborts.
766         */
767        if (orig->lock.off != LOCK_INVALID && orig_dbc->txn == NULL) {
768                if ((ret = __db_lget(new_dbc,
769                    0, new->pgno, new->lock_mode, 0, &new->lock)) != 0)
770                        return (ret);
771        }
772        new->ovflsize = orig->ovflsize;
773        new->recno = orig->recno;
774        new->flags = orig->flags;
775
776        return (0);
777}
778
779/*
780 * __bam_c_get --
781 *      Get using a cursor (btree).
782 */
783static int
784__bam_c_get(dbc, key, data, flags, pgnop)
785        DBC *dbc;
786        DBT *key, *data;
787        u_int32_t flags;
788        db_pgno_t *pgnop;
789{
790        BTREE_CURSOR *cp;
791        DB *dbp;
792        db_pgno_t orig_pgno;
793        db_indx_t orig_indx;
794        int exact, newopd, ret;
795
796        dbp = dbc->dbp;
797        cp = (BTREE_CURSOR *)dbc->internal;
798        orig_pgno = cp->pgno;
799        orig_indx = cp->indx;
800
801        newopd = 0;
802        switch (flags) {
803        case DB_CURRENT:
804                /* It's not possible to return a deleted record. */
805                if (F_ISSET(cp, C_DELETED)) {
806                        ret = DB_KEYEMPTY;
807                        goto err;
808                }
809
810                /*
811                 * Acquire the current page.  We have at least a read-lock
812                 * already.  The caller may have set DB_RMW asking for a
813                 * write lock, but upgrading to a write lock has no better
814                 * chance of succeeding now instead of later, so don't try.
815                 */
816                if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
817                        goto err;
818                break;
819        case DB_FIRST:
820                newopd = 1;
821                if ((ret = __bam_c_first(dbc)) != 0)
822                        goto err;
823                break;
824        case DB_GET_BOTH:
825                /*
826                 * There are two ways to get here based on DBcursor->c_get
827                 * with the DB_GET_BOTH flag set:
828                 *
829                 * 1. Searching a sorted off-page duplicate tree: do a tree
830                 * search.
831                 *
832                 * 2. Searching btree: do a tree search.  If it returns a
833                 * reference to off-page duplicate tree, return immediately
834                 * and let our caller deal with it.  If the search doesn't
835                 * return a reference to off-page duplicate tree, start an
836                 * on-page search.
837                 */
838                if (F_ISSET(dbc, DBC_OPD)) {
839                        if ((ret = __bam_c_search(
840                            dbc, data, DB_GET_BOTH, &exact)) != 0)
841                                goto err;
842                        if (!exact) {
843                                ret = DB_NOTFOUND;
844                                goto err;
845                        }
846                } else {
847                        if ((ret = __bam_c_search(
848                            dbc, key, DB_GET_BOTH, &exact)) != 0)
849                                return (ret);
850                        if (!exact) {
851                                ret = DB_NOTFOUND;
852                                goto err;
853                        }
854
855                        if (pgnop != NULL && __bam_isopd(dbc, pgnop)) {
856                                newopd = 1;
857                                break;
858                        }
859                        if ((ret = __bam_getboth_finddatum(dbc, data)) != 0)
860                                goto err;
861                }
862                break;
863        case DB_GET_BOTHC:
864                if ((ret = __bam_getbothc(dbc, data)) != 0)
865                        goto err;
866                break;
867        case DB_LAST:
868                newopd = 1;
869                if ((ret = __bam_c_last(dbc)) != 0)
870                        goto err;
871                break;
872        case DB_NEXT:
873                newopd = 1;
874                if (cp->pgno == PGNO_INVALID) {
875                        if ((ret = __bam_c_first(dbc)) != 0)
876                                goto err;
877                } else
878                        if ((ret = __bam_c_next(dbc, 1)) != 0)
879                                goto err;
880                break;
881        case DB_NEXT_DUP:
882                if ((ret = __bam_c_next(dbc, 1)) != 0)
883                        goto err;
884                if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
885                        ret = DB_NOTFOUND;
886                        goto err;
887                }
888                break;
889        case DB_NEXT_NODUP:
890                newopd = 1;
891                if (cp->pgno == PGNO_INVALID) {
892                        if ((ret = __bam_c_first(dbc)) != 0)
893                                goto err;
894                } else
895                        do {
896                                if ((ret = __bam_c_next(dbc, 1)) != 0)
897                                        goto err;
898                        } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
899                break;
900        case DB_PREV:
901                newopd = 1;
902                if (cp->pgno == PGNO_INVALID) {
903                        if ((ret = __bam_c_last(dbc)) != 0)
904                                goto err;
905                } else
906                        if ((ret = __bam_c_prev(dbc)) != 0)
907                                goto err;
908                break;
909        case DB_PREV_NODUP:
910                newopd = 1;
911                if (cp->pgno == PGNO_INVALID) {
912                        if ((ret = __bam_c_last(dbc)) != 0)
913                                goto err;
914                } else
915                        do {
916                                if ((ret = __bam_c_prev(dbc)) != 0)
917                                        goto err;
918                        } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
919                break;
920        case DB_SET:
921        case DB_SET_RECNO:
922                newopd = 1;
923                if ((ret = __bam_c_search(dbc, key, flags, &exact)) != 0)
924                        goto err;
925                break;
926        case DB_SET_RANGE:
927                newopd = 1;
928                if ((ret = __bam_c_search(dbc, key, flags, &exact)) != 0)
929                        goto err;
930
931                /*
932                 * As we didn't require an exact match, the search function
933                 * may have returned an entry past the end of the page.  Or,
934                 * we may be referencing a deleted record.  If so, move to
935                 * the next entry.
936                 */
937                if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc))
938                        if ((ret = __bam_c_next(dbc, 0)) != 0)
939                                goto err;
940                break;
941        default:
942                ret = __db_unknown_flag(dbp->dbenv, "__bam_c_get", flags);
943                goto err;
944        }
945
946        /*
947         * We may have moved to an off-page duplicate tree.  Return that
948         * information to our caller.
949         */
950        if (newopd && pgnop != NULL)
951                (void)__bam_isopd(dbc, pgnop);
952
953        /* Don't return the key, it was passed to us */
954        if (flags == DB_SET)
955                F_SET(key, DB_DBT_ISSET);
956
957err:    /*
958         * Regardless of whether we were successful or not, if the cursor
959         * moved, clear the delete flag, DBcursor->c_get never references
960         * a deleted key, if it moved at all.
961         */
962        if (F_ISSET(cp, C_DELETED)
963            && (cp->pgno != orig_pgno || cp->indx != orig_indx))
964                F_CLR(cp, C_DELETED);
965
966        return (ret);
967}
968
969/*
970 * __bam_getbothc --
971 *      Search for a matching data item on a join.
972 */
973static int
974__bam_getbothc(dbc, data)
975        DBC *dbc;
976        DBT *data;
977{
978        BTREE_CURSOR *cp;
979        DB *dbp;
980        int cmp, exact, ret;
981
982        dbp = dbc->dbp;
983        cp = (BTREE_CURSOR *)dbc->internal;
984
985        /*
986         * Acquire the current page.  We have at least a read-lock
987         * already.  The caller may have set DB_RMW asking for a
988         * write lock, but upgrading to a write lock has no better
989         * chance of succeeding now instead of later, so don't try.
990         */
991        if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
992                return (ret);
993
994        /*
995         * An off-page duplicate cursor.  Search the remaining duplicates
996         * for one which matches (do a normal btree search, then verify
997         * that the retrieved record is greater than the original one).
998         */
999        if (F_ISSET(dbc, DBC_OPD)) {
1000                /*
1001                 * Check to make sure the desired item comes strictly after
1002                 * the current position;  if it doesn't, return DB_NOTFOUND.
1003                 */
1004                if ((ret = __bam_cmp(dbp, data, cp->page, cp->indx,
1005                    dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare,
1006                    &cmp)) != 0)
1007                        return (ret);
1008
1009                if (cmp <= 0)
1010                        return (DB_NOTFOUND);
1011
1012                /* Discard the current page, we're going to do a full search. */
1013                if ((ret = memp_fput(dbp->mpf, cp->page, 0)) != 0)
1014                        return (ret);
1015                cp->page = NULL;
1016
1017                return (__bam_c_search(dbc, data, DB_GET_BOTH, &exact));
1018        }
1019
1020        /*
1021         * We're doing a DBC->c_get(DB_GET_BOTHC) and we're already searching
1022         * a set of on-page duplicates (either sorted or unsorted).  Continue
1023         * a linear search from after the current position.
1024         *
1025         * (Note that we could have just finished a "set" of one duplicate,
1026         * i.e. not a duplicate at all, but the following check will always
1027         * return DB_NOTFOUND in this case, which is the desired behavior.)
1028         */
1029        if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1030            !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1031                return (DB_NOTFOUND);
1032        cp->indx += P_INDX;
1033
1034        return (__bam_getboth_finddatum(dbc, data));
1035}
1036
1037/*
1038 * __bam_getboth_finddatum --
1039 *      Find a matching on-page data item.
1040 */
1041static int
1042__bam_getboth_finddatum(dbc, data)
1043        DBC *dbc;
1044        DBT *data;
1045{
1046        BTREE_CURSOR *cp;
1047        DB *dbp;
1048        db_indx_t base, lim, top;
1049        int cmp, ret;
1050
1051        dbp = dbc->dbp;
1052        cp = (BTREE_CURSOR *)dbc->internal;
1053
1054        /*
1055         * Called (sometimes indirectly) from DBC->get to search on-page data
1056         * item(s) for a matching value.  If the original flag was DB_GET_BOTH,
1057         * the cursor argument is set to the first data item for the key.  If
1058         * the original flag was DB_GET_BOTHC, the cursor argument is set to
1059         * the first data item that we can potentially return.  In both cases,
1060         * there may or may not be additional duplicate data items to search.
1061         *
1062         * If the duplicates are not sorted, do a linear search.
1063         *
1064         * If the duplicates are sorted, do a binary search.  The reason for
1065         * this is that large pages and small key/data pairs result in large
1066         * numbers of on-page duplicates before they get pushed off-page.
1067         */
1068        if (dbp->dup_compare == NULL) {
1069                for (;; cp->indx += P_INDX) {
1070                        if (!IS_CUR_DELETED(dbc) &&
1071                            (ret = __bam_cmp(dbp, data, cp->page,
1072                            cp->indx + O_INDX, __bam_defcmp, &cmp)) != 0)
1073                                return (ret);
1074                        if (cmp == 0)
1075                                return (0);
1076
1077                        if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1078                            !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1079                                break;
1080                }
1081        } else {
1082                /*
1083                 * Find the top and bottom of the duplicate set.  Binary search
1084                 * requires at least two items, don't loop if there's only one.
1085                 */
1086                for (base = top = cp->indx;
1087                    top < NUM_ENT(cp->page); top += P_INDX)
1088                        if (!IS_DUPLICATE(dbc, cp->indx, top))
1089                                break;
1090                if (base == (top - P_INDX)) {
1091                        if  ((ret = __bam_cmp(dbp, data,
1092                            cp->page, cp->indx + O_INDX,
1093                            dbp->dup_compare, &cmp)) != 0)
1094                               return (ret);
1095                        return (cmp == 0 ? 0 : DB_NOTFOUND);
1096                }
1097
1098                for (lim =
1099                    (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) {
1100                        cp->indx = base + ((lim >> 1) * P_INDX);
1101                        if ((ret = __bam_cmp(dbp, data, cp->page,
1102                            cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1103                                return (ret);
1104                        if (cmp == 0) {
1105                                if (!IS_CUR_DELETED(dbc))
1106                                        return (0);
1107                                break;
1108                        }
1109                        if (cmp > 0) {
1110                                base = cp->indx + P_INDX;
1111                                --lim;
1112                        }
1113                }
1114        }
1115        return (DB_NOTFOUND);
1116}
1117
1118/*
1119 * __bam_c_put --
1120 *      Put using a cursor.
1121 */
1122static int
1123__bam_c_put(dbc, key, data, flags, pgnop)
1124        DBC *dbc;
1125        DBT *key, *data;
1126        u_int32_t flags;
1127        db_pgno_t *pgnop;
1128{
1129        BTREE_CURSOR *cp;
1130        DB *dbp;
1131        DBT dbt;
1132        u_int32_t iiop;
1133        int cmp, exact, needkey, ret, stack;
1134        void *arg;
1135
1136        dbp = dbc->dbp;
1137        cp = (BTREE_CURSOR *)dbc->internal;
1138
1139split:  needkey = ret = stack = 0;
1140        switch (flags) {
1141        case DB_AFTER:
1142        case DB_BEFORE:
1143        case DB_CURRENT:
1144                needkey = 1;
1145                iiop = flags;
1146
1147                /*
1148                 * If the tree has record numbers (and we're not just replacing
1149                 * an existing record), we need a complete stack so that we can
1150                 * adjust the record counts.
1151                 */
1152                if (F_ISSET(cp, C_RECNUM) &&
1153                    (flags != DB_CURRENT || F_ISSET(cp, C_DELETED))) {
1154                        if ((ret = __bam_c_getstack(dbc)) != 0)
1155                                goto err;
1156                        stack = 1;
1157                } else {
1158                        /* Acquire the current page with a write lock. */
1159                        ACQUIRE_WRITE_LOCK(dbc, ret);
1160                        if (ret != 0)
1161                                goto err;
1162                        if ((ret = memp_fget(
1163                            dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
1164                                goto err;
1165                }
1166                break;
1167        case DB_KEYFIRST:
1168        case DB_KEYLAST:
1169        case DB_NODUPDATA:
1170                /*
1171                 * Searching off-page, sorted duplicate tree: do a tree search
1172                 * for the correct item; __bam_c_search returns the smallest
1173                 * slot greater than the key, use it.
1174                 */
1175                if (F_ISSET(dbc, DBC_OPD)) {
1176                        if ((ret =
1177                            __bam_c_search(dbc, data, flags, &exact)) != 0)
1178                                goto err;
1179                        stack = 1;
1180
1181                        /* Disallow "sorted" duplicate duplicates. */
1182                        if (exact) {
1183                                ret = __db_duperr(dbp, flags);
1184                                goto err;
1185                        }
1186                        iiop = DB_BEFORE;
1187                        break;
1188                }
1189
1190                /* Searching a btree. */
1191                if ((ret = __bam_c_search(dbc, key,
1192                    flags == DB_KEYFIRST || dbp->dup_compare != NULL ?
1193                    DB_KEYFIRST : DB_KEYLAST, &exact)) != 0)
1194                        goto err;
1195                stack = 1;
1196
1197                /*
1198                 * If we don't have an exact match, __bam_c_search returned
1199                 * the smallest slot greater than the key, use it.
1200                 */
1201                if (!exact) {
1202                        iiop = DB_KEYFIRST;
1203                        break;
1204                }
1205
1206                /*
1207                 * If duplicates aren't supported, replace the current item.
1208                 * (If implementing the DB->put function, our caller already
1209                 * checked the DB_NOOVERWRITE flag.)
1210                 */
1211                if (!F_ISSET(dbp, DB_AM_DUP)) {
1212                        iiop = DB_CURRENT;
1213                        break;
1214                }
1215
1216                /*
1217                 * If we find a matching entry, it may be an off-page duplicate
1218                 * tree.  Return the page number to our caller, we need a new
1219                 * cursor.
1220                 */
1221                if (pgnop != NULL && __bam_isopd(dbc, pgnop))
1222                        goto done;
1223
1224                /* If the duplicates aren't sorted, move to the right slot. */
1225                if (dbp->dup_compare == NULL) {
1226                        if (flags == DB_KEYFIRST)
1227                                iiop = DB_BEFORE;
1228                        else
1229                                for (;; cp->indx += P_INDX)
1230                                        if (cp->indx + P_INDX >=
1231                                            NUM_ENT(cp->page) ||
1232                                            !IS_DUPLICATE(dbc, cp->indx,
1233                                            cp->indx + P_INDX)) {
1234                                                iiop = DB_AFTER;
1235                                                break;
1236                                        }
1237                        break;
1238                }
1239
1240                /*
1241                 * We know that we're looking at the first of a set of sorted
1242                 * on-page duplicates.  Walk the list to find the right slot.
1243                 */
1244                for (;; cp->indx += P_INDX) {
1245                        if ((ret = __bam_cmp(dbp, data, cp->page,
1246                            cp->indx + O_INDX, dbp->dup_compare, &cmp)) !=0)
1247                                return (ret);
1248                        if (cmp < 0) {
1249                                iiop = DB_BEFORE;
1250                                break;
1251                        }
1252
1253                        /* Disallow "sorted" duplicate duplicates. */
1254                        if (cmp == 0) {
1255                                if (IS_DELETED(cp->page, cp->indx)) {
1256                                        iiop = DB_CURRENT;
1257                                        break;
1258                                }
1259                                ret = __db_duperr(dbp, flags);
1260                                goto err;
1261                        }
1262
1263                        if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1264                            ((PAGE *)cp->page)->inp[cp->indx] !=
1265                            ((PAGE *)cp->page)->inp[cp->indx + P_INDX]) {
1266                                iiop = DB_AFTER;
1267                                break;
1268                        }
1269                }
1270                break;
1271        default:
1272                ret = __db_unknown_flag(dbp->dbenv, "__bam_c_put", flags);
1273                goto err;
1274        }
1275
1276        switch (ret = __bam_iitem(dbc, key, data, iiop, 0)) {
1277        case 0:
1278                break;
1279        case DB_NEEDSPLIT:
1280                /*
1281                 * To split, we need a key for the page.  Either use the key
1282                 * argument or get a copy of the key from the page.
1283                 */
1284                if (flags == DB_AFTER ||
1285                    flags == DB_BEFORE || flags == DB_CURRENT) {
1286                        memset(&dbt, 0, sizeof(DBT));
1287                        if ((ret = __db_ret(dbp, cp->page, 0, &dbt,
1288                            &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
1289                                goto err;
1290                        arg = &dbt;
1291                } else
1292                        arg = F_ISSET(dbc, DBC_OPD) ? data : key;
1293
1294                /*
1295                 * Discard any locks and pinned pages (the locks are discarded
1296                 * even if we're running with transactions, as they lock pages
1297                 * that we're sorry we ever acquired).  If stack is set and the
1298                 * cursor entries are valid, they point to the same entries as
1299                 * the stack, don't free them twice.
1300                 */
1301                if (stack)
1302                        ret = __bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK);
1303                else
1304                        DISCARD_CUR(dbc, ret);
1305                if (ret != 0)
1306                        goto err;
1307
1308                /* Split the tree. */
1309                if ((ret = __bam_split(dbc, arg)) != 0)
1310                        return (ret);
1311
1312                goto split;
1313        default:
1314                goto err;
1315        }
1316
1317err:
1318done:   /*
1319         * Discard any pages pinned in the tree and their locks, except for
1320         * the leaf page.  Note, the leaf page participated in any stack we
1321         * acquired, and so we have to adjust the stack as necessary.  If
1322         * there was only a single page on the stack, we don't have to free
1323         * further stack pages.
1324         */
1325        if (stack && BT_STK_POP(cp) != NULL)
1326                (void)__bam_stkrel(dbc, 0);
1327
1328        /*
1329         * Regardless of whether we were successful or not, clear the delete
1330         * flag.  If we're successful, we either moved the cursor or the item
1331         * is no longer deleted.  If we're not successful, then we're just a
1332         * copy, no need to have the flag set.
1333         */
1334        F_CLR(cp, C_DELETED);
1335
1336        return (ret);
1337}
1338
1339/*
1340 * __bam_c_rget --
1341 *      Return the record number for a cursor.
1342 *
1343 * PUBLIC: int __bam_c_rget __P((DBC *, DBT *, u_int32_t));
1344 */
1345int
1346__bam_c_rget(dbc, data, flags)
1347        DBC *dbc;
1348        DBT *data;
1349        u_int32_t flags;
1350{
1351        BTREE_CURSOR *cp;
1352        DB *dbp;
1353        DBT dbt;
1354        db_recno_t recno;
1355        int exact, ret;
1356
1357        COMPQUIET(flags, 0);
1358        dbp = dbc->dbp;
1359        cp = (BTREE_CURSOR *)dbc->internal;
1360
1361        /*
1362         * Get the page with the current item on it.
1363         * Get a copy of the key.
1364         * Release the page, making sure we don't release it twice.
1365         */
1366        if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
1367                return (ret);
1368        memset(&dbt, 0, sizeof(DBT));
1369        if ((ret = __db_ret(dbp, cp->page,
1370            cp->indx, &dbt, &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
1371                goto err;
1372        ret = memp_fput(dbp->mpf, cp->page, 0);
1373        cp->page = NULL;
1374        if (ret != 0)
1375                return (ret);
1376
1377        if ((ret = __bam_search(dbc, &dbt,
1378            F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND,
1379            1, &recno, &exact)) != 0)
1380                goto err;
1381
1382        ret = __db_retcopy(dbp, data,
1383            &recno, sizeof(recno), &dbc->rdata.data, &dbc->rdata.ulen);
1384
1385        /* Release the stack. */
1386err:    __bam_stkrel(dbc, 0);
1387
1388        return (ret);
1389}
1390
1391/*
1392 * __bam_c_writelock --
1393 *      Upgrade the cursor to a write lock.
1394 */
1395static int
1396__bam_c_writelock(dbc)
1397        DBC *dbc;
1398{
1399        BTREE_CURSOR *cp;
1400        int ret;
1401
1402        cp = (BTREE_CURSOR *)dbc->internal;
1403
1404        if (cp->lock_mode == DB_LOCK_WRITE)
1405                return (0);
1406
1407        /*
1408         * When writing to an off-page duplicate tree, we need to have the
1409         * appropriate page in the primary tree locked.  The general DBC
1410         * code calls us first with the primary cursor so we can acquire the
1411         * appropriate lock.
1412         */
1413        ACQUIRE_WRITE_LOCK(dbc, ret);
1414        return (ret);
1415}
1416
1417/*
1418 * __bam_c_first --
1419 *      Return the first record.
1420 */
1421static int
1422__bam_c_first(dbc)
1423        DBC *dbc;
1424{
1425        BTREE_CURSOR *cp;
1426        DB *dbp;
1427        db_pgno_t pgno;
1428        int ret;
1429
1430        dbp = dbc->dbp;
1431        cp = (BTREE_CURSOR *)dbc->internal;
1432        ret = 0;
1433
1434        /* Walk down the left-hand side of the tree. */
1435        for (pgno = cp->root;;) {
1436                ACQUIRE_CUR_SET(dbc, DB_LOCK_READ, pgno, ret);
1437                if (ret != 0)
1438                        return (ret);
1439
1440                /* If we find a leaf page, we're done. */
1441                if (ISLEAF(cp->page))
1442                        break;
1443
1444                pgno = GET_BINTERNAL(cp->page, 0)->pgno;
1445        }
1446
1447        /* If we want a write lock instead of a read lock, get it now. */
1448        if (F_ISSET(dbc, DBC_RMW)) {
1449                ACQUIRE_WRITE_LOCK(dbc, ret);
1450                if (ret != 0)
1451                        return (ret);
1452        }
1453
1454        /* If on an empty page or a deleted record, move to the next one. */
1455        if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))
1456                if ((ret = __bam_c_next(dbc, 0)) != 0)
1457                        return (ret);
1458
1459        return (0);
1460}
1461
1462/*
1463 * __bam_c_last --
1464 *      Return the last record.
1465 */
1466static int
1467__bam_c_last(dbc)
1468        DBC *dbc;
1469{
1470        BTREE_CURSOR *cp;
1471        DB *dbp;
1472        db_pgno_t pgno;
1473        int ret;
1474
1475        dbp = dbc->dbp;
1476        cp = (BTREE_CURSOR *)dbc->internal;
1477        ret = 0;
1478
1479        /* Walk down the right-hand side of the tree. */
1480        for (pgno = cp->root;;) {
1481                ACQUIRE_CUR_SET(dbc, DB_LOCK_READ, pgno, ret);
1482                if (ret != 0)
1483                        return (ret);
1484
1485                /* If we find a leaf page, we're done. */
1486                if (ISLEAF(cp->page))
1487                        break;
1488
1489                pgno =
1490                    GET_BINTERNAL(cp->page, NUM_ENT(cp->page) - O_INDX)->pgno;
1491        }
1492
1493        /* If we want a write lock instead of a read lock, get it now. */
1494        if (F_ISSET(dbc, DBC_RMW)) {
1495                ACQUIRE_WRITE_LOCK(dbc, ret);
1496                if (ret != 0)
1497                        return (ret);
1498        }
1499
1500        cp->indx = NUM_ENT(cp->page) == 0 ? 0 :
1501            NUM_ENT(cp->page) -
1502            (TYPE(cp->page) == P_LBTREE ? P_INDX : O_INDX);
1503
1504        /* If on an empty page or a deleted record, move to the previous one. */
1505        if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))
1506                if ((ret = __bam_c_prev(dbc)) != 0)
1507                        return (ret);
1508
1509        return (0);
1510}
1511
1512/*
1513 * __bam_c_next --
1514 *      Move to the next record.
1515 */
1516static int
1517__bam_c_next(dbc, initial_move)
1518        DBC *dbc;
1519        int initial_move;
1520{
1521        BTREE_CURSOR *cp;
1522        DB *dbp;
1523        db_indx_t adjust;
1524        db_lockmode_t lock_mode;
1525        db_pgno_t pgno;
1526        int ret;
1527
1528        dbp = dbc->dbp;
1529        cp = (BTREE_CURSOR *)dbc->internal;
1530        ret = 0;
1531
1532        /*
1533         * We're either moving through a page of duplicates or a btree leaf
1534         * page.
1535         *
1536         * !!!
1537         * This code handles empty pages and pages with only deleted entries.
1538         */
1539        if (F_ISSET(dbc, DBC_OPD)) {
1540                adjust = O_INDX;
1541                lock_mode = DB_LOCK_NG;
1542        } else {
1543                adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
1544                lock_mode =
1545                    F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
1546        }
1547        if (cp->page == NULL) {
1548                ACQUIRE_CUR(dbc, lock_mode, ret);
1549                if (ret != 0)
1550                        return (ret);
1551        }
1552
1553        if (initial_move)
1554                cp->indx += adjust;
1555
1556        for (;;) {
1557                /*
1558                 * If at the end of the page, move to a subsequent page.
1559                 *
1560                 * !!!
1561                 * Check for >= NUM_ENT.  If the original search landed us on
1562                 * NUM_ENT, we may have incremented indx before the test.
1563                 */
1564                if (cp->indx >= NUM_ENT(cp->page)) {
1565                        if ((pgno
1566                            = NEXT_PGNO(cp->page)) == PGNO_INVALID)
1567                                return (DB_NOTFOUND);
1568
1569                        ACQUIRE_CUR_SET(dbc, lock_mode, pgno, ret);
1570                        if (ret != 0)
1571                                return (ret);
1572                        continue;
1573                }
1574                if (IS_CUR_DELETED(dbc)) {
1575                        cp->indx += adjust;
1576                        continue;
1577                }
1578                break;
1579        }
1580        return (0);
1581}
1582
1583/*
1584 * __bam_c_prev --
1585 *      Move to the previous record.
1586 */
1587static int
1588__bam_c_prev(dbc)
1589        DBC *dbc;
1590{
1591        BTREE_CURSOR *cp;
1592        DB *dbp;
1593        db_indx_t adjust;
1594        db_lockmode_t lock_mode;
1595        db_pgno_t pgno;
1596        int ret;
1597
1598        dbp = dbc->dbp;
1599        cp = (BTREE_CURSOR *)dbc->internal;
1600        ret = 0;
1601
1602        /*
1603         * We're either moving through a page of duplicates or a btree leaf
1604         * page.
1605         *
1606         * !!!
1607         * This code handles empty pages and pages with only deleted entries.
1608         */
1609        if (F_ISSET(dbc, DBC_OPD)) {
1610                adjust = O_INDX;
1611                lock_mode = DB_LOCK_NG;
1612        } else {
1613                adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
1614                lock_mode =
1615                    F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
1616        }
1617        if (cp->page == NULL) {
1618                ACQUIRE_CUR(dbc, lock_mode, ret);
1619                if (ret != 0)
1620                        return (ret);
1621        }
1622
1623        for (;;) {
1624                /* If at the beginning of the page, move to a previous one. */
1625                if (cp->indx == 0) {
1626                        if ((pgno =
1627                            PREV_PGNO(cp->page)) == PGNO_INVALID)
1628                                return (DB_NOTFOUND);
1629
1630                        ACQUIRE_CUR_SET(dbc, lock_mode, pgno, ret);
1631                        if (ret != 0)
1632                                return (ret);
1633
1634                        if ((cp->indx = NUM_ENT(cp->page)) == 0)
1635                                continue;
1636                }
1637
1638                /* Ignore deleted records. */
1639                cp->indx -= adjust;
1640                if (IS_CUR_DELETED(dbc))
1641                        continue;
1642
1643                break;
1644        }
1645        return (0);
1646}
1647
1648/*
1649 * __bam_c_search --
1650 *      Move to a specified record.
1651 */
1652static int
1653__bam_c_search(dbc, key, flags, exactp)
1654        DBC *dbc;
1655        const DBT *key;
1656        u_int32_t flags;
1657        int *exactp;
1658{
1659        BTREE *t;
1660        BTREE_CURSOR *cp;
1661        DB *dbp;
1662        PAGE *h;
1663        db_indx_t indx;
1664        db_pgno_t bt_lpgno;
1665        db_recno_t recno;
1666        u_int32_t sflags;
1667        int cmp, ret;
1668
1669        dbp = dbc->dbp;
1670        cp = (BTREE_CURSOR *)dbc->internal;
1671        t = dbp->bt_internal;
1672        ret = 0;
1673
1674        /*
1675         * Find an entry in the database.  Discard any lock we currently hold,
1676         * we're going to search the tree.
1677         */
1678        DISCARD_CUR(dbc, ret);
1679        if (ret != 0)
1680                return (ret);
1681
1682        switch (flags) {
1683        case DB_SET_RECNO:
1684                if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0)
1685                        return (ret);
1686                sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
1687                if ((ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0)
1688                        return (ret);
1689                break;
1690        case DB_SET:
1691        case DB_GET_BOTH:
1692                sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
1693                goto search;
1694        case DB_SET_RANGE:
1695                sflags =
1696                    (F_ISSET(dbc, DBC_RMW) ? S_WRITE : S_READ) | S_DUPFIRST;
1697                goto search;
1698        case DB_KEYFIRST:
1699                sflags = S_KEYFIRST;
1700                goto fast_search;
1701        case DB_KEYLAST:
1702        case DB_NODUPDATA:
1703                sflags = S_KEYLAST;
1704fast_search:    /*
1705                 * If the application has a history of inserting into the first
1706                 * or last pages of the database, we check those pages first to
1707                 * avoid doing a full search.
1708                 *
1709                 * If the tree has record numbers, we need a complete stack so
1710                 * that we can adjust the record counts, so fast_search isn't
1711                 * possible.
1712                 */
1713                if (F_ISSET(cp, C_RECNUM))
1714                        goto search;
1715
1716                /*
1717                 * !!!
1718                 * We do not mutex protect the t->bt_lpgno field, which means
1719                 * that it can only be used in an advisory manner.  If we find
1720                 * page we can use, great.  If we don't, we don't care, we do
1721                 * it the slow way instead.  Regardless, copy it into a local
1722                 * variable, otherwise we might acquire a lock for a page and
1723                 * then read a different page because it changed underfoot.
1724                 */
1725                bt_lpgno = t->bt_lpgno;
1726
1727                /*
1728                 * If the tree has no history of insertion, do it the slow way.
1729                 */
1730                if (bt_lpgno == PGNO_INVALID)
1731                        goto search;
1732
1733                /* Lock and retrieve the page on which we last inserted. */
1734                h = NULL;
1735                ACQUIRE(dbc,
1736                    DB_LOCK_WRITE, bt_lpgno, cp->lock, bt_lpgno, h, ret);
1737                if (ret != 0)
1738                        goto fast_miss;
1739
1740                /*
1741                 * It's okay if the page type isn't right or it's empty, it
1742                 * just means that the world changed.
1743                 */
1744                if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0)
1745                        goto fast_miss;
1746
1747                /*
1748                 * What we do here is test to see if we're at the beginning or
1749                 * end of the tree and if the new item sorts before/after the
1750                 * first/last page entry.  We don't try and catch inserts into
1751                 * the middle of the tree (although we could, as long as there
1752                 * were two keys on the page and we saved both the index and
1753                 * the page number of the last insert).
1754                 */
1755                if (h->next_pgno == PGNO_INVALID) {
1756                        indx = NUM_ENT(h) - P_INDX;
1757                        if ((ret = __bam_cmp(dbp,
1758                            key, h, indx, t->bt_compare, &cmp)) != 0)
1759                                return (ret);
1760
1761                        if (cmp < 0)
1762                                goto try_begin;
1763                        if (cmp > 0) {
1764                                indx += P_INDX;
1765                                goto fast_hit;
1766                        }
1767
1768                        /*
1769                         * Found a duplicate.  If doing DB_KEYLAST, we're at
1770                         * the correct position, otherwise, move to the first
1771                         * of the duplicates.  If we're looking at off-page
1772                         * duplicates, duplicate duplicates aren't permitted,
1773                         * so we're done.
1774                         */
1775                        if (flags == DB_KEYLAST)
1776                                goto fast_hit;
1777                        for (;
1778                            indx > 0 && h->inp[indx - P_INDX] == h->inp[indx];
1779                            indx -= P_INDX)
1780                                ;
1781                        goto fast_hit;
1782                }
1783try_begin:      if (h->prev_pgno == PGNO_INVALID) {
1784                        indx = 0;
1785                        if ((ret = __bam_cmp(dbp,
1786                            key, h, indx, t->bt_compare, &cmp)) != 0)
1787                                return (ret);
1788
1789                        if (cmp > 0)
1790                                goto fast_miss;
1791                        if (cmp < 0)
1792                                goto fast_hit;
1793
1794                        /*
1795                         * Found a duplicate.  If doing DB_KEYFIRST, we're at
1796                         * the correct position, otherwise, move to the last
1797                         * of the duplicates.  If we're looking at off-page
1798                         * duplicates, duplicate duplicates aren't permitted,
1799                         * so we're done.
1800                         */
1801                        if (flags == DB_KEYFIRST)
1802                                goto fast_hit;
1803                        for (;
1804                            indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
1805                            h->inp[indx] == h->inp[indx + P_INDX];
1806                            indx += P_INDX)
1807                                ;
1808                        goto fast_hit;
1809                }
1810                goto fast_miss;
1811
1812fast_hit:       /* Set the exact match flag, we may have found a duplicate. */
1813                *exactp = cmp == 0;
1814
1815                /*
1816                 * Insert the entry in the stack.  (Our caller is likely to
1817                 * call __bam_stkrel() after our return.)
1818                 */
1819                BT_STK_CLR(cp);
1820                BT_STK_ENTER(dbp->dbenv,
1821                    cp, h, indx, cp->lock, cp->lock_mode, ret);
1822                if (ret != 0)
1823                        return (ret);
1824                break;
1825
1826fast_miss:      /*
1827                 * This was not the right page, so we do not need to retain
1828                 * the lock even in the presence of transactions.
1829                 */
1830                DISCARD(dbc, 1, cp->lock, h, ret);
1831                if (ret != 0)
1832                        return (ret);
1833
1834search:         if ((ret =
1835                    __bam_search(dbc, key, sflags, 1, NULL, exactp)) != 0)
1836                        return (ret);
1837                break;
1838        default:
1839                return (__db_unknown_flag(dbp->dbenv, "__bam_c_search", flags));
1840        }
1841
1842        /* Initialize the cursor from the stack. */
1843        cp->page = cp->csp->page;
1844        cp->pgno = cp->csp->page->pgno;
1845        cp->indx = cp->csp->indx;
1846        cp->lock = cp->csp->lock;
1847        cp->lock_mode = cp->csp->lock_mode;
1848
1849        /*
1850         * If we inserted a key into the first or last slot of the tree,
1851         * remember where it was so we can do it more quickly next time.
1852         */
1853        if (TYPE(cp->page) == P_LBTREE &&
1854            (flags == DB_KEYFIRST || flags == DB_KEYLAST))
1855                t->bt_lpgno =
1856                    (NEXT_PGNO(cp->page) == PGNO_INVALID &&
1857                    cp->indx >= NUM_ENT(cp->page)) ||
1858                    (PREV_PGNO(cp->page) == PGNO_INVALID &&
1859                    cp->indx == 0) ? cp->pgno : PGNO_INVALID;
1860        return (0);
1861}
1862
1863/*
1864 * __bam_c_physdel --
1865 *      Physically remove an item from the page.
1866 */
1867static int
1868__bam_c_physdel(dbc)
1869        DBC *dbc;
1870{
1871        BTREE_CURSOR *cp;
1872        DB *dbp;
1873        DBT key;
1874        DB_LOCK lock;
1875        PAGE *h;
1876        db_pgno_t pgno;
1877        int delete_page, empty_page, exact, level, ret;
1878
1879        dbp = dbc->dbp;
1880        cp = (BTREE_CURSOR *)dbc->internal;
1881        delete_page = empty_page = ret = 0;
1882
1883        /* If the page is going to be emptied, consider deleting it. */
1884        delete_page = empty_page =
1885            NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1);
1886
1887        /*
1888         * Check if the application turned off reverse splits.  Applications
1889         * can't turn off reverse splits in off-page duplicate trees, that
1890         * space will never be reused unless the exact same key is specified.
1891         */
1892        if (delete_page &&
1893            !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_BT_REVSPLIT))
1894                delete_page = 0;
1895
1896        /*
1897         * We never delete the last leaf page.  (Not really true -- we delete
1898         * the last leaf page of off-page duplicate trees, but that's handled
1899         * by our caller, not down here.)
1900         */
1901        if (delete_page && cp->pgno == cp->root)
1902                delete_page = 0;
1903
1904        /*
1905         * To delete a leaf page other than an empty root page, we need a
1906         * copy of a key from the page.  Use the 0th page index since it's
1907         * the last key the page held.
1908         */
1909        if (delete_page) {
1910                memset(&key, 0, sizeof(DBT));
1911                if ((ret = __db_ret(dbp, cp->page,
1912                    0, &key, &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
1913                        return (ret);
1914        }
1915
1916        /*
1917         * Delete the items.  If page isn't empty, we adjust the cursors.
1918         *
1919         * !!!
1920         * The following operations to delete a page may deadlock.  The easy
1921         * scenario is if we're deleting an item because we're closing cursors
1922         * because we've already deadlocked and want to call txn_abort().  If
1923         * we fail due to deadlock, we'll leave a locked, possibly empty page
1924         * in the tree, which won't be empty long because we'll undo the delete
1925         * when we undo the transaction's modifications.
1926         *
1927         * !!!
1928         * Delete the key item first, otherwise the on-page duplicate checks
1929         * in __bam_ditem() won't work!
1930         */
1931        if (TYPE(cp->page) == P_LBTREE) {
1932                if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
1933                        return (ret);
1934                if (!empty_page)
1935                        if ((ret = __bam_ca_di(dbc,
1936                            PGNO(cp->page), cp->indx, -1)) != 0)
1937                                return (ret);
1938        }
1939        if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
1940                return (ret);
1941        if (!empty_page)
1942                if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0)
1943                        return (ret);
1944
1945        /* If we're not going to try and delete the page, we're done. */
1946        if (!delete_page)
1947                return (0);
1948
1949        /*
1950         * Call __bam_search to reacquire the empty leaf page, but this time
1951         * get both the leaf page and it's parent, locked.  Jump back up the
1952         * tree, until we have the top pair of pages that we want to delete.
1953         * Once we have the top page that we want to delete locked, lock the
1954         * underlying pages and check to make sure they're still empty.  If
1955         * they are, delete them.
1956         */
1957        for (level = LEAFLEVEL;; ++level) {
1958                /* Acquire a page and its parent, locked. */
1959                if ((ret = __bam_search(
1960                    dbc, &key, S_WRPAIR, level, NULL, &exact)) != 0)
1961                        return (ret);
1962
1963                /*
1964                 * If we reach the root or the parent page isn't going to be
1965                 * empty when we delete one record, stop.
1966                 */
1967                h = cp->csp[-1].page;
1968                if (h->pgno == cp->root || NUM_ENT(h) != 1)
1969                        break;
1970
1971                /* Discard the stack, retaining no locks. */
1972                (void)__bam_stkrel(dbc, STK_NOLOCK);
1973        }
1974
1975        /*
1976         * Move the stack pointer one after the last entry, we may be about
1977         * to push more items onto the page stack.
1978         */
1979        ++cp->csp;
1980
1981        /*
1982         * cp->csp[-2].page is now the parent page, which we may or may not be
1983         * going to delete, and cp->csp[-1].page is the first page we know we
1984         * are going to delete.  Walk down the chain of pages, acquiring pages
1985         * until we've acquired a leaf page.  Generally, this shouldn't happen;
1986         * we should only see a single internal page with one item and a single
1987         * leaf page with no items.  The scenario where we could see something
1988         * else is if reverse splits were turned off for awhile and then turned
1989         * back on.  That could result in all sorts of strangeness, e.g., empty
1990         * pages in the tree, trees that looked like linked lists, and so on.
1991         *
1992         * !!!
1993         * Sheer paranoia: if we find any pages that aren't going to be emptied
1994         * by the delete, someone else added an item while we were walking the
1995         * tree, and we discontinue the delete.  Shouldn't be possible, but we
1996         * check regardless.
1997         */
1998        for (h = cp->csp[-1].page;;) {
1999                if (ISLEAF(h)) {
2000                        if (NUM_ENT(h) != 0)
2001                                break;
2002                        break;
2003                } else
2004                        if (NUM_ENT(h) != 1)
2005                                break;
2006
2007                /*
2008                 * Get the next page, write lock it and push it onto the stack.
2009                 * We know it's index 0, because it can only have one element.
2010                 */
2011                switch (TYPE(h)) {
2012                case P_IBTREE:
2013                        pgno = GET_BINTERNAL(h, 0)->pgno;
2014                        break;
2015                case P_IRECNO:
2016                        pgno = GET_RINTERNAL(h, 0)->pgno;
2017                        break;
2018                default:
2019                        return (__db_pgfmt(dbp, PGNO(h)));
2020                }
2021
2022                if ((ret =
2023                    __db_lget(dbc, 0, pgno, DB_LOCK_WRITE, 0, &lock)) != 0)
2024                        break;
2025                if ((ret = memp_fget(dbp->mpf, &pgno, 0, &h)) != 0)
2026                        break;
2027                BT_STK_PUSH(dbp->dbenv, cp, h, 0, lock, DB_LOCK_WRITE, ret);
2028                if (ret != 0)
2029                        break;
2030        }
2031
2032        /* Adjust the cursor stack to reference the last page on the stack. */
2033        BT_STK_POP(cp);
2034
2035        /*
2036         * If everything worked, delete the stack, otherwise, release the
2037         * stack and page locks without further damage.
2038         */
2039        if (ret == 0)
2040                ret = __bam_dpages(dbc, cp->sp);
2041        else
2042                (void)__bam_stkrel(dbc, 0);
2043
2044        return (ret);
2045}
2046
2047/*
2048 * __bam_c_getstack --
2049 *      Acquire a full stack for a cursor.
2050 */
2051static int
2052__bam_c_getstack(dbc)
2053        DBC *dbc;
2054{
2055        BTREE_CURSOR *cp;
2056        DB *dbp;
2057        DBT dbt;
2058        PAGE *h;
2059        int exact, ret, t_ret;
2060
2061        dbp = dbc->dbp;
2062        cp = (BTREE_CURSOR *)dbc->internal;
2063
2064        /*
2065         * Get the page with the current item on it.  The caller of this
2066         * routine has to already hold a read lock on the page, so there
2067         * is no additional lock to acquire.
2068         */
2069        if ((ret = memp_fget(dbp->mpf, &cp->pgno, 0, &h)) != 0)
2070                return (ret);
2071
2072        /* Get a copy of a key from the page. */
2073        memset(&dbt, 0, sizeof(DBT));
2074        if ((ret = __db_ret(dbp,
2075            h, 0, &dbt, &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
2076                goto err;
2077
2078        /* Get a write-locked stack for the page. */
2079        exact = 0;
2080        ret = __bam_search(dbc, &dbt, S_KEYFIRST, 1, NULL, &exact);
2081
2082err:    /* Discard the key and the page. */
2083        if ((t_ret = memp_fput(dbp->mpf, h, 0)) != 0 && ret == 0)
2084                ret = t_ret;
2085
2086        if (ret == 0) {
2087                /*
2088                 * Initialize the cursor from the stack.  We don't take the
2089                 * page number or page index.  The former is unchanged, but
2090                 * the latter may have been explicitly set by our caller and
2091                 * we can't change it.
2092                 */
2093                cp->page = cp->csp->page;
2094                cp->lock = cp->csp->lock;
2095                cp->lock_mode = cp->csp->lock_mode;
2096        }
2097
2098        return (ret);
2099}
2100
2101/*
2102 * __bam_isopd --
2103 *      Return if the cursor references an off-page duplicate tree via its
2104 *      page number.
2105 */
2106static int
2107__bam_isopd(dbc, pgnop)
2108        DBC *dbc;
2109        db_pgno_t *pgnop;
2110{
2111        BOVERFLOW *bo;
2112
2113        if (TYPE(dbc->internal->page) != P_LBTREE)
2114                return (0);
2115
2116        bo = GET_BOVERFLOW(dbc->internal->page, dbc->internal->indx + O_INDX);
2117        if (B_TYPE(bo->type) == B_DUPLICATE) {
2118                *pgnop = bo->pgno;
2119                return (1);
2120        }
2121        return (0);
2122}
Note: See TracBrowser for help on using the repository browser.