[Groonga-commit] groonga/groonga [master] grn_ii_buffer: calculate the outbuf size accurately.

Back to archive index

null+****@clear***** null+****@clear*****
2012年 2月 15日 (水) 11:06:17 JST


Daijiro MORI	2012-02-15 11:06:17 +0900 (Wed, 15 Feb 2012)

  New Revision: 57ffad03a73c3731ea18401b6695e986daf2b5a3

  Log:
    grn_ii_buffer: calculate the outbuf size accurately.

  Modified files:
    lib/ii.c

  Modified: lib/ii.c (+153 -102)
===================================================================
--- lib/ii.c    2012-02-14 20:03:27 +0900 (23f5e4a)
+++ lib/ii.c    2012-02-15 11:06:17 +0900 (090563e)
@@ -6405,20 +6405,17 @@ struct _grn_ii_buffer {
   uint64_t total_chunk_size;
 };
 
-static void
-grn_ii_buffer_flush(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
+static ii_buffer_block *
+block_new(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
 {
-  uint8_t *outbuf, *outbufp, *outbufp_;
   ii_buffer_block *block;
-  GRN_LOG(ctx, GRN_LOG_NOTICE, "flushing:%d npostings:%u",
-          ii_buffer->nblocks, ii_buffer->blockpos);
-  outbuf = (uint8_t *)GRN_MALLOC(ii_buffer->blockpos * 7 * sizeof(uint32_t));
-  /* if (!outbuf) { err } */
-  outbufp_ = outbufp = outbuf;
   if (!(ii_buffer->nblocks & 0x3ff)) {
-    ii_buffer_block *blocks = GRN_REALLOC(ii_buffer->blocks,
-                                        (ii_buffer->nblocks + 0x400) * sizeof(ii_buffer_block));
-    if (!blocks) { /* err */ }
+    ii_buffer_block *blocks;
+    if (!(blocks = GRN_REALLOC(ii_buffer->blocks,
+                         (ii_buffer->nblocks + 0x400) *
+                               sizeof(ii_buffer_block)))) {
+      return NULL;
+    }
     ii_buffer->blocks = blocks;
   }
   block = &ii_buffer->blocks[ii_buffer->nblocks];
@@ -6426,114 +6423,168 @@ grn_ii_buffer_flush(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
   block->rest = 0;
   block->buffer = NULL;
   block->buffersize = 0;
-  {
-    ii_buffer_counter *counter;
-    grn_id tid, tid_max = grn_table_size(ctx, ii_buffer->tmp_lexicon);
-    for (counter = ii_buffer->counters, tid = 1; tid <= tid_max; counter++, tid++) {
-      counter->offset_tf += GRN_B_ENC_SIZE(counter->last_tf - 1);
-      counter->last_rid = 0;
-      counter->last_tf = 0;
-    }
-  }
-  {
-    grn_id tid;
-    grn_table_cursor  *tc;
-    uint8_t *pnext = (uint8_t *)&block->nextsize;
-    tc = grn_table_cursor_open(ctx, ii_buffer->tmp_lexicon, NULL, 0, NULL, 0, 0, -1, II_BUFFER_ORDER);
-    while ((tid = grn_table_cursor_next(ctx, tc)) != GRN_ID_NIL) {
-      unsigned int key_size;
-      const char *key = _grn_table_key(ctx, ii_buffer->tmp_lexicon, tid, &key_size);
-      grn_id gtid = grn_table_add(ctx, ii_buffer->lexicon, key, key_size, NULL);
-      ii_buffer_counter *counter = &ii_buffer->counters[tid - 1];
-      if (counter->nrecs) {
-        uint32_t offset_rid = counter->offset_rid;
-        uint32_t offset_tf = counter->offset_tf;
-        uint32_t offset_pos = counter->offset_pos;
-        GRN_B_ENC(gtid, outbufp);
-        GRN_B_ENC(counter->nrecs, outbufp);
-        GRN_B_ENC(counter->nposts, outbufp);
-        counter->offset_rid = outbufp - outbuf;
-        outbufp += offset_rid;
-        counter->offset_tf = outbufp - outbuf;
-        outbufp += offset_tf;
-        counter->offset_pos = outbufp - outbuf;
-        outbufp += offset_pos;
-      }
-      if (outbufp_ + II_BUFFER_BLOCK_READ_UNIT_SIZE < outbufp) {
-        uint32_t size = outbufp - outbufp_ + sizeof(uint32_t);
-        memcpy(pnext, &size, sizeof(uint32_t));
-        pnext = outbufp;
-        outbufp += sizeof(uint32_t);
-        outbufp_ = outbufp;
-      }
-    }
-    grn_table_cursor_close(ctx, tc);
-    if (outbufp_ < outbufp) {
-      uint32_t size = outbufp - outbufp_;
+  return block;
+}
+
+static uint8_t *
+allocate_outbuf(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
+{
+  size_t bufsize = 0, bufsize_ = 0;
+  ii_buffer_counter *counter = ii_buffer->counters;
+  grn_id tid, tid_max = grn_table_size(ctx, ii_buffer->tmp_lexicon);
+  for (tid = 1; tid <= tid_max; counter++, tid++) {
+    counter->offset_tf += GRN_B_ENC_SIZE(counter->last_tf - 1);
+    counter->last_rid = 0;
+    counter->last_tf = 0;
+    bufsize += 5;
+    bufsize += GRN_B_ENC_SIZE(counter->nrecs);
+    bufsize += GRN_B_ENC_SIZE(counter->nposts);
+    bufsize += counter->offset_rid;
+    bufsize += counter->offset_tf;
+    bufsize += counter->offset_pos;
+    if (bufsize_ + II_BUFFER_BLOCK_READ_UNIT_SIZE < bufsize) {
+      bufsize += sizeof(uint32_t);
+      bufsize_ = bufsize;
+    }
+  }
+  GRN_LOG(ctx, GRN_LOG_INFO, "flushing:%d bufsize:%zu",
+          ii_buffer->nblocks, bufsize);
+  return (uint8_t *)GRN_MALLOC(bufsize);
+}
+
+static size_t
+encode_terms(grn_ctx *ctx, grn_ii_buffer *ii_buffer,
+             uint8_t *outbuf, ii_buffer_block *block)
+{
+  grn_id tid;
+  uint8_t *outbufp = outbuf;
+  uint8_t *outbufp_ = outbuf;
+  grn_table_cursor  *tc;
+  uint8_t *pnext = (uint8_t *)&block->nextsize;
+  tc = grn_table_cursor_open(ctx, ii_buffer->tmp_lexicon,
+                             NULL, 0, NULL, 0, 0, -1, II_BUFFER_ORDER);
+  while ((tid = grn_table_cursor_next(ctx, tc)) != GRN_ID_NIL) {
+    unsigned int key_size;
+    const char *key = _grn_table_key(ctx, ii_buffer->tmp_lexicon,
+                                     tid, &key_size);
+    grn_id gtid = grn_table_add(ctx, ii_buffer->lexicon, key, key_size, NULL);
+    ii_buffer_counter *counter = &ii_buffer->counters[tid - 1];
+    if (counter->nrecs) {
+      uint32_t offset_rid = counter->offset_rid;
+      uint32_t offset_tf = counter->offset_tf;
+      uint32_t offset_pos = counter->offset_pos;
+      GRN_B_ENC(gtid, outbufp);
+      GRN_B_ENC(counter->nrecs, outbufp);
+      GRN_B_ENC(counter->nposts, outbufp);
+      counter->offset_rid = outbufp - outbuf;
+      outbufp += offset_rid;
+      counter->offset_tf = outbufp - outbuf;
+      outbufp += offset_tf;
+      counter->offset_pos = outbufp - outbuf;
+      outbufp += offset_pos;
+    }
+    if (outbufp_ + II_BUFFER_BLOCK_READ_UNIT_SIZE < outbufp) {
+      uint32_t size = outbufp - outbufp_ + sizeof(uint32_t);
       memcpy(pnext, &size, sizeof(uint32_t));
+      pnext = outbufp;
+      outbufp += sizeof(uint32_t);
+      outbufp_ = outbufp;
     }
   }
-  {
-    grn_id rid = 0;
-    uint32_t pos = 0;
-    uint32_t rest;
-    grn_id *bp;
-    for (bp = ii_buffer->blockbuf, rest = ii_buffer->blockpos; rest; bp++, rest--) {
-      grn_id id = *bp;
-      if (id & II_BUFFER_RID_FLAG) {
-        rid = id - II_BUFFER_RID_FLAG;
-        pos = 0;
+  grn_table_cursor_close(ctx, tc);
+  if (outbufp_ < outbufp) {
+    uint32_t size = outbufp - outbufp_;
+    memcpy(pnext, &size, sizeof(uint32_t));
+  }
+  return outbufp - outbuf;
+}
+
+static void
+encode_postings(grn_ctx *ctx, grn_ii_buffer *ii_buffer, uint8_t *outbuf)
+{
+  grn_id rid = 0;
+  uint32_t pos = 0;
+  uint32_t rest;
+  grn_id *bp = ii_buffer->blockbuf;
+  for (rest = ii_buffer->blockpos; rest; bp++, rest--) {
+    grn_id id = *bp;
+    if (id & II_BUFFER_RID_FLAG) {
+      rid = id - II_BUFFER_RID_FLAG;
+      pos = 0;
+    } else {
+      ii_buffer_counter *counter = &ii_buffer->counters[id - 1];
+      if (counter->last_rid == rid) {
+        counter->last_tf++;
       } else {
-        ii_buffer_counter *counter = &ii_buffer->counters[id - 1];
-        if (counter->last_rid == rid) {
-          counter->last_tf++;
-        } else {
-          if (counter->last_tf) {
-            uint8_t *p = outbuf + counter->offset_tf;
-            GRN_B_ENC(counter->last_tf - 1, p);
-            counter->offset_tf = p - outbuf;
-          }
-          {
-            uint8_t *p = outbuf + counter->offset_rid;
-            GRN_B_ENC(rid - counter->last_rid, p);
-            counter->offset_rid = p - outbuf;
-          }
-          counter->last_rid = rid;
-          counter->last_sid = 0;
-          counter->last_tf = 1;
-          counter->last_pos = 0;
+        if (counter->last_tf) {
+          uint8_t *p = outbuf + counter->offset_tf;
+          GRN_B_ENC(counter->last_tf - 1, p);
+          counter->offset_tf = p - outbuf;
         }
         {
-          uint8_t *p = outbuf + counter->offset_pos;
-          GRN_B_ENC(pos - counter->last_pos, p);
-          counter->offset_pos = p - outbuf;
+          uint8_t *p = outbuf + counter->offset_rid;
+          GRN_B_ENC(rid - counter->last_rid, p);
+          counter->offset_rid = p - outbuf;
         }
-        counter->last_pos = pos;
-        pos++;
+        counter->last_rid = rid;
+        counter->last_sid = 0;
+        counter->last_tf = 1;
+        counter->last_pos = 0;
       }
+      {
+        uint8_t *p = outbuf + counter->offset_pos;
+        GRN_B_ENC(pos - counter->last_pos, p);
+        counter->offset_pos = p - outbuf;
+      }
+      counter->last_pos = pos;
+      pos++;
     }
   }
-  {
-    ii_buffer_counter *counter;
-    grn_id tid, tid_max = grn_table_size(ctx, ii_buffer->tmp_lexicon);
-    for (counter = ii_buffer->counters, tid = 1; tid <= tid_max; counter++, tid++) {
-      uint8_t *p = outbuf + counter->offset_tf;
-      GRN_B_ENC(counter->last_tf - 1, p);
-    }
-    memset(ii_buffer->counters, 0, tid_max * sizeof(ii_buffer_counter));
+}
+
+static void
+encode_last_tf(grn_ctx *ctx, grn_ii_buffer *ii_buffer, uint8_t *outbuf)
+{
+  ii_buffer_counter *counter = ii_buffer->counters;
+  grn_id tid, tid_max = grn_table_size(ctx, ii_buffer->tmp_lexicon);
+  for (tid = 1; tid <= tid_max; counter++, tid++) {
+    uint8_t *p = outbuf + counter->offset_tf;
+    GRN_B_ENC(counter->last_tf - 1, p);
   }
+}
+
+static void
+grn_ii_buffer_flush(grn_ctx *ctx, grn_ii_buffer *ii_buffer)
+{
+  size_t encsize;
+  uint8_t *outbuf;
+  ii_buffer_block *block;
+  GRN_LOG(ctx, GRN_LOG_NOTICE, "flushing:%d npostings:%u",
+          ii_buffer->nblocks, ii_buffer->blockpos);
+  if (!(block = block_new(ctx, ii_buffer))) { return; }
+  if (!(outbuf = allocate_outbuf(ctx, ii_buffer))) { return; }
+  encsize = encode_terms(ctx, ii_buffer, outbuf, block);
+  encode_postings(ctx, ii_buffer, outbuf);
+  encode_last_tf(ctx, ii_buffer, outbuf);
   {
-    ssize_t r = write(ii_buffer->tmpfd, outbuf, outbufp - outbuf);
-    if (r > 0) { ii_buffer->filepos += r; }
+    ssize_t r = write(ii_buffer->tmpfd, outbuf, encsize);
+    if (r != encsize) {
+      ERR(GRN_INPUT_OUTPUT_ERROR, "write returned %d != %d", r, encsize);
+      return;
+    }
+    ii_buffer->filepos += r;
     block->tail = ii_buffer->filepos;
-    GRN_LOG(ctx, GRN_LOG_NOTICE, "flushed: %d encoded_size:%jdKB",
-            ii_buffer->nblocks, r >> 10);
   }
-  ii_buffer->nblocks++;
   GRN_FREE(outbuf);
-  ii_buffer->blockpos = 0;
+  memset(ii_buffer->counters, 0,
+         grn_table_size(ctx, ii_buffer->tmp_lexicon) *
+         sizeof(ii_buffer_counter));
   grn_obj_close(ctx, ii_buffer->tmp_lexicon);
+  GRN_LOG(ctx, GRN_LOG_NOTICE, "flushed: %d encsize:%zu",
+          ii_buffer->nblocks, encsize);
   ii_buffer->tmp_lexicon = NULL;
+  ii_buffer->nblocks++;
+  ii_buffer->blockpos = 0;
 }
 
 const uint32_t PAT_CACHE_SIZE = 1<<20;




Groonga-commit メーリングリストの案内
Back to archive index