null+****@clear*****
null+****@clear*****
2012年 1月 19日 (木) 19:56:19 JST
Daijiro MORI 2012-01-19 19:56:19 +0900 (Thu, 19 Jan 2012)
New Revision: 9d6bcc7fa960736d7295955900da6373bbaabc97
Log:
Added an offline index builder
Modified files:
lib/ii.c
lib/ii.h
Modified: lib/ii.c (+496 -0)
===================================================================
--- lib/ii.c 2012-01-19 17:42:02 +0900 (6d0e8e3)
+++ lib/ii.c 2012-01-19 19:56:19 +0900 (a1be7c6)
@@ -6335,3 +6335,499 @@ grn_ii_inspect_elements(grn_ctx *ctx, grn_ii *ii, grn_obj *buf)
GRN_TEXT_PUTS(ctx, buf, "]");
}
+/********************** offline index builder ***********************/
+
+const grn_id BUILD_RID_FLAG = 0x80000000;
+#ifdef BUILD_ORDER_BY_ID
+const int BUILD_ORDER = GRN_CURSOR_BY_ID;
+#else /* BUILD_ORDER_BY_ID */
+const int BUILD_ORDER = GRN_CURSOR_BY_KEY;
+#endif /* BUILD_ORDER_BY_ID */
+const uint16_t BUILD_NTERMS_PER_BUFFER = 16300;
+const uint32_t BUILD_PACKED_BUFFER_SIZE = 0x4000000;
+const char *TMPFILE_PATH = "/tmp/grn_ii_builder_tmp";
+const uint32_t BUILD_NCOUNTERS_MARGIN = 0x100000;
+const size_t BUILD_BLOCK_SIZE = 0x100000;
+
+typedef struct {
+ uint32_t nrecs;
+ uint32_t nposts;
+ grn_id lastrec;
+ uint32_t offset;
+} builder_counter;
+
+typedef struct {
+ off_t head;
+ off_t tail;
+ uint32_t nextsize;
+ uint32_t *buffer;
+ uint32_t buffersize;
+ uint32_t *bufcur;
+ uint32_t rest;
+ grn_id tid;
+ uint32_t nrecs;
+ uint32_t nposts;
+ grn_id *recs;
+ uint32_t *tfs;
+ uint32_t *posts;
+} builder_block;
+
+typedef struct {
+ grn_obj *target;
+ grn_obj *lexicon;
+ grn_obj *source;
+ builder_block *blocks;
+ uint32_t nblocks;
+ int tmpfd;
+ // stuff for parsing
+ off_t filepos;
+ grn_id *blockbuf;
+ uint32_t blockpos;
+ builder_counter *counters;
+ uint32_t ncounters;
+ // stuff for merging
+ grn_ii *ii;
+ uint32_t lseg;
+ uint32_t dseg;
+ buffer *term_buffer;
+ datavec data_vectors[MAX_N_ELEMENTS + 1];
+ uint8_t *packed;
+ uint32_t packed_len;
+ uint32_t total_chunk_size;
+} grn_ii_builder;
+
+static void
+grn_ii_builder_flush(grn_ctx *ctx, grn_ii_builder *builder)
+{
+ uint32_t pos = 0, pos_ = 0;
+ uint32_t *outbuf = (uint32_t *)GRN_MALLOC(builder->blockpos * 3 * sizeof(uint32_t));
+ builder_block *block;
+ if (!(builder->nblocks & 0x3ff)) {
+ builder_block *blocks = GRN_REALLOC(builder->blocks,
+ (builder->nblocks + 0x400) * sizeof(builder_block));
+ if (!blocks) { /* err */ }
+ builder->blocks = blocks;
+ }
+ block = &builder->blocks[builder->nblocks];
+ block->head = builder->filepos;
+ block->rest = 0;
+ block->buffer = NULL;
+ block->buffersize = 0;
+ {
+ grn_id tid;
+ grn_table_cursor *tc;
+ uint32_t *pnext = &block->nextsize;
+ tc = grn_table_cursor_open(ctx, builder->lexicon, NULL, 0, NULL, 0, 0, -1, BUILD_ORDER);
+ while ((tid = grn_table_cursor_next(ctx, tc)) != GRN_ID_NIL) {
+ builder_counter *counter = &builder->counters[tid - 1];
+ if (counter->nrecs) {
+ uint32_t nposts = counter->nposts;
+ outbuf[pos++] = tid;
+ outbuf[pos++] = counter->nrecs;
+ outbuf[pos++] = counter->nposts;
+ counter->offset = pos;
+ outbuf[pos] = 0;
+ pos += counter->nrecs * 2;
+ counter->nposts = pos;
+ pos += nposts;
+ }
+ if (pos_ + 0x80000 < pos) {
+ *pnext = pos - pos_+ 1;
+ pnext = &outbuf[pos++];
+ pos_ = pos;
+ }
+ }
+ grn_table_cursor_close(ctx, tc);
+ if (pos_ < pos) { *pnext = pos - pos_; }
+ }
+ {
+ grn_id rid = 0;
+ uint32_t post = 0;
+ uint32_t rest;
+ grn_id *bp;
+ for (bp = builder->blockbuf, rest = builder->blockpos; rest; bp++, rest--) {
+ grn_id id = *bp;
+ if (id & BUILD_RID_FLAG) {
+ rid = id - BUILD_RID_FLAG;
+ post = 0;
+ } else {
+ builder_counter *counter = &builder->counters[id - 1];
+ outbuf[counter->nposts++] = post;
+ if (outbuf[counter->offset]) {
+ if (outbuf[counter->offset] == rid) {
+ outbuf[counter->offset + counter->nrecs]++;
+ } else {
+ outbuf[++counter->offset] = rid;
+ outbuf[counter->offset + counter->nrecs] = 1;
+ }
+ } else {
+ outbuf[counter->offset] = rid;
+ outbuf[counter->offset + counter->nrecs] = 1;
+ }
+ post++;
+ }
+ }
+ }
+ {
+ ssize_t r = write(builder->tmpfd, outbuf, pos * sizeof(uint32_t));
+ if (r > 0) { builder->filepos += r; }
+ block->tail = builder->filepos;
+ }
+ builder->nblocks++;
+ GRN_FREE(outbuf);
+ {
+ builder_counter *counter;
+ grn_id tid, tid_max = grn_table_size(ctx, builder->lexicon);
+ for (counter = builder->counters, tid = 1; tid <= tid_max; counter++, tid++) {
+ counter->nrecs = 0;
+ counter->nposts = 0;
+ counter->lastrec = 0;
+ counter->offset = 0;
+ }
+ }
+ builder->blockpos = 0;
+}
+
+static void
+grn_ii_builder_tokenize(grn_ctx *ctx, grn_ii_builder *builder, grn_id rid, grn_obj *value)
+{
+ uint32_t pos;
+ grn_token *token;
+ grn_id *buffer = builder->blockbuf;
+ if (BUILD_BLOCK_SIZE <= builder->blockpos + GRN_TEXT_LEN(value) * 2) {
+ grn_ii_builder_flush(ctx, builder);
+ }
+ pos = builder->blockpos;
+ buffer[pos++] = rid + BUILD_RID_FLAG;
+ if (GRN_TEXT_LEN(value) &&
+ (token = grn_token_open(ctx, builder->lexicon,
+ GRN_TEXT_VALUE(value), GRN_TEXT_LEN(value), grn_token_add))) {
+ uint32_t pos_ = pos;
+ while (!token->status) {
+ grn_id tid;
+ if ((tid = grn_token_next(ctx, token))) {
+ if (tid > builder->ncounters) {
+ uint32_t ncounters = grn_table_size(ctx, builder->lexicon) + BUILD_NCOUNTERS_MARGIN;
+ builder_counter *counters = GRN_REALLOC(builder->counters,
+ ncounters * sizeof(builder_counter));
+ if (!counters) { return; }
+ memset(&counters[builder->ncounters], 0,
+ (ncounters - builder->ncounters) * sizeof(builder_counter));
+ builder->ncounters = ncounters;
+ builder->counters = counters;
+ }
+ {
+ builder_counter *counter = &builder->counters[tid - 1];
+ buffer[pos++] = tid;
+ if (counter->lastrec != rid) {
+ counter->lastrec = rid;
+ counter->nrecs++;
+ }
+ counter->nposts++;
+ }
+ }
+ }
+ grn_token_close(ctx, token);
+ if (pos - pos_ > GRN_TEXT_LEN(value)) {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "%d > %d", pos - pos_, GRN_TEXT_LEN(value));
+ }
+ }
+ builder->blockpos = pos;
+}
+
+static void
+grn_ii_builder_parse(grn_ctx *ctx, grn_ii_builder *builder)
+{
+ grn_table_cursor *tc;
+ builder->ncounters = grn_table_size(ctx, builder->lexicon) + BUILD_NCOUNTERS_MARGIN;
+ builder->counters = GRN_CALLOC(builder->ncounters * sizeof(builder_counter));
+ builder->blockbuf = (grn_id *)GRN_MALLOC(BUILD_BLOCK_SIZE * sizeof(grn_id));
+ builder->blockpos = 0;
+ builder->tmpfd = open(TMPFILE_PATH, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, 0666);
+ builder->filepos = 0;
+ if ((tc = grn_table_cursor_open(ctx, builder->target,
+ NULL, 0, NULL, 0, 0, -1, GRN_CURSOR_BY_ID))) {
+ grn_id id;
+ grn_obj rv;
+ GRN_TEXT_INIT(&rv, 0);
+ while ((id = grn_table_cursor_next(ctx, tc)) != GRN_ID_NIL) {
+ GRN_BULK_REWIND(&rv);
+ grn_obj_get_value(ctx, builder->source, id, &rv);
+ grn_ii_builder_tokenize(ctx, builder, id, &rv);
+ }
+ GRN_OBJ_FIN(ctx, &rv);
+ if (builder->blockpos) {
+ grn_ii_builder_flush(ctx, builder);
+ }
+ grn_table_cursor_close(ctx, tc);
+ }
+ close(builder->tmpfd);
+ GRN_FREE(builder->blockbuf);
+ GRN_FREE(builder->counters);
+ GRN_LOG(ctx, GRN_LOG_WARNING, "nblocks: %d", builder->nblocks);
+}
+
+static void
+grn_ii_builder_fetch(grn_ctx *ctx, grn_ii_builder *builder, builder_block *block)
+{
+ if (!block->rest) {
+ if (block->head < block->tail) {
+ size_t bytesize = block->nextsize * sizeof(uint32_t);
+ if (block->buffersize < block->nextsize) {
+ void *r = GRN_REALLOC(block->buffer, bytesize);
+ if (r) {
+ block->buffer = (uint32_t *)r;
+ block->buffersize = block->nextsize;
+ } else {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "realloc: %d", bytesize);
+ return;
+ }
+ }
+ pread(builder->tmpfd, block->buffer, bytesize, block->head);
+ block->head += bytesize;
+ block->bufcur = block->buffer;
+ if (block->head >= block->tail) {
+ if (block->head > block->tail) {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "fetch error: %jd > %jd", block->head, block->tail);
+ }
+ block->rest = block->nextsize;
+ block->nextsize = 0;
+ } else {
+ block->rest = block->nextsize - 1;
+ block->nextsize = block->buffer[block->rest];
+ }
+ }
+ }
+ if (block->rest) {
+ block->tid = block->bufcur[0];
+ block->nrecs = block->bufcur[1];
+ block->nposts = block->bufcur[2];
+ {
+ uint32_t entrysize = 3 + block->nrecs * 2 + block->nposts;
+ if (entrysize > block->rest) {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "rest: %d, %d", block->rest, entrysize);
+ }
+ block->recs = block->bufcur + 3;
+ block->tfs = block->recs + block->nrecs;
+ block->posts = block->tfs + block->nrecs;
+ block->rest -= entrysize;
+ block->bufcur += entrysize;
+ }
+ } else {
+ block->tid = 0;
+ }
+}
+
+static void
+grn_ii_builder_chunk_flush(grn_ctx *ctx, grn_ii_builder *builder)
+{
+ grn_io_win io_win;
+ uint32_t chunk_number;
+ chunk_new(ctx, builder->ii, &chunk_number, builder->packed_len);
+ GRN_LOG(ctx, GRN_LOG_INFO, "chunk:%d, packed_len:%d",
+ chunk_number, builder->packed_len);
+ fake_map2(ctx, builder->ii->chunk, &io_win, builder->packed,
+ chunk_number, builder->packed_len);
+ grn_io_win_unmap2(&io_win);
+ builder->term_buffer->header.chunk = chunk_number;
+ builder->term_buffer->header.chunk_size = builder->packed_len;
+ builder->term_buffer->header.buffer_free =
+ S_SEGMENT - sizeof(buffer_header) -
+ builder->term_buffer->header.nterms * sizeof(buffer_term);
+ builder->term_buffer->header.nterms_void = 0;
+ buffer_segment_update(builder->ii, builder->lseg, builder->dseg);
+ builder->ii->header->total_chunk_size += builder->packed_len;
+ builder->term_buffer = NULL;
+ builder->total_chunk_size += builder->packed_len;
+ builder->packed = NULL;
+ builder->packed_len = 0;
+}
+
+static void
+grn_ii_builder_merge_one(grn_ctx *ctx, grn_ii_builder *builder,
+ grn_id tid, builder_block *hits[], int nhits)
+{
+ uint32_t *a = array_get(ctx, builder->ii, tid);
+ if (nhits == 1 && hits[0]->nrecs == 1 && hits[0]->nposts == 1) {
+ builder_block *block = hits[0];
+ a[0] = (block->recs[0] << 1) + 1;
+ a[1] = block->posts[0];
+ grn_ii_builder_fetch(ctx, builder, block);
+ } else {
+ uint64_t spos = 0;
+ uint32_t nrecs = 0;
+ uint32_t nposts = 0;
+ uint16_t nterm;
+ buffer_term *bt;
+ if (!builder->term_buffer) {
+ uint32_t lseg;
+ void *term_buffer;
+ for (lseg = 0; lseg < GRN_II_MAX_LSEG; lseg++) {
+ if (builder->ii->header->binfo[lseg] == NOT_ASSIGNED) { break; }
+ }
+ builder->lseg = lseg;
+ builder->dseg = segment_get(ctx, builder->ii);
+ GRN_IO_SEG_REF(builder->ii->seg, builder->dseg, term_buffer);
+ builder->term_buffer = (buffer *)term_buffer;
+ }
+ nterm = builder->term_buffer->header.nterms++;
+ bt = &builder->term_buffer->terms[nterm];
+ a[0] = SEG2POS(builder->lseg, (sizeof(buffer_header) + sizeof(buffer_term) * nterm));
+ {
+ int i;
+ for (i = 0; i < nhits; i++) {
+ builder_block *block = hits[i];
+ nrecs += block->nrecs;
+ nposts += block->nposts;
+ }
+ }
+ datavec_reset(ctx, builder->data_vectors, builder->ii->n_elements, nrecs, nrecs * 2 + nposts);
+ {
+ uint32_t *ridp = builder->data_vectors[0].data;
+ uint32_t *tfp = builder->data_vectors[1].data;
+ uint32_t *posp = builder->data_vectors[2].data;
+ uint32_t lr = 0;
+ int i;
+ for (i = 0; i < nhits; i++) {
+ builder_block *block = hits[i];
+ uint32_t *rp = block->recs;
+ uint32_t *tp = block->tfs;
+ uint32_t *pp = block->posts;
+ uint32_t n;
+ for (n = block->nrecs; n; n--) {
+ uint32_t lp = 0;
+ uint32_t np;
+ *ridp++ = *rp - lr; lr = *rp++;
+ for (np = *tp; np; np--) {
+ *posp = *pp - lp; lp = *pp++;
+ spos += *posp++;
+ }
+ *tfp++ = *tp++ - 1;
+ }
+ grn_ii_builder_fetch(ctx, builder, block);
+ }
+
+ if (ridp != builder->data_vectors[0].data + nrecs) {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "wrong ridp! %d, %d",
+ ridp - builder->data_vectors[0].data, nrecs);
+ }
+ if (tfp != builder->data_vectors[1].data + nrecs) {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "wrong tfp! %d, %d",
+ tfp - builder->data_vectors[1].data, nrecs);
+ }
+ if (posp != builder->data_vectors[2].data + nposts) {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "wrong posp! %d, %d",
+ posp - builder->data_vectors[2].data, nposts);
+ }
+
+ builder->data_vectors[0].data_size = nrecs;
+ builder->data_vectors[1].data_size = nrecs;
+ builder->data_vectors[2].data_size = nposts;
+
+ builder->data_vectors[0].flags = ((nrecs < 16) || (nrecs <= (lr >> 8))) ? 0 : USE_P_ENC;
+ builder->data_vectors[1].flags = (nrecs < 3) ? 0 : USE_P_ENC;
+ builder->data_vectors[2].flags =
+ (((nposts < 32) || (nposts <= (spos >> 13))) ? 0 : USE_P_ENC)|ODD;
+ }
+ if (!builder->packed) { builder->packed = GRN_MALLOC(BUILD_PACKED_BUFFER_SIZE * 2); }
+ {
+ int packed_len = grn_p_encv(ctx, builder->data_vectors, builder->ii->n_elements,
+ builder->packed + builder->packed_len);
+ bt->tid = tid;
+ bt->size_in_buffer = 0;
+ bt->pos_in_buffer = 0;
+ bt->size_in_chunk = packed_len;
+ bt->pos_in_chunk = builder->packed_len;
+ builder->packed_len += packed_len;
+ }
+ if (nterm == BUILD_NTERMS_PER_BUFFER || builder->packed_len > BUILD_PACKED_BUFFER_SIZE) {
+ grn_ii_builder_chunk_flush(ctx, builder);
+ }
+ }
+}
+
+static void
+grn_ii_builder_merge(grn_ctx *ctx, grn_ii_builder *builder)
+{
+ builder->term_buffer = NULL;
+ builder->packed = NULL;
+ builder->packed_len = 0;
+ builder->total_chunk_size = 0;
+ builder->tmpfd = open(TMPFILE_PATH, O_RDONLY);
+ datavec_init(ctx, builder->data_vectors, builder->ii->n_elements, 0, 0);
+ {
+ uint32_t i;
+ for (i = 0; i < builder->nblocks; i++) {
+ grn_ii_builder_fetch(ctx, builder, &builder->blocks[i]);
+ }
+ }
+ {
+ builder_block *hits[builder->nblocks];
+ grn_id tid;
+ grn_table_cursor *tc;
+ tc = grn_table_cursor_open(ctx, builder->lexicon, NULL, 0, NULL, 0, 0, -1, BUILD_ORDER);
+ while ((tid = grn_table_cursor_next(ctx, tc)) != GRN_ID_NIL) {
+ int nrests = 0;
+ int nhits = 0;
+ uint32_t i;
+ for (i = 0; i < builder->nblocks; i++) {
+ if (builder->blocks[i].tid == tid) {
+ hits[nhits++] = &builder->blocks[i];
+ }
+ if (builder->blocks[i].tid) { nrests++; }
+ }
+ if (nhits) { grn_ii_builder_merge_one(ctx, builder, tid, hits, nhits); }
+ if (!nrests) { break; }
+ if (!nhits) {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "merge error! tid=%d, nrests=%d", tid, nrests);
+ break;
+ }
+ }
+ if (builder->packed_len) {
+ grn_ii_builder_chunk_flush(ctx, builder);
+ }
+ grn_table_cursor_close(ctx, tc);
+ }
+ datavec_fin(ctx, builder->data_vectors);
+ GRN_LOG(ctx, GRN_LOG_NOTICE, "tmpfile_size:%d, total_chunk_size:%d",
+ builder->filepos, builder->total_chunk_size);
+ close(builder->tmpfd);
+ unlink(TMPFILE_PATH);
+}
+
+grn_rc
+grn_ii_build(grn_ctx *ctx, grn_ii *ii)
+{
+ grn_rc rc = GRN_INVALID_ARGUMENT;
+ grn_ii_builder builder;
+ grn_id *s = ii->obj.source;
+ grn_obj *src, *target;
+ if (!(ii->obj.source_size) || !s) { goto exit; }
+ if (!(src = grn_ctx_at(ctx, *s))) {
+ goto exit;
+ }
+ if (!(target = GRN_OBJ_TABLEP(src) ? src : grn_ctx_at(ctx, src->header.domain))) {
+ goto exit;
+ }
+
+ builder.ii = ii;
+ builder.source = src;
+ builder.target = target;
+ builder.lexicon = ii->lexicon;
+
+ builder.nblocks = 0;
+ builder.blocks = NULL;
+
+ grn_ii_builder_parse(ctx, &builder);
+ grn_ii_builder_merge(ctx, &builder);
+
+ {
+ uint32_t i;
+ for (i = 0; i < builder.nblocks; i++) {
+ if (builder.blocks[i].buffer) { GRN_FREE(builder.blocks[i].buffer); }
+ }
+ }
+ GRN_FREE(builder.blocks);
+exit :
+ return rc;
+}
Modified: lib/ii.h (+2 -0)
===================================================================
--- lib/ii.h 2012-01-19 17:42:02 +0900 (c0c609e)
+++ lib/ii.h 2012-01-19 19:56:19 +0900 (c5d05f1)
@@ -192,6 +192,8 @@ grn_rc grn_ii_at(grn_ctx *ctx, grn_ii *ii, grn_id id, grn_hash *s, grn_operator
void grn_ii_inspect_elements(grn_ctx *ctx, grn_ii *ii, grn_obj *buf);
void grn_ii_cursor_inspect(grn_ctx *ctx, grn_ii_cursor *c, grn_obj *buf);
+grn_rc grn_ii_build(grn_ctx *ctx, grn_ii *ii);
+
#ifdef __cplusplus
}
#endif