source: trunk/third/evolution-data-server/libdb/btree/bt_split.c @ 21189

Revision 21189, 34.6 KB checked in by ghudson, 20 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r21188, which included commits to RCS files with non-trunk default branches.
Line 
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2002
5 *      Sleepycat Software.  All rights reserved.
6 */
7/*
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 *      Keith Bostic.  All rights reserved.
10 */
11/*
12 * Copyright (c) 1990, 1993, 1994, 1995
13 *      The Regents of the University of California.  All rights reserved.
14 *
15 * Redistribution and use in source and binary forms, with or without
16 * modification, are permitted provided that the following conditions
17 * are met:
18 * 1. Redistributions of source code must retain the above copyright
19 *    notice, this list of conditions and the following disclaimer.
20 * 2. Redistributions in binary form must reproduce the above copyright
21 *    notice, this list of conditions and the following disclaimer in the
22 *    documentation and/or other materials provided with the distribution.
23 * 3. Neither the name of the University nor the names of its contributors
24 *    may be used to endorse or promote products derived from this software
25 *    without specific prior written permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
28 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
30 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
31 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
32 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
33 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
34 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
35 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
36 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
37 * SUCH DAMAGE.
38 */
39
40#include "db_config.h"
41
42#ifndef lint
43static const char revid[] = "$Id: bt_split.c,v 1.1.1.1 2004-12-17 17:26:45 ghudson Exp $";
44#endif /* not lint */
45
46#ifndef NO_SYSTEM_INCLUDES
47#include <sys/types.h>
48
49#include <limits.h>
50#include <string.h>
51#endif
52
53#include "db_int.h"
54#include "dbinc/db_page.h"
55#include "dbinc/db_shash.h"
56#include "dbinc/lock.h"
57#include "dbinc/btree.h"
58
59static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
60static int __bam_page __P((DBC *, EPG *, EPG *));
61static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *, int));
62static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
63static int __bam_root __P((DBC *, EPG *));
64static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
65
66/*
67 * __bam_split --
68 *      Split a page.
69 *
70 * PUBLIC: int __bam_split __P((DBC *, void *, db_pgno_t *));
71 */
72int
73__bam_split(dbc, arg, root_pgnop)
74        DBC *dbc;
75        void *arg;
76        db_pgno_t *root_pgnop;
77{
78        BTREE_CURSOR *cp;
79        enum { UP, DOWN } dir;
80        db_pgno_t root_pgno;
81        int exact, level, ret;
82
83        cp = (BTREE_CURSOR *)dbc->internal;
84        root_pgno = cp->root;
85
86        /*
87         * The locking protocol we use to avoid deadlock to acquire locks by
88         * walking down the tree, but we do it as lazily as possible, locking
89         * the root only as a last resort.  We expect all stack pages to have
90         * been discarded before we're called; we discard all short-term locks.
91         *
92         * When __bam_split is first called, we know that a leaf page was too
93         * full for an insert.  We don't know what leaf page it was, but we
94         * have the key/recno that caused the problem.  We call XX_search to
95         * reacquire the leaf page, but this time get both the leaf page and
96         * its parent, locked.  We then split the leaf page and see if the new
97         * internal key will fit into the parent page.  If it will, we're done.
98         *
99         * If it won't, we discard our current locks and repeat the process,
100         * only this time acquiring the parent page and its parent, locked.
101         * This process repeats until we succeed in the split, splitting the
102         * root page as the final resort.  The entire process then repeats,
103         * as necessary, until we split a leaf page.
104         *
105         * XXX
106         * A traditional method of speeding this up is to maintain a stack of
107         * the pages traversed in the original search.  You can detect if the
108         * stack is correct by storing the page's LSN when it was searched and
109         * comparing that LSN with the current one when it's locked during the
110         * split.  This would be an easy change for this code, but I have no
111         * numbers that indicate it's worthwhile.
112         */
113        for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
114                /*
115                 * Acquire a page and its parent, locked.
116                 */
117                if ((ret = (dbc->dbtype == DB_BTREE ?
118                    __bam_search(dbc, PGNO_INVALID,
119                        arg, S_WRPAIR, level, NULL, &exact) :
120                    __bam_rsearch(dbc,
121                        (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
122                        return (ret);
123
124                if (root_pgnop != NULL)
125                        *root_pgnop = cp->csp[0].page->pgno == root_pgno ?
126                            root_pgno : cp->csp[-1].page->pgno;
127                /*
128                 * Split the page if it still needs it (it's possible another
129                 * thread of control has already split the page).  If we are
130                 * guaranteed that two items will fit on the page, the split
131                 * is no longer necessary.
132                 */
133                if (2 * B_MAXSIZEONPAGE(cp->ovflsize)
134                    <= (db_indx_t)P_FREESPACE(dbc->dbp, cp->csp[0].page)) {
135                        __bam_stkrel(dbc, STK_NOLOCK);
136                        return (0);
137                }
138                ret = cp->csp[0].page->pgno == root_pgno ?
139                    __bam_root(dbc, &cp->csp[0]) :
140                    __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
141                BT_STK_CLR(cp);
142
143                switch (ret) {
144                case 0:
145                        /* Once we've split the leaf page, we're done. */
146                        if (level == LEAFLEVEL)
147                                return (0);
148
149                        /* Switch directions. */
150                        if (dir == UP)
151                                dir = DOWN;
152                        break;
153                case DB_NEEDSPLIT:
154                        /*
155                         * It's possible to fail to split repeatedly, as other
156                         * threads may be modifying the tree, or the page usage
157                         * is sufficiently bad that we don't get enough space
158                         * the first time.
159                         */
160                        if (dir == DOWN)
161                                dir = UP;
162                        break;
163                default:
164                        return (ret);
165                }
166        }
167        /* NOTREACHED */
168}
169
170/*
171 * __bam_root --
172 *      Split the root page of a btree.
173 */
174static int
175__bam_root(dbc, cp)
176        DBC *dbc;
177        EPG *cp;
178{
179        DB *dbp;
180        DBT log_dbt;
181        DB_LSN log_lsn;
182        DB_MPOOLFILE *mpf;
183        PAGE *lp, *rp;
184        db_indx_t split;
185        u_int32_t opflags;
186        int ret;
187
188        dbp = dbc->dbp;
189        mpf = dbp->mpf;
190
191        /* Yeah, right. */
192        if (cp->page->level >= MAXBTREELEVEL) {
193                __db_err(dbp->dbenv,
194                    "Too many btree levels: %d", cp->page->level);
195                ret = ENOSPC;
196                goto err;
197        }
198
199        /* Create new left and right pages for the split. */
200        lp = rp = NULL;
201        if ((ret = __db_new(dbc, TYPE(cp->page), &lp)) != 0 ||
202            (ret = __db_new(dbc, TYPE(cp->page), &rp)) != 0)
203                goto err;
204        P_INIT(lp, dbp->pgsize, lp->pgno,
205            PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
206            cp->page->level, TYPE(cp->page));
207        P_INIT(rp, dbp->pgsize, rp->pgno,
208            ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
209            cp->page->level, TYPE(cp->page));
210
211        /* Split the page. */
212        if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
213                goto err;
214
215        /* Log the change. */
216        if (DBC_LOGGING(dbc)) {
217                memset(&log_dbt, 0, sizeof(log_dbt));
218                log_dbt.data = cp->page;
219                log_dbt.size = dbp->pgsize;
220                ZERO_LSN(log_lsn);
221                opflags = F_ISSET(
222                    (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0;
223                if ((ret = __bam_split_log(dbp,
224                    dbc->txn, &LSN(cp->page), 0, PGNO(lp), &LSN(lp), PGNO(rp),
225                    &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &log_lsn,
226                    dbc->internal->root, &log_dbt, opflags)) != 0)
227                        goto err;
228        } else
229                LSN_NOT_LOGGED(LSN(cp->page));
230        LSN(lp) = LSN(cp->page);
231        LSN(rp) = LSN(cp->page);
232
233        /* Clean up the new root page. */
234        if ((ret = (dbc->dbtype == DB_RECNO ?
235            __ram_root(dbc, cp->page, lp, rp) :
236            __bam_broot(dbc, cp->page, lp, rp))) != 0)
237                goto err;
238
239        /* Adjust any cursors. */
240        if ((ret = __bam_ca_split(dbc,
241            cp->page->pgno, lp->pgno, rp->pgno, split, 1)) != 0)
242                goto err;
243
244        /* Success -- write the real pages back to the store. */
245        (void)mpf->put(mpf, cp->page, DB_MPOOL_DIRTY);
246        (void)__TLPUT(dbc, cp->lock);
247        (void)mpf->put(mpf, lp, DB_MPOOL_DIRTY);
248        (void)mpf->put(mpf, rp, DB_MPOOL_DIRTY);
249
250        return (0);
251
252err:    if (lp != NULL)
253                (void)mpf->put(mpf, lp, 0);
254        if (rp != NULL)
255                (void)mpf->put(mpf, rp, 0);
256        (void)mpf->put(mpf, cp->page, 0);
257        (void)__TLPUT(dbc, cp->lock);
258        return (ret);
259}
260
261/*
262 * __bam_page --
263 *      Split the non-root page of a btree.
264 */
265static int
266__bam_page(dbc, pp, cp)
267        DBC *dbc;
268        EPG *pp, *cp;
269{
270        BTREE_CURSOR *bc;
271        DBT log_dbt;
272        DB_LSN log_lsn;
273        DB *dbp;
274        DB_LOCK rplock, tplock;
275        DB_MPOOLFILE *mpf;
276        DB_LSN save_lsn;
277        PAGE *lp, *rp, *alloc_rp, *tp;
278        db_indx_t split;
279        u_int32_t opflags;
280        int ret, t_ret;
281
282        dbp = dbc->dbp;
283        mpf = dbp->mpf;
284        alloc_rp = lp = rp = tp = NULL;
285        LOCK_INIT(rplock);
286        LOCK_INIT(tplock);
287        ret = -1;
288
289        /*
290         * Create a new right page for the split, and fill in everything
291         * except its LSN and page number.
292         *
293         * We malloc space for both the left and right pages, so we don't get
294         * a new page from the underlying buffer pool until we know the split
295         * is going to succeed.  The reason is that we can't release locks
296         * acquired during the get-a-new-page process because metadata page
297         * locks can't be discarded on failure since we may have modified the
298         * free list.  So, if you assume that we're holding a write lock on the
299         * leaf page which ran out of space and started this split (e.g., we
300         * have already written records to the page, or we retrieved a record
301         * from it with the DB_RMW flag set), failing in a split with both a
302         * leaf page locked and the metadata page locked can potentially lock
303         * up the tree badly, because we've violated the rule of always locking
304         * down the tree, and never up.
305         */
306        if ((ret = __os_malloc(dbp->dbenv, dbp->pgsize, &rp)) != 0)
307                goto err;
308        P_INIT(rp, dbp->pgsize, 0,
309            ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page),
310            ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page),
311            cp->page->level, TYPE(cp->page));
312
313        /*
314         * Create new left page for the split, and fill in everything
315         * except its LSN and next-page page number.
316         */
317        if ((ret = __os_malloc(dbp->dbenv, dbp->pgsize, &lp)) != 0)
318                goto err;
319        P_INIT(lp, dbp->pgsize, PGNO(cp->page),
320            ISINTERNAL(cp->page) ?  PGNO_INVALID : PREV_PGNO(cp->page),
321            ISINTERNAL(cp->page) ?  PGNO_INVALID : 0,
322            cp->page->level, TYPE(cp->page));
323
324        /*
325         * Split right.
326         *
327         * Only the indices are sorted on the page, i.e., the key/data pairs
328         * aren't, so it's simpler to copy the data from the split page onto
329         * two new pages instead of copying half the data to a new right page
330         * and compacting the left page in place.  Since the left page can't
331         * change, we swap the original and the allocated left page after the
332         * split.
333         */
334        if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
335                goto err;
336
337        /*
338         * Test to see if we are going to be able to insert the new pages into
339         * the parent page.  The interesting failure here is that the parent
340         * page can't hold the new keys, and has to be split in turn, in which
341         * case we want to release all the locks we can.
342         */
343        if ((ret = __bam_pinsert(dbc, pp, lp, rp, 1)) != 0)
344                goto err;
345
346        /*
347         * Fix up the previous pointer of any leaf page following the split
348         * page.
349         *
350         * There's interesting deadlock situations here as we try to write-lock
351         * a page that's not in our direct ancestry.  Consider a cursor walking
352         * backward through the leaf pages, that has our following page locked,
353         * and is waiting on a lock for the page we're splitting.  In that case
354         * we're going to deadlock here .  It's probably OK, stepping backward
355         * through the tree isn't a common operation.
356         */
357        if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID) {
358                if ((ret = __db_lget(dbc,
359                    0, NEXT_PGNO(cp->page), DB_LOCK_WRITE, 0, &tplock)) != 0)
360                        goto err;
361                if ((ret = mpf->get(mpf, &NEXT_PGNO(cp->page), 0, &tp)) != 0)
362                        goto err;
363        }
364
365        /*
366         * We've got everything locked down we need, and we know the split
367         * is going to succeed.  Go and get the additional page we'll need.
368         */
369        if ((ret = __db_new(dbc, TYPE(cp->page), &alloc_rp)) != 0)
370                goto err;
371
372        /*
373         * Lock the new page.  We need to do this because someone
374         * could get here through bt_lpgno if this page was recently
375         * dealocated.  They can't look at it before we commit.
376         */
377        if ((ret = __db_lget(dbc,
378            0, PGNO(alloc_rp), DB_LOCK_WRITE, 0, &rplock)) != 0)
379                goto err;
380
381        /*
382         * Fix up the page numbers we didn't have before.  We have to do this
383         * before calling __bam_pinsert because it may copy a page number onto
384         * the parent page and it takes the page number from its page argument.
385         */
386        PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp);
387
388        /* Actually update the parent page. */
389        if ((ret = __bam_pinsert(dbc, pp, lp, rp, 0)) != 0)
390                goto err;
391
392        bc = (BTREE_CURSOR *)dbc->internal;
393        /* Log the change. */
394        if (DBC_LOGGING(dbc)) {
395                memset(&log_dbt, 0, sizeof(log_dbt));
396                log_dbt.data = cp->page;
397                log_dbt.size = dbp->pgsize;
398                if (tp == NULL)
399                        ZERO_LSN(log_lsn);
400                opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0;
401                if ((ret = __bam_split_log(dbp, dbc->txn, &LSN(cp->page), 0,
402                    PGNO(cp->page), &LSN(cp->page), PGNO(alloc_rp),
403                    &LSN(alloc_rp), (u_int32_t)NUM_ENT(lp),
404                    tp == NULL ? 0 : PGNO(tp),
405                    tp == NULL ? &log_lsn : &LSN(tp),
406                    PGNO_INVALID, &log_dbt, opflags)) != 0)
407                        goto err;
408
409        } else
410                LSN_NOT_LOGGED(LSN(cp->page));
411
412        /* Update the LSNs for all involved pages. */
413        LSN(alloc_rp) = LSN(cp->page);
414        LSN(lp) = LSN(cp->page);
415        LSN(rp) = LSN(cp->page);
416        if (tp != NULL)
417                LSN(tp) = LSN(cp->page);
418
419        /*
420         * Copy the left and right pages into place.  There are two paths
421         * through here.  Either we are logging and we set the LSNs in the
422         * logging path.  However, if we are not logging, then we do not
423         * have valid LSNs on lp or rp.  The correct LSNs to use are the
424         * ones on the page we got from __db_new or the one that was
425         * originally on cp->page.  In both cases, we save the LSN from the
426         * real database page (not a malloc'd one) and reapply it after we
427         * do the copy.
428         */
429        save_lsn = alloc_rp->lsn;
430        memcpy(alloc_rp, rp, LOFFSET(dbp, rp));
431        memcpy((u_int8_t *)alloc_rp + HOFFSET(rp),
432            (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp));
433        alloc_rp->lsn = save_lsn;
434
435        save_lsn = cp->page->lsn;
436        memcpy(cp->page, lp, LOFFSET(dbp, lp));
437        memcpy((u_int8_t *)cp->page + HOFFSET(lp),
438            (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
439        cp->page->lsn = save_lsn;
440
441        /* Fix up the next-page link. */
442        if (tp != NULL)
443                PREV_PGNO(tp) = PGNO(rp);
444
445        /* Adjust any cursors. */
446        if ((ret = __bam_ca_split(dbc,
447            PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0)
448                goto err;
449
450        __os_free(dbp->dbenv, lp);
451        __os_free(dbp->dbenv, rp);
452
453        /*
454         * Success -- write the real pages back to the store.  As we never
455         * acquired any sort of lock on the new page, we release it before
456         * releasing locks on the pages that reference it.  We're finished
457         * modifying the page so it's not really necessary, but it's neater.
458         */
459        if ((t_ret = mpf->put(mpf, alloc_rp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
460                ret = t_ret;
461        (void)__TLPUT(dbc, rplock);
462        if ((t_ret = mpf->put(mpf, pp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
463                ret = t_ret;
464        (void)__TLPUT(dbc, pp->lock);
465        if ((t_ret = mpf->put(mpf, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
466                ret = t_ret;
467        (void)__TLPUT(dbc, cp->lock);
468        if (tp != NULL) {
469                if ((t_ret =
470                    mpf->put(mpf, tp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
471                        ret = t_ret;
472                (void)__TLPUT(dbc, tplock);
473        }
474        return (ret);
475
476err:    if (lp != NULL)
477                __os_free(dbp->dbenv, lp);
478        if (rp != NULL)
479                __os_free(dbp->dbenv, rp);
480        if (alloc_rp != NULL)
481                (void)mpf->put(mpf, alloc_rp, 0);
482        if (tp != NULL)
483                (void)mpf->put(mpf, tp, 0);
484
485        /* We never updated the new or next pages, we can release them. */
486        (void)__LPUT(dbc, rplock);
487        (void)__LPUT(dbc, tplock);
488
489        (void)mpf->put(mpf, pp->page, 0);
490        if (ret == DB_NEEDSPLIT)
491                (void)__LPUT(dbc, pp->lock);
492        else
493                (void)__TLPUT(dbc, pp->lock);
494
495        (void)mpf->put(mpf, cp->page, 0);
496        if (ret == DB_NEEDSPLIT)
497                (void)__LPUT(dbc, cp->lock);
498        else
499                (void)__TLPUT(dbc, cp->lock);
500
501        return (ret);
502}
503
504/*
505 * __bam_broot --
506 *      Fix up the btree root page after it has been split.
507 */
508static int
509__bam_broot(dbc, rootp, lp, rp)
510        DBC *dbc;
511        PAGE *rootp, *lp, *rp;
512{
513        BINTERNAL bi, *child_bi;
514        BKEYDATA *child_bk;
515        BTREE_CURSOR *cp;
516        DB *dbp;
517        DBT hdr, data;
518        db_pgno_t root_pgno;
519        int ret;
520
521        dbp = dbc->dbp;
522        cp = (BTREE_CURSOR *)dbc->internal;
523
524        /*
525         * If the root page was a leaf page, change it into an internal page.
526         * We copy the key we split on (but not the key's data, in the case of
527         * a leaf page) to the new root page.
528         */
529        root_pgno = cp->root;
530        P_INIT(rootp, dbp->pgsize,
531            root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
532
533        memset(&data, 0, sizeof(data));
534        memset(&hdr, 0, sizeof(hdr));
535
536        /*
537         * The btree comparison code guarantees that the left-most key on any
538         * internal btree page is never used, so it doesn't need to be filled
539         * in.  Set the record count if necessary.
540         */
541        memset(&bi, 0, sizeof(bi));
542        bi.len = 0;
543        B_TSET(bi.type, B_KEYDATA, 0);
544        bi.pgno = lp->pgno;
545        if (F_ISSET(cp, C_RECNUM)) {
546                bi.nrecs = __bam_total(dbp, lp);
547                RE_NREC_SET(rootp, bi.nrecs);
548        }
549        hdr.data = &bi;
550        hdr.size = SSZA(BINTERNAL, data);
551        if ((ret =
552            __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
553                return (ret);
554
555        switch (TYPE(rp)) {
556        case P_IBTREE:
557                /* Copy the first key of the child page onto the root page. */
558                child_bi = GET_BINTERNAL(dbp, rp, 0);
559
560                bi.len = child_bi->len;
561                B_TSET(bi.type, child_bi->type, 0);
562                bi.pgno = rp->pgno;
563                if (F_ISSET(cp, C_RECNUM)) {
564                        bi.nrecs = __bam_total(dbp, rp);
565                        RE_NREC_ADJ(rootp, bi.nrecs);
566                }
567                hdr.data = &bi;
568                hdr.size = SSZA(BINTERNAL, data);
569                data.data = child_bi->data;
570                data.size = child_bi->len;
571                if ((ret = __db_pitem(dbc, rootp, 1,
572                    BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
573                        return (ret);
574
575                /* Increment the overflow ref count. */
576                if (B_TYPE(child_bi->type) == B_OVERFLOW)
577                        if ((ret = __db_ovref(dbc,
578                            ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
579                                return (ret);
580                break;
581        case P_LDUP:
582        case P_LBTREE:
583                /* Copy the first key of the child page onto the root page. */
584                child_bk = GET_BKEYDATA(dbp, rp, 0);
585                switch (B_TYPE(child_bk->type)) {
586                case B_KEYDATA:
587                        bi.len = child_bk->len;
588                        B_TSET(bi.type, child_bk->type, 0);
589                        bi.pgno = rp->pgno;
590                        if (F_ISSET(cp, C_RECNUM)) {
591                                bi.nrecs = __bam_total(dbp, rp);
592                                RE_NREC_ADJ(rootp, bi.nrecs);
593                        }
594                        hdr.data = &bi;
595                        hdr.size = SSZA(BINTERNAL, data);
596                        data.data = child_bk->data;
597                        data.size = child_bk->len;
598                        if ((ret = __db_pitem(dbc, rootp, 1,
599                            BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
600                                return (ret);
601                        break;
602                case B_DUPLICATE:
603                case B_OVERFLOW:
604                        bi.len = BOVERFLOW_SIZE;
605                        B_TSET(bi.type, child_bk->type, 0);
606                        bi.pgno = rp->pgno;
607                        if (F_ISSET(cp, C_RECNUM)) {
608                                bi.nrecs = __bam_total(dbp, rp);
609                                RE_NREC_ADJ(rootp, bi.nrecs);
610                        }
611                        hdr.data = &bi;
612                        hdr.size = SSZA(BINTERNAL, data);
613                        data.data = child_bk;
614                        data.size = BOVERFLOW_SIZE;
615                        if ((ret = __db_pitem(dbc, rootp, 1,
616                            BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
617                                return (ret);
618
619                        /* Increment the overflow ref count. */
620                        if (B_TYPE(child_bk->type) == B_OVERFLOW)
621                                if ((ret = __db_ovref(dbc,
622                                    ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
623                                        return (ret);
624                        break;
625                default:
626                        return (__db_pgfmt(dbp->dbenv, rp->pgno));
627                }
628                break;
629        default:
630                return (__db_pgfmt(dbp->dbenv, rp->pgno));
631        }
632        return (0);
633}
634
635/*
636 * __ram_root --
637 *      Fix up the recno root page after it has been split.
638 */
639static int
640__ram_root(dbc, rootp, lp, rp)
641        DBC *dbc;
642        PAGE *rootp, *lp, *rp;
643{
644        DB *dbp;
645        DBT hdr;
646        RINTERNAL ri;
647        db_pgno_t root_pgno;
648        int ret;
649
650        dbp = dbc->dbp;
651        root_pgno = dbc->internal->root;
652
653        /* Initialize the page. */
654        P_INIT(rootp, dbp->pgsize,
655            root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
656
657        /* Initialize the header. */
658        memset(&hdr, 0, sizeof(hdr));
659        hdr.data = &ri;
660        hdr.size = RINTERNAL_SIZE;
661
662        /* Insert the left and right keys, set the header information. */
663        ri.pgno = lp->pgno;
664        ri.nrecs = __bam_total(dbp, lp);
665        if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
666                return (ret);
667        RE_NREC_SET(rootp, ri.nrecs);
668        ri.pgno = rp->pgno;
669        ri.nrecs = __bam_total(dbp, rp);
670        if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
671                return (ret);
672        RE_NREC_ADJ(rootp, ri.nrecs);
673        return (0);
674}
675
676/*
677 * __bam_pinsert --
678 *      Insert a new key into a parent page, completing the split.
679 */
680static int
681__bam_pinsert(dbc, parent, lchild, rchild, space_check)
682        DBC *dbc;
683        EPG *parent;
684        PAGE *lchild, *rchild;
685        int space_check;
686{
687        BINTERNAL bi, *child_bi;
688        BKEYDATA *child_bk, *tmp_bk;
689        BTREE *t;
690        BTREE_CURSOR *cp;
691        DB *dbp;
692        DBT a, b, hdr, data;
693        PAGE *ppage;
694        RINTERNAL ri;
695        db_indx_t off;
696        db_recno_t nrecs;
697        size_t (*func) __P((DB *, const DBT *, const DBT *));
698        u_int32_t n, nbytes, nksize;
699        int ret;
700
701        dbp = dbc->dbp;
702        cp = (BTREE_CURSOR *)dbc->internal;
703        t = dbp->bt_internal;
704        ppage = parent->page;
705
706        /* If handling record numbers, count records split to the right page. */
707        nrecs = F_ISSET(cp, C_RECNUM) &&
708            !space_check ? __bam_total(dbp, rchild) : 0;
709
710        /*
711         * Now we insert the new page's first key into the parent page, which
712         * completes the split.  The parent points to a PAGE and a page index
713         * offset, where the new key goes ONE AFTER the index, because we split
714         * to the right.
715         *
716         * XXX
717         * Some btree algorithms replace the key for the old page as well as
718         * the new page.  We don't, as there's no reason to believe that the
719         * first key on the old page is any better than the key we have, and,
720         * in the case of a key being placed at index 0 causing the split, the
721         * key is unavailable.
722         */
723        off = parent->indx + O_INDX;
724
725        /*
726         * Calculate the space needed on the parent page.
727         *
728         * Prefix trees: space hack used when inserting into BINTERNAL pages.
729         * Retain only what's needed to distinguish between the new entry and
730         * the LAST entry on the page to its left.  If the keys compare equal,
731         * retain the entire key.  We ignore overflow keys, and the entire key
732         * must be retained for the next-to-leftmost key on the leftmost page
733         * of each level, or the search will fail.  Applicable ONLY to internal
734         * pages that have leaf pages as children.  Further reduction of the
735         * key between pairs of internal pages loses too much information.
736         */
737        switch (TYPE(rchild)) {
738        case P_IBTREE:
739                child_bi = GET_BINTERNAL(dbp, rchild, 0);
740                nbytes = BINTERNAL_PSIZE(child_bi->len);
741
742                if (P_FREESPACE(dbp, ppage) < nbytes)
743                        return (DB_NEEDSPLIT);
744                if (space_check)
745                        return (0);
746
747                /* Add a new record for the right page. */
748                memset(&bi, 0, sizeof(bi));
749                bi.len = child_bi->len;
750                B_TSET(bi.type, child_bi->type, 0);
751                bi.pgno = rchild->pgno;
752                bi.nrecs = nrecs;
753                memset(&hdr, 0, sizeof(hdr));
754                hdr.data = &bi;
755                hdr.size = SSZA(BINTERNAL, data);
756                memset(&data, 0, sizeof(data));
757                data.data = child_bi->data;
758                data.size = child_bi->len;
759                if ((ret = __db_pitem(dbc, ppage, off,
760                    BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
761                        return (ret);
762
763                /* Increment the overflow ref count. */
764                if (B_TYPE(child_bi->type) == B_OVERFLOW)
765                        if ((ret = __db_ovref(dbc,
766                            ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
767                                return (ret);
768                break;
769        case P_LDUP:
770        case P_LBTREE:
771                child_bk = GET_BKEYDATA(dbp, rchild, 0);
772                switch (B_TYPE(child_bk->type)) {
773                case B_KEYDATA:
774                        /*
775                         * We set t->bt_prefix to NULL if we have a comparison
776                         * callback but no prefix compression callback.  But,
777                         * if we're splitting in an off-page duplicates tree,
778                         * we still have to do some checking.  If using the
779                         * default off-page duplicates comparison routine we
780                         * can use the default prefix compression callback. If
781                         * not using the default off-page duplicates comparison
782                         * routine, we can't do any kind of prefix compression
783                         * as there's no way for an application to specify a
784                         * prefix compression callback that corresponds to its
785                         * comparison callback.
786                         */
787                        if (F_ISSET(dbc, DBC_OPD)) {
788                                if (dbp->dup_compare == __bam_defcmp)
789                                        func = __bam_defpfx;
790                                else
791                                        func = NULL;
792                        } else
793                                func = t->bt_prefix;
794
795                        nbytes = BINTERNAL_PSIZE(child_bk->len);
796                        nksize = child_bk->len;
797                        if (func == NULL)
798                                goto noprefix;
799                        if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
800                                goto noprefix;
801                        tmp_bk = GET_BKEYDATA(dbp, lchild, NUM_ENT(lchild) -
802                            (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX));
803                        if (B_TYPE(tmp_bk->type) != B_KEYDATA)
804                                goto noprefix;
805                        memset(&a, 0, sizeof(a));
806                        a.size = tmp_bk->len;
807                        a.data = tmp_bk->data;
808                        memset(&b, 0, sizeof(b));
809                        b.size = child_bk->len;
810                        b.data = child_bk->data;
811                        nksize = (u_int32_t)func(dbp, &a, &b);
812                        if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
813                                nbytes = n;
814                        else
815noprefix:                       nksize = child_bk->len;
816
817                        if (P_FREESPACE(dbp, ppage) < nbytes)
818                                return (DB_NEEDSPLIT);
819                        if (space_check)
820                                return (0);
821
822                        memset(&bi, 0, sizeof(bi));
823                        bi.len = nksize;
824                        B_TSET(bi.type, child_bk->type, 0);
825                        bi.pgno = rchild->pgno;
826                        bi.nrecs = nrecs;
827                        memset(&hdr, 0, sizeof(hdr));
828                        hdr.data = &bi;
829                        hdr.size = SSZA(BINTERNAL, data);
830                        memset(&data, 0, sizeof(data));
831                        data.data = child_bk->data;
832                        data.size = nksize;
833                        if ((ret = __db_pitem(dbc, ppage, off,
834                            BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
835                                return (ret);
836                        break;
837                case B_DUPLICATE:
838                case B_OVERFLOW:
839                        nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
840
841                        if (P_FREESPACE(dbp, ppage) < nbytes)
842                                return (DB_NEEDSPLIT);
843                        if (space_check)
844                                return (0);
845
846                        memset(&bi, 0, sizeof(bi));
847                        bi.len = BOVERFLOW_SIZE;
848                        B_TSET(bi.type, child_bk->type, 0);
849                        bi.pgno = rchild->pgno;
850                        bi.nrecs = nrecs;
851                        memset(&hdr, 0, sizeof(hdr));
852                        hdr.data = &bi;
853                        hdr.size = SSZA(BINTERNAL, data);
854                        memset(&data, 0, sizeof(data));
855                        data.data = child_bk;
856                        data.size = BOVERFLOW_SIZE;
857                        if ((ret = __db_pitem(dbc, ppage, off,
858                            BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
859                                return (ret);
860
861                        /* Increment the overflow ref count. */
862                        if (B_TYPE(child_bk->type) == B_OVERFLOW)
863                                if ((ret = __db_ovref(dbc,
864                                    ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
865                                        return (ret);
866                        break;
867                default:
868                        return (__db_pgfmt(dbp->dbenv, rchild->pgno));
869                }
870                break;
871        case P_IRECNO:
872        case P_LRECNO:
873                nbytes = RINTERNAL_PSIZE;
874
875                if (P_FREESPACE(dbp, ppage) < nbytes)
876                        return (DB_NEEDSPLIT);
877                if (space_check)
878                        return (0);
879
880                /* Add a new record for the right page. */
881                memset(&hdr, 0, sizeof(hdr));
882                hdr.data = &ri;
883                hdr.size = RINTERNAL_SIZE;
884                ri.pgno = rchild->pgno;
885                ri.nrecs = nrecs;
886                if ((ret = __db_pitem(dbc,
887                    ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
888                        return (ret);
889                break;
890        default:
891                return (__db_pgfmt(dbp->dbenv, rchild->pgno));
892        }
893
894        /*
895         * If a Recno or Btree with record numbers AM page, or an off-page
896         * duplicates tree, adjust the parent page's left page record count.
897         */
898        if (F_ISSET(cp, C_RECNUM)) {
899                /* Log the change. */
900                if (DBC_LOGGING(dbc)) {
901                if ((ret = __bam_cadjust_log(dbp, dbc->txn,
902                    &LSN(ppage), 0, PGNO(ppage),
903                    &LSN(ppage), parent->indx, -(int32_t)nrecs, 0)) != 0)
904                        return (ret);
905                } else
906                        LSN_NOT_LOGGED(LSN(ppage));
907
908                /* Update the left page count. */
909                if (dbc->dbtype == DB_RECNO)
910                        GET_RINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
911                else
912                        GET_BINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
913        }
914
915        return (0);
916}
917
918/*
919 * __bam_psplit --
920 *      Do the real work of splitting the page.
921 */
922static int
923__bam_psplit(dbc, cp, lp, rp, splitret)
924        DBC *dbc;
925        EPG *cp;
926        PAGE *lp, *rp;
927        db_indx_t *splitret;
928{
929        DB *dbp;
930        PAGE *pp;
931        db_indx_t half, *inp, nbytes, off, splitp, top;
932        int adjust, cnt, iflag, isbigkey, ret;
933
934        dbp = dbc->dbp;
935        pp = cp->page;
936        inp = P_INP(dbp, pp);
937        adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
938
939        /*
940         * If we're splitting the first (last) page on a level because we're
941         * inserting (appending) a key to it, it's likely that the data is
942         * sorted.  Moving a single item to the new page is less work and can
943         * push the fill factor higher than normal.  This is trivial when we
944         * are splitting a new page before the beginning of the tree, all of
945         * the interesting tests are against values of 0.
946         *
947         * Catching appends to the tree is harder.  In a simple append, we're
948         * inserting an item that sorts past the end of the tree; the cursor
949         * will point past the last element on the page.  But, in trees with
950         * duplicates, the cursor may point to the last entry on the page --
951         * in this case, the entry will also be the last element of a duplicate
952         * set (the last because the search call specified the S_DUPLAST flag).
953         * The only way to differentiate between an insert immediately before
954         * the last item in a tree or an append after a duplicate set which is
955         * also the last item in the tree is to call the comparison function.
956         * When splitting internal pages during an append, the search code
957         * guarantees the cursor always points to the largest page item less
958         * than the new internal entry.  To summarize, we want to catch three
959         * possible index values:
960         *
961         *      NUM_ENT(page)           Btree/Recno leaf insert past end-of-tree
962         *      NUM_ENT(page) - O_INDX  Btree or Recno internal insert past EOT
963         *      NUM_ENT(page) - P_INDX  Btree leaf insert past EOT after a set
964         *                                  of duplicates
965         *
966         * two of which, (NUM_ENT(page) - O_INDX or P_INDX) might be an insert
967         * near the end of the tree, and not after the end of the tree at all.
968         * Do a simple test which might be wrong because calling the comparison
969         * functions is expensive.  Regardless, it's not a big deal if we're
970         * wrong, we'll do the split the right way next time.
971         */
972        off = 0;
973        if (NEXT_PGNO(pp) == PGNO_INVALID && cp->indx >= NUM_ENT(pp) - adjust)
974                off = NUM_ENT(pp) - adjust;
975        else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
976                off = adjust;
977        if (off != 0)
978                goto sort;
979
980        /*
981         * Split the data to the left and right pages.  Try not to split on
982         * an overflow key.  (Overflow keys on internal pages will slow down
983         * searches.)  Refuse to split in the middle of a set of duplicates.
984         *
985         * First, find the optimum place to split.
986         *
987         * It's possible to try and split past the last record on the page if
988         * there's a very large record at the end of the page.  Make sure this
989         * doesn't happen by bounding the check at the next-to-last entry on
990         * the page.
991         *
992         * Note, we try and split half the data present on the page.  This is
993         * because another process may have already split the page and left
994         * it half empty.  We don't try and skip the split -- we don't know
995         * how much space we're going to need on the page, and we may need up
996         * to half the page for a big item, so there's no easy test to decide
997         * if we need to split or not.  Besides, if two threads are inserting
998         * data into the same place in the database, we're probably going to
999         * need more space soon anyway.
1000         */
1001        top = NUM_ENT(pp) - adjust;
1002        half = (dbp->pgsize - HOFFSET(pp)) / 2;
1003        for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
1004                switch (TYPE(pp)) {
1005                case P_IBTREE:
1006                        if (B_TYPE(
1007                            GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA)
1008                                nbytes += BINTERNAL_SIZE(
1009                                   GET_BINTERNAL(dbp, pp, off)->len);
1010                        else
1011                                nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
1012                        break;
1013                case P_LBTREE:
1014                        if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1015                            B_KEYDATA)
1016                                nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1017                                    pp, off)->len);
1018                        else
1019                                nbytes += BOVERFLOW_SIZE;
1020
1021                        ++off;
1022                        /* FALLTHROUGH */
1023                case P_LDUP:
1024                case P_LRECNO:
1025                        if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1026                            B_KEYDATA)
1027                                nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1028                                    pp, off)->len);
1029                        else
1030                                nbytes += BOVERFLOW_SIZE;
1031                        break;
1032                case P_IRECNO:
1033                        nbytes += RINTERNAL_SIZE;
1034                        break;
1035                default:
1036                        return (__db_pgfmt(dbp->dbenv, pp->pgno));
1037                }
1038sort:   splitp = off;
1039
1040        /*
1041         * Splitp is either at or just past the optimum split point.  If the
1042         * tree type is such that we're going to promote a key to an internal
1043         * page, and our current choice is an overflow key, look for something
1044         * close by that's smaller.
1045         */
1046        switch (TYPE(pp)) {
1047        case P_IBTREE:
1048                iflag = 1;
1049                isbigkey =
1050                    B_TYPE(GET_BINTERNAL(dbp, pp, off)->type) != B_KEYDATA;
1051                break;
1052        case P_LBTREE:
1053        case P_LDUP:
1054                iflag = 0;
1055                isbigkey = B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) !=
1056                    B_KEYDATA;
1057                break;
1058        default:
1059                iflag = isbigkey = 0;
1060        }
1061        if (isbigkey)
1062                for (cnt = 1; cnt <= 3; ++cnt) {
1063                        off = splitp + cnt * adjust;
1064                        if (off < (db_indx_t)NUM_ENT(pp) &&
1065                            ((iflag && B_TYPE(
1066                            GET_BINTERNAL(dbp, pp,off)->type) == B_KEYDATA) ||
1067                            B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1068                            B_KEYDATA)) {
1069                                splitp = off;
1070                                break;
1071                        }
1072                        if (splitp <= (db_indx_t)(cnt * adjust))
1073                                continue;
1074                        off = splitp - cnt * adjust;
1075                        if (iflag ? B_TYPE(
1076                            GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA :
1077                            B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
1078                            B_KEYDATA) {
1079                                splitp = off;
1080                                break;
1081                        }
1082                }
1083
1084        /*
1085         * We can't split in the middle a set of duplicates.  We know that
1086         * no duplicate set can take up more than about 25% of the page,
1087         * because that's the point where we push it off onto a duplicate
1088         * page set.  So, this loop can't be unbounded.
1089         */
1090        if (TYPE(pp) == P_LBTREE &&
1091            inp[splitp] == inp[splitp - adjust])
1092                for (cnt = 1;; ++cnt) {
1093                        off = splitp + cnt * adjust;
1094                        if (off < NUM_ENT(pp) &&
1095                            inp[splitp] != inp[off]) {
1096                                splitp = off;
1097                                break;
1098                        }
1099                        if (splitp <= (db_indx_t)(cnt * adjust))
1100                                continue;
1101                        off = splitp - cnt * adjust;
1102                        if (inp[splitp] != inp[off]) {
1103                                splitp = off + adjust;
1104                                break;
1105                        }
1106                }
1107
1108        /* We're going to split at splitp. */
1109        if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
1110                return (ret);
1111        if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
1112                return (ret);
1113
1114        *splitret = splitp;
1115        return (0);
1116}
1117
1118/*
1119 * __bam_copy --
1120 *      Copy a set of records from one page to another.
1121 *
1122 * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
1123 */
1124int
1125__bam_copy(dbp, pp, cp, nxt, stop)
1126        DB *dbp;
1127        PAGE *pp, *cp;
1128        u_int32_t nxt, stop;
1129{
1130        db_indx_t *cinp, nbytes, off, *pinp;
1131
1132        cinp = P_INP(dbp, cp);
1133        pinp = P_INP(dbp, pp);
1134        /*
1135         * Nxt is the offset of the next record to be placed on the target page.
1136         */
1137        for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
1138                switch (TYPE(pp)) {
1139                case P_IBTREE:
1140                        if (B_TYPE(
1141                            GET_BINTERNAL(dbp, pp, nxt)->type) == B_KEYDATA)
1142                                nbytes = BINTERNAL_SIZE(
1143                                    GET_BINTERNAL(dbp, pp, nxt)->len);
1144                        else
1145                                nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
1146                        break;
1147                case P_LBTREE:
1148                        /*
1149                         * If we're on a key and it's a duplicate, just copy
1150                         * the offset.
1151                         */
1152                        if (off != 0 && (nxt % P_INDX) == 0 &&
1153                            pinp[nxt] == pinp[nxt - P_INDX]) {
1154                                cinp[off] = cinp[off - P_INDX];
1155                                continue;
1156                        }
1157                        /* FALLTHROUGH */
1158                case P_LDUP:
1159                case P_LRECNO:
1160                        if (B_TYPE(GET_BKEYDATA(dbp, pp, nxt)->type) ==
1161                            B_KEYDATA)
1162                                nbytes = BKEYDATA_SIZE(GET_BKEYDATA(dbp,
1163                                    pp, nxt)->len);
1164                        else
1165                                nbytes = BOVERFLOW_SIZE;
1166                        break;
1167                case P_IRECNO:
1168                        nbytes = RINTERNAL_SIZE;
1169                        break;
1170                default:
1171                        return (__db_pgfmt(dbp->dbenv, pp->pgno));
1172                }
1173                cinp[off] = HOFFSET(cp) -= nbytes;
1174                memcpy(P_ENTRY(dbp, cp, off), P_ENTRY(dbp, pp, nxt), nbytes);
1175        }
1176        return (0);
1177}
Note: See TracBrowser for help on using the repository browser.