[Groonga-commit] groonga/groonga [master] table: implement push/pull commands.

Back to archive index

null+****@clear***** null+****@clear*****
2012年 6月 15日 (金) 16:14:16 JST


Daijiro MORI	2012-06-15 16:14:16 +0900 (Fri, 15 Jun 2012)

  New Revision: d5e3c445b2ffbcdd50534f1deb83958b1d9bae5c

  Log:
    table: implement push/pull commands.

  Modified files:
    lib/hash.c
    lib/hash.h
    plugins/table/table.c

  Modified: lib/hash.c (+19 -10)
===================================================================
--- lib/hash.c    2012-06-15 14:49:48 +0900 (e0d0cfc)
+++ lib/hash.c    2012-06-15 16:14:16 +0900 (ab5fc33)
@@ -320,16 +320,6 @@ grn_io_array_bit_flip(grn_ctx *ctx, grn_io *io,
 
 /* grn_table_queue */
 
-typedef struct _grn_table_queue grn_table_queue;
-
-struct _grn_table_queue {
-  grn_mutex mutex;
-  grn_cond cond;
-  grn_id head;
-  grn_id tail;
-  grn_id cap;
-};
-
 static void
 grn_table_queue_lock_init(grn_ctx *ctx, grn_table_queue *queue)
 {
@@ -493,6 +483,18 @@ grn_array_queue_lock_clear(grn_ctx *ctx, grn_array *array)
   grn_table_queue_lock_init(ctx, &header->queue);
 }
 
+grn_table_queue *
+grn_array_queue(grn_ctx *ctx, grn_array *array)
+{
+  if (grn_array_is_io_array(array)) {
+    struct grn_array_header *header;
+    header = grn_io_header(array->io);
+    return &header->queue;
+  } else {
+    return NULL;
+  }
+}
+
 static grn_rc
 grn_array_init(grn_ctx *ctx, grn_array *array,
                const char *path, uint32_t value_size, uint32_t flags)
@@ -1023,6 +1025,13 @@ grn_array_add_to_io_array(grn_ctx *ctx, grn_array *array, void **value)
   return id;
 }
 
