[Groonga-commit] groonga/groonga [master] Added an offline index builder

Back to archive index

null+****@clear***** null+****@clear*****
2012年 1月 19日 (木) 19:56:19 JST


Daijiro MORI	2012-01-19 19:56:19 +0900 (Thu, 19 Jan 2012)

  New Revision: 9d6bcc7fa960736d7295955900da6373bbaabc97

  Log:
    Added an offline index builder

  Modified files:
    lib/ii.c
    lib/ii.h

  Modified: lib/ii.c (+496 -0)
===================================================================
--- lib/ii.c    2012-01-19 17:42:02 +0900 (6d0e8e3)
+++ lib/ii.c    2012-01-19 19:56:19 +0900 (a1be7c6)
@@ -6335,3 +6335,499 @@ grn_ii_inspect_elements(grn_ctx *ctx, grn_ii *ii, grn_obj *buf)
   GRN_TEXT_PUTS(ctx, buf, "]");
 }
 
+/********************** offline index builder ***********************/
+
+const grn_id BUILD_RID_FLAG = 0x80000000;
+#ifdef BUILD_ORDER_BY_ID
+const int BUILD_ORDER = GRN_CURSOR_BY_ID;
+#else /* BUILD_ORDER_BY_ID */
+const int BUILD_ORDER = GRN_CURSOR_BY_KEY;
+#endif /* BUILD_ORDER_BY_ID */
+const uint16_t BUILD_NTERMS_PER_BUFFER = 16300;
+const uint32_t BUILD_PACKED_BUFFER_SIZE = 0x4000000;
+const char *TMPFILE_PATH = "/tmp/grn_ii_builder_tmp";
+const uint32_t BUILD_NCOUNTERS_MARGIN = 0x100000;
+const size_t BUILD_BLOCK_SIZE = 0x100000;
+
+typedef struct {
+  uint32_t nrecs;
+  uint32_t nposts;
+  grn_id lastrec;
+  uint32_t offset;
+} builder_counter;
+
+typedef struct {
+  off_t head;
+  off_t tail;
+  uint32_t nextsize;
+  uint32_t *buffer;
+  uint32_t buffersize;
+  uint32_t *bufcur;
+  uint32_t rest;
+  grn_id tid;
+  uint32_t nrecs;
+  uint32_t nposts;
+  grn_id *recs;
+  uint32_t *tfs;
+  uint32_t *posts;
+} builder_block;
+
+typedef struct {
+  grn_obj *target;
+  grn_obj *lexicon;
+  grn_obj *source;
+  builder_block *blocks;
+  uint32_t nblocks;
+  int tmpfd;
+  // stuff for parsing
+  off_t filepos;
+  grn_id *blockbuf;
+  uint32_t blockpos;
+  builder_counter *counters;
+  uint32_t ncounters;
+  // stuff for merging
+  grn_ii *ii;
+  uint32_t lseg;
+  uint32_t dseg;
+  buffer *term_buffer;
+  datavec data_vectors[MAX_N_ELEMENTS + 1];
+  uint8_t *packed;
+  uint32_t packed_len;
+  uint32_t total_chunk_size;
+} grn_ii_builder;
+
+static void
+grn_ii_builder_flush(grn_ctx *ctx, grn_ii_builder *builder)
+{
+  uint32_t pos = 0, pos_ = 0;
+  uint32_t *outbuf = (uint32_t *)GRN_MALLOC(builder->blockpos * 3 * sizeof(uint32_t));
+  builder_block *block;
+  if (!(builder->nblocks & 0x3ff)) {
+    builder_block *blocks = GRN_REALLOC(builder->blocks,
+                                        (builder->nblocks + 0x400) * sizeof(builder_block));
+    if (!blocks) { /* err */ }
+    builder->blocks = blocks;
+  }
+  block = &builder->blocks[builder->nblocks];
+  block->head = builder->filepos;
+  block->rest = 0;
+  block->buffer = NULL;
+  block->buffersize = 0;
+  {
+    grn_id tid;
+    grn_table_cursor  *tc;
+    uint32_t *pnext = &block->nextsize;
+    tc = grn_table_cursor_open(ctx, builder->lexicon, NULL, 0, NULL, 0, 0, -1, BUILD_ORDER);
+    while ((tid = grn_table_cursor_next(ctx, tc)) != GRN_ID_NIL) {
+      builder_counter *counter = &builder->counters[tid - 1];
+      if (counter->nrecs) {
+        uint32_t nposts = counter->nposts;
+        outbuf[pos++] = tid;
+        outbuf[pos++] = counter->nrecs;
+        outbuf[pos++] = counter->nposts;
+        counter->offset = pos;
+        outbuf[pos] = 0;
+        pos += counter->nrecs * 2;
+        counter->nposts = pos;
+        pos += nposts;
+      }
+      if (pos_ + 0x80000 < pos) {
+        *pnext = pos - pos_+ 1;
+        pnext = &outbuf[pos++];
+        pos_ = pos;
+      }
+    }
+    grn_table_cursor_close(ctx, tc);
+    if (pos_ < pos) { *pnext = pos - pos_; }
+  }
+  {
+    grn_id rid = 0;
+    uint32_t post = 0;
+    uint32_t rest;
+    grn_id *bp;
+    for (bp = builder->blockbuf, rest = builder->blockpos; rest; bp++, rest--) {
+      grn_id id = *bp;
+      if (id & BUILD_RID_FLAG) {
+        rid = id - BUILD_RID_FLAG;
+        post = 0;
+      } else {
+        builder_counter *counter = &builder->counters[id - 1];
+        outbuf[counter->nposts++] = post;
+        if (outbuf[counter->offset]) {
+          if (outbuf[counter->offset] == rid) {
+            outbuf[counter->offset + counter->nrecs]++;
+          } else {
+            outbuf[++counter->offset] = rid;
+            outbuf[counter->offset + counter->nrecs] = 1;
+          }
+        } else {
+          outbuf[counter->offset] = rid;
+          outbuf[counter->offset + counter->nrecs] = 1;
+        }
+        post++;
+      }
+    }
+  }
+  {
+    ssize_t r = write(builder->tmpfd, outbuf, pos * sizeof(uint32_t));
+    if (r > 0) { builder->filepos += r; }
+    block->tail = builder->filepos;
+  }
+  builder->nblocks++;
+  GRN_FREE(outbuf);
+  {
+    builder_counter *counter;
+    grn_id tid, tid_max = grn_table_size(ctx, builder->lexicon);
+    for (counter = builder->counters, tid = 1; tid <= tid_max; counter++, tid++) {
+      counter->nrecs = 0;
+      counter->nposts = 0;
+      counter->lastrec = 0;
+      counter->offset = 0;
+    }
+  }
+  builder->blockpos = 0;
+}
+
+static void
+grn_ii_builder_tokenize(grn_ctx *ctx, grn_ii_builder *builder, grn_id rid, grn_obj *value)
+{
+  uint32_t pos;
+  grn_token *token;
+  grn_id *buffer = builder->blockbuf;
+  if (BUILD_BLOCK_SIZE <= builder->blockpos + GRN_TEXT_LEN(value) * 2) {
+    grn_ii_builder_flush(ctx, builder);
+  }
+  pos = builder->blockpos;
+  buffer[pos++] = rid + BUILD_RID_FLAG;
+  if (GRN_TEXT_LEN(value) &&
+      (token = grn_token_open(ctx, builder->lexicon,
+                              GRN_TEXT_VALUE(value), GRN_TEXT_LEN(value), grn_token_add))) {
+    uint32_t pos_ = pos;
+    while (!token->status) {
+      grn_id tid;
+      if ((tid = grn_token_next(ctx, token))) {
+        if (tid > builder->ncounters) {
+          uint32_t ncounters = grn_table_size(ctx, builder->lexicon) + BUILD_NCOUNTERS_MARGIN;
+          builder_counter *counters = GRN_REALLOC(builder->counters,
+                                                  ncounters * sizeof(builder_counter));
+          if (!counters) { return; }
+          memset(&counters[builder->ncounters], 0,
+                 (ncounters - builder->ncounters) * sizeof(builder_counter));
+          builder->ncounters = ncounters;
+          builder->counters = counters;
+        }
+        {
+          builder_counter *counter = &builder->counters[tid - 1];
+          buffer[pos++] = tid;
+          if (counter->lastrec != rid) {
+            counter->lastrec = rid;
+            counter->nrecs++;
+          }
+          counter->nposts++;
+        }
+      }
+    }
+    grn_token_close(ctx, token);
+    if (pos - pos_ > GRN_TEXT_LEN(value)) {
+      GRN_LOG(ctx, GRN_LOG_WARNING, "%d > %d", pos - pos_, GRN_TEXT_LEN(value));
+    }
+  }
+  builder->blockpos = pos;
+}
+
+static void
+grn_ii_builder_parse(grn_ctx *ctx, grn_ii_builder *builder)
+{
+  grn_table_cursor  *tc;
+  builder->ncounters = grn_table_size(ctx, builder->lexicon) + BUILD_NCOUNTERS_MARGIN;
+  builder->counters = GRN_CALLOC(builder->ncounters * sizeof(builder_counter));
+  builder->blockbuf = (grn_id *)GRN_MALLOC(BUILD_BLOCK_SIZE * sizeof(grn_id));
+  builder->blockpos = 0;
+  builder->tmpfd = open(TMPFILE_PATH, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, 0666);
+  builder->filepos = 0;
+  if ((tc = grn_table_cursor_open(ctx, builder->target,
+                                  NULL, 0, NULL, 0, 0, -1, GRN_CURSOR_BY_ID))) {
+    grn_id id;
+    grn_obj rv;
+    GRN_TEXT_INIT(&rv, 0);
+    while ((id = grn_table_cursor_next(ctx, tc)) != GRN_ID_NIL) {
+      GRN_BULK_REWIND(&rv);
+      grn_obj_get_value(ctx, builder->source, id, &rv);
+      grn_ii_builder_tokenize(ctx, builder, id, &rv);
+    }
+    GRN_OBJ_FIN(ctx, &rv);
+    if (builder->blockpos) {
+      grn_ii_builder_flush(ctx, builder);
+    }
+    grn_table_cursor_close(ctx, tc);
+  }
+  close(builder->tmpfd);
+  GRN_FREE(builder->blockbuf);
+  GRN_FREE(builder->counters);
+  GRN_LOG(ctx, GRN_LOG_WARNING, "nblocks: %d", builder->nblocks);
+}
+
+static void
+grn_ii_builder_fetch(grn_ctx *ctx, grn_ii_builder *builder, builder_block *block)
+{
+  if (!block->rest) {
+    if (block->head < block->tail) {
+      size_t bytesize = block->nextsize * sizeof(uint32_t);
+      if (block->buffersize < block->nextsize) {
+        void *r = GRN_REALLOC(block->buffer, bytesize);
+        if (r) {
+          block->buffer = (uint32_t *)r;
+          block->buffersize = block->nextsize;
+        } else {
+          GRN_LOG(ctx, GRN_LOG_WARNING, "realloc: %d", bytesize);
+          return;
+        }
+      }
+      pread(builder->tmpfd, block->buffer, bytesize, block->head);
+      block->head += bytesize;
+      block->bufcur = block->buffer;
+      if (block->head >= block->tail) {
+        if (block->head > block->tail) {
+          GRN_LOG(ctx, GRN_LOG_WARNING, "fetch error: %jd > %jd", block->head, block->tail);
+        }
+        block->rest = block->nextsize;
+        block->nextsize = 0;
+      } else {
+        block->rest = block->nextsize - 1;
+        block->nextsize = block->buffer[block->rest];
+      }
+    }
+  }
+  if (block->rest) {
+    block->tid = block->bufcur[0];
+    block->nrecs = block->bufcur[1];
+    block->nposts = block->bufcur[2];
+    {
+      uint32_t entrysize = 3 + block->nrecs * 2 + block->nposts;
+      if (entrysize > block->rest) {
+        GRN_LOG(ctx, GRN_LOG_WARNING, "rest: %d, %d", block->rest, entrysize);
+      }
+      block->recs = block->bufcur + 3;
+      block->tfs = block->recs + block->nrecs;
+      block->posts = block->tfs + block->nrecs;
+      block->rest -= entrysize;
+      block->bufcur += entrysize;
+    }
+  } else {
+    block->tid = 0;
+  }
+}
+
+static void
+grn_ii_builder_chunk_flush(grn_ctx *ctx, grn_ii_builder *builder)
+{
+  grn_io_win io_win;
+  uint32_t chunk_number;
+  chunk_new(ctx, builder->ii, &chunk_number, builder->packed_len);
+  GRN_LOG(ctx, GRN_LOG_INFO, "chunk:%d, packed_len:%d",
+          chunk_number, builder->packed_len);
+  fake_map2(ctx, builder->ii->chunk, &io_win, builder->packed,
+            chunk_number, builder->packed_len);
+  grn_io_win_unmap2(&io_win);
+  builder->term_buffer->header.chunk = chunk_number;
+  builder->term_buffer->header.chunk_size = builder->packed_len;
+  builder->term_buffer->header.buffer_free =
+    S_SEGMENT - sizeof(buffer_header) -
+    builder->term_buffer->header.nterms * sizeof(buffer_term);
+  builder->term_buffer->header.nterms_void = 0;
+  buffer_segment_update(builder->ii, builder->lseg, builder->dseg);
+  builder->ii->header->total_chunk_size += builder->packed_len;
+  builder->term_buffer = NULL;
+  builder->total_chunk_size += builder->packed_len;
+  builder->packed = NULL;
+  builder->packed_len = 0;
+}
+
+static void
+grn_ii_builder_merge_one(grn_ctx *ctx, grn_ii_builder *builder,
+                         grn_id tid, builder_block *hits[], int nhits)
+{
+  uint32_t *a = array_get(ctx, builder->ii, tid);
+  if (nhits == 1 && hits[0]->nrecs == 1 && hits[0]->nposts == 1) {
+    builder_block *block = hits[0];
+    a[0] = (block->recs[0] << 1) + 1;
+    a[1] = block->posts[0];
+    grn_ii_builder_fetch(ctx, builder, block);
+  } else {
+    uint64_t spos = 0;
+    uint32_t nrecs = 0;
+    uint32_t nposts = 0;
+    uint16_t nterm;
+    buffer_term *bt;
+    if (!builder->term_buffer) {
+      uint32_t lseg;
+      void *term_buffer;
+      for (lseg = 0; lseg < GRN_II_MAX_LSEG; lseg++) {
+        if (builder->ii->header->binfo[lseg] == NOT_ASSIGNED) { break; }
+      }
+      builder->lseg = lseg;
+      builder->dseg = segment_get(ctx, builder->ii);
+      GRN_IO_SEG_REF(builder->ii->seg, builder->dseg, term_buffer);
+      builder->term_buffer = (buffer *)term_buffer;
+    }
+    nterm = builder->term_buffer->header.nterms++;
+    bt = &builder->term_buffer->terms[nterm];
+    a[0] = SEG2POS(builder->lseg, (sizeof(buffer_header) + sizeof(buffer_term) * nterm));
+    {
+      int i;
+      for (i = 0; i < nhits; i++) {
+        builder_block *block = hits[i];
+        nrecs += block->nrecs;
+        nposts += block->nposts;
+      }
+    }
+    datavec_reset(ctx, builder->data_vectors, builder->ii->n_elements, nrecs, nrecs * 2 + nposts);
+    {
+      uint32_t *ridp = builder->data_vectors[0].data;
+      uint32_t *tfp = builder->data_vectors[1].data;
+      uint32_t *posp = builder->data_vectors[2].data;
+      uint32_t lr = 0;
+      int i;
+      for (i = 0; i < nhits; i++) {
+        builder_block *block = hits[i];
+        uint32_t *rp = block->recs;
+        uint32_t *tp = block->tfs;
+        uint32_t *pp = block->posts;
+        uint32_t n;
+        for (n = block->nrecs; n; n--) {
+          uint32_t lp = 0;
+          uint32_t np;
+          *ridp++ = *rp - lr; lr = *rp++;
+          for (np = *tp; np; np--) {
+            *posp = *pp - lp; lp = *pp++;
+            spos += *posp++;
+          }
+          *tfp++ = *tp++ - 1;
+        }
+        grn_ii_builder_fetch(ctx, builder, block);
+      }
+
+      if (ridp != builder->data_vectors[0].data + nrecs) {
+        GRN_LOG(ctx, GRN_LOG_WARNING, "wrong ridp! %d, %d",
+                ridp - builder->data_vectors[0].data, nrecs);
+      }
+      if (tfp != builder->data_vectors[1].data + nrecs) {
+        GRN_LOG(ctx, GRN_LOG_WARNING, "wrong tfp! %d, %d",
+                tfp - builder->data_vectors[1].data, nrecs);
+      }
+      if (posp != builder->data_vectors[2].data + nposts) {
+        GRN_LOG(ctx, GRN_LOG_WARNING, "wrong posp! %d, %d",
+                posp - builder->data_vectors[2].data, nposts);
+      }
+
+      builder->data_vectors[0].data_size = nrecs;
+      builder->data_vectors[1].data_size = nrecs;
+      builder->data_vectors[2].data_size = nposts;
+
+      builder->data_vectors[0].flags = ((nrecs < 16) || (nrecs <= (lr >> 8))) ? 0 : USE_P_ENC;
+      builder->data_vectors[1].flags = (nrecs < 3) ? 0 : USE_P_ENC;
+      builder->data_vectors[2].flags =
+        (((nposts < 32) || (nposts <= (spos >> 13))) ? 0 : USE_P_ENC)|ODD;
+    }
+    if (!builder->packed) { builder->packed = GRN_MALLOC(BUILD_PACKED_BUFFER_SIZE * 2); }
+    {
+      int packed_len = grn_p_encv(ctx, builder->data_vectors, builder->ii->n_elements,
+                                  builder->packed + builder->packed_len);
+      bt->tid = tid;
+      bt->size_in_buffer = 0;
+      bt->pos_in_buffer = 0;
+      bt->size_in_chunk = packed_len;
+      bt->pos_in_chunk = builder->packed_len;
+      builder->packed_len += packed_len;
+    }
+    if (nterm == BUILD_NTERMS_PER_BUFFER || builder->packed_len > BUILD_PACKED_BUFFER_SIZE) {
+      grn_ii_builder_chunk_flush(ctx, builder);
+    }
+  }
+}
+
+static void
+grn_ii_builder_merge(grn_ctx *ctx, grn_ii_builder *builder)
+{
+  builder->term_buffer = NULL;
+  builder->packed = NULL;
+  builder->packed_len = 0;
+  builder->total_chunk_size = 0;
+  builder->tmpfd = open(TMPFILE_PATH, O_RDONLY);
+  datavec_init(ctx, builder->data_vectors, builder->ii->n_elements, 0, 0);
+  {
+    uint32_t i;
+    for (i = 0; i < builder->nblocks; i++) {
+      grn_ii_builder_fetch(ctx, builder, &builder->blocks[i]);
+    }
+  }
+  {
+    builder_block *hits[builder->nblocks];
+    grn_id tid;
+    grn_table_cursor *tc;
+    tc = grn_table_cursor_open(ctx, builder->lexicon, NULL, 0, NULL, 0, 0, -1, BUILD_ORDER);
+    while ((tid = grn_table_cursor_next(ctx, tc)) != GRN_ID_NIL) {
+      int nrests = 0;
+      int nhits = 0;
+      uint32_t i;
+      for (i = 0; i < builder->nblocks; i++) {
+        if (builder->blocks[i].tid == tid) {
+          hits[nhits++] = &builder->blocks[i];
+        }
+        if (builder->blocks[i].tid) { nrests++; }
+      }
+      if (nhits) { grn_ii_builder_merge_one(ctx, builder, tid, hits, nhits); }
+      if (!nrests) { break; }
+      if (!nhits) {
+        GRN_LOG(ctx, GRN_LOG_WARNING, "merge error! tid=%d, nrests=%d", tid, nrests);
+        break;
+      }
+    }
+    if (builder->packed_len) {
+      grn_ii_builder_chunk_flush(ctx, builder);
+    }
+    grn_table_cursor_close(ctx, tc);
+  }
+  datavec_fin(ctx, builder->data_vectors);
+  GRN_LOG(ctx, GRN_LOG_NOTICE, "tmpfile_size:%d, total_chunk_size:%d",
+          builder->filepos, builder->total_chunk_size);
+  close(builder->tmpfd);
+  unlink(TMPFILE_PATH);
+}
+
+grn_rc
+grn_ii_build(grn_ctx *ctx, grn_ii *ii)
+{
+  grn_rc rc = GRN_INVALID_ARGUMENT;
+  grn_ii_builder builder;
+  grn_id *s = ii->obj.source;
+  grn_obj *src, *target;
+  if (!(ii->obj.source_size) || !s) { goto exit; }
+  if (!(src = grn_ctx_at(ctx, *s))) {
+    goto exit;
+  }
+  if (!(target = GRN_OBJ_TABLEP(src) ? src : grn_ctx_at(ctx, src->header.domain))) {
+    goto exit;
+  }
+
+  builder.ii = ii;
+  builder.source = src;
+  builder.target = target;
+  builder.lexicon = ii->lexicon;
+
+  builder.nblocks = 0;
+  builder.blocks = NULL;
+
+  grn_ii_builder_parse(ctx, &builder);
+  grn_ii_builder_merge(ctx, &builder);
+
+  {
+    uint32_t i;
+    for (i = 0; i < builder.nblocks; i++) {
+      if (builder.blocks[i].buffer) { GRN_FREE(builder.blocks[i].buffer); }
+    }
+  }
+  GRN_FREE(builder.blocks);
+exit :
+  return rc;
+}

  Modified: lib/ii.h (+2 -0)
===================================================================
--- lib/ii.h    2012-01-19 17:42:02 +0900 (c0c609e)
+++ lib/ii.h    2012-01-19 19:56:19 +0900 (c5d05f1)
@@ -192,6 +192,8 @@ grn_rc grn_ii_at(grn_ctx *ctx, grn_ii *ii, grn_id id, grn_hash *s, grn_operator
 void grn_ii_inspect_elements(grn_ctx *ctx, grn_ii *ii, grn_obj *buf);
 void grn_ii_cursor_inspect(grn_ctx *ctx, grn_ii_cursor *c, grn_obj *buf);
 
+grn_rc grn_ii_build(grn_ctx *ctx, grn_ii *ii);
+
 #ifdef __cplusplus
 }
 #endif




Groonga-commit メーリングリストの案内
Back to archive index