source: trunk/third/rpm/db/btree/bt_cursor.c @ 19079

Revision 19079, 71.6 KB checked in by ghudson, 22 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r19078, which included commits to RCS files with non-trunk default branches.
Line 
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2002
5 *      Sleepycat Software.  All rights reserved.
6 */
7
8#include "db_config.h"
9
10#ifndef lint
11static const char revid[] = "Id: bt_cursor.c,v 11.147 2002/08/13 20:46:07 ubell Exp ";
12#endif /* not lint */
13
14#ifndef NO_SYSTEM_INCLUDES
15#include <sys/types.h>
16
17#include <string.h>
18#endif
19
20#include "db_int.h"
21#include "dbinc/db_page.h"
22#include "dbinc/db_shash.h"
23#include "dbinc/btree.h"
24#include "dbinc/lock.h"
25
26static int  __bam_bulk __P((DBC *, DBT *, u_int32_t));
27static int  __bam_c_close __P((DBC *, db_pgno_t, int *));
28static int  __bam_c_del __P((DBC *));
29static int  __bam_c_destroy __P((DBC *));
30static int  __bam_c_first __P((DBC *));
31static int  __bam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
32static int  __bam_c_getstack __P((DBC *));
33static int  __bam_c_last __P((DBC *));
34static int  __bam_c_next __P((DBC *, int, int));
35static int  __bam_c_physdel __P((DBC *));
36static int  __bam_c_prev __P((DBC *));
37static int  __bam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
38static int  __bam_c_search __P((DBC *,
39                db_pgno_t, const DBT *, u_int32_t, int *));
40static int  __bam_c_writelock __P((DBC *));
41static int  __bam_getboth_finddatum __P((DBC *, DBT *, u_int32_t));
42static int  __bam_getbothc __P((DBC *, DBT *));
43static int  __bam_get_prev __P((DBC *));
44static int  __bam_isopd __P((DBC *, db_pgno_t *));
45
46/*
47 * Acquire a new page/lock.  If we hold a page/lock, discard the page, and
48 * lock-couple the lock.
49 *
50 * !!!
51 * We have to handle both where we have a lock to lock-couple and where we
52 * don't -- we don't duplicate locks when we duplicate cursors if we are
53 * running in a transaction environment as there's no point if locks are
54 * never discarded.  This means that the cursor may or may not hold a lock.
55 * In the case where we are decending the tree we always want to
56 * unlock the held interior page so we use ACQUIRE_COUPLE.
57 */
58#undef  ACQUIRE
59#define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, ret) {            \
60        DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
61        if ((pagep) != NULL) {                                          \
62                ret = __mpf->put(__mpf, pagep, 0);                      \
63                pagep = NULL;                                           \
64        } else                                                          \
65                ret = 0;                                                \
66        if ((ret) == 0 && STD_LOCKING(dbc))                             \
67                ret = __db_lget(dbc, LCK_COUPLE, lpgno, mode, 0, &(lock));\
68        if ((ret) == 0)                                                 \
69                ret = __mpf->get(__mpf, &(fpgno), 0, &(pagep));         \
70}
71
72#undef  ACQUIRE_COUPLE
73#define ACQUIRE_COUPLE(dbc, mode, lpgno, lock, fpgno, pagep, ret) {     \
74        DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
75        if ((pagep) != NULL) {                                          \
76                ret = __mpf->put(__mpf, pagep, 0);                      \
77                pagep = NULL;                                           \
78        } else                                                          \
79                ret = 0;                                                \
80        if ((ret) == 0 && STD_LOCKING(dbc))                             \
81                ret = __db_lget(dbc,                                    \
82                    LCK_COUPLE_ALWAYS, lpgno, mode, 0, &(lock));        \
83        if ((ret) == 0)                                                 \
84                ret = __mpf->get(__mpf, &(fpgno), 0, &(pagep));         \
85}
86
87/* Acquire a new page/lock for a cursor. */
88#undef  ACQUIRE_CUR
89#define ACQUIRE_CUR(dbc, mode, p, ret) {                                \
90        BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
91        ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, ret);          \
92        if ((ret) == 0) {                                               \
93                __cp->pgno = p;                                         \
94                __cp->lock_mode = (mode);                               \
95        }                                                               \
96}
97
98/*
99 * Acquire a new page/lock for a cursor and release the previous.
100 * This is typically used when decending a tree and we do not
101 * want to hold the interior nodes locked.
102 */
103#undef  ACQUIRE_CUR_COUPLE
104#define ACQUIRE_CUR_COUPLE(dbc, mode, p, ret) {                         \
105        BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
106        ACQUIRE_COUPLE(dbc, mode, p, __cp->lock, p, __cp->page, ret);   \
107        if ((ret) == 0) {                                               \
108                __cp->pgno = p;                                         \
109                __cp->lock_mode = (mode);                               \
110        }                                                               \
111}
112
113/*
114 * Acquire a write lock if we don't already have one.
115 *
116 * !!!
117 * See ACQUIRE macro on why we handle cursors that don't have locks.
118 */
119#undef  ACQUIRE_WRITE_LOCK
120#define ACQUIRE_WRITE_LOCK(dbc, ret) {                                  \
121        BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
122        ret = 0;                                                        \
123        if (STD_LOCKING(dbc) &&                                         \
124            __cp->lock_mode != DB_LOCK_WRITE &&                         \
125            ((ret) = __db_lget(dbc,                                     \
126            LOCK_ISSET(__cp->lock) ? LCK_COUPLE : 0,                    \
127            __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) == 0)           \
128                __cp->lock_mode = DB_LOCK_WRITE;                        \
129}
130
131/* Discard the current page/lock. */
132#undef  DISCARD
133#define DISCARD(dbc, ldiscard, lock, pagep, ret) {                      \
134        DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
135        int __t_ret;                                                    \
136        if ((pagep) != NULL) {                                          \
137                ret = __mpf->put(__mpf, pagep, 0);                      \
138                pagep = NULL;                                           \
139        } else                                                          \
140                ret = 0;                                                \
141        if (ldiscard)                                                   \
142                __t_ret = __LPUT((dbc), lock);                          \
143        else                                                            \
144                __t_ret = __TLPUT((dbc), lock);                         \
145        if (__t_ret != 0 && (ret) == 0)                                 \
146                ret = __t_ret;                                          \
147}
148
149/* Discard the current page/lock for a cursor. */
150#undef  DISCARD_CUR
151#define DISCARD_CUR(dbc, ret) {                                         \
152        BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
153        DISCARD(dbc, 0, __cp->lock, __cp->page, ret);                   \
154        if ((ret) == 0)                                                 \
155                __cp->lock_mode = DB_LOCK_NG;                           \
156}
157
158/* If on-page item is a deleted record. */
159#undef  IS_DELETED
160#define IS_DELETED(dbp, page, indx)                                     \
161        B_DISSET(GET_BKEYDATA(dbp, page,                                \
162            (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type)
163#undef  IS_CUR_DELETED
164#define IS_CUR_DELETED(dbc)                                             \
165        IS_DELETED((dbc)->dbp, (dbc)->internal->page, (dbc)->internal->indx)
166
167/*
168 * Test to see if two cursors could point to duplicates of the same key.
169 * In the case of off-page duplicates they are they same, as the cursors
170 * will be in the same off-page duplicate tree.  In the case of on-page
171 * duplicates, the key index offsets must be the same.  For the last test,
172 * as the original cursor may not have a valid page pointer, we use the
173 * current cursor's.
174 */
175#undef  IS_DUPLICATE
176#define IS_DUPLICATE(dbc, i1, i2)                                       \
177            (P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i1] ==   \
178             P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i2])
179#undef  IS_CUR_DUPLICATE
180#define IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)                     \
181        (F_ISSET(dbc, DBC_OPD) ||                                       \
182            (orig_pgno == (dbc)->internal->pgno &&                      \
183            IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx)))
184
185/*
186 * __bam_c_init --
187 *      Initialize the access private portion of a cursor
188 *
189 * PUBLIC: int __bam_c_init __P((DBC *, DBTYPE));
190 */
191int
192__bam_c_init(dbc, dbtype)
193        DBC *dbc;
194        DBTYPE dbtype;
195{
196        DB_ENV *dbenv;
197        int ret;
198
199        dbenv = dbc->dbp->dbenv;
200
201        /* Allocate/initialize the internal structure. */
202        if (dbc->internal == NULL && (ret =
203            __os_malloc(dbenv, sizeof(BTREE_CURSOR), &dbc->internal)) != 0)
204                return (ret);
205
206        /* Initialize methods. */
207        dbc->c_close = __db_c_close;
208        dbc->c_count = __db_c_count;
209        dbc->c_del = __db_c_del;
210        dbc->c_dup = __db_c_dup;
211        dbc->c_get = dbc->c_real_get = __db_c_get;
212        dbc->c_pget = __db_c_pget;
213        dbc->c_put = __db_c_put;
214        if (dbtype == DB_BTREE) {
215                dbc->c_am_bulk = __bam_bulk;
216                dbc->c_am_close = __bam_c_close;
217                dbc->c_am_del = __bam_c_del;
218                dbc->c_am_destroy = __bam_c_destroy;
219                dbc->c_am_get = __bam_c_get;
220                dbc->c_am_put = __bam_c_put;
221                dbc->c_am_writelock = __bam_c_writelock;
222        } else {
223                dbc->c_am_bulk = __bam_bulk;
224                dbc->c_am_close = __bam_c_close;
225                dbc->c_am_del = __ram_c_del;
226                dbc->c_am_destroy = __bam_c_destroy;
227                dbc->c_am_get = __ram_c_get;
228                dbc->c_am_put = __ram_c_put;
229                dbc->c_am_writelock = __bam_c_writelock;
230        }
231
232        return (0);
233}
234
235/*
236 * __bam_c_refresh
237 *      Set things up properly for cursor re-use.
238 *
239 * PUBLIC: int __bam_c_refresh __P((DBC *));
240 */
241int
242__bam_c_refresh(dbc)
243        DBC *dbc;
244{
245        BTREE *t;
246        BTREE_CURSOR *cp;
247        DB *dbp;
248
249        dbp = dbc->dbp;
250        t = dbp->bt_internal;
251        cp = (BTREE_CURSOR *)dbc->internal;
252
253        /*
254         * If our caller set the root page number, it's because the root was
255         * known.  This is always the case for off page dup cursors.  Else,
256         * pull it out of our internal information.
257         */
258        if (cp->root == PGNO_INVALID)
259                cp->root = t->bt_root;
260
261        LOCK_INIT(cp->lock);
262        cp->lock_mode = DB_LOCK_NG;
263
264        cp->sp = cp->csp = cp->stack;
265        cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]);
266
267        /*
268         * The btree leaf page data structures require that two key/data pairs
269         * (or four items) fit on a page, but other than that there's no fixed
270         * requirement.  The btree off-page duplicates only require two items,
271         * to be exact, but requiring four for them as well seems reasonable.
272         *
273         * Recno uses the btree bt_ovflsize value -- it's close enough.
274         */
275        cp->ovflsize = B_MINKEY_TO_OVFLSIZE(
276            dbp,  F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey, dbp->pgsize);
277
278        cp->recno = RECNO_OOB;
279        cp->order = INVALID_ORDER;
280        cp->flags = 0;
281
282        /* Initialize for record numbers. */
283        if (F_ISSET(dbc, DBC_OPD) ||
284            dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_AM_RECNUM)) {
285                F_SET(cp, C_RECNUM);
286
287                /*
288                 * All btrees that support record numbers, optionally standard
289                 * recno trees, and all off-page duplicate recno trees have
290                 * mutable record numbers.
291                 */
292                if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) ||
293                    F_ISSET(dbp, DB_AM_RECNUM | DB_AM_RENUMBER))
294                        F_SET(cp, C_RENUMBER);
295        }
296
297        return (0);
298}
299
300/*
301 * __bam_c_close --
302 *      Close down the cursor.
303 */
304static int
305__bam_c_close(dbc, root_pgno, rmroot)
306        DBC *dbc;
307        db_pgno_t root_pgno;
308        int *rmroot;
309{
310        BTREE_CURSOR *cp, *cp_opd, *cp_c;
311        DB *dbp;
312        DBC *dbc_opd, *dbc_c;
313        DB_MPOOLFILE *mpf;
314        PAGE *h;
315        int cdb_lock, ret, t_ret;
316
317        dbp = dbc->dbp;
318        mpf = dbp->mpf;
319        cp = (BTREE_CURSOR *)dbc->internal;
320        cp_opd = (dbc_opd = cp->opd) == NULL ?
321            NULL : (BTREE_CURSOR *)dbc_opd->internal;
322        cdb_lock = ret = 0;
323
324        /*
325         * There are 3 ways this function is called:
326         *
327         * 1. Closing a primary cursor: we get called with a pointer to a
328         *    primary cursor that has a NULL opd field.  This happens when
329         *    closing a btree/recno database cursor without an associated
330         *    off-page duplicate tree.
331         *
332         * 2. Closing a primary and an off-page duplicate cursor stack: we
333         *    get called with a pointer to the primary cursor which has a
334         *    non-NULL opd field.  This happens when closing a btree cursor
335         *    into database with an associated off-page btree/recno duplicate
336         *    tree. (It can't be a primary recno database, recno databases
337         *    don't support duplicates.)
338         *
339         * 3. Closing an off-page duplicate cursor stack: we get called with
340         *    a pointer to the off-page duplicate cursor.  This happens when
341         *    closing a non-btree database that has an associated off-page
342         *    btree/recno duplicate tree or for a btree database when the
343         *    opd tree is not empty (root_pgno == PGNO_INVALID).
344         *
345         * If either the primary or off-page duplicate cursor deleted a btree
346         * key/data pair, check to see if the item is still referenced by a
347         * different cursor.  If it is, confirm that cursor's delete flag is
348         * set and leave it to that cursor to do the delete.
349         *
350         * NB: The test for == 0 below is correct.  Our caller already removed
351         * our cursor argument from the active queue, we won't find it when we
352         * search the queue in __bam_ca_delete().
353         * NB: It can't be true that both the primary and off-page duplicate
354         * cursors have deleted a btree key/data pair.  Either the primary
355         * cursor may have deleted an item and there's no off-page duplicate
356         * cursor, or there's an off-page duplicate cursor and it may have
357         * deleted an item.
358         *
359         * Primary recno databases aren't an issue here.  Recno keys are either
360         * deleted immediately or never deleted, and do not have to be handled
361         * here.
362         *
363         * Off-page duplicate recno databases are an issue here, cases #2 and
364         * #3 above can both be off-page recno databases.  The problem is the
365         * same as the final problem for off-page duplicate btree databases.
366         * If we no longer need the off-page duplicate tree, we want to remove
367         * it.  For off-page duplicate btrees, we are done with the tree when
368         * we delete the last item it contains, i.e., there can be no further
369         * references to it when it's empty.  For off-page duplicate recnos,
370         * we remove items from the tree as the application calls the remove
371         * function, so we are done with the tree when we close the last cursor
372         * that references it.
373         *
374         * We optionally take the root page number from our caller.  If the
375         * primary database is a btree, we can get it ourselves because dbc
376         * is the primary cursor.  If the primary database is not a btree,
377         * the problem is that we may be dealing with a stack of pages.  The
378         * cursor we're using to do the delete points at the bottom of that
379         * stack and we need the top of the stack.
380         */
381        if (F_ISSET(cp, C_DELETED)) {
382                dbc_c = dbc;
383                switch (dbc->dbtype) {
384                case DB_BTREE:                          /* Case #1, #3. */
385                        if (__bam_ca_delete(dbp, cp->pgno, cp->indx, 1) == 0)
386                                goto lock;
387                        goto done;
388                case DB_RECNO:
389                        if (!F_ISSET(dbc, DBC_OPD))     /* Case #1. */
390                                goto done;
391                                                        /* Case #3. */
392                        if (__ram_ca_delete(dbp, cp->root) == 0)
393                                goto lock;
394                        goto done;
395                default:
396                        return (__db_unknown_type(dbp->dbenv,
397                                "__bam_c_close", dbc->dbtype));
398                }
399        }
400
401        if (dbc_opd == NULL)
402                goto done;
403
404        if (F_ISSET(cp_opd, C_DELETED)) {               /* Case #2. */
405                /*
406                 * We will not have been provided a root page number.  Acquire
407                 * one from the primary database.
408                 */
409                if ((ret = mpf->get(mpf, &cp->pgno, 0, &h)) != 0)
410                        goto err;
411                root_pgno = GET_BOVERFLOW(dbp, h, cp->indx + O_INDX)->pgno;
412                if ((ret = mpf->put(mpf, h, 0)) != 0)
413                        goto err;
414
415                dbc_c = dbc_opd;
416                switch (dbc_opd->dbtype) {
417                case DB_BTREE:
418                        if (__bam_ca_delete(
419                            dbp, cp_opd->pgno, cp_opd->indx, 1) == 0)
420                                goto lock;
421                        goto done;
422                case DB_RECNO:
423                        if (__ram_ca_delete(dbp, cp_opd->root) == 0)
424                                goto lock;
425                        goto done;
426                default:
427                        return (__db_unknown_type(dbp->dbenv,
428                                "__bam_c_close", dbc->dbtype));
429                }
430        }
431        goto done;
432
433lock:   cp_c = (BTREE_CURSOR *)dbc_c->internal;
434
435        /*
436         * If this is CDB, upgrade the lock if necessary.  While we acquired
437         * the write lock to logically delete the record, we released it when
438         * we returned from that call, and so may not be holding a write lock
439         * at the moment.  NB: to get here in CDB we must either be holding a
440         * write lock or be the only cursor that is permitted to acquire write
441         * locks.  The reason is that there can never be more than a single CDB
442         * write cursor (that cursor cannot be dup'd), and so that cursor must
443         * be closed and the item therefore deleted before any other cursor
444         * could acquire a reference to this item.
445         *
446         * Note that dbc may be an off-page dup cursor;  this is the sole
447         * instance in which an OPD cursor does any locking, but it's necessary
448         * because we may be closed by ourselves without a parent cursor
449         * handy, and we have to do a lock upgrade on behalf of somebody.
450         * If this is the case, the OPD has been given the parent's locking
451         * info in __db_c_get--the OPD is also a WRITEDUP.
452         */
453        if (CDB_LOCKING(dbp->dbenv)) {
454                if (F_ISSET(dbc, DBC_WRITEDUP | DBC_WRITECURSOR)) {
455                        if ((ret = dbp->dbenv->lock_get(
456                            dbp->dbenv, dbc->locker, DB_LOCK_UPGRADE,
457                            &dbc->lock_dbt, DB_LOCK_WRITE, &dbc->mylock)) != 0)
458                                goto err;
459                        cdb_lock = 1;
460                }
461                if ((ret = mpf->get(mpf, &cp_c->pgno, 0, &cp_c->page)) != 0)
462                        goto err;
463
464                goto delete;
465        }
466
467        /*
468         * The variable dbc_c has been initialized to reference the cursor in
469         * which we're going to do the delete.  Initialize the cursor's page
470         * and lock structures as necessary.
471         *
472         * First, we may not need to acquire any locks.  If we're in case #3,
473         * that is, the primary database isn't a btree database, our caller
474         * is responsible for acquiring any necessary locks before calling us.
475         */
476        if (F_ISSET(dbc, DBC_OPD)) {
477                if ((ret = mpf->get(mpf, &cp_c->pgno, 0, &cp_c->page)) != 0)
478                        goto err;
479                goto delete;
480        }
481
482        /*
483         * Otherwise, acquire a write lock.  If the cursor that did the initial
484         * logical deletion (and which had a write lock) is not the same as the
485         * cursor doing the physical deletion (which may have only ever had a
486         * read lock on the item), we need to upgrade.  The confusion comes as
487         * follows:
488         *
489         *      C1      created, acquires item read lock
490         *      C2      dup C1, create C2, also has item read lock.
491         *      C1      acquire write lock, delete item
492         *      C1      close
493         *      C2      close, needs a write lock to physically delete item.
494         *
495         * If we're in a TXN, we know that C2 will be able to acquire the write
496         * lock, because no locker other than the one shared by C1 and C2 can
497         * acquire a write lock -- the original write lock C1 acquire was never
498         * discarded.
499         *
500         * If we're not in a TXN, it's nastier.  Other cursors might acquire
501         * read locks on the item after C1 closed, discarding its write lock,
502         * and such locks would prevent C2 from acquiring a read lock.  That's
503         * OK, though, we'll simply wait until we can acquire a read lock, or
504         * we'll deadlock.  (Which better not happen, since we're not in a TXN.)
505         *
506         * Lock the primary database page, regardless of whether we're deleting
507         * an item on a primary database page or an off-page duplicates page.
508         */
509        ACQUIRE(dbc, DB_LOCK_WRITE,
510            cp->pgno, cp_c->lock, cp_c->pgno, cp_c->page, ret);
511        if (ret != 0)
512                goto err;
513
514delete: /*
515         * If the delete occurred in a btree, delete the on-page physical item
516         * referenced by the cursor.
517         */
518        if (dbc_c->dbtype == DB_BTREE && (ret = __bam_c_physdel(dbc_c)) != 0)
519                goto err;
520
521        /*
522         * If we're not working in an off-page duplicate tree, then we're
523         * done.
524         */
525        if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID)
526                goto done;
527
528        /*
529         * We may have just deleted the last element in the off-page duplicate
530         * tree, and closed the last cursor in the tree.  For an off-page btree
531         * there are no other cursors in the tree by definition, if the tree is
532         * empty.  For an off-page recno we know we have closed the last cursor
533         * in the tree because the __ram_ca_delete call above returned 0 only
534         * in that case.  So, if the off-page duplicate tree is empty at this
535         * point, we want to remove it.
536         */
537        if ((ret = mpf->get(mpf, &root_pgno, 0, &h)) != 0)
538                goto err;
539        if (NUM_ENT(h) == 0) {
540                if ((ret = __db_free(dbc, h)) != 0)
541                        goto err;
542        } else {
543                if ((ret = mpf->put(mpf, h, 0)) != 0)
544                        goto err;
545                goto done;
546        }
547
548        /*
549         * When removing the tree, we have to do one of two things.  If this is
550         * case #2, that is, the primary tree is a btree, delete the key that's
551         * associated with the tree from the btree leaf page.  We know we are
552         * the only reference to it and we already have the correct lock.  We
553         * detect this case because the cursor that was passed to us references
554         * an off-page duplicate cursor.
555         *
556         * If this is case #3, that is, the primary tree isn't a btree, pass
557         * the information back to our caller, it's their job to do cleanup on
558         * the primary page.
559         */
560        if (dbc_opd != NULL) {
561                if ((ret = mpf->get(mpf, &cp->pgno, 0, &cp->page)) != 0)
562                        goto err;
563                if ((ret = __bam_c_physdel(dbc)) != 0)
564                        goto err;
565        } else
566                *rmroot = 1;
567err:
568done:   /*
569         * Discard the page references and locks, and confirm that the stack
570         * has been emptied.
571         */
572        if (dbc_opd != NULL) {
573                DISCARD_CUR(dbc_opd, t_ret);
574                if (t_ret != 0 && ret == 0)
575                        ret = t_ret;
576        }
577        DISCARD_CUR(dbc, t_ret);
578        if (t_ret != 0 && ret == 0)
579                ret = t_ret;
580
581        /* Downgrade any CDB lock we acquired. */
582        if (cdb_lock)
583                (void)__lock_downgrade(
584                    dbp->dbenv, &dbc->mylock, DB_LOCK_IWRITE, 0);
585
586        return (ret);
587}
588
589/*
590 * __bam_c_destroy --
591 *      Close a single cursor -- internal version.
592 */
593static int
594__bam_c_destroy(dbc)
595        DBC *dbc;
596{
597        /* Discard the structures. */
598        __os_free(dbc->dbp->dbenv, dbc->internal);
599
600        return (0);
601}
602
603/*
604 * __bam_c_count --
605 *      Return a count of on and off-page duplicates.
606 *
607 * PUBLIC: int __bam_c_count __P((DBC *, db_recno_t *));
608 */
609int
610__bam_c_count(dbc, recnop)
611        DBC *dbc;
612        db_recno_t *recnop;
613{
614        BTREE_CURSOR *cp;
615        DB *dbp;
616        DB_MPOOLFILE *mpf;
617        db_indx_t indx, top;
618        db_recno_t recno;
619        int ret;
620
621        dbp = dbc->dbp;
622        mpf = dbp->mpf;
623        cp = (BTREE_CURSOR *)dbc->internal;
624
625        /*
626         * Called with the top-level cursor that may reference an off-page
627         * duplicates page.  If it's a set of on-page duplicates, get the
628         * page and count.  Otherwise, get the root page of the off-page
629         * duplicate tree, and use the count.  We don't have to acquire any
630         * new locks, we have to have a read lock to even get here.
631         */
632        if (cp->opd == NULL) {
633                if ((ret = mpf->get(mpf, &cp->pgno, 0, &cp->page)) != 0)
634                        return (ret);
635
636                /*
637                 * Move back to the beginning of the set of duplicates and
638                 * then count forward.
639                 */
640                for (indx = cp->indx;; indx -= P_INDX)
641                        if (indx == 0 ||
642                            !IS_DUPLICATE(dbc, indx, indx - P_INDX))
643                                break;
644                for (recno = 1, top = NUM_ENT(cp->page) - P_INDX;
645                    indx < top; ++recno, indx += P_INDX)
646                        if (!IS_DUPLICATE(dbc, indx, indx + P_INDX))
647                                break;
648                *recnop = recno;
649        } else {
650                if ((ret =
651                    mpf->get(mpf, &cp->opd->internal->root, 0, &cp->page)) != 0)
652                        return (ret);
653
654                *recnop = RE_NREC(cp->page);
655        }
656
657        ret = mpf->put(mpf, cp->page, 0);
658        cp->page = NULL;
659
660        return (ret);
661}
662
663/*
664 * __bam_c_del --
665 *      Delete using a cursor.
666 */
667static int
668__bam_c_del(dbc)
669        DBC *dbc;
670{
671        BTREE_CURSOR *cp;
672        DB *dbp;
673        DB_MPOOLFILE *mpf;
674        int ret, t_ret;
675
676        dbp = dbc->dbp;
677        mpf = dbp->mpf;
678        cp = (BTREE_CURSOR *)dbc->internal;
679        ret = 0;
680
681        /* If the item was already deleted, return failure. */
682        if (F_ISSET(cp, C_DELETED))
683                return (DB_KEYEMPTY);
684
685        /*
686         * This code is always called with a page lock but no page.
687         */
688        DB_ASSERT(cp->page == NULL);
689
690        /*
691         * We don't physically delete the record until the cursor moves, so
692         * we have to have a long-lived write lock on the page instead of a
693         * a long-lived read lock.  Note, we have to have a read lock to even
694         * get here.
695         *
696         * If we're maintaining record numbers, we lock the entire tree, else
697         * we lock the single page.
698         */
699        if (F_ISSET(cp, C_RECNUM)) {
700                if ((ret = __bam_c_getstack(dbc)) != 0)
701                        goto err;
702                cp->page = cp->csp->page;
703        } else {
704                ACQUIRE_CUR(dbc, DB_LOCK_WRITE, cp->pgno, ret);
705                if (ret != 0)
706                        goto err;
707        }
708
709        /* Log the change. */
710        if (DBC_LOGGING(dbc)) {
711                if ((ret = __bam_cdel_log(dbp, dbc->txn, &LSN(cp->page), 0,
712                    PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0)
713                        goto err;
714        } else
715                LSN_NOT_LOGGED(LSN(cp->page));
716
717        /* Set the intent-to-delete flag on the page. */
718        if (TYPE(cp->page) == P_LBTREE)
719                B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx + O_INDX)->type);
720        else
721                B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type);
722
723        /* Mark the page dirty. */
724        ret = mpf->set(mpf, cp->page, DB_MPOOL_DIRTY);
725
726err:    /*
727         * If we've been successful so far and the tree has record numbers,
728         * adjust the record counts.  Either way, release acquired page(s).
729         */
730        if (F_ISSET(cp, C_RECNUM)) {
731                if (ret == 0)
732                        ret = __bam_adjust(dbc, -1);
733                (void)__bam_stkrel(dbc, 0);
734        } else
735                if (cp->page != NULL &&
736                    (t_ret = mpf->put(mpf, cp->page, 0)) != 0 && ret == 0)
737                        ret = t_ret;
738
739        cp->page = NULL;
740
741        /* Update the cursors last, after all chance of failure is past. */
742        if (ret == 0)
743                (void)__bam_ca_delete(dbp, cp->pgno, cp->indx, 1);
744
745        return (ret);
746}
747
748/*
749 * __bam_c_dup --
750 *      Duplicate a btree cursor, such that the new one holds appropriate
751 *      locks for the position of the original.
752 *
753 * PUBLIC: int __bam_c_dup __P((DBC *, DBC *));
754 */
755int
756__bam_c_dup(orig_dbc, new_dbc)
757        DBC *orig_dbc, *new_dbc;
758{
759        BTREE_CURSOR *orig, *new;
760        int ret;
761
762        orig = (BTREE_CURSOR *)orig_dbc->internal;
763        new = (BTREE_CURSOR *)new_dbc->internal;
764
765        /*
766         * If we're holding a lock we need to acquire a copy of it, unless
767         * we're in a transaction.  We don't need to copy any lock we're
768         * holding inside a transaction because all the locks are retained
769         * until the transaction commits or aborts.
770         */
771        if (LOCK_ISSET(orig->lock) && orig_dbc->txn == NULL) {
772                if ((ret = __db_lget(new_dbc,
773                    0, new->pgno, new->lock_mode, 0, &new->lock)) != 0)
774                        return (ret);
775        }
776        new->ovflsize = orig->ovflsize;
777        new->recno = orig->recno;
778        new->flags = orig->flags;
779
780        return (0);
781}
782
783/*
784 * __bam_c_get --
785 *      Get using a cursor (btree).
786 */
787static int
788__bam_c_get(dbc, key, data, flags, pgnop)
789        DBC *dbc;
790        DBT *key, *data;
791        u_int32_t flags;
792        db_pgno_t *pgnop;
793{
794        BTREE_CURSOR *cp;
795        DB *dbp;
796        DB_MPOOLFILE *mpf;
797        db_pgno_t orig_pgno;
798        db_indx_t orig_indx;
799        int exact, newopd, ret;
800
801        dbp = dbc->dbp;
802        mpf = dbp->mpf;
803        cp = (BTREE_CURSOR *)dbc->internal;
804        orig_pgno = cp->pgno;
805        orig_indx = cp->indx;
806
807        newopd = 0;
808        switch (flags) {
809        case DB_CURRENT:
810                /* It's not possible to return a deleted record. */
811                if (F_ISSET(cp, C_DELETED)) {
812                        ret = DB_KEYEMPTY;
813                        goto err;
814                }
815
816                /*
817                 * Acquire the current page.  We have at least a read-lock
818                 * already.  The caller may have set DB_RMW asking for a
819                 * write lock, but upgrading to a write lock has no better
820                 * chance of succeeding now instead of later, so don't try.
821                 */
822                if ((ret = mpf->get(mpf, &cp->pgno, 0, &cp->page)) != 0)
823                        goto err;
824                break;
825        case DB_FIRST:
826                newopd = 1;
827                if ((ret = __bam_c_first(dbc)) != 0)
828                        goto err;
829                break;
830        case DB_GET_BOTH:
831        case DB_GET_BOTH_RANGE:
832                /*
833                 * There are two ways to get here based on DBcursor->c_get
834                 * with the DB_GET_BOTH/DB_GET_BOTH_RANGE flags set:
835                 *
836                 * 1. Searching a sorted off-page duplicate tree: do a tree
837                 * search.
838                 *
839                 * 2. Searching btree: do a tree search.  If it returns a
840                 * reference to off-page duplicate tree, return immediately
841                 * and let our caller deal with it.  If the search doesn't
842                 * return a reference to off-page duplicate tree, continue
843                 * with an on-page search.
844                 */
845                if (F_ISSET(dbc, DBC_OPD)) {
846                        if ((ret = __bam_c_search(
847                            dbc, PGNO_INVALID, data, flags, &exact)) != 0)
848                                goto err;
849                        if (flags == DB_GET_BOTH) {
850                                if (!exact) {
851                                        ret = DB_NOTFOUND;
852                                        goto err;
853                                }
854                                break;
855                        }
856
857                        /*
858                         * We didn't require an exact match, so the search may
859                         * may have returned an entry past the end of the page,
860                         * or we may be referencing a deleted record.  If so,
861                         * move to the next entry.
862                         */
863                        if ((cp->indx == NUM_ENT(cp->page) ||
864                            IS_CUR_DELETED(dbc)) &&
865                            (ret = __bam_c_next(dbc, 1, 0)) != 0)
866                                goto err;
867                } else {
868                        if ((ret = __bam_c_search(
869                            dbc, PGNO_INVALID, key, flags, &exact)) != 0)
870                                return (ret);
871                        if (!exact) {
872                                ret = DB_NOTFOUND;
873                                goto err;
874                        }
875
876                        if (pgnop != NULL && __bam_isopd(dbc, pgnop)) {
877                                newopd = 1;
878                                break;
879                        }
880                        if ((ret =
881                            __bam_getboth_finddatum(dbc, data, flags)) != 0)
882                                goto err;
883                }
884                break;
885        case DB_GET_BOTHC:
886                if ((ret = __bam_getbothc(dbc, data)) != 0)
887                        goto err;
888                break;
889        case DB_LAST:
890                newopd = 1;
891                if ((ret = __bam_c_last(dbc)) != 0)
892                        goto err;
893                break;
894        case DB_NEXT:
895                newopd = 1;
896                if (cp->pgno == PGNO_INVALID) {
897                        if ((ret = __bam_c_first(dbc)) != 0)
898                                goto err;
899                } else
900                        if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
901                                goto err;
902                break;
903        case DB_NEXT_DUP:
904                if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
905                        goto err;
906                if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
907                        ret = DB_NOTFOUND;
908                        goto err;
909                }
910                break;
911        case DB_NEXT_NODUP:
912                newopd = 1;
913                if (cp->pgno == PGNO_INVALID) {
914                        if ((ret = __bam_c_first(dbc)) != 0)
915                                goto err;
916                } else
917                        do {
918                                if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
919                                        goto err;
920                        } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
921                break;
922        case DB_PREV:
923                newopd = 1;
924                if (cp->pgno == PGNO_INVALID) {
925                        if ((ret = __bam_c_last(dbc)) != 0)
926                                goto err;
927                } else
928                        if ((ret = __bam_c_prev(dbc)) != 0)
929                                goto err;
930                break;
931        case DB_PREV_NODUP:
932                newopd = 1;
933                if (cp->pgno == PGNO_INVALID) {
934                        if ((ret = __bam_c_last(dbc)) != 0)
935                                goto err;
936                } else
937                        do {
938                                if ((ret = __bam_c_prev(dbc)) != 0)
939                                        goto err;
940                        } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
941                break;
942        case DB_SET:
943        case DB_SET_RECNO:
944                newopd = 1;
945                if ((ret = __bam_c_search(dbc,
946                    PGNO_INVALID, key, flags, &exact)) != 0)
947                        goto err;
948                break;
949        case DB_SET_RANGE:
950                newopd = 1;
951                if ((ret = __bam_c_search(dbc,
952                    PGNO_INVALID, key, flags, &exact)) != 0)
953                        goto err;
954
955                /*
956                 * As we didn't require an exact match, the search function
957                 * may have returned an entry past the end of the page.  Or,
958                 * we may be referencing a deleted record.  If so, move to
959                 * the next entry.
960                 */
961                if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc))
962                        if ((ret = __bam_c_next(dbc, 0, 0)) != 0)
963                                goto err;
964                break;
965        default:
966                ret = __db_unknown_flag(dbp->dbenv, "__bam_c_get", flags);
967                goto err;
968        }
969
970        /*
971         * We may have moved to an off-page duplicate tree.  Return that
972         * information to our caller.
973         */
974        if (newopd && pgnop != NULL)
975                (void)__bam_isopd(dbc, pgnop);
976
977        /*
978         * Don't return the key, it was passed to us (this is true even if the
979         * application defines a compare function returning equality for more
980         * than one key value, since in that case which actual value we store
981         * in the database is undefined -- and particularly true in the case of
982         * duplicates where we only store one key value).
983         */
984        if (flags == DB_GET_BOTH ||
985            flags == DB_GET_BOTH_RANGE || flags == DB_SET)
986                F_SET(key, DB_DBT_ISSET);
987
988err:    /*
989         * Regardless of whether we were successful or not, if the cursor
990         * moved, clear the delete flag, DBcursor->c_get never references
991         * a deleted key, if it moved at all.
992         */
993        if (F_ISSET(cp, C_DELETED) &&
994            (cp->pgno != orig_pgno || cp->indx != orig_indx))
995                F_CLR(cp, C_DELETED);
996
997        return (ret);
998}
999
1000static int
1001__bam_get_prev(dbc)
1002        DBC *dbc;
1003{
1004        BTREE_CURSOR *cp;
1005        DBT key, data;
1006        db_pgno_t pgno;
1007        int ret;
1008
1009        if ((ret = __bam_c_prev(dbc)) != 0)
1010                return (ret);
1011
1012        if (__bam_isopd(dbc, &pgno)) {
1013                cp = (BTREE_CURSOR *)dbc->internal;
1014                if ((ret = __db_c_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1015                        return (ret);
1016                if ((ret = cp->opd->c_am_get(cp->opd,
1017                    &key, &data, DB_LAST, NULL)) != 0)
1018                        return (ret);
1019        }
1020
1021        return (0);
1022}
1023
1024/*
1025 * __bam_bulk -- Return bulk data from a btree.
1026 */
1027static int
1028__bam_bulk(dbc, data, flags)
1029        DBC *dbc;
1030        DBT *data;
1031        u_int32_t flags;
1032{
1033        BKEYDATA *bk;
1034        BOVERFLOW *bo;
1035        BTREE_CURSOR *cp;
1036        PAGE *pg;
1037        db_indx_t *inp, indx, pg_keyoff;
1038        int32_t  *endp, key_off, *offp, *saveoffp;
1039        u_int8_t *dbuf, *dp, *np;
1040        u_int32_t key_size, size, space;
1041        int adj, is_key, need_pg, next_key, no_dup;
1042        int pagesize, rec_key, ret;
1043
1044        ret = 0;
1045        key_off = 0;
1046        size = 0;
1047        pagesize = dbc->dbp->pgsize;
1048        cp = (BTREE_CURSOR *)dbc->internal;
1049
1050        /*
1051         * dp tracks the beginging of the page in the buffer.
1052         * np is the next place to copy things into the buffer.
1053         * dbuf always stays at the beging of the buffer.
1054         */
1055        dbuf = data->data;
1056        np = dp = dbuf;
1057
1058        /* Keep track of space that is left.  There is a termination entry */
1059        space = data->ulen;
1060        space -= sizeof(*offp);
1061
1062        /* Build the offset/size table from the end up. */
1063        endp = (int32_t *)((u_int8_t *)dbuf + data->ulen);
1064        endp--;
1065        offp = endp;
1066
1067        key_size = 0;
1068
1069        /*
1070         * Distinguish between BTREE and RECNO.
1071         * There are no keys in RECNO.  If MULTIPLE_KEY is specified
1072         * then we return the record numbers.
1073         * is_key indicates that multiple btree keys are returned.
1074         * rec_key is set if we are returning record numbers.
1075         * next_key is set if we are going after the next key rather than dup.
1076         */
1077        if (dbc->dbtype == DB_BTREE) {
1078                is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1: 0;
1079                rec_key = 0;
1080                next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1081                adj = 2;
1082        } else {
1083                is_key = 0;
1084                rec_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1085                next_key = LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1086                adj = 1;
1087        }
1088        no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
1089
1090next_pg:
1091        indx = cp->indx;
1092        pg = cp->page;
1093
1094        inp = P_INP(dbc->dbp, pg);
1095        /* The current page is not yet in the buffer. */
1096        need_pg = 1;
1097
1098        /*
1099         * Keep track of the offset of the current key on the page.
1100         * If we are returning keys, set it to 0 first so we force
1101         * the copy of the key to the buffer.
1102         */
1103        pg_keyoff = 0;
1104        if (is_key == 0)
1105                pg_keyoff = inp[indx];
1106
1107        do {
1108                if (IS_DELETED(dbc->dbp, pg, indx)) {
1109                        if (dbc->dbtype != DB_RECNO)
1110                                continue;
1111
1112                        cp->recno++;
1113                        /*
1114                         * If we are not returning recnos then we
1115                         * need to fill in every slot so the user
1116                         * can calculate the record numbers.
1117                         */
1118                        if (rec_key != 0)
1119                                continue;
1120
1121                        space -= 2 * sizeof(*offp);
1122                        /* Check if space as underflowed. */
1123                        if (space > data->ulen)
1124                                goto back_up;
1125
1126                        /* Just mark the empty recno slots. */
1127                        *offp-- = 0;
1128                        *offp-- = 0;
1129                        continue;
1130                }
1131
1132                /*
1133                 * Check to see if we have a new key.
1134                 * If so, then see if we need to put the
1135                 * key on the page.  If its already there
1136                 * then we just point to it.
1137                 */
1138                if (is_key && pg_keyoff != inp[indx]) {
1139                        bk = GET_BKEYDATA(dbc->dbp, pg, indx);
1140                        if (B_TYPE(bk->type) == B_OVERFLOW) {
1141                                bo = (BOVERFLOW *)bk;
1142                                size = key_size = bo->tlen;
1143                                if (key_size > space)
1144                                        goto get_key_space;
1145                                if ((ret = __bam_bulk_overflow(dbc,
1146                                    bo->tlen, bo->pgno, np)) != 0)
1147                                        return (ret);
1148                                space -= key_size;
1149                                key_off = (int32_t)(np - dbuf);
1150                                np += key_size;
1151                        } else {
1152                                if (need_pg) {
1153                                        dp = np;
1154                                        size = pagesize - HOFFSET(pg);
1155                                        if (space < size) {
1156get_key_space:
1157                                                /* Nothing added, then error. */
1158                                                if (offp == endp) {
1159                                                        data->size =
1160                                                            ALIGN(size +
1161                                                            pagesize,
1162                                                            sizeof(u_int32_t));
1163                                                        return (ENOMEM);
1164                                                }
1165                                                /*
1166                                                 * We need to back up to the
1167                                                 * last record put into the
1168                                                 * buffer so that it is
1169                                                 * CURRENT.
1170                                                 */
1171                                                if (indx != 0)
1172                                                        indx -= P_INDX;
1173                                                else {
1174                                                        if ((ret =
1175                                                            __bam_get_prev(
1176                                                            dbc)) != 0)
1177                                                                return (ret);
1178                                                        indx = cp->indx;
1179                                                        pg = cp->page;
1180                                                }
1181                                                break;
1182                                        }
1183                                        /*
1184                                         * Move the data part of the page
1185                                         * to the buffer.
1186                                         */
1187                                        memcpy(dp,
1188                                           (u_int8_t *)pg + HOFFSET(pg), size);
1189                                        need_pg = 0;
1190                                        space -= size;
1191                                        np += size;
1192                                }
1193                                key_size = bk->len;
1194                                key_off = (int32_t)(inp[indx] - HOFFSET(pg)
1195                                    + dp - dbuf + SSZA(BKEYDATA, data));
1196                                pg_keyoff = inp[indx];
1197                        }
1198                }
1199
1200                /*
1201                 * Reserve space for the pointers and sizes.
1202                 * Either key/data pair or just for a data item.
1203                 */
1204                space -= (is_key ? 4 : 2) * sizeof(*offp);
1205                if (rec_key)
1206                        space -= sizeof(*offp);
1207
1208                /* Check to see if space has underflowed. */
1209                if (space > data->ulen)
1210                        goto back_up;
1211
1212                /*
1213                 * Determine if the next record is in the
1214                 * buffer already or if it needs to be copied in.
1215                 * If we have an off page dup, then copy as many
1216                 * as will fit into the buffer.
1217                 */
1218                bk = GET_BKEYDATA(dbc->dbp, pg, indx + adj - 1);
1219                if (B_TYPE(bk->type) == B_DUPLICATE) {
1220                        bo = (BOVERFLOW *)bk;
1221                        if (is_key) {
1222                                *offp-- = key_off;
1223                                *offp-- = key_size;
1224                        }
1225                        /*
1226                         * We pass the offset of the current key.
1227                         * On return we check to see if offp has
1228                         * moved to see if any data fit.
1229                         */
1230                        saveoffp = offp;
1231                        if ((ret = __bam_bulk_duplicates(dbc, bo->pgno,
1232                            dbuf, is_key ? offp + P_INDX : NULL,
1233                            &offp, &np, &space, no_dup)) != 0) {
1234                                if (ret == ENOMEM) {
1235                                        size = space;
1236                                        /* If nothing was added, then error. */
1237                                        if (offp == saveoffp) {
1238                                                offp += 2;
1239                                                goto back_up;
1240                                        }
1241                                        goto get_space;
1242                                }
1243                                return (ret);
1244                        }
1245                } else if (B_TYPE(bk->type) == B_OVERFLOW) {
1246                        bo = (BOVERFLOW *)bk;
1247                        size = bo->tlen;
1248                        if (size > space)
1249                                goto back_up;
1250                        if ((ret =
1251                            __bam_bulk_overflow(dbc,
1252                                bo->tlen, bo->pgno, np)) != 0)
1253                                return (ret);
1254                        space -= size;
1255                        if (is_key) {
1256                                *offp-- = key_off;
1257                                *offp-- = key_size;
1258                        } else if (rec_key)
1259                                *offp-- = cp->recno;
1260                        *offp-- = (int32_t)(np - dbuf);
1261                        np += size;
1262                        *offp-- = size;
1263                } else {
1264                        if (need_pg) {
1265                                dp = np;
1266                                size = pagesize - HOFFSET(pg);
1267                                if (space < size) {
1268back_up:
1269                                        /*
1270                                         * Back up the index so that the
1271                                         * last record in the buffer is CURRENT
1272                                         */
1273                                        if (indx >= adj)
1274                                                indx -= adj;
1275                                        else {
1276                                                if ((ret =
1277                                                    __bam_get_prev(dbc)) != 0 &&
1278                                                    ret != DB_NOTFOUND)
1279                                                        return (ret);
1280                                                indx = cp->indx;
1281                                                pg = cp->page;
1282                                        }
1283                                        if (dbc->dbtype == DB_RECNO)
1284                                                cp->recno--;
1285get_space:
1286                                        /*
1287                                         * See if we put anything in the
1288                                         * buffer or if we are doing a DBP->get
1289                                         * did we get all of the data.
1290                                         */
1291                                        if (offp >=
1292                                            (is_key ? &endp[-1] : endp) ||
1293                                            F_ISSET(dbc, DBC_TRANSIENT)) {
1294                                                data->size = ALIGN(size +
1295                                                    data->ulen - space,
1296                                                    sizeof(u_int32_t));
1297                                                return (ENOMEM);
1298                                        }
1299                                        break;
1300                                }
1301                                memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
1302                                need_pg = 0;
1303                                space -= size;
1304                                np += size;
1305                        }
1306                        /*
1307                         * Add the offsets and sizes to the end of the buffer.
1308                         * First add the key info then the data info.
1309                         */
1310                        if (is_key) {
1311                                *offp-- = key_off;
1312                                *offp-- = key_size;
1313                        } else if (rec_key)
1314                                *offp-- = cp->recno;
1315                        *offp-- = (int32_t)(inp[indx + adj - 1] - HOFFSET(pg)
1316                            + dp - dbuf + SSZA(BKEYDATA, data));
1317                        *offp-- = bk->len;
1318                }
1319                if (dbc->dbtype == DB_RECNO)
1320                        cp->recno++;
1321                else if (no_dup) {
1322                        while (indx + adj < NUM_ENT(pg) &&
1323                            pg_keyoff == inp[indx + adj])
1324                                indx += adj;
1325                }
1326        /*
1327         * Stop when we either run off the page or we
1328         * move to the next key and we are not returning mulitple keys.
1329         */
1330        } while ((indx += adj) < NUM_ENT(pg) &&
1331            (next_key || pg_keyoff == inp[indx]));
1332
1333        /* If we are off the page then try to the next page. */
1334        if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
1335                cp->indx = indx;
1336                ret = __bam_c_next(dbc, 0, 1);
1337                if (ret == 0)
1338                        goto next_pg;
1339                if (ret != DB_NOTFOUND)
1340                        return (ret);
1341        }
1342
1343        /*
1344         * If we did a DBP->get we must error if we did not return
1345         * all the data for the current key because there is
1346         * no way to know if we did not get it all, nor any
1347         * interface to fetch the balance.
1348         */
1349
1350        if (ret == 0 &&
1351            F_ISSET(dbc, DBC_TRANSIENT) && pg_keyoff == inp[indx]) {
1352                data->size = (data->ulen - space) + size;
1353                return (ENOMEM);
1354        }
1355        /*
1356         * Must leave the index pointing at the last record fetched.
1357         * If we are not fetching keys, we may have stepped to the
1358         * next key.
1359         */
1360        if (next_key || pg_keyoff == inp[indx])
1361                cp->indx = indx;
1362        else
1363                cp->indx = indx - P_INDX;
1364
1365        if (rec_key == 1)
1366                *offp = (u_int32_t) RECNO_OOB;
1367        else
1368                *offp = (u_int32_t) -1;
1369        return (0);
1370}
1371
1372/*
1373 * __bam_bulk_overflow --
1374 *      Dump overflow record into the buffer.
1375 *      The space requirements have already been checked.
1376 * PUBLIC: int __bam_bulk_overflow
1377 * PUBLIC:    __P((DBC *, u_int32_t, db_pgno_t, u_int8_t *));
1378 */
1379int
1380__bam_bulk_overflow(dbc, len, pgno, dp)
1381        DBC *dbc;
1382        u_int32_t len;
1383        db_pgno_t pgno;
1384        u_int8_t *dp;
1385{
1386        DBT dbt;
1387
1388        memset(&dbt, 0, sizeof(dbt));
1389        F_SET(&dbt, DB_DBT_USERMEM);
1390        dbt.ulen = len;
1391        dbt.data = (void *)dp;
1392        return (__db_goff(dbc->dbp, &dbt, len, pgno, NULL, NULL));
1393}
1394
1395/*
1396 * __bam_bulk_duplicates --
1397 *      Put as many off page duplicates as will fit into the buffer.
1398 * This routine will adjust the cursor to reflect the position in
1399 * the overflow tree.
1400 * PUBLIC: int __bam_bulk_duplicates __P((DBC *,
1401 * PUBLIC:       db_pgno_t, u_int8_t *, int32_t *,
1402 * PUBLIC:       int32_t **, u_int8_t **, u_int32_t *, int));
1403 */
1404int
1405__bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup)
1406        DBC *dbc;
1407        db_pgno_t pgno;
1408        u_int8_t *dbuf;
1409        int32_t *keyoff, **offpp;
1410        u_int8_t **dpp;
1411        u_int32_t *spacep;
1412        int no_dup;
1413{
1414        DB *dbp;
1415        BKEYDATA *bk;
1416        BOVERFLOW *bo;
1417        BTREE_CURSOR *cp;
1418        DBC *opd;
1419        DBT key, data;
1420        PAGE *pg;
1421        db_indx_t indx, *inp;
1422        int32_t *offp;
1423        u_int32_t size, space;
1424        u_int8_t *dp, *np;
1425        int first, need_pg, pagesize, ret, t_ret;
1426
1427        ret = 0;
1428
1429        dbp = dbc->dbp;
1430        cp = (BTREE_CURSOR *)dbc->internal;
1431        opd = cp->opd;
1432
1433        if (opd == NULL) {
1434                if ((ret = __db_c_newopd(dbc, pgno, NULL, &opd)) != 0)
1435                        return (ret);
1436                cp->opd = opd;
1437                if ((ret = opd->c_am_get(opd,
1438                    &key, &data, DB_FIRST, NULL)) != 0)
1439                        return (ret);
1440        }
1441
1442        pagesize = opd->dbp->pgsize;
1443        cp = (BTREE_CURSOR *)opd->internal;
1444        space = *spacep;
1445        /* Get current offset slot. */
1446        offp = *offpp;
1447
1448        /*
1449         * np is the next place to put data.
1450         * dp is the begining of the current page in the buffer.
1451         */
1452        np = dp = *dpp;
1453        first = 1;
1454        indx = cp->indx;
1455
1456        do {
1457                /* Fetch the current record.  No initial move. */
1458                if ((ret = __bam_c_next(opd, 0, 0)) != 0)
1459                        break;
1460                pg = cp->page;
1461                indx = cp->indx;
1462                inp = P_INP(dbp, pg);
1463                /* We need to copy the page to the buffer. */
1464                need_pg = 1;
1465
1466                do {
1467                        if (IS_DELETED(dbp, pg, indx))
1468                                goto contin;
1469                        bk = GET_BKEYDATA(dbp, pg, indx);
1470                        space -= 2 * sizeof(*offp);
1471                        /* Allocate space for key if needed. */
1472                        if (first == 0 && keyoff != NULL)
1473                                space -= 2 * sizeof(*offp);
1474
1475                        /* Did space underflow? */
1476                        if (space > *spacep) {
1477                                ret = ENOMEM;
1478                                if (first == 1) {
1479                                        space = *spacep + -(int32_t)space;
1480                                        if (need_pg)
1481                                                space += pagesize - HOFFSET(pg);
1482                                }
1483                                break;
1484                        }
1485                        if (B_TYPE(bk->type) == B_OVERFLOW) {
1486                                bo = (BOVERFLOW *)bk;
1487                                size = bo->tlen;
1488                                if (size > space) {
1489                                        ret = ENOMEM;
1490                                        if (first == 1) {
1491                                                space = *spacep + size;
1492                                        }
1493                                        break;
1494                                }
1495                                if (first == 0 && keyoff != NULL) {
1496                                        *offp-- = keyoff[0];
1497                                        *offp-- = keyoff[-1];
1498                                }
1499                                if ((ret = __bam_bulk_overflow(dbc,
1500                                    bo->tlen, bo->pgno, np)) != 0)
1501                                        return (ret);
1502                                space -= size;
1503                                *offp-- = (int32_t)(np - dbuf);
1504                                np += size;
1505                        } else {
1506                                if (need_pg) {
1507                                        dp = np;
1508                                        size = pagesize - HOFFSET(pg);
1509                                        if (space < size) {
1510                                                ret = ENOMEM;
1511                                                /* Return space required. */
1512                                                if (first == 1) {
1513                                                        space = *spacep + size;
1514                                                }
1515                                                break;
1516                                        }
1517                                        memcpy(dp,
1518                                            (u_int8_t *)pg + HOFFSET(pg), size);
1519                                        need_pg = 0;
1520                                        space -= size;
1521                                        np += size;
1522                                }
1523                                if (first == 0 && keyoff != NULL) {
1524                                        *offp-- = keyoff[0];
1525                                        *offp-- = keyoff[-1];
1526                                }
1527                                size = bk->len;
1528                                *offp-- = (int32_t)(inp[indx] - HOFFSET(pg)
1529                                    + dp - dbuf + SSZA(BKEYDATA, data));
1530                        }
1531                        *offp-- = size;
1532                        first = 0;
1533                        if (no_dup)
1534                                break;
1535contin:
1536                        indx++;
1537                        if (opd->dbtype == DB_RECNO)
1538                                cp->recno++;
1539                } while (indx < NUM_ENT(pg));
1540                if (no_dup)
1541                        break;
1542                cp->indx = indx;
1543
1544        } while (ret == 0);
1545
1546        /* Return the updated information. */
1547        *spacep = space;
1548        *offpp = offp;
1549        *dpp = np;
1550
1551        /*
1552         * If we ran out of space back up the pointer.
1553         * If we did not return any dups or reached the end, close the opd.
1554         */
1555        if (ret == ENOMEM) {
1556                if (opd->dbtype == DB_RECNO) {
1557                        if (--cp->recno == 0)
1558                                goto close_opd;
1559                } else if (indx != 0)
1560                        cp->indx--;
1561                else {
1562                        t_ret = __bam_c_prev(opd);
1563                        if (t_ret == DB_NOTFOUND)
1564                                goto close_opd;
1565                        if (t_ret != 0)
1566                                ret = t_ret;
1567                }
1568        } else if (keyoff == NULL && ret == DB_NOTFOUND) {
1569                cp->indx--;
1570                if (opd->dbtype == DB_RECNO)
1571                        --cp->recno;
1572        } else if (indx == 0 || ret == DB_NOTFOUND) {
1573close_opd:
1574                opd->c_close(opd);
1575                ((BTREE_CURSOR *)dbc->internal)->opd = NULL;
1576        }
1577        if (ret == DB_NOTFOUND)
1578                ret = 0;
1579
1580        return (ret);
1581}
1582
1583/*
1584 * __bam_getbothc --
1585 *      Search for a matching data item on a join.
1586 */
1587static int
1588__bam_getbothc(dbc, data)
1589        DBC *dbc;
1590        DBT *data;
1591{
1592        BTREE_CURSOR *cp;
1593        DB *dbp;
1594        DB_MPOOLFILE *mpf;
1595        int cmp, exact, ret;
1596
1597        dbp = dbc->dbp;
1598        mpf = dbp->mpf;
1599        cp = (BTREE_CURSOR *)dbc->internal;
1600
1601        /*
1602         * Acquire the current page.  We have at least a read-lock
1603         * already.  The caller may have set DB_RMW asking for a
1604         * write lock, but upgrading to a write lock has no better
1605         * chance of succeeding now instead of later, so don't try.
1606         */
1607        if ((ret = mpf->get(mpf, &cp->pgno, 0, &cp->page)) != 0)
1608                return (ret);
1609
1610        /*
1611         * An off-page duplicate cursor.  Search the remaining duplicates
1612         * for one which matches (do a normal btree search, then verify
1613         * that the retrieved record is greater than the original one).
1614         */
1615        if (F_ISSET(dbc, DBC_OPD)) {
1616                /*
1617                 * Check to make sure the desired item comes strictly after
1618                 * the current position;  if it doesn't, return DB_NOTFOUND.
1619                 */
1620                if ((ret = __bam_cmp(dbp, data, cp->page, cp->indx,
1621                    dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare,
1622                    &cmp)) != 0)
1623                        return (ret);
1624
1625                if (cmp <= 0)
1626                        return (DB_NOTFOUND);
1627
1628                /* Discard the current page, we're going to do a full search. */
1629                if ((ret = mpf->put(mpf, cp->page, 0)) != 0)
1630                        return (ret);
1631                cp->page = NULL;
1632
1633                return (__bam_c_search(dbc,
1634                    PGNO_INVALID, data, DB_GET_BOTH, &exact));
1635        }
1636
1637        /*
1638         * We're doing a DBC->c_get(DB_GET_BOTHC) and we're already searching
1639         * a set of on-page duplicates (either sorted or unsorted).  Continue
1640         * a linear search from after the current position.
1641         *
1642         * (Note that we could have just finished a "set" of one duplicate,
1643         * i.e. not a duplicate at all, but the following check will always
1644         * return DB_NOTFOUND in this case, which is the desired behavior.)
1645         */
1646        if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1647            !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1648                return (DB_NOTFOUND);
1649        cp->indx += P_INDX;
1650
1651        return (__bam_getboth_finddatum(dbc, data, DB_GET_BOTH));
1652}
1653
1654/*
1655 * __bam_getboth_finddatum --
1656 *      Find a matching on-page data item.
1657 */
1658static int
1659__bam_getboth_finddatum(dbc, data, flags)
1660        DBC *dbc;
1661        DBT *data;
1662        u_int32_t flags;
1663{
1664        BTREE_CURSOR *cp;
1665        DB *dbp;
1666        db_indx_t base, lim, top;
1667        int cmp, ret;
1668
1669        dbp = dbc->dbp;
1670        cp = (BTREE_CURSOR *)dbc->internal;
1671
1672        /*
1673         * Called (sometimes indirectly) from DBC->get to search on-page data
1674         * item(s) for a matching value.  If the original flag was DB_GET_BOTH
1675         * or DB_GET_BOTH_RANGE, the cursor is set to the first undeleted data
1676         * item for the key.  If the original flag was DB_GET_BOTHC, the cursor
1677         * argument is set to the first data item we can potentially return.
1678         * In both cases, there may or may not be additional duplicate data
1679         * items to search.
1680         *
1681         * If the duplicates are not sorted, do a linear search.
1682         */
1683        if (dbp->dup_compare == NULL) {
1684                for (;; cp->indx += P_INDX) {
1685                        if (!IS_CUR_DELETED(dbc) &&
1686                            (ret = __bam_cmp(dbp, data, cp->page,
1687                            cp->indx + O_INDX, __bam_defcmp, &cmp)) != 0)
1688                                return (ret);
1689                        if (cmp == 0)
1690                                return (0);
1691
1692                        if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1693                            !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1694                                break;
1695                }
1696                return (DB_NOTFOUND);
1697        }
1698
1699        /*
1700         * If the duplicates are sorted, do a binary search.  The reason for
1701         * this is that large pages and small key/data pairs result in large
1702         * numbers of on-page duplicates before they get pushed off-page.
1703         *
1704         * Find the top and bottom of the duplicate set.  Binary search
1705         * requires at least two items, don't loop if there's only one.
1706         */
1707        for (base = top = cp->indx; top < NUM_ENT(cp->page); top += P_INDX)
1708                if (!IS_DUPLICATE(dbc, cp->indx, top))
1709                        break;
1710        if (base == (top - P_INDX)) {
1711                if  ((ret = __bam_cmp(dbp, data,
1712                    cp->page, cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1713                        return (ret);
1714                return (cmp == 0 ||
1715                    (cmp < 0 && flags == DB_GET_BOTH_RANGE) ? 0 : DB_NOTFOUND);
1716        }
1717
1718        for (lim = (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) {
1719                cp->indx = base + ((lim >> 1) * P_INDX);
1720                if ((ret = __bam_cmp(dbp, data, cp->page,
1721                    cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1722                        return (ret);
1723                if (cmp == 0) {
1724                        /*
1725                         * XXX
1726                         * No duplicate duplicates in sorted duplicate sets,
1727                         * so there can be only one.
1728                         */
1729                        if (!IS_CUR_DELETED(dbc))
1730                                return (0);
1731                        break;
1732                }
1733                if (cmp > 0) {
1734                        base = cp->indx + P_INDX;
1735                        --lim;
1736                }
1737        }
1738
1739        /* No match found; if we're looking for an exact match, we're done. */
1740        if (flags == DB_GET_BOTH)
1741                return (DB_NOTFOUND);
1742
1743        /*
1744         * Base is the smallest index greater than the data item, may be zero
1745         * or a last + O_INDX index, and may be deleted.  Find an undeleted
1746         * item.
1747         */
1748        cp->indx = base;
1749        while (cp->indx < top && IS_CUR_DELETED(dbc))
1750                cp->indx += P_INDX;
1751        return (cp->indx < top ? 0 : DB_NOTFOUND);
1752}
1753
1754/*
1755 * __bam_c_put --
1756 *      Put using a cursor.
1757 */
1758static int
1759__bam_c_put(dbc, key, data, flags, pgnop)
1760        DBC *dbc;
1761        DBT *key, *data;
1762        u_int32_t flags;
1763        db_pgno_t *pgnop;
1764{
1765        BTREE_CURSOR *cp;
1766        DB *dbp;
1767        DBT dbt;
1768        DB_MPOOLFILE *mpf;
1769        db_pgno_t root_pgno;
1770        u_int32_t iiop;
1771        int cmp, exact, ret, stack;
1772        void *arg;
1773
1774        dbp = dbc->dbp;
1775        mpf = dbp->mpf;
1776        cp = (BTREE_CURSOR *)dbc->internal;
1777        root_pgno = cp->root;
1778
1779split:  ret = stack = 0;
1780        switch (flags) {
1781        case DB_AFTER:
1782        case DB_BEFORE:
1783        case DB_CURRENT:
1784                iiop = flags;
1785
1786                /*
1787                 * If the Btree has record numbers (and we're not replacing an
1788                 * existing record), we need a complete stack so that we can
1789                 * adjust the record counts.  The check for flags == DB_CURRENT
1790                 * is superfluous but left in for clarity.  (If C_RECNUM is set
1791                 * we know that flags must be DB_CURRENT, as DB_AFTER/DB_BEFORE
1792                 * are illegal in a Btree unless it's configured for duplicates
1793                 * and you cannot configure a Btree for both record renumbering
1794                 * and duplicates.)
1795                 */
1796                if (flags == DB_CURRENT &&
1797                    F_ISSET(cp, C_RECNUM) && F_ISSET(cp, C_DELETED)) {
1798                        if ((ret = __bam_c_getstack(dbc)) != 0)
1799                                goto err;
1800                        /*
1801                         * Initialize the cursor from the stack.  Don't take
1802                         * the page number or page index, they should already
1803                         * be set.
1804                         */
1805                        cp->page = cp->csp->page;
1806                        cp->lock = cp->csp->lock;
1807                        cp->lock_mode = cp->csp->lock_mode;
1808
1809                        stack = 1;
1810                        break;
1811                }
1812
1813                /* Acquire the current page with a write lock. */
1814                ACQUIRE_WRITE_LOCK(dbc, ret);
1815                if (ret != 0)
1816                        goto err;
1817                if ((ret = mpf->get(mpf, &cp->pgno, 0, &cp->page)) != 0)
1818                        goto err;
1819                break;
1820        case DB_KEYFIRST:
1821        case DB_KEYLAST:
1822        case DB_NODUPDATA:
1823                /*
1824                 * Searching off-page, sorted duplicate tree: do a tree search
1825                 * for the correct item; __bam_c_search returns the smallest
1826                 * slot greater than the key, use it.
1827                 *
1828                 * See comment below regarding where we can start the search.
1829                 */
1830                if (F_ISSET(dbc, DBC_OPD)) {
1831                        if ((ret = __bam_c_search(dbc,
1832                            F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno,
1833                            data, flags, &exact)) != 0)
1834                                goto err;
1835                        stack = 1;
1836
1837                        /* Disallow "sorted" duplicate duplicates. */
1838                        if (exact) {
1839                                if (IS_DELETED(dbp, cp->page, cp->indx)) {
1840                                        iiop = DB_CURRENT;
1841                                        break;
1842                                }
1843                                ret = __db_duperr(dbp, flags);
1844                                goto err;
1845                        }
1846                        iiop = DB_BEFORE;
1847                        break;
1848                }
1849
1850                /*
1851                 * Searching a btree.
1852                 *
1853                 * If we've done a split, we can start the search from the
1854                 * parent of the split page, which __bam_split returned
1855                 * for us in root_pgno, unless we're in a Btree with record
1856                 * numbering.  In that case, we'll need the true root page
1857                 * in order to adjust the record count.
1858                 */
1859                if ((ret = __bam_c_search(dbc,
1860                    F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno, key,
1861                    flags == DB_KEYFIRST || dbp->dup_compare != NULL ?
1862                    DB_KEYFIRST : DB_KEYLAST, &exact)) != 0)
1863                        goto err;
1864                stack = 1;
1865
1866                /*
1867                 * If we don't have an exact match, __bam_c_search returned
1868                 * the smallest slot greater than the key, use it.
1869                 */
1870                if (!exact) {
1871                        iiop = DB_KEYFIRST;
1872                        break;
1873                }
1874
1875                /*
1876                 * If duplicates aren't supported, replace the current item.
1877                 * (If implementing the DB->put function, our caller already
1878                 * checked the DB_NOOVERWRITE flag.)
1879                 */
1880                if (!F_ISSET(dbp, DB_AM_DUP)) {
1881                        iiop = DB_CURRENT;
1882                        break;
1883                }
1884
1885                /*
1886                 * If we find a matching entry, it may be an off-page duplicate
1887                 * tree.  Return the page number to our caller, we need a new
1888                 * cursor.
1889                 */
1890                if (pgnop != NULL && __bam_isopd(dbc, pgnop))
1891                        goto done;
1892
1893                /* If the duplicates aren't sorted, move to the right slot. */
1894                if (dbp->dup_compare == NULL) {
1895                        if (flags == DB_KEYFIRST)
1896                                iiop = DB_BEFORE;
1897                        else
1898                                for (;; cp->indx += P_INDX)
1899                                        if (cp->indx + P_INDX >=
1900                                            NUM_ENT(cp->page) ||
1901                                            !IS_DUPLICATE(dbc, cp->indx,
1902                                            cp->indx + P_INDX)) {
1903                                                iiop = DB_AFTER;
1904                                                break;
1905                                        }
1906                        break;
1907                }
1908
1909                /*
1910                 * We know that we're looking at the first of a set of sorted
1911                 * on-page duplicates.  Walk the list to find the right slot.
1912                 */
1913                for (;; cp->indx += P_INDX) {
1914                        if ((ret = __bam_cmp(dbp, data, cp->page,
1915                            cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1916                                goto err;
1917                        if (cmp < 0) {
1918                                iiop = DB_BEFORE;
1919                                break;
1920                        }
1921
1922                        /* Disallow "sorted" duplicate duplicates. */
1923                        if (cmp == 0) {
1924                                if (IS_DELETED(dbp, cp->page, cp->indx)) {
1925                                        iiop = DB_CURRENT;
1926                                        break;
1927                                }
1928                                ret = __db_duperr(dbp, flags);
1929                                goto err;
1930                        }
1931
1932                        if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1933                            P_INP(dbp, ((PAGE *)cp->page))[cp->indx] !=
1934                            P_INP(dbp, ((PAGE *)cp->page))[cp->indx + P_INDX]) {
1935                                iiop = DB_AFTER;
1936                                break;
1937                        }
1938                }
1939                break;
1940        default:
1941                ret = __db_unknown_flag(dbp->dbenv, "__bam_c_put", flags);
1942                goto err;
1943        }
1944
1945        switch (ret = __bam_iitem(dbc, key, data, iiop, 0)) {
1946        case 0:
1947                break;
1948        case DB_NEEDSPLIT:
1949                /*
1950                 * To split, we need a key for the page.  Either use the key
1951                 * argument or get a copy of the key from the page.
1952                 */
1953                if (flags == DB_AFTER ||
1954                    flags == DB_BEFORE || flags == DB_CURRENT) {
1955                        memset(&dbt, 0, sizeof(DBT));
1956                        if ((ret = __db_ret(dbp, cp->page, 0, &dbt,
1957                            &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
1958                                goto err;
1959                        arg = &dbt;
1960                } else
1961                        arg = F_ISSET(dbc, DBC_OPD) ? data : key;
1962
1963                /*
1964                 * Discard any locks and pinned pages (the locks are discarded
1965                 * even if we're running with transactions, as they lock pages
1966                 * that we're sorry we ever acquired).  If stack is set and the
1967                 * cursor entries are valid, they point to the same entries as
1968                 * the stack, don't free them twice.
1969                 */
1970                if (stack)
1971                        ret = __bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK);
1972                else
1973                        DISCARD_CUR(dbc, ret);
1974                if (ret != 0)
1975                        goto err;
1976
1977                /* Split the tree. */
1978                if ((ret = __bam_split(dbc, arg, &root_pgno)) != 0)
1979                        return (ret);
1980
1981                goto split;
1982        default:
1983                goto err;
1984        }
1985
1986err:
1987done:   /*
1988         * Discard any pages pinned in the tree and their locks, except for
1989         * the leaf page.  Note, the leaf page participated in any stack we
1990         * acquired, and so we have to adjust the stack as necessary.  If
1991         * there was only a single page on the stack, we don't have to free
1992         * further stack pages.
1993         */
1994        if (stack && BT_STK_POP(cp) != NULL)
1995                (void)__bam_stkrel(dbc, 0);
1996
1997        /*
1998         * Regardless of whether we were successful or not, clear the delete
1999         * flag.  If we're successful, we either moved the cursor or the item
2000         * is no longer deleted.  If we're not successful, then we're just a
2001         * copy, no need to have the flag set.
2002         */
2003        F_CLR(cp, C_DELETED);
2004
2005        return (ret);
2006}
2007
2008/*
2009 * __bam_c_rget --
2010 *      Return the record number for a cursor.
2011 *
2012 * PUBLIC: int __bam_c_rget __P((DBC *, DBT *));
2013 */
2014int
2015__bam_c_rget(dbc, data)
2016        DBC *dbc;
2017        DBT *data;
2018{
2019        BTREE_CURSOR *cp;
2020        DB *dbp;
2021        DBT dbt;
2022        DB_MPOOLFILE *mpf;
2023        db_recno_t recno;
2024        int exact, ret;
2025
2026        dbp = dbc->dbp;
2027        mpf = dbp->mpf;
2028        cp = (BTREE_CURSOR *)dbc->internal;
2029
2030        /*
2031         * Get the page with the current item on it.
2032         * Get a copy of the key.
2033         * Release the page, making sure we don't release it twice.
2034         */
2035        if ((ret = mpf->get(mpf, &cp->pgno, 0, &cp->page)) != 0)
2036                return (ret);
2037        memset(&dbt, 0, sizeof(DBT));
2038        if ((ret = __db_ret(dbp, cp->page,
2039            cp->indx, &dbt, &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
2040                goto err;
2041        ret = mpf->put(mpf, cp->page, 0);
2042        cp->page = NULL;
2043        if (ret != 0)
2044                return (ret);
2045
2046        if ((ret = __bam_search(dbc, PGNO_INVALID, &dbt,
2047            F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND,
2048            1, &recno, &exact)) != 0)
2049                goto err;
2050
2051        ret = __db_retcopy(dbp->dbenv, data,
2052            &recno, sizeof(recno), &dbc->rdata->data, &dbc->rdata->ulen);
2053
2054        /* Release the stack. */
2055err:    __bam_stkrel(dbc, 0);
2056
2057        return (ret);
2058}
2059
2060/*
2061 * __bam_c_writelock --
2062 *      Upgrade the cursor to a write lock.
2063 */
2064static int
2065__bam_c_writelock(dbc)
2066        DBC *dbc;
2067{
2068        BTREE_CURSOR *cp;
2069        int ret;
2070
2071        cp = (BTREE_CURSOR *)dbc->internal;
2072
2073        if (cp->lock_mode == DB_LOCK_WRITE)
2074                return (0);
2075
2076        /*
2077         * When writing to an off-page duplicate tree, we need to have the
2078         * appropriate page in the primary tree locked.  The general DBC
2079         * code calls us first with the primary cursor so we can acquire the
2080         * appropriate lock.
2081         */
2082        ACQUIRE_WRITE_LOCK(dbc, ret);
2083        return (ret);
2084}
2085
2086/*
2087 * __bam_c_first --
2088 *      Return the first record.
2089 */
2090static int
2091__bam_c_first(dbc)
2092        DBC *dbc;
2093{
2094        BTREE_CURSOR *cp;
2095        db_pgno_t pgno;
2096        int ret;
2097
2098        cp = (BTREE_CURSOR *)dbc->internal;
2099        ret = 0;
2100
2101        /* Walk down the left-hand side of the tree. */
2102        for (pgno = cp->root;;) {
2103                ACQUIRE_CUR_COUPLE(dbc, DB_LOCK_READ, pgno, ret);
2104                if (ret != 0)
2105                        return (ret);
2106
2107                /* If we find a leaf page, we're done. */
2108                if (ISLEAF(cp->page))
2109                        break;
2110
2111                pgno = GET_BINTERNAL(dbc->dbp, cp->page, 0)->pgno;
2112        }
2113
2114        /* If we want a write lock instead of a read lock, get it now. */
2115        if (F_ISSET(dbc, DBC_RMW)) {
2116                ACQUIRE_WRITE_LOCK(dbc, ret);
2117                if (ret != 0)
2118                        return (ret);
2119        }
2120
2121        cp->indx = 0;
2122
2123        /* If on an empty page or a deleted record, move to the next one. */
2124        if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))
2125                if ((ret = __bam_c_next(dbc, 0, 0)) != 0)
2126                        return (ret);
2127
2128        return (0);
2129}
2130
2131/*
2132 * __bam_c_last --
2133 *      Return the last record.
2134 */
2135static int
2136__bam_c_last(dbc)
2137        DBC *dbc;
2138{
2139        BTREE_CURSOR *cp;
2140        db_pgno_t pgno;
2141        int ret;
2142
2143        cp = (BTREE_CURSOR *)dbc->internal;
2144        ret = 0;
2145
2146        /* Walk down the right-hand side of the tree. */
2147        for (pgno = cp->root;;) {
2148                ACQUIRE_CUR_COUPLE(dbc, DB_LOCK_READ, pgno, ret);
2149                if (ret != 0)
2150                        return (ret);
2151
2152                /* If we find a leaf page, we're done. */
2153                if (ISLEAF(cp->page))
2154                        break;
2155
2156                pgno = GET_BINTERNAL(dbc->dbp, cp->page,
2157                    NUM_ENT(cp->page) - O_INDX)->pgno;
2158        }
2159
2160        /* If we want a write lock instead of a read lock, get it now. */
2161        if (F_ISSET(dbc, DBC_RMW)) {
2162                ACQUIRE_WRITE_LOCK(dbc, ret);
2163                if (ret != 0)
2164                        return (ret);
2165        }
2166
2167        cp->indx = NUM_ENT(cp->page) == 0 ? 0 :
2168            NUM_ENT(cp->page) -
2169            (TYPE(cp->page) == P_LBTREE ? P_INDX : O_INDX);
2170
2171        /* If on an empty page or a deleted record, move to the previous one. */
2172        if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))
2173                if ((ret = __bam_c_prev(dbc)) != 0)
2174                        return (ret);
2175
2176        return (0);
2177}
2178
2179/*
2180 * __bam_c_next --
2181 *      Move to the next record.
2182 */
2183static int
2184__bam_c_next(dbc, initial_move, deleted_okay)
2185        DBC *dbc;
2186        int initial_move, deleted_okay;
2187{
2188        BTREE_CURSOR *cp;
2189        db_indx_t adjust;
2190        db_lockmode_t lock_mode;
2191        db_pgno_t pgno;
2192        int ret;
2193
2194        cp = (BTREE_CURSOR *)dbc->internal;
2195        ret = 0;
2196
2197        /*
2198         * We're either moving through a page of duplicates or a btree leaf
2199         * page.
2200         *
2201         * !!!
2202         * This code handles empty pages and pages with only deleted entries.
2203         */
2204        if (F_ISSET(dbc, DBC_OPD)) {
2205                adjust = O_INDX;
2206                lock_mode = DB_LOCK_NG;
2207        } else {
2208                adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2209                lock_mode =
2210                    F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2211        }
2212        if (cp->page == NULL) {
2213                ACQUIRE_CUR(dbc, lock_mode, cp->pgno, ret);
2214                if (ret != 0)
2215                        return (ret);
2216        }
2217
2218        if (initial_move)
2219                cp->indx += adjust;
2220
2221        for (;;) {
2222                /*
2223                 * If at the end of the page, move to a subsequent page.
2224                 *
2225                 * !!!
2226                 * Check for >= NUM_ENT.  If the original search landed us on
2227                 * NUM_ENT, we may have incremented indx before the test.
2228                 */
2229                if (cp->indx >= NUM_ENT(cp->page)) {
2230                        if ((pgno
2231                            = NEXT_PGNO(cp->page)) == PGNO_INVALID)
2232                                return (DB_NOTFOUND);
2233
2234                        ACQUIRE_CUR(dbc, lock_mode, pgno, ret);
2235                        if (ret != 0)
2236                                return (ret);
2237                        cp->indx = 0;
2238                        continue;
2239                }
2240                if (!deleted_okay && IS_CUR_DELETED(dbc)) {
2241                        cp->indx += adjust;
2242                        continue;
2243                }
2244                break;
2245        }
2246        return (0);
2247}
2248
2249/*
2250 * __bam_c_prev --
2251 *      Move to the previous record.
2252 */
2253static int
2254__bam_c_prev(dbc)
2255        DBC *dbc;
2256{
2257        BTREE_CURSOR *cp;
2258        db_indx_t adjust;
2259        db_lockmode_t lock_mode;
2260        db_pgno_t pgno;
2261        int ret;
2262
2263        cp = (BTREE_CURSOR *)dbc->internal;
2264        ret = 0;
2265
2266        /*
2267         * We're either moving through a page of duplicates or a btree leaf
2268         * page.
2269         *
2270         * !!!
2271         * This code handles empty pages and pages with only deleted entries.
2272         */
2273        if (F_ISSET(dbc, DBC_OPD)) {
2274                adjust = O_INDX;
2275                lock_mode = DB_LOCK_NG;
2276        } else {
2277                adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2278                lock_mode =
2279                    F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2280        }
2281        if (cp->page == NULL) {
2282                ACQUIRE_CUR(dbc, lock_mode, cp->pgno, ret);
2283                if (ret != 0)
2284                        return (ret);
2285        }
2286
2287        for (;;) {
2288                /* If at the beginning of the page, move to a previous one. */
2289                if (cp->indx == 0) {
2290                        if ((pgno =
2291                            PREV_PGNO(cp->page)) == PGNO_INVALID)
2292                                return (DB_NOTFOUND);
2293
2294                        ACQUIRE_CUR(dbc, lock_mode, pgno, ret);
2295                        if (ret != 0)
2296                                return (ret);
2297
2298                        if ((cp->indx = NUM_ENT(cp->page)) == 0)
2299                                continue;
2300                }
2301
2302                /* Ignore deleted records. */
2303                cp->indx -= adjust;
2304                if (IS_CUR_DELETED(dbc))
2305                        continue;
2306
2307                break;
2308        }
2309        return (0);
2310}
2311
2312/*
2313 * __bam_c_search --
2314 *      Move to a specified record.
2315 */
2316static int
2317__bam_c_search(dbc, root_pgno, key, flags, exactp)
2318        DBC *dbc;
2319        db_pgno_t root_pgno;
2320        const DBT *key;
2321        u_int32_t flags;
2322        int *exactp;
2323{
2324        BTREE *t;
2325        BTREE_CURSOR *cp;
2326        DB *dbp;
2327        PAGE *h;
2328        db_indx_t indx, *inp;
2329        db_pgno_t bt_lpgno;
2330        db_recno_t recno;
2331        u_int32_t sflags;
2332        int cmp, ret;
2333
2334        dbp = dbc->dbp;
2335        cp = (BTREE_CURSOR *)dbc->internal;
2336        t = dbp->bt_internal;
2337        ret = 0;
2338
2339        /*
2340         * Find an entry in the database.  Discard any lock we currently hold,
2341         * we're going to search the tree.
2342         */
2343        DISCARD_CUR(dbc, ret);
2344        if (ret != 0)
2345                return (ret);
2346
2347        switch (flags) {
2348        case DB_SET_RECNO:
2349                if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0)
2350                        return (ret);
2351                sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
2352                if ((ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0)
2353                        return (ret);
2354                break;
2355        case DB_SET:
2356        case DB_GET_BOTH:
2357                sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
2358                goto search;
2359        case DB_GET_BOTH_RANGE:
2360                sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND);
2361                goto search;
2362        case DB_SET_RANGE:
2363                sflags =
2364                    (F_ISSET(dbc, DBC_RMW) ? S_WRITE : S_READ) | S_DUPFIRST;
2365                goto search;
2366        case DB_KEYFIRST:
2367                sflags = S_KEYFIRST;
2368                goto fast_search;
2369        case DB_KEYLAST:
2370        case DB_NODUPDATA:
2371                sflags = S_KEYLAST;
2372fast_search:    /*
2373                 * If the application has a history of inserting into the first
2374                 * or last pages of the database, we check those pages first to
2375                 * avoid doing a full search.
2376                 *
2377                 * If the tree has record numbers, we need a complete stack so
2378                 * that we can adjust the record counts, so fast_search isn't
2379                 * possible.
2380                 */
2381                if (F_ISSET(cp, C_RECNUM))
2382                        goto search;
2383
2384                /*
2385                 * !!!
2386                 * We do not mutex protect the t->bt_lpgno field, which means
2387                 * that it can only be used in an advisory manner.  If we find
2388                 * page we can use, great.  If we don't, we don't care, we do
2389                 * it the slow way instead.  Regardless, copy it into a local
2390                 * variable, otherwise we might acquire a lock for a page and
2391                 * then read a different page because it changed underfoot.
2392                 */
2393                bt_lpgno = t->bt_lpgno;
2394
2395                /*
2396                 * If the tree has no history of insertion, do it the slow way.
2397                 */
2398                if (bt_lpgno == PGNO_INVALID)
2399                        goto search;
2400
2401                /* Lock and retrieve the page on which we last inserted. */
2402                h = NULL;
2403                ACQUIRE(dbc,
2404                    DB_LOCK_WRITE, bt_lpgno, cp->lock, bt_lpgno, h, ret);
2405                if (ret != 0)
2406                        goto fast_miss;
2407
2408                inp = P_INP(dbp, h);
2409                /*
2410                 * It's okay if the page type isn't right or it's empty, it
2411                 * just means that the world changed.
2412                 */
2413                if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0)
2414                        goto fast_miss;
2415
2416                /*
2417                 * What we do here is test to see if we're at the beginning or
2418                 * end of the tree and if the new item sorts before/after the
2419                 * first/last page entry.  We don't try and catch inserts into
2420                 * the middle of the tree (although we could, as long as there
2421                 * were two keys on the page and we saved both the index and
2422                 * the page number of the last insert).
2423                 */
2424                if (h->next_pgno == PGNO_INVALID) {
2425                        indx = NUM_ENT(h) - P_INDX;
2426                        if ((ret = __bam_cmp(dbp,
2427                            key, h, indx, t->bt_compare, &cmp)) != 0)
2428                                return (ret);
2429
2430                        if (cmp < 0)
2431                                goto try_begin;
2432                        if (cmp > 0) {
2433                                indx += P_INDX;
2434                                goto fast_hit;
2435                        }
2436
2437                        /*
2438                         * Found a duplicate.  If doing DB_KEYLAST, we're at
2439                         * the correct position, otherwise, move to the first
2440                         * of the duplicates.  If we're looking at off-page
2441                         * duplicates, duplicate duplicates aren't permitted,
2442                         * so we're done.
2443                         */
2444                        if (flags == DB_KEYLAST)
2445                                goto fast_hit;
2446                        for (;
2447                            indx > 0 && inp[indx - P_INDX] == inp[indx];
2448                            indx -= P_INDX)
2449                                ;
2450                        goto fast_hit;
2451                }
2452try_begin:      if (h->prev_pgno == PGNO_INVALID) {
2453                        indx = 0;
2454                        if ((ret = __bam_cmp(dbp,
2455                            key, h, indx, t->bt_compare, &cmp)) != 0)
2456                                return (ret);
2457
2458                        if (cmp > 0)
2459                                goto fast_miss;
2460                        if (cmp < 0)
2461                                goto fast_hit;
2462
2463                        /*
2464                         * Found a duplicate.  If doing DB_KEYFIRST, we're at
2465                         * the correct position, otherwise, move to the last
2466                         * of the duplicates.  If we're looking at off-page
2467                         * duplicates, duplicate duplicates aren't permitted,
2468                         * so we're done.
2469                         */
2470                        if (flags == DB_KEYFIRST)
2471                                goto fast_hit;
2472                        for (;
2473                            indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
2474                            inp[indx] == inp[indx + P_INDX];
2475                            indx += P_INDX)
2476                                ;
2477                        goto fast_hit;
2478                }
2479                goto fast_miss;
2480
2481fast_hit:       /* Set the exact match flag, we may have found a duplicate. */
2482                *exactp = cmp == 0;
2483
2484                /*
2485                 * Insert the entry in the stack.  (Our caller is likely to
2486                 * call __bam_stkrel() after our return.)
2487                 */
2488                BT_STK_CLR(cp);
2489                BT_STK_ENTER(dbp->dbenv,
2490                    cp, h, indx, cp->lock, cp->lock_mode, ret);
2491                if (ret != 0)
2492                        return (ret);
2493                break;
2494
2495fast_miss:      /*
2496                 * This was not the right page, so we do not need to retain
2497                 * the lock even in the presence of transactions.
2498                 */
2499                DISCARD(dbc, 1, cp->lock, h, ret);
2500                if (ret != 0)
2501                        return (ret);
2502
2503search:         if ((ret = __bam_search(dbc, root_pgno,
2504                    key, sflags, 1, NULL, exactp)) != 0)
2505                        return (ret);
2506                break;
2507        default:
2508                return (__db_unknown_flag(dbp->dbenv, "__bam_c_search", flags));
2509        }
2510
2511        /* Initialize the cursor from the stack. */
2512        cp->page = cp->csp->page;
2513        cp->pgno = cp->csp->page->pgno;
2514        cp->indx = cp->csp->indx;
2515        cp->lock = cp->csp->lock;
2516        cp->lock_mode = cp->csp->lock_mode;
2517
2518        /*
2519         * If we inserted a key into the first or last slot of the tree,
2520         * remember where it was so we can do it more quickly next time.
2521         * If there are duplicates and we are inserting into the last slot,
2522         * the cursor will point _to_ the last item, not after it, which
2523         * is why we subtract P_INDX below.
2524         */
2525        if (TYPE(cp->page) == P_LBTREE &&
2526            (flags == DB_KEYFIRST || flags == DB_KEYLAST))
2527                t->bt_lpgno =
2528                    (NEXT_PGNO(cp->page) == PGNO_INVALID &&
2529                    cp->indx >= NUM_ENT(cp->page) - P_INDX) ||
2530                    (PREV_PGNO(cp->page) == PGNO_INVALID &&
2531                    cp->indx == 0) ? cp->pgno : PGNO_INVALID;
2532        return (0);
2533}
2534
2535/*
2536 * __bam_c_physdel --
2537 *      Physically remove an item from the page.
2538 */
2539static int
2540__bam_c_physdel(dbc)
2541        DBC *dbc;
2542{
2543        BTREE_CURSOR *cp;
2544        DB *dbp;
2545        DBT key;
2546        DB_LOCK lock;
2547        DB_MPOOLFILE *mpf;
2548        PAGE *h;
2549        db_pgno_t pgno;
2550        int delete_page, empty_page, exact, level, ret;
2551
2552        dbp = dbc->dbp;
2553        mpf = dbp->mpf;
2554        cp = (BTREE_CURSOR *)dbc->internal;
2555        delete_page = empty_page = ret = 0;
2556
2557        /* If the page is going to be emptied, consider deleting it. */
2558        delete_page = empty_page =
2559            NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1);
2560
2561        /*
2562         * Check if the application turned off reverse splits.  Applications
2563         * can't turn off reverse splits in off-page duplicate trees, that
2564         * space will never be reused unless the exact same key is specified.
2565         */
2566        if (delete_page &&
2567            !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_AM_REVSPLITOFF))
2568                delete_page = 0;
2569
2570        /*
2571         * We never delete the last leaf page.  (Not really true -- we delete
2572         * the last leaf page of off-page duplicate trees, but that's handled
2573         * by our caller, not down here.)
2574         */
2575        if (delete_page && cp->pgno == cp->root)
2576                delete_page = 0;
2577
2578        /*
2579         * To delete a leaf page other than an empty root page, we need a
2580         * copy of a key from the page.  Use the 0th page index since it's
2581         * the last key the page held.
2582         *
2583         * !!!
2584         * Note that because __bam_c_physdel is always called from a cursor
2585         * close, it should be safe to use the cursor's own "my_rkey" memory
2586         * to temporarily hold this key.  We shouldn't own any returned-data
2587         * memory of interest--if we do, we're in trouble anyway.
2588         */
2589        if (delete_page) {
2590                memset(&key, 0, sizeof(DBT));
2591                if ((ret = __db_ret(dbp, cp->page,
2592                    0, &key, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2593                        return (ret);
2594        }
2595
2596        /*
2597         * Delete the items.  If page isn't empty, we adjust the cursors.
2598         *
2599         * !!!
2600         * The following operations to delete a page may deadlock.  The easy
2601         * scenario is if we're deleting an item because we're closing cursors
2602         * because we've already deadlocked and want to call txn->abort.  If
2603         * we fail due to deadlock, we'll leave a locked, possibly empty page
2604         * in the tree, which won't be empty long because we'll undo the delete
2605         * when we undo the transaction's modifications.
2606         *
2607         * !!!
2608         * Delete the key item first, otherwise the on-page duplicate checks
2609         * in __bam_ditem() won't work!
2610         */
2611        if (TYPE(cp->page) == P_LBTREE) {
2612                if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2613                        return (ret);
2614                if (!empty_page)
2615                        if ((ret = __bam_ca_di(dbc,
2616                            PGNO(cp->page), cp->indx, -1)) != 0)
2617                                return (ret);
2618        }
2619        if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2620                return (ret);
2621        if (!empty_page)
2622                if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0)
2623                        return (ret);
2624
2625        /* If we're not going to try and delete the page, we're done. */
2626        if (!delete_page)
2627                return (0);
2628
2629        /*
2630         * Call __bam_search to reacquire the empty leaf page, but this time
2631         * get both the leaf page and it's parent, locked.  Jump back up the
2632         * tree, until we have the top pair of pages that we want to delete.
2633         * Once we have the top page that we want to delete locked, lock the
2634         * underlying pages and check to make sure they're still empty.  If
2635         * they are, delete them.
2636         */
2637        for (level = LEAFLEVEL;; ++level) {
2638                /* Acquire a page and its parent, locked. */
2639                if ((ret = __bam_search(dbc, PGNO_INVALID,
2640                    &key, S_WRPAIR, level, NULL, &exact)) != 0)
2641                        return (ret);
2642
2643                /*
2644                 * If we reach the root or the parent page isn't going to be
2645                 * empty when we delete one record, stop.
2646                 */
2647                h = cp->csp[-1].page;
2648                if (h->pgno == cp->root || NUM_ENT(h) != 1)
2649                        break;
2650
2651                /* Discard the stack, retaining no locks. */
2652                (void)__bam_stkrel(dbc, STK_NOLOCK);
2653        }
2654
2655        /*
2656         * Move the stack pointer one after the last entry, we may be about
2657         * to push more items onto the page stack.
2658         */
2659        ++cp->csp;
2660
2661        /*
2662         * cp->csp[-2].page is now the parent page, which we may or may not be
2663         * going to delete, and cp->csp[-1].page is the first page we know we
2664         * are going to delete.  Walk down the chain of pages, acquiring pages
2665         * until we've acquired a leaf page.  Generally, this shouldn't happen;
2666         * we should only see a single internal page with one item and a single
2667         * leaf page with no items.  The scenario where we could see something
2668         * else is if reverse splits were turned off for awhile and then turned
2669         * back on.  That could result in all sorts of strangeness, e.g., empty
2670         * pages in the tree, trees that looked like linked lists, and so on.
2671         *
2672         * !!!
2673         * Sheer paranoia: if we find any pages that aren't going to be emptied
2674         * by the delete, someone else added an item while we were walking the
2675         * tree, and we discontinue the delete.  Shouldn't be possible, but we
2676         * check regardless.
2677         */
2678        for (h = cp->csp[-1].page;;) {
2679                if (ISLEAF(h)) {
2680                        if (NUM_ENT(h) != 0)
2681                                break;
2682                        break;
2683                } else
2684                        if (NUM_ENT(h) != 1)
2685                                break;
2686
2687                /*
2688                 * Get the next page, write lock it and push it onto the stack.
2689                 * We know it's index 0, because it can only have one element.
2690                 */
2691                switch (TYPE(h)) {
2692                case P_IBTREE:
2693                        pgno = GET_BINTERNAL(dbp, h, 0)->pgno;
2694                        break;
2695                case P_IRECNO:
2696                        pgno = GET_RINTERNAL(dbp, h, 0)->pgno;
2697                        break;
2698                default:
2699                        return (__db_pgfmt(dbp->dbenv, PGNO(h)));
2700                }
2701
2702                if ((ret =
2703                    __db_lget(dbc, 0, pgno, DB_LOCK_WRITE, 0, &lock)) != 0)
2704                        break;
2705                if ((ret = mpf->get(mpf, &pgno, 0, &h)) != 0)
2706                        break;
2707                BT_STK_PUSH(dbp->dbenv, cp, h, 0, lock, DB_LOCK_WRITE, ret);
2708                if (ret != 0)
2709                        break;
2710        }
2711
2712        /* Adjust the cursor stack to reference the last page on the stack. */
2713        BT_STK_POP(cp);
2714
2715        /*
2716         * If everything worked, delete the stack, otherwise, release the
2717         * stack and page locks without further damage.
2718         */
2719        if (ret == 0)
2720                ret = __bam_dpages(dbc, cp->sp);
2721        else
2722                (void)__bam_stkrel(dbc, 0);
2723
2724        return (ret);
2725}
2726
2727/*
2728 * __bam_c_getstack --
2729 *      Acquire a full stack for a cursor.
2730 */
2731static int
2732__bam_c_getstack(dbc)
2733        DBC *dbc;
2734{
2735        BTREE_CURSOR *cp;
2736        DB *dbp;
2737        DBT dbt;
2738        DB_MPOOLFILE *mpf;
2739        PAGE *h;
2740        int exact, ret, t_ret;
2741
2742        dbp = dbc->dbp;
2743        mpf = dbp->mpf;
2744        cp = (BTREE_CURSOR *)dbc->internal;
2745
2746        /*
2747         * Get the page with the current item on it.  The caller of this
2748         * routine has to already hold a read lock on the page, so there
2749         * is no additional lock to acquire.
2750         */
2751        if ((ret = mpf->get(mpf, &cp->pgno, 0, &h)) != 0)
2752                return (ret);
2753
2754        /* Get a copy of a key from the page. */
2755        memset(&dbt, 0, sizeof(DBT));
2756        if ((ret = __db_ret(dbp,
2757            h, 0, &dbt, &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
2758                goto err;
2759
2760        /* Get a write-locked stack for the page. */
2761        exact = 0;
2762        ret = __bam_search(dbc, PGNO_INVALID,
2763            &dbt, S_KEYFIRST, 1, NULL, &exact);
2764
2765err:    /* Discard the key and the page. */
2766        if ((t_ret = mpf->put(mpf, h, 0)) != 0 && ret == 0)
2767                ret = t_ret;
2768
2769        return (ret);
2770}
2771
2772/*
2773 * __bam_isopd --
2774 *      Return if the cursor references an off-page duplicate tree via its
2775 *      page number.
2776 */
2777static int
2778__bam_isopd(dbc, pgnop)
2779        DBC *dbc;
2780        db_pgno_t *pgnop;
2781{
2782        BOVERFLOW *bo;
2783
2784        if (TYPE(dbc->internal->page) != P_LBTREE)
2785                return (0);
2786
2787        bo = GET_BOVERFLOW(dbc->dbp,
2788            dbc->internal->page, dbc->internal->indx + O_INDX);
2789        if (B_TYPE(bo->type) == B_DUPLICATE) {
2790                *pgnop = bo->pgno;
2791                return (1);
2792        }
2793        return (0);
2794}
Note: See TracBrowser for help on using the repository browser.