+void
+grn_array_clear_curr_rec(grn_ctx *ctx, grn_array *array)
+{
+  struct grn_array_header * const header = array->header;
+  header->curr_rec = GRN_ID_NIL;
+}
+
 grn_id
 grn_array_add(grn_ctx *ctx, grn_array *array, void **value)
 {

  Modified: lib/hash.h (+16 -1)
===================================================================
--- lib/hash.h    2012-06-15 14:49:48 +0900 (03f00f2)
+++ lib/hash.h    2012-06-15 16:14:16 +0900 (ded36fc)
@@ -166,7 +166,6 @@ struct _grn_array_cursor {
 grn_rc grn_array_truncate(grn_ctx *ctx, grn_array *array);
 grn_rc grn_array_copy_sort_key(grn_ctx *ctx, grn_array *array,
                                grn_table_sort_key *keys, int n_keys);
-void grn_array_queue_lock_clear(grn_ctx *ctx, grn_array *array);
 
 /**** grn_hash ****/
 
@@ -279,6 +278,22 @@ grn_rc grn_hash_clear_lock(grn_ctx *ctx, grn_hash *hash);
 
 #define GRN_HASH_SIZE(hash) (*((hash)->n_entries))
 
+/* grn_table_queue */
+
+typedef struct _grn_table_queue grn_table_queue;
+
+struct _grn_table_queue {
+  grn_mutex mutex;
+  grn_cond cond;
+  grn_id head;
+  grn_id tail;
+  grn_id cap;
+};
+
+void grn_array_queue_lock_clear(grn_ctx *ctx, grn_array *array);
+void grn_array_clear_curr_rec(grn_ctx *ctx, grn_array *array);
+grn_table_queue *grn_array_queue(grn_ctx *ctx, grn_array *array);
+
 /* private */
 typedef enum {
   grn_rec_document = 0,

  Modified: plugins/table/table.c (+148 -0)
===================================================================
--- plugins/table/table.c    2012-06-15 14:49:48 +0900 (51035dc)
+++ plugins/table/table.c    2012-06-15 16:14:16 +0900 (f6bb508)
@@ -317,6 +317,148 @@ command_set(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data)
   return NULL;
 }
 
+uint32_t
+grn_table_queue_size(grn_table_queue *queue)
+{
+  return (queue->head < queue->tail)
+    ? 2 * queue->cap + queue->head - queue->tail
+    : queue->head - queue->tail;
+}
+
+void
+grn_table_queue_head_increment(grn_table_queue *queue)
+{
+  if (queue->head == 2 * queue->cap) {
+    queue->head = 1;
+  } else {
+    queue->head++;
+  }
+}
+
+void
+grn_table_queue_tail_increment(grn_table_queue *queue)
+{
+  if (queue->tail == 2 * queue->cap) {
+    queue->tail = 1;
+  } else {
+    queue->tail++;
+  }
+}
+
+grn_id
+grn_table_queue_head(grn_table_queue *queue)
+{
+  return queue->head > queue->cap
+    ? queue->head - queue->cap
+    : queue->head;
+}
+
+grn_id
+grn_table_queue_tail(grn_table_queue *queue)
+{
+  return queue->tail > queue->cap
+    ? queue->tail - queue->cap
+    : queue->tail;
+}
+
+static grn_obj *
+command_push(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data)
+{
+  grn_obj *table = grn_ctx_get(ctx, GRN_TEXT_VALUE(VAR(0)), GRN_TEXT_LEN(VAR(0)));
+  if (table) {
+    switch (table->header.type) {
+    case GRN_TABLE_NO_KEY:
+      {
+        grn_array *array = (grn_array *)table;
+        grn_table_queue *queue = grn_array_queue(ctx, array);
+        if (queue) {
+          MUTEX_LOCK(queue->mutex);
+          if (grn_table_queue_head(queue) == queue->cap) {
+            grn_array_clear_curr_rec(ctx, array);
+          }
+          grn_load_(ctx, GRN_CONTENT_JSON,
+                    GRN_TEXT_VALUE(VAR(0)), GRN_TEXT_LEN(VAR(0)),
+                    NULL, 0,
+                    GRN_TEXT_VALUE(VAR(1)), GRN_TEXT_LEN(VAR(1)),
+                    NULL, 0, NULL, 0, 0);
+          if (grn_table_queue_size == queue->cap) {
+            grn_table_queue_tail_increment(queue);
+          }
+          grn_table_queue_head_increment(queue);
+          COND_SIGNAL(queue->cond);
+          MUTEX_UNLOCK(queue->mutex);
+          GRN_OUTPUT_BOOL(ctx->impl->loader.nrecords);
+          if (ctx->impl->loader.table) {
+            grn_db_touch(ctx, DB_OBJ(ctx->impl->loader.table)->db);
+          }
+        } else {
+          ERR(GRN_OPERATION_NOT_SUPPORTED, "table '%.*s' doesn't support push",
+              (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+        }
+      }
+      break;
+    default :
+      ERR(GRN_OPERATION_NOT_SUPPORTED, "table '%.*s' doesn't support push",
+          (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+    }
+  } else {
+    ERR(GRN_INVALID_ARGUMENT, "table '%.*s' does not exist.",
+        (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+  }
+  return NULL;
+}
+
+static grn_obj *
+command_pull(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data)
+{
+  grn_obj *table = grn_ctx_get(ctx, GRN_TEXT_VALUE(VAR(0)), GRN_TEXT_LEN(VAR(0)));
+  if (table) {
+    switch (table->header.type) {
+    case GRN_TABLE_NO_KEY:
+      {
+        grn_array *array = (grn_array *)table;
+        grn_table_queue *queue = grn_array_queue(ctx, array);
+        if (queue) {
+          MUTEX_LOCK(queue->mutex);
+          while (grn_table_queue_size(queue) == 0) {
+            if (GRN_TEXT_LEN(VAR(2))) {
+              MUTEX_UNLOCK(queue->mutex);
+              GRN_OUTPUT_BOOL(0);
+              return NULL;
+            }
+            COND_WAIT(queue->cond, queue->mutex);
+          }
+          grn_table_queue_tail_increment(queue);
+          {
+            grn_obj obj;
+            grn_obj_format format;
+            GRN_RECORD_INIT(&obj, 0, ((grn_db_obj *)table)->id);
+            GRN_OBJ_FORMAT_INIT(&format, 1, 0, 1, 0);
+            GRN_RECORD_SET(ctx, &obj, grn_table_queue_tail(queue));
+            grn_obj_columns(ctx, table, GRN_TEXT_VALUE(VAR(1)), GRN_TEXT_LEN(VAR(1)),
+                            &format.columns);
+            format.flags = 0 /* GRN_OBJ_FORMAT_WITH_COLUMN_NAMES */;
+            GRN_OUTPUT_OBJ(&obj, &format);
+            GRN_OBJ_FORMAT_FIN(ctx, &format);
+          }
+          MUTEX_UNLOCK(queue->mutex);
+        } else {
+          ERR(GRN_OPERATION_NOT_SUPPORTED, "table '%.*s' doesn't support pull",
+              (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+        }
+      }
+      break;
+    default :
+      ERR(GRN_OPERATION_NOT_SUPPORTED, "table '%.*s' doesn't support pull",
+          (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+    }
+  } else {
+    ERR(GRN_INVALID_ARGUMENT, "table '%.*s' does not exist.",
+        (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+  }
+  return NULL;
+}
+
 grn_rc
 GRN_PLUGIN_INIT(grn_ctx *ctx)
 {
@@ -377,8 +519,14 @@ GRN_PLUGIN_REGISTER(grn_ctx *ctx)
   DEF_VAR(vars[4], "output_columns");
   DEF_VAR(vars[5], "id");
   DEF_COMMAND("add", command_add, 2, vars);
+  DEF_COMMAND("push", command_push, 2, vars);
   DEF_COMMAND("set", command_set, 6, vars);
 
+  DEF_VAR(vars[0], "table");
+  DEF_VAR(vars[1], "output_columns");
+  DEF_VAR(vars[2], "non_block");
+  DEF_COMMAND("pull", command_pull, 3, vars);
+
   return ctx->rc;
 }
 




Groonga-commit メーリングリストの案内
Back to archive index