root / hw / hw2 / MyStuff / gistget.c

Revision 65, 18.2 kB (checked in by cs186, 4 years ago)

checksplit should return value of resetoffset

Line 
1/*-------------------------------------------------------------------------
2 *
3 * gistget.c
4 *    fetch tuples from a GIST scan.
5 *
6 *
7 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 *    $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.62 2006/11/12 06:55:53 neilc Exp $
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/gist_private.h"
18#include "executor/execdebug.h"
19#include "pgstat.h"
20#include "utils/memutils.h"
21#include "utils/builtins.h"
22#include <stdio.h>
23
24
25static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n,
26       ScanDirection dir);
27static int  gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples);
28static double gistindex_keydistance(IndexTuple tuple, IndexScanDesc scan,
29          OffsetNumber offset);
30static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
31          OffsetNumber offset);
32
33static void
34killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr)
35{
36  Buffer    buffer = so->curbuf;
37
38  for (;;)
39  {
40    Page    p;
41    BlockNumber blkno;
42    OffsetNumber offset,
43          maxoff;
44
45    LockBuffer(buffer, GIST_SHARE);
46    gistcheckpage(r, buffer);
47    p = (Page) BufferGetPage(buffer);
48
49    if (buffer == so->curbuf && XLByteEQ(so->stack->lsn, PageGetLSN(p)))
50    {
51      /* page unchanged, so all is simple */
52      offset = ItemPointerGetOffsetNumber(iptr);
53      PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
54      SetBufferCommitInfoNeedsSave(buffer);
55      LockBuffer(buffer, GIST_UNLOCK);
56      break;
57    }
58
59    maxoff = PageGetMaxOffsetNumber(p);
60
61    for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
62    {
63      IndexTuple  ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
64
65      if (ItemPointerEquals(&(ituple->t_tid), iptr))
66      {
67        /* found */
68        PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
69        SetBufferCommitInfoNeedsSave(buffer);
70        LockBuffer(buffer, GIST_UNLOCK);
71        if (buffer != so->curbuf)
72          ReleaseBuffer(buffer);
73        return;
74      }
75    }
76
77    /* follow right link */
78
79    /*
80     * ??? is it good? if tuple dropped by concurrent vacuum, we will read
81     * all leaf pages...
82     */
83    blkno = GistPageGetOpaque(p)->rightlink;
84    LockBuffer(buffer, GIST_UNLOCK);
85    if (buffer != so->curbuf)
86      ReleaseBuffer(buffer);
87
88    if (blkno == InvalidBlockNumber)
89      /* can't found, dropped by somebody else */
90      return;
91    buffer = ReadBuffer(r, blkno);
92  }
93}
94
95/*
96 * gistgettuple() -- Get the next tuple in the scan
97 */
98Datum
99gistgettuple(PG_FUNCTION_ARGS)
100{
101  IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
102  ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
103  GISTScanOpaque so;
104  ItemPointerData tid;
105  bool    res;
106
107  so = (GISTScanOpaque) scan->opaque;
108
109  /*
110   * If we have produced an index tuple in the past and the executor has
111   * informed us we need to mark it as "killed", do so now.
112   */
113  if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData)))
114    killtuple(scan->indexRelation, so, &(scan->currentItemData));
115
116  /*
117   * Get the next tuple that matches the search key. If asked to skip killed
118   * tuples, continue looping until we find a non-killed tuple that matches
119   * the search key.
120   */
121  res = (gistnext(scan, dir, &tid, 1, scan->ignore_killed_tuples)) ? true : false;
122
123  PG_RETURN_BOOL(res);
124}
125
126Datum
127gistgetmulti(PG_FUNCTION_ARGS)
128{
129  IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
130  ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1);
131  int32    max_tids = PG_GETARG_INT32(2);
132  int32     *returned_tids = (int32 *) PG_GETARG_POINTER(3);
133
134  *returned_tids = gistnext(scan, ForwardScanDirection, tids, max_tids, false);
135
136  PG_RETURN_BOOL(*returned_tids == max_tids);
137}
138
139/*
140 * CS186 HW2: RELEVANT CODE BEGINS HERE
141 */
142
143/* insert a new priority queue entry into so->pq */
144static void
145gist_pq_insertpq(GISTScanOpaque so, /* State of a GiST scan */
146                 GISTPQ *newpq /* a new GISTPQ entry */
147                 )
148{
149  /* CS186 HW2: fill me in! */
150}
151
152/*
153 * given the current state of the index scan, generate a GISTPQ to insert into
154 * the priority queue, and insert it. Note that the "stack" holds the blocks
155 * from the root to the current item.
156 * There are two cases to consider:
157 *   1) the item being inserted is an entry in a leaf node, identified by a
158 *      block number (from the stack), and the offset.  It points to a data
159 *      item in the database (isdata == true).
160 *   2) the item being inserted is the block number of an index node to
161 *      be visited next (isdata == false).  The key corresponding to this
162 *      block number is in the block's *parent* in the tree (which can be
163 *      found in the stack) at the given offset.
164 */
165static void
166gist_pq_insert(IndexScanDesc scan, /* scan descriptor */
167               OffsetNumber offset, /* "slot number" of item being inserted */
168               GISTSearchStack *stack, /* search stack to this item */
169               bool isData /* is the item being inserted a pointer to data? */
170               )
171{
172  /* CS186 HW2: fill me in! Should call gist_pq_insertpq. */
173
174}
175
176/* Find the minimum item in the priority queue (without deleting it) */
177static GISTPQ *
178gist_pq_findmin(GISTScanOpaque so)
179{
180  /* CS186 HW2: implement me */
181  return NULL;
182}
183
184/*
185 * Find the min item in the priority queue, remove it from the queue, and
186 * return it.  Should NOT free the space allocated for the min; the caller
187 * does that when they're done with it.
188 */
189static GISTPQ *
190gist_pq_deletemin(GISTScanOpaque so)
191{
192  /* CS186 HW2: implement me! */
193  return NULL;
194}
195
196/* Optional debugging routine -- dump the state of the priority queue */
197static void
198gist_pq_dump(GISTScanOpaque so)
199{
200  /* CS186 HW2: fill me in if you like. */
201}
202
203/* Support routine for gistnext, to load in the root element on first call */
204static void
205gistnext_get_root(IndexScanDesc scan, GISTScanOpaque so)
206{
207  GISTSearchStack *stk;
208
209  /* Being asked to fetch the first entry, so start at the root */
210  Assert(so->curbuf == InvalidBuffer);
211  Assert(so->stack == NULL);
212
213  so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO);
214
215  stk = so->stack = (GISTSearchStack *) palloc0(sizeof(GISTSearchStack));
216
217  stk->next = NULL;
218
219  pgstat_count_index_scan(&scan->xs_pgstat_info);
220}
221
222/*
223 * Support routine for gistnext.  Deals with concurrency control issues to
224 * check if the top node on the search stack was split by a concurrent
225 * inserter since the time it was put on the stack.  If so, the "rest"
226 * of the split can be found by adding the right-hand
227 * neighbor of the split node to the search stack.  See
228 * Kornacker/Mohan/Hellerstein, SIGMOD 1997, for details.
229 */
230static bool
231gistnext_checksplit(IndexScanDesc scan, GISTScanOpaque so, Page p,
232                    GISTPageOpaque opaque)
233{
234  bool            resetoffset = false;
235  GISTSearchStack *stk;
236
237  if (XLogRecPtrIsInvalid(so->stack->lsn) || !XLByteEQ(so->stack->lsn, PageGetLSN(p)))
238  {
239    /* page changed from last visit or visit first time , reset offset */
240    so->stack->lsn = PageGetLSN(p);
241    resetoffset = true;
242
243    /* check page split, occured from last visit or visit to parent */
244    if (!XLogRecPtrIsInvalid(so->stack->parentlsn) &&
245      XLByteLT(so->stack->parentlsn, opaque->nsn) &&
246      opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
247      (so->stack->next == NULL || so->stack->next->block != opaque->rightlink)    /* check if already
248        added */ )
249    {
250      /* detect page split, follow right link to add pages */
251
252      /*
253       * CS186 HW2: Fix me!
254       * Modify this next code to put the rightlinked block on
255       * the priority queue.
256       * Note: since this page was split since it was inserted into
257       * the priority queue, it has fewer items on it, and a different
258       * bounding box, so its priority may have changed.
259       */
260      stk = (GISTSearchStack *) palloc0(sizeof(GISTSearchStack));
261      stk->next = so->stack->next;
262      stk->block = opaque->rightlink;
263      stk->parentlsn = so->stack->parentlsn;
264      memset(&(stk->lsn), 0, sizeof(GistNSN));
265      so->stack->next = stk;
266    }
267  }
268  return resetoffset;
269}
270
271/*
272 * Fetch tuples that matchs the search key; this can be invoked
273 * either to fetch the first such tuple or subsequent matching
274 * tuples. Returns true iff a matching tuple was found.
275 */
276static int
277gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples)
278{
279  Page    p;
280  OffsetNumber n;
281  GISTScanOpaque so;
282  GISTSearchStack *stk;
283  IndexTuple  it;
284  GISTPageOpaque opaque;
285  bool    resetoffset = false;
286  int      ntids = 0;
287 
288  /* CS186 HW2: This code currently returns tuples that match the query in
289   * an arbitrary order (depth-first left-to-right).  Modify it to return
290   * matching tuples ordered by distance to search key
291   */
292
293  so = (GISTScanOpaque) scan->opaque;
294
295  if (ItemPointerIsValid(&scan->currentItemData) == false)
296  {
297    gistnext_get_root(scan, so);
298  }
299  else if (so->curbuf == InvalidBuffer)
300  {
301    return 0;
302  }
303
304  for (;;)
305  {
306    /* First of all, we need to get a "lightweight" lock on the buffer */
307    Assert(so->curbuf != InvalidBuffer);
308    LockBuffer(so->curbuf, GIST_SHARE);
309   
310    gistcheckpage(scan->indexRelation, so->curbuf);
311    p = BufferGetPage(so->curbuf);
312    opaque = GistPageGetOpaque(p);
313
314    /* deal with case where this page split since last we saw it */
315    resetoffset = gistnext_checksplit(scan, so, p, opaque);
316
317    /* if page is empty, then just skip it */
318    if (PageIsEmpty(p))
319    {
320      LockBuffer(so->curbuf, GIST_UNLOCK);
321      stk = so->stack->next;
322      pfree(so->stack);
323      so->stack = stk;
324
325      if (so->stack == NULL)
326      {
327        ReleaseBuffer(so->curbuf);
328        so->curbuf = InvalidBuffer;
329        return ntids;
330      }
331
332      so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
333                        stk->block);
334      continue;
335    }
336
337    if (!GistPageIsLeaf(p) || resetoffset ||
338      !ItemPointerIsValid(&scan->currentItemData))
339    {
340      if (ScanDirectionIsBackward(dir))
341        n = PageGetMaxOffsetNumber(p);
342      else
343        n = FirstOffsetNumber;
344    }
345    else
346    {
347      n = ItemPointerGetOffsetNumber(&(scan->currentItemData));
348
349      if (ScanDirectionIsBackward(dir))
350        n = OffsetNumberPrev(n);
351      else
352        n = OffsetNumberNext(n);
353    }
354
355    /* wonderful, we can look at page */
356
357    for (;;)
358    {
359      n = gistfindnext(scan, n, dir);
360
361      if (!OffsetNumberIsValid(n))
362      {
363        /*
364         * We ran out of matching index entries on the current page,
365         * so pop the top stack entry and use it to continue the
366         * search.
367         */
368        LockBuffer(so->curbuf, GIST_UNLOCK);
369        stk = so->stack->next;
370        pfree(so->stack);
371        so->stack = stk;
372
373        /* If we're out of stack entries, we're done */
374
375        if (so->stack == NULL)
376        {
377          ReleaseBuffer(so->curbuf);
378          so->curbuf = InvalidBuffer;
379          return ntids;
380        }
381
382        so->curbuf = ReleaseAndReadBuffer(so->curbuf,
383                          scan->indexRelation,
384                          stk->block);
385        /* XXX  go up */
386        break;
387      }
388
389      if (GistPageIsLeaf(p))
390      {
391        /*
392         * We've found a matching index entry in a leaf page, so
393         * return success. Note that we keep "curbuf" pinned so that
394         * we can efficiently resume the index scan later.
395         */
396
397        ItemPointerSet(&(scan->currentItemData),
398                 BufferGetBlockNumber(so->curbuf), n);
399
400        if (!(ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n))))
401        {
402          it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
403          tids[ntids] = scan->xs_ctup.t_self = it->t_tid;
404          ntids++;
405
406          if (ntids == maxtids)
407          {
408            LockBuffer(so->curbuf, GIST_UNLOCK);
409            return ntids;
410          }
411        }
412      }
413      else
414      {
415        /*
416         * We've found an entry in an internal node whose key is
417         * consistent with the search key, so push it to stack
418         */
419
420        stk = (GISTSearchStack *) palloc0(sizeof(GISTSearchStack));
421
422        it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
423        stk->block = ItemPointerGetBlockNumber(&(it->t_tid));
424        memset(&(stk->lsn), 0, sizeof(GistNSN));
425        stk->parentlsn = so->stack->lsn;
426
427        stk->next = so->stack->next;
428        so->stack->next = stk;
429
430      }
431
432      if (ScanDirectionIsBackward(dir))
433        n = OffsetNumberPrev(n);
434      else
435        n = OffsetNumberNext(n);
436    }
437  }
438
439  return ntids;
440}
441
442
443/*
444 * gistindex_keytest() -- does this index tuple satisfy the scan key(s)?
445 *
446 * We must decompress the key in the IndexTuple before passing it to the
447 * sk_func (and we have previously overwritten the sk_func to use the
448 * user-defined Consistent method, so we actually are invoking that).
449 *
450 * Note that this function is always invoked in a short-lived memory context,
451 * so we don't need to worry about cleaning up allocated memory, either here
452 * or in the implementation of any Consistent methods.
453 */
454static bool
455gistindex_keytest(IndexTuple tuple,
456          IndexScanDesc scan,
457          OffsetNumber offset)
458{
459  int      keySize = scan->numberOfKeys;
460  ScanKey    key = scan->keyData;
461  Relation  r = scan->indexRelation;
462  GISTScanOpaque so;
463  Page    p;
464  GISTSTATE  *giststate;
465
466  so = (GISTScanOpaque) scan->opaque;
467  giststate = so->giststate;
468  p = BufferGetPage(so->curbuf);
469
470  IncrIndexProcessed();
471
472  /*
473   * Tuple doesn't restore after crash recovery because of incomplete insert
474   */
475  if (!GistPageIsLeaf(p) && GistTupleIsInvalid(tuple))
476    return true;
477
478  while (keySize > 0)
479  {
480    Datum    datum;
481    bool    isNull;
482    Datum    test;
483    GISTENTRY  de;
484
485    datum = index_getattr(tuple,
486                key->sk_attno,
487                giststate->tupdesc,
488                &isNull);
489
490    if (key->sk_flags & SK_ISNULL)
491    {
492      /*
493       * is the compared-to datum NULL? on non-leaf page it's possible
494       * to have nulls in childs :(
495       */
496
497      if (isNull || !GistPageIsLeaf(p))
498        return true;
499      return false;
500    }
501    else if (isNull)
502      return false;
503
504    /* decompress the key! */
505    gistdentryinit(giststate, key->sk_attno - 1, &de,
506             datum, r, p, offset,
507             FALSE, isNull);
508
509    /*
510     * Call the Consistent function to evaluate the test.  The arguments
511     * are the index datum (as a GISTENTRY*), the comparison datum, and
512     * the comparison operator's strategy number and subtype from pg_amop.
513     *
514     * (Presently there's no need to pass the subtype since it'll always
515     * be zero, but might as well pass it for possible future use.)
516     */
517    test = FunctionCall4(&key->sk_func,
518               PointerGetDatum(&de),
519               key->sk_argument,
520               Int32GetDatum(key->sk_strategy),
521               ObjectIdGetDatum(key->sk_subtype));
522
523    if (!DatumGetBool(test))
524      return false;
525
526    keySize--;
527    key++;
528  }
529
530  return true;
531}
532
533/*
534 * Use the mindistance function to compute the distance of the given index
535 * entry from the constant in the scan.  See gistindex_keytest for an
536 * analogous task.
537 */
538
539static double
540gistindex_keydistance(  IndexTuple tuple,
541            IndexScanDesc scan,
542            OffsetNumber offset)
543{
544  int      keySize = scan->numberOfKeys;
545  ScanKey    key = scan->keyData;
546  Relation  r = scan->indexRelation;
547  GISTScanOpaque so;
548  Page    p;
549  GISTSTATE  *giststate;
550  double retval = 0;
551 
552  so = (GISTScanOpaque) scan->opaque;
553  giststate = so->giststate;
554  p = BufferGetPage(so->curbuf);
555
556  while (keySize > 0)
557  {
558    Datum    datum;
559    bool    isNull;
560    Datum    test;
561    GISTENTRY  de;
562
563    datum = index_getattr(tuple,
564                key->sk_attno,
565                giststate->tupdesc,
566                &isNull);
567
568    if (key->sk_flags & SK_ISNULL)
569    {
570      /*
571       * is the compared-to datum NULL? on non-leaf page it's possible
572       * to have nulls in childs :(
573       */
574
575      if (isNull || !GistPageIsLeaf(p))
576        return get_float8_infinity();
577      return 0;
578    }
579    else if (isNull)
580      return 0;
581
582    gistdentryinit(giststate, key->sk_attno - 1, &de,
583             datum, r, p, offset,
584             FALSE, isNull);
585
586    /*
587     * Call the Distance function to evaluate the test.  The arguments
588     * are the index datum (as a GISTENTRY*), and the comparison datum.
589     */
590    test = FunctionCall2(&giststate->distanceFn[key->sk_attno - 1],
591               PointerGetDatum(&de),
592               key->sk_argument);
593
594    retval += DatumGetFloat8(test);
595
596    keySize--;
597    key++;
598  }
599
600  return retval;
601}
602
603/*
604 * Return the offset of the first index entry that is consistent with
605 * the search key after offset 'n' in the current page. If there are
606 * no more consistent entries, return InvalidOffsetNumber.
607 * Page should be locked....
608 */
609static OffsetNumber
610gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
611{
612  OffsetNumber maxoff;
613  IndexTuple  it;
614  GISTScanOpaque so;
615  MemoryContext oldcxt;
616  Page    p;
617
618  so = (GISTScanOpaque) scan->opaque;
619  p = BufferGetPage(so->curbuf);
620  maxoff = PageGetMaxOffsetNumber(p);
621
622  /*
623   * Make sure we're in a short-lived memory context when we invoke a
624   * user-supplied GIST method in gistindex_keytest(), so we don't leak
625   * memory
626   */
627  oldcxt = MemoryContextSwitchTo(so->tempCxt);
628
629  /*
630   * If we modified the index during the scan, we may have a pointer to a
631   * ghost tuple, before the scan.  If this is the case, back up one.
632   */
633  if (so->flags & GS_CURBEFORE)
634  {
635    so->flags &= ~GS_CURBEFORE;
636    n = OffsetNumberPrev(n);
637  }
638
639  while (n >= FirstOffsetNumber && n <= maxoff)
640  {
641    it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
642    if (gistindex_keytest(it, scan, n))
643      break;
644
645    if (ScanDirectionIsBackward(dir))
646      n = OffsetNumberPrev(n);
647    else
648      n = OffsetNumberNext(n);
649  }
650
651  MemoryContextSwitchTo(oldcxt);
652  MemoryContextReset(so->tempCxt);
653
654  /*
655   * If we found a matching entry, return its offset; otherwise return
656   * InvalidOffsetNumber to inform the caller to go to the next page.
657   */
658  if (n >= FirstOffsetNumber && n <= maxoff)
659    return n;
660  else
661    return InvalidOffsetNumber;
662}
Note: See TracBrowser for help on using the browser.