source: trunk/third/rpm/db/btree/bt_recno.c @ 19079

Revision 19079, 34.0 KB checked in by ghudson, 22 years ago (diff)
This commit was generated by cvs2svn to compensate for changes in r19078, which included commits to RCS files with non-trunk default branches.
Line 
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1997-2002
5 *      Sleepycat Software.  All rights reserved.
6 */
7
8#include "db_config.h"
9
10#ifndef lint
11static const char revid[] = "Id: bt_recno.c,v 11.106 2002/08/16 04:56:30 ubell Exp ";
12#endif /* not lint */
13
14#ifndef NO_SYSTEM_INCLUDES
15#include <sys/types.h>
16
17#include <limits.h>
18#include <stdio.h>
19#include <string.h>
20#endif
21
22#include "db_int.h"
23#include "dbinc/db_page.h"
24#include "dbinc/btree.h"
25#include "dbinc/db_shash.h"
26#include "dbinc/lock.h"
27
28static int  __ram_add __P((DBC *, db_recno_t *, DBT *, u_int32_t, u_int32_t));
29static int  __ram_source __P((DB *));
30static int  __ram_sread __P((DBC *, db_recno_t));
31static int  __ram_update __P((DBC *, db_recno_t, int));
32
33/*
34 * In recno, there are two meanings to the on-page "deleted" flag.  If we're
35 * re-numbering records, it means the record was implicitly created.  We skip
36 * over implicitly created records if doing a cursor "next" or "prev", and
37 * return DB_KEYEMPTY if they're explicitly requested..  If not re-numbering
38 * records, it means that the record was implicitly created, or was deleted.
39 * We skip over implicitly created or deleted records if doing a cursor "next"
40 * or "prev", and return DB_KEYEMPTY if they're explicitly requested.
41 *
42 * If we're re-numbering records, then we have to detect in the cursor that
43 * a record was deleted, and adjust the cursor as necessary on the next get.
44 * If we're not re-numbering records, then we can detect that a record has
45 * been deleted by looking at the actual on-page record, so we completely
46 * ignore the cursor's delete flag.  This is different from the B+tree code.
47 * It also maintains whether the cursor references a deleted record in the
48 * cursor, and it doesn't always check the on-page value.
49 */
50#define CD_SET(cp) {                                                    \
51        if (F_ISSET(cp, C_RENUMBER))                                    \
52                F_SET(cp, C_DELETED);                                   \
53}
54#define CD_CLR(cp) {                                                    \
55        if (F_ISSET(cp, C_RENUMBER)) {                                  \
56                F_CLR(cp, C_DELETED);                                   \
57                cp->order = INVALID_ORDER;                              \
58        }                                                               \
59}
60#define CD_ISSET(cp)                                                    \
61        (F_ISSET(cp, C_RENUMBER) && F_ISSET(cp, C_DELETED))
62
63/*
64 * Macros for comparing the ordering of two cursors.
65 * cp1 comes before cp2 iff one of the following holds:
66 *      cp1's recno is less than cp2's recno
67 *      recnos are equal, both deleted, and cp1's order is less than cp2's
68 *      recnos are equal, cp1 deleted, and cp2 not deleted
69 */
70#define C_LESSTHAN(cp1, cp2)                                            \
71    (((cp1)->recno < (cp2)->recno) ||                                   \
72    (((cp1)->recno == (cp2)->recno) &&                                  \
73    ((CD_ISSET((cp1)) && CD_ISSET((cp2)) && (cp1)->order < (cp2)->order) || \
74    (CD_ISSET((cp1)) && !CD_ISSET((cp2))))))
75
76/*
77 * cp1 is equal to cp2 iff their recnos and delete flags are identical,
78 * and if the delete flag is set their orders are also identical.
79 */
80#define C_EQUAL(cp1, cp2)                                               \
81    ((cp1)->recno == (cp2)->recno && CD_ISSET((cp1)) == CD_ISSET((cp2)) && \
82    (!CD_ISSET((cp1)) || (cp1)->order == (cp2)->order))
83
84/*
85 * Do we need to log the current cursor adjustment?
86 */
87#define CURADJ_LOG(dbc)                                                 \
88        (DBC_LOGGING((dbc)) && (dbc)->txn != NULL && (dbc)->txn->parent != NULL)
89
90/*
91 * After a search, copy the found page into the cursor, discarding any
92 * currently held lock.
93 */
94#define STACK_TO_CURSOR(cp) {                                           \
95        (cp)->page = (cp)->csp->page;                                   \
96        (cp)->pgno = (cp)->csp->page->pgno;                             \
97        (cp)->indx = (cp)->csp->indx;                                   \
98        (void)__TLPUT(dbc, (cp)->lock);                                 \
99        (cp)->lock = (cp)->csp->lock;                                   \
100        (cp)->lock_mode = (cp)->csp->lock_mode;                         \
101}
102
103/*
104 * __ram_open --
105 *      Recno open function.
106 *
107 * PUBLIC: int __ram_open __P((DB *,
108 * PUBLIC:      DB_TXN *, const char *, db_pgno_t, u_int32_t));
109 */
110int
111__ram_open(dbp, txn, name, base_pgno, flags)
112        DB *dbp;
113        DB_TXN *txn;
114        const char *name;
115        db_pgno_t base_pgno;
116        u_int32_t flags;
117{
118        BTREE *t;
119        DBC *dbc;
120        int ret, t_ret;
121
122        COMPQUIET(name, NULL);
123        t = dbp->bt_internal;
124
125        /* Initialize the remaining fields/methods of the DB. */
126        dbp->stat = __bam_stat;
127
128        /* Start up the tree. */
129        if ((ret = __bam_read_root(dbp, txn, base_pgno, flags)) != 0)
130                return (ret);
131
132        /*
133         * If the user specified a source tree, open it and map it in.
134         *
135         * !!!
136         * We don't complain if the user specified transactions or threads.
137         * It's possible to make it work, but you'd better know what you're
138         * doing!
139         */
140        if (t->re_source != NULL && (ret = __ram_source(dbp)) != 0)
141                return (ret);
142
143        /* If we're snapshotting an underlying source file, do it now. */
144        if (F_ISSET(dbp, DB_AM_SNAPSHOT)) {
145                /* Allocate a cursor. */
146                if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
147                        return (ret);
148
149                /* Do the snapshot. */
150                if ((ret = __ram_update(dbc,
151                    DB_MAX_RECORDS, 0)) != 0 && ret == DB_NOTFOUND)
152                        ret = 0;
153
154                /* Discard the cursor. */
155                if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
156                        ret = t_ret;
157        }
158
159        return (ret);
160}
161
162/*
163 * __ram_append --
164 *      Recno append function.
165 *
166 * PUBLIC: int __ram_append __P((DBC *, DBT *, DBT *));
167 */
168int
169__ram_append(dbc, key, data)
170        DBC *dbc;
171        DBT *key, *data;
172{
173        BTREE_CURSOR *cp;
174        int ret;
175
176        cp = (BTREE_CURSOR *)dbc->internal;
177
178        /*
179         * Make sure we've read in all of the backing source file.  If
180         * we found the record or it simply didn't exist, add the
181         * user's record.
182         */
183        ret = __ram_update(dbc, DB_MAX_RECORDS, 0);
184        if (ret == 0 || ret == DB_NOTFOUND)
185                ret = __ram_add(dbc, &cp->recno, data, DB_APPEND, 0);
186
187        /* Return the record number. */
188        if (ret == 0)
189                ret = __db_retcopy(dbc->dbp->dbenv, key, &cp->recno,
190                    sizeof(cp->recno), &dbc->rkey->data, &dbc->rkey->ulen);
191
192        return (ret);
193}
194
195/*
196 * __ram_c_del --
197 *      Recno cursor->c_del function.
198 *
199 * PUBLIC: int __ram_c_del __P((DBC *));
200 */
201int
202__ram_c_del(dbc)
203        DBC *dbc;
204{
205        BKEYDATA bk;
206        BTREE *t;
207        BTREE_CURSOR *cp;
208        DB *dbp;
209        DB_LSN lsn;
210        DBT hdr, data;
211        EPG *epg;
212        int exact, ret, stack;
213
214        dbp = dbc->dbp;
215        cp = (BTREE_CURSOR *)dbc->internal;
216        t = dbp->bt_internal;
217        stack = 0;
218
219        /*
220         * The semantics of cursors during delete are as follows: in
221         * non-renumbering recnos, records are replaced with a marker
222         * containing a delete flag.  If the record referenced by this cursor
223         * has already been deleted, we will detect that as part of the delete
224         * operation, and fail.
225         *
226         * In renumbering recnos, cursors which represent deleted items
227         * are flagged with the C_DELETED flag, and it is an error to
228         * call c_del a second time without an intervening cursor motion.
229         */
230        if (CD_ISSET(cp))
231                return (DB_KEYEMPTY);
232
233        /* Search the tree for the key; delete only deletes exact matches. */
234        if ((ret = __bam_rsearch(dbc, &cp->recno, S_DELETE, 1, &exact)) != 0)
235                goto err;
236        if (!exact) {
237                ret = DB_NOTFOUND;
238                goto err;
239        }
240        stack = 1;
241
242        /* Copy the page into the cursor. */
243        STACK_TO_CURSOR(cp);
244
245        /*
246         * If re-numbering records, the on-page deleted flag can only mean
247         * that this record was implicitly created.  Applications aren't
248         * permitted to delete records they never created, return an error.
249         *
250         * If not re-numbering records, the on-page deleted flag means that
251         * this record was implicitly created, or, was deleted at some time.
252         * The former is an error because applications aren't permitted to
253         * delete records they never created, the latter is an error because
254         * if the record was "deleted", we could never have found it.
255         */
256        if (B_DISSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type)) {
257                ret = DB_KEYEMPTY;
258                goto err;
259        }
260
261        if (F_ISSET(cp, C_RENUMBER)) {
262                /* Delete the item, adjust the counts, adjust the cursors. */
263                if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
264                        goto err;
265                __bam_adjust(dbc, -1);
266                if (__ram_ca(dbc, CA_DELETE) > 0 &&
267                    CURADJ_LOG(dbc) && (ret = __bam_rcuradj_log(dbp, dbc->txn,
268                    &lsn, 0, CA_DELETE, cp->root, cp->recno, cp->order)) != 0)
269                        goto err;
270
271                /*
272                 * If the page is empty, delete it.
273                 *
274                 * We never delete a root page.  First, root pages of primary
275                 * databases never go away, recno or otherwise.  However, if
276                 * it's the root page of an off-page duplicates database, then
277                 * it can be deleted.   We don't delete it here because we have
278                 * no way of telling the primary database page holder (e.g.,
279                 * the hash access method) that its page element should cleaned
280                 * up because the underlying tree is gone.  So, we keep the page
281                 * around until the last cursor referencing the empty tree is
282                 * are closed, and then clean it up.
283                 */
284                if (NUM_ENT(cp->page) == 0 && PGNO(cp->page) != cp->root) {
285                        /*
286                         * We already have a locked stack of pages.  However,
287                         * there are likely entries in the stack that aren't
288                         * going to be emptied by removing the single reference
289                         * to the emptied page (or one of its parents).
290                         */
291                        for (epg = cp->csp; epg >= cp->sp; --epg)
292                                if (NUM_ENT(epg->page) > 1)
293                                        break;
294
295                        /*
296                         * We want to delete a single item out of the last page
297                         * that we're not deleting.
298                         */
299                        ret = __bam_dpages(dbc, epg);
300
301                        /*
302                         * Regardless of the return from __bam_dpages, it will
303                         * discard our stack and pinned page.
304                         */
305                        stack = 0;
306                        cp->page = NULL;
307                }
308        } else {
309                /* Use a delete/put pair to replace the record with a marker. */
310                if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
311                        goto err;
312
313                B_TSET(bk.type, B_KEYDATA, 1);
314                bk.len = 0;
315                memset(&hdr, 0, sizeof(hdr));
316                hdr.data = &bk;
317                hdr.size = SSZA(BKEYDATA, data);
318                memset(&data, 0, sizeof(data));
319                data.data = (void *)"";
320                data.size = 0;
321                if ((ret = __db_pitem(dbc,
322                    cp->page, cp->indx, BKEYDATA_SIZE(0), &hdr, &data)) != 0)
323                        goto err;
324        }
325
326        t->re_modified = 1;
327
328err:    if (stack)
329                __bam_stkrel(dbc, STK_CLRDBC);
330
331        return (ret);
332}
333
334/*
335 * __ram_c_get --
336 *      Recno cursor->c_get function.
337 *
338 * PUBLIC: int __ram_c_get
339 * PUBLIC:     __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
340 */
341int
342__ram_c_get(dbc, key, data, flags, pgnop)
343        DBC *dbc;
344        DBT *key, *data;
345        u_int32_t flags;
346        db_pgno_t *pgnop;
347{
348        BTREE_CURSOR *cp;
349        DB *dbp;
350        int cmp, exact, ret;
351
352        COMPQUIET(pgnop, NULL);
353
354        dbp = dbc->dbp;
355        cp = (BTREE_CURSOR *)dbc->internal;
356
357        LF_CLR(DB_MULTIPLE|DB_MULTIPLE_KEY);
358retry:  switch (flags) {
359        case DB_CURRENT:
360                /*
361                 * If we're using mutable records and the deleted flag is
362                 * set, the cursor is pointing at a nonexistent record;
363                 * return an error.
364                 */
365                if (CD_ISSET(cp))
366                        return (DB_KEYEMPTY);
367                break;
368        case DB_NEXT_DUP:
369                /*
370                 * If we're not in an off-page dup set, we know there's no
371                 * next duplicate since recnos don't have them.  If we
372                 * are in an off-page dup set, the next item assuredly is
373                 * a dup, so we set flags to DB_NEXT and keep going.
374                 */
375                if (!F_ISSET(dbc, DBC_OPD))
376                        return (DB_NOTFOUND);
377                /* FALLTHROUGH */
378        case DB_NEXT_NODUP:
379                /*
380                 * Recno databases don't have duplicates, set flags to DB_NEXT
381                 * and keep going.
382                 */
383                /* FALLTHROUGH */
384        case DB_NEXT:
385                flags = DB_NEXT;
386                /*
387                 * If record numbers are mutable: if we just deleted a record,
388                 * we have to avoid incrementing the record number so that we
389                 * return the right record by virtue of renumbering the tree.
390                 */
391                if (CD_ISSET(cp))
392                        break;
393
394                if (cp->recno != RECNO_OOB) {
395                        ++cp->recno;
396                        break;
397                }
398                /* FALLTHROUGH */
399        case DB_FIRST:
400                flags = DB_NEXT;
401                cp->recno = 1;
402                break;
403        case DB_PREV_NODUP:
404                /*
405                 * Recno databases don't have duplicates, set flags to DB_PREV
406                 * and keep going.
407                 */
408                /* FALLTHROUGH */
409        case DB_PREV:
410                flags = DB_PREV;
411                if (cp->recno != RECNO_OOB) {
412                        if (cp->recno == 1) {
413                                ret = DB_NOTFOUND;
414                                goto err;
415                        }
416                        --cp->recno;
417                        break;
418                }
419                /* FALLTHROUGH */
420        case DB_LAST:
421                flags = DB_PREV;
422                if (((ret = __ram_update(dbc,
423                    DB_MAX_RECORDS, 0)) != 0) && ret != DB_NOTFOUND)
424                        goto err;
425                if ((ret = __bam_nrecs(dbc, &cp->recno)) != 0)
426                        goto err;
427                if (cp->recno == 0) {
428                        ret = DB_NOTFOUND;
429                        goto err;
430                }
431                break;
432        case DB_GET_BOTHC:
433                /*
434                 * If we're doing a join and these are offpage dups,
435                 * we want to keep searching forward from after the
436                 * current cursor position.  Increment the recno by 1,
437                 * then proceed as for a DB_SET.
438                 *
439                 * Otherwise, we know there are no additional matching
440                 * data, as recnos don't have dups.  return DB_NOTFOUND.
441                 */
442                if (F_ISSET(dbc, DBC_OPD)) {
443                        cp->recno++;
444                        break;
445                }
446                ret = DB_NOTFOUND;
447                goto err;
448                /* NOTREACHED */
449        case DB_GET_BOTH:
450        case DB_GET_BOTH_RANGE:
451                /*
452                 * If we're searching a set of off-page dups, we start
453                 * a new linear search from the first record.  Otherwise,
454                 * we compare the single data item associated with the
455                 * requested record for a match.
456                 */
457                if (F_ISSET(dbc, DBC_OPD)) {
458                        cp->recno = 1;
459                        break;
460                }
461                /* FALLTHROUGH */
462        case DB_SET:
463        case DB_SET_RANGE:
464                if ((ret = __ram_getno(dbc, key, &cp->recno, 0)) != 0)
465                        goto err;
466                break;
467        default:
468                ret = __db_unknown_flag(dbp->dbenv, "__ram_c_get", flags);
469                goto err;
470        }
471
472        /*
473         * For DB_PREV, DB_LAST, DB_SET and DB_SET_RANGE, we have already
474         * called __ram_update() to make sure sufficient records have been
475         * read from the backing source file.  Do it now for DB_CURRENT (if
476         * the current record was deleted we may need more records from the
477         * backing file for a DB_CURRENT operation), DB_FIRST and DB_NEXT.
478         * (We don't have to test for flags == DB_FIRST, because the switch
479         * statement above re-set flags to DB_NEXT in that case.)
480         */
481        if ((flags == DB_NEXT || flags == DB_CURRENT) && ((ret =
482            __ram_update(dbc, cp->recno, 0)) != 0) && ret != DB_NOTFOUND)
483                goto err;
484
485        for (;; ++cp->recno) {
486                /* Search the tree for the record. */
487                if ((ret = __bam_rsearch(dbc, &cp->recno,
488                    F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND,
489                    1, &exact)) != 0)
490                        goto err;
491                if (!exact) {
492                        ret = DB_NOTFOUND;
493                        goto err;
494                }
495
496                /* Copy the page into the cursor. */
497                STACK_TO_CURSOR(cp);
498
499                /*
500                 * If re-numbering records, the on-page deleted flag means this
501                 * record was implicitly created.  If not re-numbering records,
502                 * the on-page deleted flag means this record was implicitly
503                 * created, or, it was deleted at some time.  Regardless, we
504                 * skip such records if doing cursor next/prev operations or
505                 * walking through off-page duplicates, and fail if they were
506                 * requested explicitly by the application.
507                 */
508                if (B_DISSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type))
509                        switch (flags) {
510                        case DB_NEXT:
511                        case DB_PREV:
512                                (void)__bam_stkrel(dbc, STK_CLRDBC);
513                                goto retry;
514                        case DB_GET_BOTH:
515                        case DB_GET_BOTH_RANGE:
516                                /*
517                                 * If we're an OPD tree, we don't care about
518                                 * matching a record number on a DB_GET_BOTH
519                                 * -- everything belongs to the same tree.  A
520                                 * normal recno should give up and return
521                                 * DB_NOTFOUND if the matching recno is deleted.
522                                 */
523                                if (F_ISSET(dbc, DBC_OPD)) {
524                                        (void)__bam_stkrel(dbc, STK_CLRDBC);
525                                        continue;
526                                }
527                                ret = DB_NOTFOUND;
528                                goto err;
529                        default:
530                                ret = DB_KEYEMPTY;
531                                goto err;
532                        }
533
534                if (flags == DB_GET_BOTH ||
535                    flags == DB_GET_BOTHC || flags == DB_GET_BOTH_RANGE) {
536                        if ((ret = __bam_cmp(dbp, data,
537                            cp->page, cp->indx, __bam_defcmp, &cmp)) != 0)
538                                return (ret);
539                        if (cmp == 0)
540                                break;
541                        if (!F_ISSET(dbc, DBC_OPD)) {
542                                ret = DB_NOTFOUND;
543                                goto err;
544                        }
545                        (void)__bam_stkrel(dbc, STK_CLRDBC);
546                } else
547                        break;
548        }
549
550        /* Return the key if the user didn't give us one. */
551        if (!F_ISSET(dbc, DBC_OPD)) {
552                if (flags != DB_GET_BOTH && flags != DB_GET_BOTH_RANGE &&
553                    flags != DB_SET && flags != DB_SET_RANGE)
554                        ret = __db_retcopy(dbp->dbenv,
555                            key, &cp->recno, sizeof(cp->recno),
556                            &dbc->rkey->data, &dbc->rkey->ulen);
557                F_SET(key, DB_DBT_ISSET);
558        }
559
560        /* The cursor was reset, no further delete adjustment is necessary. */
561err:    CD_CLR(cp);
562
563        return (ret);
564}
565
566/*
567 * __ram_c_put --
568 *      Recno cursor->c_put function.
569 *
570 * PUBLIC: int __ram_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
571 */
572int
573__ram_c_put(dbc, key, data, flags, pgnop)
574        DBC *dbc;
575        DBT *key, *data;
576        u_int32_t flags;
577        db_pgno_t *pgnop;
578{
579        BTREE_CURSOR *cp;
580        DB *dbp;
581        DB_LSN lsn;
582        int exact, nc, ret, t_ret;
583        u_int32_t iiflags;
584        void *arg;
585
586        COMPQUIET(pgnop, NULL);
587
588        dbp = dbc->dbp;
589        cp = (BTREE_CURSOR *)dbc->internal;
590
591        /*
592         * DB_KEYFIRST and DB_KEYLAST mean different things if they're
593         * used in an off-page duplicate tree.  If we're an off-page
594         * duplicate tree, they really mean "put at the beginning of the
595         * tree" and "put at the end of the tree" respectively, so translate
596         * them to something else.
597         */
598        if (F_ISSET(dbc, DBC_OPD))
599                switch (flags) {
600                case DB_KEYFIRST:
601                        cp->recno = 1;
602                        flags = DB_BEFORE;
603                        break;
604                case DB_KEYLAST:
605                        if ((ret = __ram_add(dbc,
606                            &cp->recno, data, DB_APPEND, 0)) != 0)
607                                return (ret);
608                        if (CURADJ_LOG(dbc) &&
609                            (ret = __bam_rcuradj_log(dbp, dbc->txn, &lsn, 0,
610                            CA_ICURRENT, cp->root, cp->recno, cp->order)))
611                                return (ret);
612                        return (0);
613                }
614
615        /*
616         * Handle normal DB_KEYFIRST/DB_KEYLAST;  for a recno, which has
617         * no duplicates, these are identical and mean "put the given
618         * datum at the given recno".
619         *
620         * Note that the code here used to be in __ram_put;  now, we
621         * go through the access-method-common __db_put function, which
622         * handles DB_NOOVERWRITE, so we and __ram_add don't have to.
623         */
624        if (flags == DB_KEYFIRST || flags == DB_KEYLAST) {
625                ret = __ram_getno(dbc, key, &cp->recno, 1);
626                if (ret == 0 || ret == DB_NOTFOUND)
627                        ret = __ram_add(dbc, &cp->recno, data, 0, 0);
628                return (ret);
629        }
630
631        /*
632         * If we're putting with a cursor that's marked C_DELETED, we need to
633         * take special care;  the cursor doesn't "really" reference the item
634         * corresponding to its current recno, but instead is "between" that
635         * record and the current one.  Translate the actual insert into
636         * DB_BEFORE, and let the __ram_ca work out the gory details of what
637         * should wind up pointing where.
638         */
639        if (CD_ISSET(cp))
640                iiflags = DB_BEFORE;
641        else
642                iiflags = flags;
643
644split:  if ((ret = __bam_rsearch(dbc, &cp->recno, S_INSERT, 1, &exact)) != 0)
645                goto err;
646        /*
647         * An inexact match is okay;  it just means we're one record past the
648         * end, which is reasonable if we're marked deleted.
649         */
650        DB_ASSERT(exact || CD_ISSET(cp));
651
652        /* Copy the page into the cursor. */
653        STACK_TO_CURSOR(cp);
654
655        ret = __bam_iitem(dbc, key, data, iiflags, 0);
656        t_ret = __bam_stkrel(dbc, STK_CLRDBC);
657
658        if (t_ret != 0 && (ret == 0 || ret == DB_NEEDSPLIT))
659                ret = t_ret;
660        else if (ret == DB_NEEDSPLIT) {
661                arg = &cp->recno;
662                if ((ret = __bam_split(dbc, arg, NULL)) != 0)
663                        goto err;
664                goto split;
665        }
666        if (ret != 0)
667                goto err;
668
669        switch (flags) {                        /* Adjust the cursors. */
670        case DB_AFTER:
671                nc = __ram_ca(dbc, CA_IAFTER);
672
673                /*
674                 * We only need to adjust this cursor forward if we truly added
675                 * the item after the current recno, rather than remapping it
676                 * to DB_BEFORE.
677                 */
678                if (iiflags == DB_AFTER)
679                        ++cp->recno;
680
681                /* Only log if __ram_ca found any relevant cursors. */
682                if (nc > 0 && CURADJ_LOG(dbc) &&
683                    (ret = __bam_rcuradj_log(dbp, dbc->txn, &lsn, 0, CA_IAFTER,
684                    cp->root, cp->recno, cp->order)) != 0)
685                        goto err;
686                break;
687        case DB_BEFORE:
688                nc = __ram_ca(dbc, CA_IBEFORE);
689                --cp->recno;
690
691                /* Only log if __ram_ca found any relevant cursors. */
692                if (nc > 0 && CURADJ_LOG(dbc) &&
693                    (ret = __bam_rcuradj_log(dbp, dbc->txn, &lsn, 0, CA_IBEFORE,
694                    cp->root, cp->recno, cp->order)) != 0)
695                        goto err;
696                break;
697        case DB_CURRENT:
698                /*
699                 * We only need to do an adjustment if we actually
700                 * added an item, which we only would have done if the
701                 * cursor was marked deleted.
702                 *
703                 * Only log if __ram_ca found any relevant cursors.
704                 */
705                if (CD_ISSET(cp) && __ram_ca(dbc, CA_ICURRENT) > 0 &&
706                    CURADJ_LOG(dbc) &&
707                    (ret = __bam_rcuradj_log(dbp, dbc->txn, &lsn, 0,
708                    CA_ICURRENT, cp->root, cp->recno, cp->order)) != 0)
709                        goto err;
710                break;
711        }
712
713        /* Return the key if we've created a new record. */
714        if (!F_ISSET(dbc, DBC_OPD) && (flags == DB_AFTER || flags == DB_BEFORE))
715                ret = __db_retcopy(dbp->dbenv, key, &cp->recno,
716                    sizeof(cp->recno), &dbc->rkey->data, &dbc->rkey->ulen);
717
718        /* The cursor was reset, no further delete adjustment is necessary. */
719err:    CD_CLR(cp);
720
721        return (ret);
722}
723
724/*
725 * __ram_ca --
726 *      Adjust cursors.  Returns the number of relevant cursors.
727 *
728 * PUBLIC: int __ram_ca __P((DBC *, ca_recno_arg));
729 */
730int
731__ram_ca(dbc_arg, op)
732        DBC *dbc_arg;
733        ca_recno_arg op;
734{
735        BTREE_CURSOR *cp, *cp_arg;
736        DB *dbp, *ldbp;
737        DB_ENV *dbenv;
738        DBC *dbc;
739        db_recno_t recno;
740        int adjusted, found;
741        u_int32_t order;
742
743        dbp = dbc_arg->dbp;
744        dbenv = dbp->dbenv;
745        cp_arg = (BTREE_CURSOR *)dbc_arg->internal;
746        recno = cp_arg->recno;
747
748        found = 0;
749
750        /*
751         * It only makes sense to adjust cursors if we're a renumbering
752         * recno;  we should only be called if this is one.
753         */
754        DB_ASSERT(F_ISSET(cp_arg, C_RENUMBER));
755
756        MUTEX_THREAD_LOCK(dbenv, dbenv->dblist_mutexp);
757        /*
758         * Adjust the cursors.  See the comment in __bam_ca_delete().
759         */
760        /*
761         * If we're doing a delete, we need to find the highest
762         * order of any cursor currently pointing at this item,
763         * so we can assign a higher order to the newly deleted
764         * cursor.  Unfortunately, this requires a second pass through
765         * the cursor list.
766         */
767        if (op == CA_DELETE) {
768                order = 1;
769                for (ldbp = __dblist_get(dbenv, dbp->adj_fileid);
770                    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
771                    ldbp = LIST_NEXT(ldbp, dblistlinks)) {
772                        MUTEX_THREAD_LOCK(dbenv, dbp->mutexp);
773                        for (dbc = TAILQ_FIRST(&ldbp->active_queue);
774                            dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
775                                cp = (BTREE_CURSOR *)dbc->internal;
776                                if (cp_arg->root == cp->root &&
777                                    recno == cp->recno && CD_ISSET(cp) &&
778                                    order <= cp->order)
779                                        order = cp->order + 1;
780                        }
781                        MUTEX_THREAD_UNLOCK(dbenv, dbp->mutexp);
782                }
783        } else
784                order = INVALID_ORDER;
785
786        /* Now go through and do the actual adjustments. */
787        for (ldbp = __dblist_get(dbenv, dbp->adj_fileid);
788            ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
789            ldbp = LIST_NEXT(ldbp, dblistlinks)) {
790                MUTEX_THREAD_LOCK(dbenv, dbp->mutexp);
791                for (dbc = TAILQ_FIRST(&ldbp->active_queue);
792                    dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
793                        cp = (BTREE_CURSOR *)dbc->internal;
794                        if (cp_arg->root != cp->root)
795                                continue;
796                        ++found;
797                        adjusted = 0;
798                        switch (op) {
799                        case CA_DELETE:
800                                if (recno < cp->recno) {
801                                        --cp->recno;
802                                        /*
803                                         * If the adjustment made them equal,
804                                         * we have to merge the orders.
805                                         */
806                                        if (recno == cp->recno && CD_ISSET(cp))
807                                                cp->order += order;
808                                } else if (recno == cp->recno &&
809                                    !CD_ISSET(cp)) {
810                                        CD_SET(cp);
811                                        cp->order = order;
812                                }
813                                break;
814                        case CA_IBEFORE:
815                                /*
816                                 * IBEFORE is just like IAFTER, except that we
817                                 * adjust cursors on the current record too.
818                                 */
819                                if (C_EQUAL(cp_arg, cp)) {
820                                        ++cp->recno;
821                                        adjusted = 1;
822                                }
823                                goto iafter;
824                        case CA_ICURRENT:
825
826                                /*
827                                 * If the original cursor wasn't deleted, we
828                                 * just did a replacement and so there's no
829                                 * need to adjust anything--we shouldn't have
830                                 * gotten this far.  Otherwise, we behave
831                                 * much like an IAFTER, except that all
832                                 * cursors pointing to the current item get
833                                 * marked undeleted and point to the new
834                                 * item.
835                                 */
836                                DB_ASSERT(CD_ISSET(cp_arg));
837                                if (C_EQUAL(cp_arg, cp)) {
838                                        CD_CLR(cp);
839                                        break;
840                                }
841                                /* FALLTHROUGH */
842                        case CA_IAFTER:
843iafter:                         if (!adjusted && C_LESSTHAN(cp_arg, cp)) {
844                                        ++cp->recno;
845                                        adjusted = 1;
846                                }
847                                if (recno == cp->recno && adjusted)
848                                        /*
849                                         * If we've moved this cursor's recno,
850                                         * split its order number--i.e.,
851                                         * decrement it by enough so that
852                                         * the lowest cursor moved has order 1.
853                                         * cp_arg->order is the split point,
854                                         * so decrement by one less than that.
855                                         */
856                                        cp->order -= (cp_arg->order - 1);
857                                break;
858                        }
859                }
860                MUTEX_THREAD_UNLOCK(dbp->dbenv, dbp->mutexp);
861        }
862        MUTEX_THREAD_UNLOCK(dbenv, dbenv->dblist_mutexp);
863
864        return (found);
865}
866
867/*
868 * __ram_getno --
869 *      Check the user's record number, and make sure we've seen it.
870 *
871 * PUBLIC: int __ram_getno __P((DBC *, const DBT *, db_recno_t *, int));
872 */
873int
874__ram_getno(dbc, key, rep, can_create)
875        DBC *dbc;
876        const DBT *key;
877        db_recno_t *rep;
878        int can_create;
879{
880        DB *dbp;
881        db_recno_t recno;
882
883        dbp = dbc->dbp;
884
885        /* Check the user's record number. */
886        if ((recno = *(db_recno_t *)key->data) == 0) {
887                __db_err(dbp->dbenv, "illegal record number of 0");
888                return (EINVAL);
889        }
890        if (rep != NULL)
891                *rep = recno;
892
893        /*
894         * Btree can neither create records nor read them in.  Recno can
895         * do both, see if we can find the record.
896         */
897        return (dbc->dbtype == DB_RECNO ?
898            __ram_update(dbc, recno, can_create) : 0);
899}
900
901/*
902 * __ram_update --
903 *      Ensure the tree has records up to and including the specified one.
904 */
905static int
906__ram_update(dbc, recno, can_create)
907        DBC *dbc;
908        db_recno_t recno;
909        int can_create;
910{
911        BTREE *t;
912        DB *dbp;
913        DBT *rdata;
914        db_recno_t nrecs;
915        int ret;
916
917        dbp = dbc->dbp;
918        t = dbp->bt_internal;
919
920        /*
921         * If we can't create records and we've read the entire backing input
922         * file, we're done.
923         */
924        if (!can_create && t->re_eof)
925                return (0);
926
927        /*
928         * If we haven't seen this record yet, try to get it from the original
929         * file.
930         */
931        if ((ret = __bam_nrecs(dbc, &nrecs)) != 0)
932                return (ret);
933        if (!t->re_eof && recno > nrecs) {
934                if ((ret = __ram_sread(dbc, recno)) != 0 && ret != DB_NOTFOUND)
935                        return (ret);
936                if ((ret = __bam_nrecs(dbc, &nrecs)) != 0)
937                        return (ret);
938        }
939
940        /*
941         * If we can create records, create empty ones up to the requested
942         * record.
943         */
944        if (!can_create || recno <= nrecs + 1)
945                return (0);
946
947        rdata = &dbc->my_rdata;
948        rdata->flags = 0;
949        rdata->size = 0;
950
951        while (recno > ++nrecs)
952                if ((ret = __ram_add(dbc,
953                    &nrecs, rdata, 0, BI_DELETED)) != 0)
954                        return (ret);
955        return (0);
956}
957
958/*
959 * __ram_source --
960 *      Load information about the backing file.
961 */
962static int
963__ram_source(dbp)
964        DB *dbp;
965{
966        BTREE *t;
967        char *source;
968        int ret;
969
970        t = dbp->bt_internal;
971
972        /* Find the real name, and swap out the one we had before. */
973        if ((ret = __db_appname(dbp->dbenv,
974            DB_APP_DATA, t->re_source, 0, NULL, &source)) != 0)
975                return (ret);
976        __os_free(dbp->dbenv, t->re_source);
977        t->re_source = source;
978
979        /*
980         * !!!
981         * It's possible that the backing source file is read-only.  We don't
982         * much care other than we'll complain if there are any modifications
983         * when it comes time to write the database back to the source.
984         */
985        if ((t->re_fp = fopen(t->re_source, "r")) == NULL) {
986                ret = errno;
987                __db_err(dbp->dbenv, "%s: %s", t->re_source, db_strerror(ret));
988                return (ret);
989        }
990
991        t->re_eof = 0;
992        return (0);
993}
994
995/*
996 * __ram_writeback --
997 *      Rewrite the backing file.
998 *
999 * PUBLIC: int __ram_writeback __P((DB *));
1000 */
1001int
1002__ram_writeback(dbp)
1003        DB *dbp;
1004{
1005        BTREE *t;
1006        DB_ENV *dbenv;
1007        DBC *dbc;
1008        DBT key, data;
1009        FILE *fp;
1010        db_recno_t keyno;
1011        int ret, t_ret;
1012        u_int8_t delim, *pad;
1013
1014        t = dbp->bt_internal;
1015        dbenv = dbp->dbenv;
1016        fp = NULL;
1017        pad = NULL;
1018
1019        /* If the file wasn't modified, we're done. */
1020        if (!t->re_modified)
1021                return (0);
1022
1023        /* If there's no backing source file, we're done. */
1024        if (t->re_source == NULL) {
1025                t->re_modified = 0;
1026                return (0);
1027        }
1028
1029        /* Allocate a cursor. */
1030        if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
1031                return (ret);
1032
1033        /*
1034         * Read any remaining records into the tree.
1035         *
1036         * !!!
1037         * This is why we can't support transactions when applications specify
1038         * backing (re_source) files.  At this point we have to read in the
1039         * rest of the records from the file so that we can write all of the
1040         * records back out again, which could modify a page for which we'd
1041         * have to log changes and which we don't have locked.  This could be
1042         * partially fixed by taking a snapshot of the entire file during the
1043         * DB->open as DB->open is transaction protected.  But, if a checkpoint
1044         * occurs then, the part of the log holding the copy of the file could
1045         * be discarded, and that would make it impossible to recover in the
1046         * face of disaster.  This could all probably be fixed, but it would
1047         * require transaction protecting the backing source file.
1048         *
1049         * XXX
1050         * This could be made to work now that we have transactions protecting
1051         * file operations.  Margo has specifically asked for the privilege of
1052         * doing this work.
1053         */
1054        if ((ret =
1055            __ram_update(dbc, DB_MAX_RECORDS, 0)) != 0 && ret != DB_NOTFOUND)
1056                return (ret);
1057
1058        /*
1059         * Close any existing file handle and re-open the file, truncating it.
1060         */
1061        if (t->re_fp != NULL) {
1062                if (fclose(t->re_fp) != 0) {
1063                        ret = errno;
1064                        goto err;
1065                }
1066                t->re_fp = NULL;
1067        }
1068        if ((fp = fopen(t->re_source, "w")) == NULL) {
1069                ret = errno;
1070                __db_err(dbenv, "%s: %s", t->re_source, db_strerror(ret));
1071                goto err;
1072        }
1073
1074        /*
1075         * We step through the records, writing each one out.  Use the record
1076         * number and the dbp->get() function, instead of a cursor, so we find
1077         * and write out "deleted" or non-existent records.  The DB handle may
1078         * be threaded, so allocate memory as we go.
1079         */
1080        memset(&key, 0, sizeof(key));
1081        key.size = sizeof(db_recno_t);
1082        key.data = &keyno;
1083        memset(&data, 0, sizeof(data));
1084        F_SET(&data, DB_DBT_REALLOC);
1085
1086        /*
1087         * We'll need the delimiter if we're doing variable-length records,
1088         * and the pad character if we're doing fixed-length records.
1089         */
1090        delim = t->re_delim;
1091        if (F_ISSET(dbp, DB_AM_FIXEDLEN)) {
1092                if ((ret = __os_malloc(dbenv, t->re_len, &pad)) != 0)
1093                        goto err;
1094                memset(pad, t->re_pad, t->re_len);
1095        }
1096        for (keyno = 1;; ++keyno) {
1097                switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
1098                case 0:
1099                        if (data.size != 0 && (u_int32_t)fwrite(
1100                            data.data, 1, data.size, fp) != data.size)
1101                                goto write_err;
1102                        break;
1103                case DB_KEYEMPTY:
1104                        if (F_ISSET(dbp, DB_AM_FIXEDLEN) &&
1105                            (u_int32_t)fwrite(pad, 1, t->re_len, fp) !=
1106                            t->re_len)
1107                                goto write_err;
1108                        break;
1109                case DB_NOTFOUND:
1110                        ret = 0;
1111                        goto done;
1112                default:
1113                        goto err;
1114                }
1115                if (!F_ISSET(dbp, DB_AM_FIXEDLEN) &&
1116                    fwrite(&delim, 1, 1, fp) != 1) {
1117write_err:              ret = errno;
1118                        __db_err(dbp->dbenv,
1119                            "%s: write failed to backing file: %s",
1120                            t->re_source, strerror(ret));
1121                        goto err;
1122                }
1123        }
1124
1125err:
1126done:   /* Close the file descriptor. */
1127        if (fp != NULL && fclose(fp) != 0) {
1128                if (ret == 0)
1129                        ret = errno;
1130                __db_err(dbenv, "%s: %s", t->re_source, db_strerror(errno));
1131        }
1132
1133        /* Discard the cursor. */
1134        if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
1135                ret = t_ret;
1136
1137        /* Discard memory allocated to hold the data items. */
1138        if (data.data != NULL)
1139                __os_ufree(dbenv, data.data);
1140        if (pad != NULL)
1141                __os_free(dbenv, pad);
1142
1143        if (ret == 0)
1144                t->re_modified = 0;
1145
1146        return (ret);
1147}
1148
1149/*
1150 * __ram_sread --
1151 *      Read records from a source file.
1152 */
1153static int
1154__ram_sread(dbc, top)
1155        DBC *dbc;
1156        db_recno_t top;
1157{
1158        BTREE *t;
1159        DB *dbp;
1160        DBT data, *rdata;
1161        db_recno_t recno;
1162        size_t len;
1163        int ch, ret, was_modified;
1164
1165        t = dbc->dbp->bt_internal;
1166        dbp = dbc->dbp;
1167        was_modified = t->re_modified;
1168
1169        if ((ret = __bam_nrecs(dbc, &recno)) != 0)
1170                return (ret);
1171
1172        /*
1173         * Use the record key return memory, it's only a short-term use.
1174         * The record data return memory is used by __bam_iitem, which
1175         * we'll indirectly call, so use the key so as not to collide.
1176         */
1177        len = F_ISSET(dbp, DB_AM_FIXEDLEN) ? t->re_len : 256;
1178        rdata = &dbc->my_rkey;
1179        if (rdata->ulen < len) {
1180                if ((ret = __os_realloc(
1181                    dbp->dbenv, len, &rdata->data)) != 0) {
1182                        rdata->ulen = 0;
1183                        rdata->data = NULL;
1184                        return (ret);
1185                }
1186                rdata->ulen = (u_int32_t)len;
1187        }
1188
1189        memset(&data, 0, sizeof(data));
1190        while (recno < top) {
1191                data.data = rdata->data;
1192                data.size = 0;
1193                if (F_ISSET(dbp, DB_AM_FIXEDLEN))
1194                        for (len = t->re_len; len > 0; --len) {
1195                                if ((ch = getc(t->re_fp)) == EOF) {
1196                                        if (data.size == 0)
1197                                                goto eof;
1198                                        break;
1199                                }
1200                                ((u_int8_t *)data.data)[data.size++] = ch;
1201                        }
1202                else
1203                        for (;;) {
1204                                if ((ch = getc(t->re_fp)) == EOF) {
1205                                        if (data.size == 0)
1206                                                goto eof;
1207                                        break;
1208                                }
1209                                if (ch == t->re_delim)
1210                                        break;
1211
1212                                ((u_int8_t *)data.data)[data.size++] = ch;
1213                                if (data.size == rdata->ulen) {
1214                                        if ((ret = __os_realloc(dbp->dbenv,
1215                                            rdata->ulen *= 2,
1216                                            &rdata->data)) != 0) {
1217                                                rdata->ulen = 0;
1218                                                rdata->data = NULL;
1219                                                return (ret);
1220                                        } else
1221                                                data.data = rdata->data;
1222                                }
1223                        }
1224
1225                /*
1226                 * Another process may have read this record from the input
1227                 * file and stored it into the database already, in which
1228                 * case we don't need to repeat that operation.  We detect
1229                 * this by checking if the last record we've read is greater
1230                 * or equal to the number of records in the database.
1231                 */
1232                if (t->re_last >= recno) {
1233                        ++recno;
1234                        if ((ret = __ram_add(dbc, &recno, &data, 0, 0)) != 0)
1235                                goto err;
1236                }
1237                ++t->re_last;
1238        }
1239
1240        if (0) {
1241eof:            t->re_eof = 1;
1242                ret = DB_NOTFOUND;
1243        }
1244err:    if (!was_modified)
1245                t->re_modified = 0;
1246
1247        return (ret);
1248}
1249
1250/*
1251 * __ram_add --
1252 *      Add records into the tree.
1253 */
1254static int
1255__ram_add(dbc, recnop, data, flags, bi_flags)
1256        DBC *dbc;
1257        db_recno_t *recnop;
1258        DBT *data;
1259        u_int32_t flags, bi_flags;
1260{
1261        BTREE_CURSOR *cp;
1262        int exact, ret, stack;
1263
1264        cp = (BTREE_CURSOR *)dbc->internal;
1265
1266retry:  /* Find the slot for insertion. */
1267        if ((ret = __bam_rsearch(dbc, recnop,
1268            S_INSERT | (flags == DB_APPEND ? S_APPEND : 0), 1, &exact)) != 0)
1269                return (ret);
1270        stack = 1;
1271
1272        /* Copy the page into the cursor. */
1273        STACK_TO_CURSOR(cp);
1274
1275        /*
1276         * The application may modify the data based on the selected record
1277         * number.
1278         */
1279        if (flags == DB_APPEND && dbc->dbp->db_append_recno != NULL &&
1280            (ret = dbc->dbp->db_append_recno(dbc->dbp, data, *recnop)) != 0)
1281                goto err;
1282
1283        /*
1284         * Select the arguments for __bam_iitem() and do the insert.  If the
1285         * key is an exact match, or we're replacing the data item with a
1286         * new data item, replace the current item.  If the key isn't an exact
1287         * match, we're inserting a new key/data pair, before the search
1288         * location.
1289         */
1290        switch (ret = __bam_iitem(dbc,
1291            NULL, data, exact ? DB_CURRENT : DB_BEFORE, bi_flags)) {
1292        case 0:
1293                /*
1294                 * Don't adjust anything.
1295                 *
1296                 * If we inserted a record, no cursors need adjusting because
1297                 * the only new record it's possible to insert is at the very
1298                 * end of the tree.  The necessary adjustments to the internal
1299                 * page counts were made by __bam_iitem().
1300                 *
1301                 * If we overwrote a record, no cursors need adjusting because
1302                 * future DBcursor->get calls will simply return the underlying
1303                 * record (there's no adjustment made for the DB_CURRENT flag
1304                 * when a cursor get operation immediately follows a cursor
1305                 * delete operation, and the normal adjustment for the DB_NEXT
1306                 * flag is still correct).
1307                 */
1308                break;
1309        case DB_NEEDSPLIT:
1310                /* Discard the stack of pages and split the page. */
1311                (void)__bam_stkrel(dbc, STK_CLRDBC);
1312                stack = 0;
1313
1314                if ((ret = __bam_split(dbc, recnop, NULL)) != 0)
1315                        goto err;
1316
1317                goto retry;
1318                /* NOTREACHED */
1319        default:
1320                goto err;
1321        }
1322
1323err:    if (stack)
1324                __bam_stkrel(dbc, STK_CLRDBC);
1325
1326        return (ret);
1327}
Note: See TracBrowser for help on using the repository browser.