source: trunk/third/rpm/db/btree/bt_put.c @ 19079

Revision 19079, 23.1 KB checked in by ghudson, 22 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r19078, which included commits to RCS files with non-trunk default branches.
Line 
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2002
5 *      Sleepycat Software.  All rights reserved.
6 */
7/*
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 *      Keith Bostic.  All rights reserved.
10 */
11/*
12 * Copyright (c) 1990, 1993, 1994, 1995
13 *      The Regents of the University of California.  All rights reserved.
14 *
15 * This code is derived from software contributed to Berkeley by
16 * Mike Olson.
17 *
18 * Redistribution and use in source and binary forms, with or without
19 * modification, are permitted provided that the following conditions
20 * are met:
21 * 1. Redistributions of source code must retain the above copyright
22 *    notice, this list of conditions and the following disclaimer.
23 * 2. Redistributions in binary form must reproduce the above copyright
24 *    notice, this list of conditions and the following disclaimer in the
25 *    documentation and/or other materials provided with the distribution.
26 * 3. Neither the name of the University nor the names of its contributors
27 *    may be used to endorse or promote products derived from this software
28 *    without specific prior written permission.
29 *
30 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
31 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
32 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
34 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
35 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
36 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
37 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
38 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
39 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
40 * SUCH DAMAGE.
41 */
42
43#include "db_config.h"
44
45#ifndef lint
46static const char revid[] = "Id: bt_put.c,v 11.69 2002/08/06 06:11:12 bostic Exp ";
47#endif /* not lint */
48
49#ifndef NO_SYSTEM_INCLUDES
50#include <sys/types.h>
51
52#include <string.h>
53#endif
54
55#include "db_int.h"
56#include "dbinc/db_page.h"
57#include "dbinc/btree.h"
58
59static int __bam_build
60               __P((DBC *, u_int32_t, DBT *, PAGE *, u_int32_t, u_int32_t));
61static int __bam_dup_convert __P((DBC *, PAGE *, u_int32_t));
62static int __bam_ovput
63               __P((DBC *, u_int32_t, db_pgno_t, PAGE *, u_int32_t, DBT *));
64static u_int32_t
65           __bam_partsize __P((DB *, u_int32_t, DBT *, PAGE *, u_int32_t));
66
67/*
68 * __bam_iitem --
69 *      Insert an item into the tree.
70 *
71 * PUBLIC: int __bam_iitem __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t));
72 */
73int
74__bam_iitem(dbc, key, data, op, flags)
75        DBC *dbc;
76        DBT *key, *data;
77        u_int32_t op, flags;
78{
79        BKEYDATA *bk, bk_tmp;
80        BTREE *t;
81        BTREE_CURSOR *cp;
82        DB *dbp;
83        DBT bk_hdr, tdbt;
84        DB_MPOOLFILE *mpf;
85        PAGE *h;
86        db_indx_t indx;
87        u_int32_t data_size, have_bytes, need_bytes, needed;
88        int cmp, bigkey, bigdata, dupadjust, padrec, replace, ret, was_deleted;
89
90        COMPQUIET(bk, NULL);
91
92        dbp = dbc->dbp;
93        mpf = dbp->mpf;
94        cp = (BTREE_CURSOR *)dbc->internal;
95        t = dbp->bt_internal;
96        h = cp->page;
97        indx = cp->indx;
98        dupadjust = replace = was_deleted = 0;
99
100        /*
101         * Fixed-length records with partial puts: it's an error to specify
102         * anything other simple overwrite.
103         */
104        if (F_ISSET(dbp, DB_AM_FIXEDLEN) &&
105            F_ISSET(data, DB_DBT_PARTIAL) && data->dlen != data->size) {
106                data_size = data->size;
107                goto len_err;
108        }
109
110        /*
111         * Figure out how much space the data will take, including if it's a
112         * partial record.
113         *
114         * Fixed-length records: it's an error to specify a record that's
115         * longer than the fixed-length, and we never require less than
116         * the fixed-length record size.
117         */
118        data_size = F_ISSET(data, DB_DBT_PARTIAL) ?
119            __bam_partsize(dbp, op, data, h, indx) : data->size;
120        padrec = 0;
121        if (F_ISSET(dbp, DB_AM_FIXEDLEN)) {
122                if (data_size > t->re_len) {
123len_err:                __db_err(dbp->dbenv,
124                            "Length improper for fixed length record %lu",
125                            (u_long)data_size);
126                        return (EINVAL);
127                }
128
129                /* Records that are deleted anyway needn't be padded out. */
130                if (!LF_ISSET(BI_DELETED) && data_size < t->re_len) {
131                        padrec = 1;
132                        data_size = t->re_len;
133                }
134        }
135
136        /*
137         * Handle partial puts or short fixed-length records: build the
138         * real record.
139         */
140        if (padrec || F_ISSET(data, DB_DBT_PARTIAL)) {
141                tdbt = *data;
142                if ((ret =
143                    __bam_build(dbc, op, &tdbt, h, indx, data_size)) != 0)
144                        return (ret);
145                data = &tdbt;
146        }
147
148        /*
149         * If the user has specified a duplicate comparison function, return
150         * an error if DB_CURRENT was specified and the replacement data
151         * doesn't compare equal to the current data.  This stops apps from
152         * screwing up the duplicate sort order.  We have to do this after
153         * we build the real record so that we're comparing the real items.
154         */
155        if (op == DB_CURRENT && dbp->dup_compare != NULL) {
156                if ((ret = __bam_cmp(dbp, data, h,
157                    indx + (TYPE(h) == P_LBTREE ? O_INDX : 0),
158                    dbp->dup_compare, &cmp)) != 0)
159                        return (ret);
160                if (cmp != 0) {
161                        __db_err(dbp->dbenv,
162                            "Current data differs from put data");
163                        return (EINVAL);
164                }
165        }
166
167        /*
168         * If the key or data item won't fit on a page, we'll have to store
169         * them on overflow pages.
170         */
171        needed = 0;
172        bigdata = data_size > cp->ovflsize;
173        switch (op) {
174        case DB_KEYFIRST:
175                /* We're adding a new key and data pair. */
176                bigkey = key->size > cp->ovflsize;
177                if (bigkey)
178                        needed += BOVERFLOW_PSIZE;
179                else
180                        needed += BKEYDATA_PSIZE(key->size);
181                if (bigdata)
182                        needed += BOVERFLOW_PSIZE;
183                else
184                        needed += BKEYDATA_PSIZE(data_size);
185                break;
186        case DB_AFTER:
187        case DB_BEFORE:
188        case DB_CURRENT:
189                /*
190                 * We're either overwriting the data item of a key/data pair
191                 * or we're creating a new on-page duplicate and only adding
192                 * a data item.
193                 *
194                 * !!!
195                 * We're not currently correcting for space reclaimed from
196                 * already deleted items, but I don't think it's worth the
197                 * complexity.
198                 */
199                bigkey = 0;
200                if (op == DB_CURRENT) {
201                        bk = GET_BKEYDATA(dbp, h,
202                            indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
203                        if (B_TYPE(bk->type) == B_KEYDATA)
204                                have_bytes = BKEYDATA_PSIZE(bk->len);
205                        else
206                                have_bytes = BOVERFLOW_PSIZE;
207                        need_bytes = 0;
208                } else {
209                        have_bytes = 0;
210                        need_bytes = sizeof(db_indx_t);
211                }
212                if (bigdata)
213                        need_bytes += BOVERFLOW_PSIZE;
214                else
215                        need_bytes += BKEYDATA_PSIZE(data_size);
216
217                if (have_bytes < need_bytes)
218                        needed += need_bytes - have_bytes;
219                break;
220        default:
221                return (__db_unknown_flag(dbp->dbenv, "__bam_iitem", op));
222        }
223
224        /*
225         * If there's not enough room, or the user has put a ceiling on the
226         * number of keys permitted in the page, split the page.
227         *
228         * XXX
229         * The t->bt_maxkey test here may be insufficient -- do we have to
230         * check in the btree split code, so we don't undo it there!?!?
231         */
232        if (P_FREESPACE(dbp, h) < needed ||
233            (t->bt_maxkey != 0 && NUM_ENT(h) > t->bt_maxkey))
234                return (DB_NEEDSPLIT);
235
236        /*
237         * The code breaks it up into five cases:
238         *
239         * 1. Insert a new key/data pair.
240         * 2. Append a new data item (a new duplicate).
241         * 3. Insert a new data item (a new duplicate).
242         * 4. Delete and re-add the data item (overflow item).
243         * 5. Overwrite the data item.
244         */
245        switch (op) {
246        case DB_KEYFIRST:               /* 1. Insert a new key/data pair. */
247                if (bigkey) {
248                        if ((ret = __bam_ovput(dbc,
249                            B_OVERFLOW, PGNO_INVALID, h, indx, key)) != 0)
250                                return (ret);
251                } else
252                        if ((ret = __db_pitem(dbc, h, indx,
253                            BKEYDATA_SIZE(key->size), NULL, key)) != 0)
254                                return (ret);
255
256                if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0)
257                        return (ret);
258                ++indx;
259                break;
260        case DB_AFTER:                  /* 2. Append a new data item. */
261                if (TYPE(h) == P_LBTREE) {
262                        /* Copy the key for the duplicate and adjust cursors. */
263                        if ((ret =
264                            __bam_adjindx(dbc, h, indx + P_INDX, indx, 1)) != 0)
265                                return (ret);
266                        if ((ret =
267                            __bam_ca_di(dbc, PGNO(h), indx + P_INDX, 1)) != 0)
268                                return (ret);
269
270                        indx += 3;
271                        dupadjust = 1;
272
273                        cp->indx += 2;
274                } else {
275                        ++indx;
276                        cp->indx += 1;
277                }
278                break;
279        case DB_BEFORE:                 /* 3. Insert a new data item. */
280                if (TYPE(h) == P_LBTREE) {
281                        /* Copy the key for the duplicate and adjust cursors. */
282                        if ((ret = __bam_adjindx(dbc, h, indx, indx, 1)) != 0)
283                                return (ret);
284                        if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0)
285                                return (ret);
286
287                        ++indx;
288                        dupadjust = 1;
289                }
290                break;
291        case DB_CURRENT:
292                 /*
293                  * Clear the cursor's deleted flag.  The problem is that if
294                  * we deadlock or fail while deleting the overflow item or
295                  * replacing the non-overflow item, a subsequent cursor close
296                  * will try and remove the item because the cursor's delete
297                  * flag is set
298                  */
299                (void)__bam_ca_delete(dbp, PGNO(h), indx, 0);
300
301                if (TYPE(h) == P_LBTREE) {
302                        ++indx;
303                        dupadjust = 1;
304
305                        /*
306                         * In a Btree deleted records aren't counted (deleted
307                         * records are counted in a Recno because all accesses
308                         * are based on record number).  If it's a Btree and
309                         * it's a DB_CURRENT operation overwriting a previously
310                         * deleted record, increment the record count.
311                         */
312                        was_deleted = B_DISSET(bk->type);
313                }
314
315                /*
316                 * 4. Delete and re-add the data item.
317                 *
318                 * If we're changing the type of the on-page structure, or we
319                 * are referencing offpage items, we have to delete and then
320                 * re-add the item.  We do not do any cursor adjustments here
321                 * because we're going to immediately re-add the item into the
322                 * same slot.
323                 */
324                if (bigdata || B_TYPE(bk->type) != B_KEYDATA) {
325                        if ((ret = __bam_ditem(dbc, h, indx)) != 0)
326                                return (ret);
327                        break;
328                }
329
330                /* 5. Overwrite the data item. */
331                replace = 1;
332                break;
333        default:
334                return (__db_unknown_flag(dbp->dbenv, "__bam_iitem", op));
335        }
336
337        /* Add the data. */
338        if (bigdata) {
339                /*
340                 * We do not have to handle deleted (BI_DELETED) records
341                 * in this case; the actual records should never be created.
342                 */
343                DB_ASSERT(!LF_ISSET(BI_DELETED));
344                if ((ret = __bam_ovput(dbc,
345                    B_OVERFLOW, PGNO_INVALID, h, indx, data)) != 0)
346                        return (ret);
347        } else {
348                if (LF_ISSET(BI_DELETED)) {
349                        B_TSET(bk_tmp.type, B_KEYDATA, 1);
350                        bk_tmp.len = data->size;
351                        bk_hdr.data = &bk_tmp;
352                        bk_hdr.size = SSZA(BKEYDATA, data);
353                        ret = __db_pitem(dbc, h, indx,
354                            BKEYDATA_SIZE(data->size), &bk_hdr, data);
355                } else if (replace)
356                        ret = __bam_ritem(dbc, h, indx, data);
357                else
358                        ret = __db_pitem(dbc, h, indx,
359                            BKEYDATA_SIZE(data->size), NULL, data);
360                if (ret != 0)
361                        return (ret);
362        }
363        if ((ret = mpf->set(mpf, h, DB_MPOOL_DIRTY)) != 0)
364                return (ret);
365
366        /*
367         * Re-position the cursors if necessary and reset the current cursor
368         * to point to the new item.
369         */
370        if (op != DB_CURRENT) {
371                if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0)
372                        return (ret);
373                cp->indx = TYPE(h) == P_LBTREE ? indx - O_INDX : indx;
374        }
375
376        /*
377         * If we've changed the record count, update the tree.  There's no
378         * need to adjust the count if the operation not performed on the
379         * current record or when the current record was previously deleted.
380         */
381        if (F_ISSET(cp, C_RECNUM) && (op != DB_CURRENT || was_deleted))
382                if ((ret = __bam_adjust(dbc, 1)) != 0)
383                        return (ret);
384
385        /*
386         * If a Btree leaf page is at least 50% full and we may have added or
387         * modified a duplicate data item, see if the set of duplicates takes
388         * up at least 25% of the space on the page.  If it does, move it onto
389         * its own page.
390         */
391        if (dupadjust && P_FREESPACE(dbp, h) <= dbp->pgsize / 2) {
392                if ((ret = __bam_dup_convert(dbc, h, indx - O_INDX)) != 0)
393                        return (ret);
394        }
395
396        /* If we've modified a recno file, set the flag. */
397        if (dbc->dbtype == DB_RECNO)
398                t->re_modified = 1;
399
400        return (ret);
401}
402
403/*
404 * __bam_partsize --
405 *      Figure out how much space a partial data item is in total.
406 */
407static u_int32_t
408__bam_partsize(dbp, op, data, h, indx)
409        DB *dbp;
410        u_int32_t op, indx;
411        DBT *data;
412        PAGE *h;
413{
414        BKEYDATA *bk;
415        u_int32_t nbytes;
416
417        /*
418         * If the record doesn't already exist, it's simply the data we're
419         * provided.
420         */
421        if (op != DB_CURRENT)
422                return (data->doff + data->size);
423
424        /*
425         * Otherwise, it's the data provided plus any already existing data
426         * that we're not replacing.
427         */
428        bk = GET_BKEYDATA(dbp, h, indx + (TYPE(h) == P_LBTREE ? O_INDX : 0));
429        nbytes =
430            B_TYPE(bk->type) == B_OVERFLOW ? ((BOVERFLOW *)bk)->tlen : bk->len;
431
432        return (__db_partsize(nbytes, data));
433}
434
435/*
436 * __bam_build --
437 *      Build the real record for a partial put, or short fixed-length record.
438 */
439static int
440__bam_build(dbc, op, dbt, h, indx, nbytes)
441        DBC *dbc;
442        u_int32_t op, indx, nbytes;
443        DBT *dbt;
444        PAGE *h;
445{
446        BKEYDATA *bk, tbk;
447        BOVERFLOW *bo;
448        BTREE *t;
449        DB *dbp;
450        DBT copy, *rdata;
451        u_int32_t len, tlen;
452        u_int8_t *p;
453        int ret;
454
455        COMPQUIET(bo, NULL);
456
457        dbp = dbc->dbp;
458        t = dbp->bt_internal;
459
460        /* We use the record data return memory, it's only a short-term use. */
461        rdata = &dbc->my_rdata;
462        if (rdata->ulen < nbytes) {
463                if ((ret = __os_realloc(dbp->dbenv,
464                    nbytes, &rdata->data)) != 0) {
465                        rdata->ulen = 0;
466                        rdata->data = NULL;
467                        return (ret);
468                }
469                rdata->ulen = nbytes;
470        }
471
472        /*
473         * We use nul or pad bytes for any part of the record that isn't
474         * specified; get it over with.
475         */
476        memset(rdata->data,
477           F_ISSET(dbp, DB_AM_FIXEDLEN) ? t->re_pad : 0, nbytes);
478
479        /*
480         * In the next clauses, we need to do three things: a) set p to point
481         * to the place at which to copy the user's data, b) set tlen to the
482         * total length of the record, not including the bytes contributed by
483         * the user, and c) copy any valid data from an existing record.  If
484         * it's not a partial put (this code is called for both partial puts
485         * and fixed-length record padding) or it's a new key, we can cut to
486         * the chase.
487         */
488        if (!F_ISSET(dbt, DB_DBT_PARTIAL) || op != DB_CURRENT) {
489                p = (u_int8_t *)rdata->data + dbt->doff;
490                tlen = dbt->doff;
491                goto user_copy;
492        }
493
494        /* Find the current record. */
495        if (indx < NUM_ENT(h)) {
496                bk = GET_BKEYDATA(dbp, h, indx + (TYPE(h) == P_LBTREE ?
497                    O_INDX : 0));
498                bo = (BOVERFLOW *)bk;
499        } else {
500                bk = &tbk;
501                B_TSET(bk->type, B_KEYDATA, 0);
502                bk->len = 0;
503        }
504        if (B_TYPE(bk->type) == B_OVERFLOW) {
505                /*
506                 * In the case of an overflow record, we shift things around
507                 * in the current record rather than allocate a separate copy.
508                 */
509                memset(&copy, 0, sizeof(copy));
510                if ((ret = __db_goff(dbp, &copy, bo->tlen,
511                    bo->pgno, &rdata->data, &rdata->ulen)) != 0)
512                        return (ret);
513
514                /* Skip any leading data from the original record. */
515                tlen = dbt->doff;
516                p = (u_int8_t *)rdata->data + dbt->doff;
517
518                /*
519                 * Copy in any trailing data from the original record.
520                 *
521                 * If the original record was larger than the original offset
522                 * plus the bytes being deleted, there is trailing data in the
523                 * original record we need to preserve.  If we aren't deleting
524                 * the same number of bytes as we're inserting, copy it up or
525                 * down, into place.
526                 *
527                 * Use memmove(), the regions may overlap.
528                 */
529                if (bo->tlen > dbt->doff + dbt->dlen) {
530                        len = bo->tlen - (dbt->doff + dbt->dlen);
531                        if (dbt->dlen != dbt->size)
532                                memmove(p + dbt->size, p + dbt->dlen, len);
533                        tlen += len;
534                }
535        } else {
536                /* Copy in any leading data from the original record. */
537                memcpy(rdata->data,
538                    bk->data, dbt->doff > bk->len ? bk->len : dbt->doff);
539                tlen = dbt->doff;
540                p = (u_int8_t *)rdata->data + dbt->doff;
541
542                /* Copy in any trailing data from the original record. */
543                len = dbt->doff + dbt->dlen;
544                if (bk->len > len) {
545                        memcpy(p + dbt->size, bk->data + len, bk->len - len);
546                        tlen += bk->len - len;
547                }
548        }
549
550user_copy:
551        /*
552         * Copy in the application provided data -- p and tlen must have been
553         * initialized above.
554         */
555        memcpy(p, dbt->data, dbt->size);
556        tlen += dbt->size;
557
558        /* Set the DBT to reference our new record. */
559        rdata->size = F_ISSET(dbp, DB_AM_FIXEDLEN) ? t->re_len : tlen;
560        rdata->dlen = 0;
561        rdata->doff = 0;
562        rdata->flags = 0;
563        *dbt = *rdata;
564        return (0);
565}
566
567/*
568 * __bam_ritem --
569 *      Replace an item on a page.
570 *
571 * PUBLIC: int __bam_ritem __P((DBC *, PAGE *, u_int32_t, DBT *));
572 */
573int
574__bam_ritem(dbc, h, indx, data)
575        DBC *dbc;
576        PAGE *h;
577        u_int32_t indx;
578        DBT *data;
579{
580        BKEYDATA *bk;
581        DB *dbp;
582        DBT orig, repl;
583        db_indx_t cnt, lo, ln, min, off, prefix, suffix;
584        int32_t nbytes;
585        int ret;
586        db_indx_t *inp;
587        u_int8_t *p, *t;
588
589        dbp = dbc->dbp;
590
591        /*
592         * Replace a single item onto a page.  The logic figuring out where
593         * to insert and whether it fits is handled in the caller.  All we do
594         * here is manage the page shuffling.
595         */
596        bk = GET_BKEYDATA(dbp, h, indx);
597
598        /* Log the change. */
599        if (DBC_LOGGING(dbc)) {
600                /*
601                 * We might as well check to see if the two data items share
602                 * a common prefix and suffix -- it can save us a lot of log
603                 * message if they're large.
604                 */
605                min = data->size < bk->len ? data->size : bk->len;
606                for (prefix = 0,
607                    p = bk->data, t = data->data;
608                    prefix < min && *p == *t; ++prefix, ++p, ++t)
609                        ;
610
611                min -= prefix;
612                for (suffix = 0,
613                    p = (u_int8_t *)bk->data + bk->len - 1,
614                    t = (u_int8_t *)data->data + data->size - 1;
615                    suffix < min && *p == *t; ++suffix, --p, --t)
616                        ;
617
618                /* We only log the parts of the keys that have changed. */
619                orig.data = (u_int8_t *)bk->data + prefix;
620                orig.size = bk->len - (prefix + suffix);
621                repl.data = (u_int8_t *)data->data + prefix;
622                repl.size = data->size - (prefix + suffix);
623                if ((ret = __bam_repl_log(dbp, dbc->txn, &LSN(h), 0, PGNO(h),
624                    &LSN(h), (u_int32_t)indx, (u_int32_t)B_DISSET(bk->type),
625                    &orig, &repl, (u_int32_t)prefix, (u_int32_t)suffix)) != 0)
626                        return (ret);
627        } else
628                LSN_NOT_LOGGED(LSN(h));
629
630        /*
631         * Set references to the first in-use byte on the page and the
632         * first byte of the item being replaced.
633         */
634        inp = P_INP(dbp, h);
635        p = (u_int8_t *)h + HOFFSET(h);
636        t = (u_int8_t *)bk;
637
638        /*
639         * If the entry is growing in size, shift the beginning of the data
640         * part of the page down.  If the entry is shrinking in size, shift
641         * the beginning of the data part of the page up.  Use memmove(3),
642         * the regions overlap.
643         */
644        lo = BKEYDATA_SIZE(bk->len);
645        ln = (db_indx_t)BKEYDATA_SIZE(data->size);
646        if (lo != ln) {
647                nbytes = lo - ln;               /* Signed difference. */
648                if (p == t)                     /* First index is fast. */
649                        inp[indx] += nbytes;
650                else {                          /* Else, shift the page. */
651                        memmove(p + nbytes, p, t - p);
652
653                        /* Adjust the indices' offsets. */
654                        off = inp[indx];
655                        for (cnt = 0; cnt < NUM_ENT(h); ++cnt)
656                                if (inp[cnt] <= off)
657                                        inp[cnt] += nbytes;
658                }
659
660                /* Clean up the page and adjust the item's reference. */
661                HOFFSET(h) += nbytes;
662                t += nbytes;
663        }
664
665        /* Copy the new item onto the page. */
666        bk = (BKEYDATA *)t;
667        B_TSET(bk->type, B_KEYDATA, 0);
668        bk->len = data->size;
669        memcpy(bk->data, data->data, data->size);
670
671        return (0);
672}
673
674/*
675 * __bam_dup_convert --
676 *      Check to see if the duplicate set at indx should have its own page.
677 *      If it should, create it.
678 */
679static int
680__bam_dup_convert(dbc, h, indx)
681        DBC *dbc;
682        PAGE *h;
683        u_int32_t indx;
684{
685        BKEYDATA *bk;
686        DB *dbp;
687        DBT hdr;
688        DB_MPOOLFILE *mpf;
689        PAGE *dp;
690        db_indx_t cnt, cpindx, dindx, first, *inp, sz;
691        int ret;
692
693        dbp = dbc->dbp;
694        mpf = dbp->mpf;
695        inp = P_INP(dbp, h);
696
697        /*
698         * Count the duplicate records and calculate how much room they're
699         * using on the page.
700         */
701        while (indx > 0 && inp[indx] == inp[indx - P_INDX])
702                indx -= P_INDX;
703        for (cnt = 0, sz = 0, first = indx;; ++cnt, indx += P_INDX) {
704                if (indx >= NUM_ENT(h) || inp[first] != inp[indx])
705                        break;
706                bk = GET_BKEYDATA(dbp, h, indx);
707                sz += B_TYPE(bk->type) == B_KEYDATA ?
708                    BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE;
709                bk = GET_BKEYDATA(dbp, h, indx + O_INDX);
710                sz += B_TYPE(bk->type) == B_KEYDATA ?
711                    BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE;
712        }
713
714        /*
715         * We have to do these checks when the user is replacing the cursor's
716         * data item -- if the application replaces a duplicate item with a
717         * larger data item, it can increase the amount of space used by the
718         * duplicates, requiring this check.  But that means we may have done
719         * this check when it wasn't a duplicate item after all.
720         */
721        if (cnt == 1)
722                return (0);
723
724        /*
725         * If this set of duplicates is using more than 25% of the page, move
726         * them off.  The choice of 25% is a WAG, but the value must be small
727         * enough that we can always split a page without putting duplicates
728         * on two different pages.
729         */
730        if (sz < dbp->pgsize / 4)
731                return (0);
732
733        /* Get a new page. */
734        if ((ret = __db_new(dbc,
735            dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, &dp)) != 0)
736                return (ret);
737        P_INIT(dp, dbp->pgsize, dp->pgno,
738            PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp));
739
740        /*
741         * Move this set of duplicates off the page.  First points to the first
742         * key of the first duplicate key/data pair, cnt is the number of pairs
743         * we're dealing with.
744         */
745        memset(&hdr, 0, sizeof(hdr));
746        dindx = first;
747        indx = first;
748        cpindx = 0;
749        do {
750                /* Move cursors referencing the old entry to the new entry. */
751                if ((ret = __bam_ca_dup(dbc, first,
752                    PGNO(h), indx, PGNO(dp), cpindx)) != 0)
753                        goto err;
754
755                /*
756                 * Copy the entry to the new page.  If the off-duplicate page
757                 * If the off-duplicate page is a Btree page (i.e. dup_compare
758                 * will be non-NULL, we use Btree pages for sorted dups,
759                 * and Recno pages for unsorted dups), move all entries
760                 * normally, even deleted ones.  If it's a Recno page,
761                 * deleted entries are discarded (if the deleted entry is
762                 * overflow, then free up those pages).
763                 */
764                bk = GET_BKEYDATA(dbp, h, dindx + 1);
765                hdr.data = bk;
766                hdr.size = B_TYPE(bk->type) == B_KEYDATA ?
767                    BKEYDATA_SIZE(bk->len) : BOVERFLOW_SIZE;
768                if (dbp->dup_compare == NULL && B_DISSET(bk->type)) {
769                        /*
770                         * Unsorted dups, i.e. recno page, and we have
771                         * a deleted entry, don't move it, but if it was
772                         * an overflow entry, we need to free those pages.
773                         */
774                        if (B_TYPE(bk->type) == B_OVERFLOW &&
775                            (ret = __db_doff(dbc,
776                            (GET_BOVERFLOW(dbp, h, dindx + 1))->pgno)) != 0)
777                                goto err;
778                } else {
779                        if ((ret = __db_pitem(
780                            dbc, dp, cpindx, hdr.size, &hdr, NULL)) != 0)
781                                goto err;
782                        ++cpindx;
783                }
784                /* Delete all but the last reference to the key. */
785                if (cnt != 1) {
786                        if ((ret = __bam_adjindx(dbc,
787                            h, dindx, first + 1, 0)) != 0)
788                                goto err;
789                } else
790                        dindx++;
791
792                /* Delete the data item. */
793                if ((ret = __db_ditem(dbc, h, dindx, hdr.size)) != 0)
794                        goto err;
795                indx += P_INDX;
796        } while (--cnt);
797
798        /* Put in a new data item that points to the duplicates page. */
799        if ((ret = __bam_ovput(dbc,
800            B_DUPLICATE, dp->pgno, h, first + 1, NULL)) != 0)
801                goto err;
802
803        /* Adjust cursors for all the above movments. */
804        if ((ret = __bam_ca_di(dbc,
805            PGNO(h), first + P_INDX, first + P_INDX - indx)) != 0)
806                goto err;
807
808        return (mpf->put(mpf, dp, DB_MPOOL_DIRTY));
809
810err:    (void)mpf->put(mpf, dp, 0);
811        return (ret);
812}
813
814/*
815 * __bam_ovput --
816 *      Build an item for an off-page duplicates page or overflow page and
817 *      insert it on the page.
818 */
819static int
820__bam_ovput(dbc, type, pgno, h, indx, item)
821        DBC *dbc;
822        u_int32_t type, indx;
823        db_pgno_t pgno;
824        PAGE *h;
825        DBT *item;
826{
827        BOVERFLOW bo;
828        DBT hdr;
829        int ret;
830
831        UMRW_SET(bo.unused1);
832        B_TSET(bo.type, type, 0);
833        UMRW_SET(bo.unused2);
834
835        /*
836         * If we're creating an overflow item, do so and acquire the page
837         * number for it.  If we're creating an off-page duplicates tree,
838         * we are giving the page number as an argument.
839         */
840        if (type == B_OVERFLOW) {
841                if ((ret = __db_poff(dbc, item, &bo.pgno)) != 0)
842                        return (ret);
843                bo.tlen = item->size;
844        } else {
845                bo.pgno = pgno;
846                bo.tlen = 0;
847        }
848
849        /* Store the new record on the page. */
850        memset(&hdr, 0, sizeof(hdr));
851        hdr.data = &bo;
852        hdr.size = BOVERFLOW_SIZE;
853        return (__db_pitem(dbc, h, indx, BOVERFLOW_SIZE, &hdr, NULL));
854}
Note: See TracBrowser for help on using the repository browser.