null+****@clear*****
null+****@clear*****
2012年 2月 15日 (水) 18:46:10 JST
Daijiro MORI 2012-02-15 18:46:10 +0900 (Wed, 15 Feb 2012)
New Revision: 6236f255bbb4ecbbfb10faeda69d28b1e3b13b3c
Log:
grn_ii_buffer: calculate the size of block_buf accurately.
Modified files:
lib/ii.c
Modified: lib/ii.c (+205 -155)
===================================================================
--- lib/ii.c 2012-02-15 18:41:55 +0900 (d14f8d0)
+++ lib/ii.c 2012-02-15 18:46:10 +0900 (6d91332)
@@ -6388,8 +6388,9 @@ struct _grn_ii_buffer {
int tmpfd;
// stuff for parsing
off_t filepos;
- grn_id *blockbuf;
- uint32_t blockpos;
+ grn_id *block_buf;
+ uint32_t block_buf_size;
+ uint32_t block_pos;
ii_buffer_counter *counters;
uint32_t ncounters;
// stuff for merging
@@ -6504,8 +6505,8 @@ encode_postings(grn_ctx *ctx, grn_ii_buffer *ii_buffer, uint8_t *outbuf)
grn_id rid = 0;
uint32_t pos = 0;
uint32_t rest;
- grn_id *bp = ii_buffer->blockbuf;
- for (rest = ii_buffer->blockpos; rest; bp++, rest--) {
+ grn_id *bp = ii_buffer->block_buf;
+ for (rest = ii_buffer->block_pos; rest; bp++, rest--) {
grn_id id = *bp;
if (id & II_BUFFER_RID_FLAG) {
rid = id - II_BUFFER_RID_FLAG;
@@ -6559,7 +6560,7 @@ grn_ii_buffer_flush(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
uint8_t *outbuf;
ii_buffer_block *block;
GRN_LOG(ctx, GRN_LOG_NOTICE, "flushing:%d npostings:%u",
- ii_buffer->nblocks, ii_buffer->blockpos);
+ ii_buffer->nblocks, ii_buffer->block_pos);
if (!(block = block_new(ctx, ii_buffer))) { return; }
if (!(outbuf = allocate_outbuf(ctx, ii_buffer))) { return; }
encsize = encode_terms(ctx, ii_buffer, outbuf, block);
@@ -6583,57 +6584,86 @@ grn_ii_buffer_flush(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
ii_buffer->nblocks, encsize);
ii_buffer->tmp_lexicon = NULL;
ii_buffer->nblocks++;
- ii_buffer->blockpos = 0;
+ ii_buffer->block_pos = 0;
}
const uint32_t PAT_CACHE_SIZE = 1<<20;
+static grn_obj *
+get_tmp_lexicon(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
+{
+ grn_obj *tmp_lexicon = ii_buffer->tmp_lexicon;
+ if (!tmp_lexicon) {
+ grn_obj *domain = grn_ctx_at(ctx, ii_buffer->lexicon->header.domain);
+ grn_obj *range = grn_ctx_at(ctx, DB_OBJ(ii_buffer->lexicon)->range);
+ grn_obj *tokenizer;
+ grn_obj_flags flags;
+ grn_table_get_info(ctx, ii_buffer->lexicon, &flags, NULL, &tokenizer);
+ flags &= ~GRN_OBJ_PERSISTENT;
+ tmp_lexicon = grn_table_create(ctx, NULL, 0, NULL, flags, domain, range);
+ if (tmp_lexicon) {
+ ii_buffer->tmp_lexicon = tmp_lexicon;
+ grn_obj_set_info(ctx, tmp_lexicon,
+ GRN_INFO_DEFAULT_TOKENIZER, tokenizer);
+ if (flags & GRN_OBJ_TABLE_PAT_KEY) {
+ grn_pat_cache_enable(ctx, (grn_pat *)tmp_lexicon, PAT_CACHE_SIZE);
+ }
+ }
+ }
+ return tmp_lexicon;
+}
+
+static ii_buffer_counter *
+get_buffer_counter(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
+ grn_obj *tmp_lexicon, grn_id tid)
+{
+ if (tid > ii_buffer->ncounters) {
+ ii_buffer_counter *counters;
+ uint32_t ncounters =
+ grn_table_size(ctx, tmp_lexicon) + II_BUFFER_NCOUNTERS_MARGIN;
+ counters = GRN_REALLOC(ii_buffer->counters,
+ ncounters * sizeof(ii_buffer_counter));
+ if (!counters) { return NULL; }
+ memset(&counters[ii_buffer->ncounters], 0,
+ (ncounters - ii_buffer->ncounters) * sizeof(ii_buffer_counter));
+ ii_buffer->ncounters = ncounters;
+ ii_buffer->counters = counters;
+ }
+ return &ii_buffer->counters[tid - 1];
+}
+
static void
grn_ii_buffer_tokenize(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
grn_id rid, unsigned int section, grn_obj *value)
{
- if (GRN_TEXT_LEN(value)) {
- uint32_t blockpos;
- grn_token *token;
- grn_id *buffer = ii_buffer->blockbuf;
- if (II_BUFFER_BLOCK_SIZE <= ii_buffer->blockpos + GRN_TEXT_LEN(value) * 2) {
+ uint32_t value_len = GRN_TEXT_LEN(value);
+ if (value_len) {
+ grn_obj *tmp_lexicon;
+ if (ii_buffer->block_buf_size < ii_buffer->block_pos + value_len) {
grn_ii_buffer_flush(ctx, ii_buffer);
}
- if (!ii_buffer->tmp_lexicon) {
- grn_obj *domain = grn_ctx_at(ctx, ii_buffer->lexicon->header.domain);
- grn_obj *range = grn_ctx_at(ctx, DB_OBJ(ii_buffer->lexicon)->range);
- grn_obj *tokenizer;
- grn_obj_flags flags;
- grn_table_get_info(ctx, ii_buffer->lexicon, &flags, NULL, &tokenizer);
- flags &= ~GRN_OBJ_PERSISTENT;
- ii_buffer->tmp_lexicon = grn_table_create(ctx, NULL, 0, NULL, flags, domain, range);
- grn_obj_set_info(ctx, ii_buffer->tmp_lexicon, GRN_INFO_DEFAULT_TOKENIZER, tokenizer);
- if (flags & GRN_OBJ_TABLE_PAT_KEY) {
- grn_pat_cache_enable(ctx, (grn_pat *)ii_buffer->tmp_lexicon, PAT_CACHE_SIZE);
- }
- }
- blockpos = ii_buffer->blockpos;
- buffer[blockpos++] = rid + II_BUFFER_RID_FLAG;
- if ((token = grn_token_open(ctx, ii_buffer->tmp_lexicon,
- GRN_TEXT_VALUE(value), GRN_TEXT_LEN(value), grn_token_add))) {
- uint32_t pos, blockpos_ = blockpos;
- for (pos = 0; !token->status; pos++) {
- grn_id tid;
- if ((tid = grn_token_next(ctx, token))) {
- if (tid > ii_buffer->ncounters) {
- uint32_t ncounters = grn_table_size(ctx, ii_buffer->tmp_lexicon) +
- II_BUFFER_NCOUNTERS_MARGIN;
- ii_buffer_counter *counters = GRN_REALLOC(ii_buffer->counters,
- ncounters * sizeof(ii_buffer_counter));
- if (!counters) { return; }
- memset(&counters[ii_buffer->ncounters], 0,
- (ncounters - ii_buffer->ncounters) * sizeof(ii_buffer_counter));
- ii_buffer->ncounters = ncounters;
- ii_buffer->counters = counters;
- }
- {
- ii_buffer_counter *counter = &ii_buffer->counters[tid - 1];
- buffer[blockpos++] = tid;
+ if (ii_buffer->block_buf_size < value_len) {
+ grn_id *block_buf = (grn_id *)GRN_REALLOC(ii_buffer->block_buf,
+ value_len * sizeof(grn_id));
+ if (!block_buf) { return; }
+ ii_buffer->block_buf = block_buf;
+ ii_buffer->block_buf_size = value_len;
+ }
+ if ((tmp_lexicon = get_tmp_lexicon(ctx, ii_buffer))) {
+ grn_token *token;
+ grn_id *buffer = ii_buffer->block_buf;
+ uint32_t block_pos = ii_buffer->block_pos;
+ buffer[block_pos++] = rid + II_BUFFER_RID_FLAG;
+ if ((token = grn_token_open(ctx, tmp_lexicon, GRN_TEXT_VALUE(value),
+ value_len, grn_token_add))) {
+ uint32_t pos;
+ for (pos = 0; !token->status; pos++) {
+ grn_id tid;
+ if ((tid = grn_token_next(ctx, token))) {
+ ii_buffer_counter *counter;
+ counter = get_buffer_counter(ctx, ii_buffer, tmp_lexicon, tid);
+ if (!counter) { return; }
+ buffer[block_pos++] = tid;
if (counter->last_rid != rid) {
counter->offset_rid += GRN_B_ENC_SIZE(rid - counter->last_rid);
counter->last_rid = rid;
@@ -6642,28 +6672,28 @@ grn_ii_buffer_tokenize(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
counter->last_tf = 0;
}
counter->last_pos = 0;
-
counter->nrecs++;
}
counter->offset_pos += GRN_B_ENC_SIZE(pos - counter->last_pos);
counter->last_pos = pos;
counter->last_tf++;
-
counter->nposts++;
}
}
+ grn_token_close(ctx, token);
+ if (ii_buffer->block_pos + value_len < block_pos) {
+ GRN_LOG(ctx, GRN_LOG_WARNING, "%d < %d",
+ value_len, block_pos - ii_buffer->block_pos);
+ }
}
- grn_token_close(ctx, token);
- if (blockpos - blockpos_ > GRN_TEXT_LEN(value)) {
- GRN_LOG(ctx, GRN_LOG_WARNING, "%d > %d", blockpos - blockpos_, GRN_TEXT_LEN(value));
- }
+ ii_buffer->block_pos = block_pos;
}
- ii_buffer->blockpos = blockpos;
}
}
static void
-grn_ii_buffer_fetch(grn_ctx *ctx, grn_ii_buffer *ii_buffer, ii_buffer_block *block)
+grn_ii_buffer_fetch(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
+ ii_buffer_block *block)
{
if (!block->rest) {
if (block->head < block->tail) {
@@ -6683,13 +6713,15 @@ grn_ii_buffer_fetch(grn_ctx *ctx, grn_ii_buffer *ii_buffer, ii_buffer_block *blo
block->bufcur = block->buffer;
if (block->head >= block->tail) {
if (block->head > block->tail) {
- GRN_LOG(ctx, GRN_LOG_WARNING, "fetch error: %jd > %jd", block->head, block->tail);
+ GRN_LOG(ctx, GRN_LOG_WARNING,
+ "fetch error: %jd > %jd", block->head, block->tail);
}
block->rest = block->nextsize;
block->nextsize = 0;
} else {
block->rest = block->nextsize - sizeof(uint32_t);
- memcpy(&block->nextsize, &block->buffer[block->rest], sizeof(uint32_t));
+ memcpy(&block->nextsize,
+ &block->buffer[block->rest], sizeof(uint32_t));
}
}
}
@@ -6735,6 +6767,88 @@ grn_ii_buffer_chunk_flush(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
ii_buffer->packed_buf_size = 0;
}
+static uint32_t
+merge_hit_blocks(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
+ ii_buffer_block *hits[], int nhits)
+{
+ uint32_t nrecs = 0;
+ uint32_t nposts = 0;
+ uint32_t max_size;
+ int i;
+ for (i = 0; i < nhits; i++) {
+ ii_buffer_block *block = hits[i];
+ nrecs += block->nrecs;
+ nposts += block->nposts;
+ }
+ max_size = nrecs * 2 + nposts;
+ datavec_reset(ctx, ii_buffer->data_vectors,
+ ii_buffer->ii->n_elements, nrecs, max_size);
+ {
+ uint32_t *ridp = ii_buffer->data_vectors[0].data;
+ uint32_t *tfp = ii_buffer->data_vectors[1].data;
+ uint32_t *posp = ii_buffer->data_vectors[2].data;
+ uint32_t lr = 0;
+ uint64_t spos = 0;
+ int i;
+ for (i = 0; i < nhits; i++) {
+ ii_buffer_block *block = hits[i];
+ uint8_t *p = block->bufcur;
+ uint32_t n = block->nrecs;
+ if (n) {
+ GRN_B_DEC(*ridp, p);
+ *ridp -= lr;
+ lr += *ridp++;
+ while (--n) {
+ GRN_B_DEC(*ridp, p);
+ lr += *ridp++;
+ }
+ }
+ for (n = block->nrecs; n; n--) {
+ GRN_B_DEC(*tfp++, p);
+ }
+ for (n = block->nposts; n; n--) {
+ GRN_B_DEC(*posp, p);
+ spos += *posp++;
+ }
+ block->rest -= (p - block->bufcur);
+ block->bufcur = p;
+ grn_ii_buffer_fetch(ctx, ii_buffer, block);
+ }
+ ii_buffer->data_vectors[0].data_size = nrecs;
+ ii_buffer->data_vectors[1].data_size = nrecs;
+ ii_buffer->data_vectors[2].data_size = nposts;
+
+ ii_buffer->data_vectors[0].flags =
+ ((nrecs < 16) || (nrecs <= (lr >> 8))) ? 0 : USE_P_ENC;
+ ii_buffer->data_vectors[1].flags =
+ (nrecs < 3) ? 0 : USE_P_ENC;
+ ii_buffer->data_vectors[2].flags =
+ (((nposts < 32) || (nposts <= (spos >> 13))) ? 0 : USE_P_ENC)|ODD;
+ }
+ return max_size;
+}
+
+static buffer *
+get_term_buffer(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
+{
+ if (!ii_buffer->term_buffer) {
+ uint32_t lseg;
+ void *term_buffer;
+ for (lseg = 0; lseg < GRN_II_MAX_LSEG; lseg++) {
+ if (ii_buffer->ii->header->binfo[lseg] == NOT_ASSIGNED) { break; }
+ }
+ if (lseg == GRN_II_MAX_LSEG) {
+ ERR(GRN_NO_MEMORY_AVAILABLE, "segment allocate failed");
+ return NULL;
+ }
+ ii_buffer->lseg = lseg;
+ ii_buffer->dseg = segment_get(ctx, ii_buffer->ii);
+ GRN_IO_SEG_REF(ii_buffer->ii->seg, ii_buffer->dseg, term_buffer);
+ ii_buffer->term_buffer = (buffer *)term_buffer;
+ }
+ return ii_buffer->term_buffer;
+}
+
static void
grn_ii_buffer_merge(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
grn_id tid, ii_buffer_block *hits[], int nhits)
@@ -6757,105 +6871,40 @@ grn_ii_buffer_merge(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
a[1] = pos;
grn_ii_buffer_fetch(ctx, ii_buffer, block);
} else {
- uint64_t spos = 0;
- uint32_t nrecs = 0;
- uint32_t nposts = 0;
- {
- int i;
- for (i = 0; i < nhits; i++) {
- ii_buffer_block *block = hits[i];
- nrecs += block->nrecs;
- nposts += block->nposts;
- }
- }
- datavec_reset(ctx, ii_buffer->data_vectors, ii_buffer->ii->n_elements, nrecs, nrecs * 2 + nposts);
- {
- uint32_t *ridp = ii_buffer->data_vectors[0].data;
- uint32_t *tfp = ii_buffer->data_vectors[1].data;
- uint32_t *posp = ii_buffer->data_vectors[2].data;
- uint32_t lr = 0;
- int i;
- for (i = 0; i < nhits; i++) {
- ii_buffer_block *block = hits[i];
- uint8_t *p = block->bufcur;
- uint32_t n = block->nrecs;
- if (n) {
- GRN_B_DEC(*ridp, p);
- *ridp -= lr;
- lr += *ridp++;
- while (--n) {
- GRN_B_DEC(*ridp, p);
- lr += *ridp++;
- }
- }
- for (n = block->nrecs; n; n--) {
- GRN_B_DEC(*tfp++, p);
- }
- for (n = block->nposts; n; n--) {
- GRN_B_DEC(*posp, p);
- spos += *posp++;
- }
- block->rest -= (p - block->bufcur);
- block->bufcur = p;
- grn_ii_buffer_fetch(ctx, ii_buffer, block);
- }
- ii_buffer->data_vectors[0].data_size = nrecs;
- ii_buffer->data_vectors[1].data_size = nrecs;
- ii_buffer->data_vectors[2].data_size = nposts;
-
- ii_buffer->data_vectors[0].flags =
- ((nrecs < 16) || (nrecs <= (lr >> 8))) ? 0 : USE_P_ENC;
- ii_buffer->data_vectors[1].flags =
- (nrecs < 3) ? 0 : USE_P_ENC;
- ii_buffer->data_vectors[2].flags =
- (((nposts < 32) || (nposts <= (spos >> 13))) ? 0 : USE_P_ENC)|ODD;
+ uint32_t max_size = merge_hit_blocks(ctx, ii_buffer, hits, nhits);
+ if (ii_buffer->packed_buf &&
+ ii_buffer->packed_buf_size <
+ ii_buffer->packed_len + max_size) {
+ grn_ii_buffer_chunk_flush(ctx, ii_buffer);
}
- {
- uint32_t max_size = (nrecs * 2 + nposts);
- if (ii_buffer->packed_buf &&
- ii_buffer->packed_buf_size <
- ii_buffer->packed_len + max_size) {
- grn_ii_buffer_chunk_flush(ctx, ii_buffer);
- }
- if (!ii_buffer->packed_buf) {
- uint32_t buf_size = (max_size > II_BUFFER_PACKED_BUF_SIZE)
- ? max_size : II_BUFFER_PACKED_BUF_SIZE;
- if ((ii_buffer->packed_buf = GRN_MALLOC(buf_size))) {
- ii_buffer->packed_buf_size = buf_size;
- }
+ if (!ii_buffer->packed_buf) {
+ uint32_t buf_size = (max_size > II_BUFFER_PACKED_BUF_SIZE)
+ ? max_size : II_BUFFER_PACKED_BUF_SIZE;
+ if ((ii_buffer->packed_buf = GRN_MALLOC(buf_size))) {
+ ii_buffer->packed_buf_size = buf_size;
}
}
{
uint16_t nterm;
+ int packed_len;
buffer_term *bt;
- if (!ii_buffer->term_buffer) {
- uint32_t lseg;
- void *term_buffer;
- for (lseg = 0; lseg < GRN_II_MAX_LSEG; lseg++) {
- if (ii_buffer->ii->header->binfo[lseg] == NOT_ASSIGNED) { break; }
- }
- ii_buffer->lseg = lseg;
- ii_buffer->dseg = segment_get(ctx, ii_buffer->ii);
- GRN_IO_SEG_REF(ii_buffer->ii->seg, ii_buffer->dseg, term_buffer);
- ii_buffer->term_buffer = (buffer *)term_buffer;
- }
- nterm = ii_buffer->term_buffer->header.nterms++;
- bt = &ii_buffer->term_buffer->terms[nterm];
+ buffer *term_buffer = get_term_buffer(ctx, ii_buffer);
+ if (!term_buffer) { return; }
+ nterm = term_buffer->header.nterms++;
+ bt = &term_buffer->terms[nterm];
a[0] = SEG2POS(ii_buffer->lseg,
(sizeof(buffer_header) + sizeof(buffer_term) * nterm));
- {
- int packed_len = grn_p_encv(ctx, ii_buffer->data_vectors,
- ii_buffer->ii->n_elements,
- ii_buffer->packed_buf +
- ii_buffer->packed_len);
- bt->tid = tid;
- bt->size_in_buffer = 0;
- bt->pos_in_buffer = 0;
- bt->size_in_chunk = packed_len;
- bt->pos_in_chunk = ii_buffer->packed_len;
- ii_buffer->packed_len += packed_len;
- }
- if (ii_buffer->term_buffer->header.nterms == II_BUFFER_NTERMS_PER_BUFFER) {
+ packed_len = grn_p_encv(ctx, ii_buffer->data_vectors,
+ ii_buffer->ii->n_elements,
+ ii_buffer->packed_buf +
+ ii_buffer->packed_len);
+ bt->tid = tid;
+ bt->size_in_buffer = 0;
+ bt->pos_in_buffer = 0;
+ bt->size_in_chunk = packed_len;
+ bt->pos_in_chunk = ii_buffer->packed_len;
+ ii_buffer->packed_len += packed_len;
+ if (term_buffer->header.nterms == II_BUFFER_NTERMS_PER_BUFFER) {
grn_ii_buffer_chunk_flush(ctx, ii_buffer);
}
}
@@ -6874,27 +6923,28 @@ grn_ii_buffer_open(grn_ctx *ctx, grn_ii *ii)
ii_buffer->nblocks = 0;
ii_buffer->blocks = NULL;
ii_buffer->ncounters = II_BUFFER_NCOUNTERS_MARGIN;
- ii_buffer->blockpos = 0;
+ ii_buffer->block_pos = 0;
ii_buffer->filepos = 0;
ii_buffer->counters = GRN_CALLOC(ii_buffer->ncounters *
sizeof(ii_buffer_counter));
if (ii_buffer->counters) {
- ii_buffer->blockbuf = (grn_id *)GRN_MALLOC(II_BUFFER_BLOCK_SIZE *
- sizeof(grn_id));
- if (ii_buffer->blockbuf) {
+ ii_buffer->block_buf = GRN_MALLOCN(grn_id, II_BUFFER_BLOCK_SIZE);
+ if (ii_buffer->block_buf) {
+ ii_buffer->block_buf_size = II_BUFFER_BLOCK_SIZE;
ii_buffer->tmpfd = open(TMPFILE_PATH,
O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, 0666);
if (ii_buffer->tmpfd) {
grn_obj_flags flags;
grn_table_get_info(ctx, ii->lexicon, &flags, NULL, NULL);
if (flags & GRN_OBJ_TABLE_PAT_KEY) {
- grn_pat_cache_enable(ctx, (grn_pat *)ii->lexicon, PAT_CACHE_SIZE);
+ grn_pat_cache_enable(ctx, (grn_pat *)ii->lexicon,
+ PAT_CACHE_SIZE);
}
return ii_buffer;
} else {
ERR(GRN_INVALID_ARGUMENT, "temporary file open failed");
}
- GRN_FREE(ii_buffer->blockbuf);
+ GRN_FREE(ii_buffer->block_buf);
}
GRN_FREE(ii_buffer->counters);
}
@@ -6917,12 +6967,12 @@ grn_ii_buffer_append(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
grn_rc
grn_ii_buffer_commit(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
{
- if (ii_buffer->blockpos) {
+ if (ii_buffer->block_pos) {
grn_ii_buffer_flush(ctx, ii_buffer);
}
close(ii_buffer->tmpfd);
- GRN_FREE(ii_buffer->blockbuf);
+ GRN_FREE(ii_buffer->block_buf);
GRN_FREE(ii_buffer->counters);
GRN_LOG(ctx, GRN_LOG_NOTICE, "nblocks: %d", ii_buffer->nblocks);