null+****@clear*****
null+****@clear*****
2012年 6月 15日 (金) 16:14:16 JST
Daijiro MORI 2012-06-15 16:14:16 +0900 (Fri, 15 Jun 2012)
New Revision: d5e3c445b2ffbcdd50534f1deb83958b1d9bae5c
Log:
table: implement push/pull commands.
Modified files:
lib/hash.c
lib/hash.h
plugins/table/table.c
Modified: lib/hash.c (+19 -10)
===================================================================
--- lib/hash.c 2012-06-15 14:49:48 +0900 (e0d0cfc)
+++ lib/hash.c 2012-06-15 16:14:16 +0900 (ab5fc33)
@@ -320,16 +320,6 @@ grn_io_array_bit_flip(grn_ctx *ctx, grn_io *io,
/* grn_table_queue */
-typedef struct _grn_table_queue grn_table_queue;
-
-struct _grn_table_queue {
- grn_mutex mutex;
- grn_cond cond;
- grn_id head;
- grn_id tail;
- grn_id cap;
-};
-
static void
grn_table_queue_lock_init(grn_ctx *ctx, grn_table_queue *queue)
{
@@ -493,6 +483,18 @@ grn_array_queue_lock_clear(grn_ctx *ctx, grn_array *array)
grn_table_queue_lock_init(ctx, &header->queue);
}
+grn_table_queue *
+grn_array_queue(grn_ctx *ctx, grn_array *array)
+{
+ if (grn_array_is_io_array(array)) {
+ struct grn_array_header *header;
+ header = grn_io_header(array->io);
+ return &header->queue;
+ } else {
+ return NULL;
+ }
+}
+
static grn_rc
grn_array_init(grn_ctx *ctx, grn_array *array,
const char *path, uint32_t value_size, uint32_t flags)
@@ -1023,6 +1025,13 @@ grn_array_add_to_io_array(grn_ctx *ctx, grn_array *array, void **value)
return id;
}
+void
+grn_array_clear_curr_rec(grn_ctx *ctx, grn_array *array)
+{
+ struct grn_array_header * const header = array->header;
+ header->curr_rec = GRN_ID_NIL;
+}
+
grn_id
grn_array_add(grn_ctx *ctx, grn_array *array, void **value)
{
Modified: lib/hash.h (+16 -1)
===================================================================
--- lib/hash.h 2012-06-15 14:49:48 +0900 (03f00f2)
+++ lib/hash.h 2012-06-15 16:14:16 +0900 (ded36fc)
@@ -166,7 +166,6 @@ struct _grn_array_cursor {
grn_rc grn_array_truncate(grn_ctx *ctx, grn_array *array);
grn_rc grn_array_copy_sort_key(grn_ctx *ctx, grn_array *array,
grn_table_sort_key *keys, int n_keys);
-void grn_array_queue_lock_clear(grn_ctx *ctx, grn_array *array);
/**** grn_hash ****/
@@ -279,6 +278,22 @@ grn_rc grn_hash_clear_lock(grn_ctx *ctx, grn_hash *hash);
#define GRN_HASH_SIZE(hash) (*((hash)->n_entries))
+/* grn_table_queue */
+
+typedef struct _grn_table_queue grn_table_queue;
+
+struct _grn_table_queue {
+ grn_mutex mutex;
+ grn_cond cond;
+ grn_id head;
+ grn_id tail;
+ grn_id cap;
+};
+
+void grn_array_queue_lock_clear(grn_ctx *ctx, grn_array *array);
+void grn_array_clear_curr_rec(grn_ctx *ctx, grn_array *array);
+grn_table_queue *grn_array_queue(grn_ctx *ctx, grn_array *array);
+
/* private */
typedef enum {
grn_rec_document = 0,
Modified: plugins/table/table.c (+148 -0)
===================================================================
--- plugins/table/table.c 2012-06-15 14:49:48 +0900 (51035dc)
+++ plugins/table/table.c 2012-06-15 16:14:16 +0900 (f6bb508)
@@ -317,6 +317,148 @@ command_set(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data)
return NULL;
}
+uint32_t
+grn_table_queue_size(grn_table_queue *queue)
+{
+ return (queue->head < queue->tail)
+ ? 2 * queue->cap + queue->head - queue->tail
+ : queue->head - queue->tail;
+}
+
+void
+grn_table_queue_head_increment(grn_table_queue *queue)
+{
+ if (queue->head == 2 * queue->cap) {
+ queue->head = 1;
+ } else {
+ queue->head++;
+ }
+}
+
+void
+grn_table_queue_tail_increment(grn_table_queue *queue)
+{
+ if (queue->tail == 2 * queue->cap) {
+ queue->tail = 1;
+ } else {
+ queue->tail++;
+ }
+}
+
+grn_id
+grn_table_queue_head(grn_table_queue *queue)
+{
+ return queue->head > queue->cap
+ ? queue->head - queue->cap
+ : queue->head;
+}
+
+grn_id
+grn_table_queue_tail(grn_table_queue *queue)
+{
+ return queue->tail > queue->cap
+ ? queue->tail - queue->cap
+ : queue->tail;
+}
+
+static grn_obj *
+command_push(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data)
+{
+ grn_obj *table = grn_ctx_get(ctx, GRN_TEXT_VALUE(VAR(0)), GRN_TEXT_LEN(VAR(0)));
+ if (table) {
+ switch (table->header.type) {
+ case GRN_TABLE_NO_KEY:
+ {
+ grn_array *array = (grn_array *)table;
+ grn_table_queue *queue = grn_array_queue(ctx, array);
+ if (queue) {
+ MUTEX_LOCK(queue->mutex);
+ if (grn_table_queue_head(queue) == queue->cap) {
+ grn_array_clear_curr_rec(ctx, array);
+ }
+ grn_load_(ctx, GRN_CONTENT_JSON,
+ GRN_TEXT_VALUE(VAR(0)), GRN_TEXT_LEN(VAR(0)),
+ NULL, 0,
+ GRN_TEXT_VALUE(VAR(1)), GRN_TEXT_LEN(VAR(1)),
+ NULL, 0, NULL, 0, 0);
+ if (grn_table_queue_size == queue->cap) {
+ grn_table_queue_tail_increment(queue);
+ }
+ grn_table_queue_head_increment(queue);
+ COND_SIGNAL(queue->cond);
+ MUTEX_UNLOCK(queue->mutex);
+ GRN_OUTPUT_BOOL(ctx->impl->loader.nrecords);
+ if (ctx->impl->loader.table) {
+ grn_db_touch(ctx, DB_OBJ(ctx->impl->loader.table)->db);
+ }
+ } else {
+ ERR(GRN_OPERATION_NOT_SUPPORTED, "table '%.*s' doesn't support push",
+ (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+ }
+ }
+ break;
+ default :
+ ERR(GRN_OPERATION_NOT_SUPPORTED, "table '%.*s' doesn't support push",
+ (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+ }
+ } else {
+ ERR(GRN_INVALID_ARGUMENT, "table '%.*s' does not exist.",
+ (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+ }
+ return NULL;
+}
+
+static grn_obj *
+command_pull(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data)
+{
+ grn_obj *table = grn_ctx_get(ctx, GRN_TEXT_VALUE(VAR(0)), GRN_TEXT_LEN(VAR(0)));
+ if (table) {
+ switch (table->header.type) {
+ case GRN_TABLE_NO_KEY:
+ {
+ grn_array *array = (grn_array *)table;
+ grn_table_queue *queue = grn_array_queue(ctx, array);
+ if (queue) {
+ MUTEX_LOCK(queue->mutex);
+ while (grn_table_queue_size(queue) == 0) {
+ if (GRN_TEXT_LEN(VAR(2))) {
+ MUTEX_UNLOCK(queue->mutex);
+ GRN_OUTPUT_BOOL(0);
+ return NULL;
+ }
+ COND_WAIT(queue->cond, queue->mutex);
+ }
+ grn_table_queue_tail_increment(queue);
+ {
+ grn_obj obj;
+ grn_obj_format format;
+ GRN_RECORD_INIT(&obj, 0, ((grn_db_obj *)table)->id);
+ GRN_OBJ_FORMAT_INIT(&format, 1, 0, 1, 0);
+ GRN_RECORD_SET(ctx, &obj, grn_table_queue_tail(queue));
+ grn_obj_columns(ctx, table, GRN_TEXT_VALUE(VAR(1)), GRN_TEXT_LEN(VAR(1)),
+ &format.columns);
+ format.flags = 0 /* GRN_OBJ_FORMAT_WITH_COLUMN_NAMES */;
+ GRN_OUTPUT_OBJ(&obj, &format);
+ GRN_OBJ_FORMAT_FIN(ctx, &format);
+ }
+ MUTEX_UNLOCK(queue->mutex);
+ } else {
+ ERR(GRN_OPERATION_NOT_SUPPORTED, "table '%.*s' doesn't support pull",
+ (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+ }
+ }
+ break;
+ default :
+ ERR(GRN_OPERATION_NOT_SUPPORTED, "table '%.*s' doesn't support pull",
+ (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+ }
+ } else {
+ ERR(GRN_INVALID_ARGUMENT, "table '%.*s' does not exist.",
+ (int)GRN_TEXT_LEN(VAR(0)), GRN_TEXT_VALUE(VAR(0)));
+ }
+ return NULL;
+}
+
grn_rc
GRN_PLUGIN_INIT(grn_ctx *ctx)
{
@@ -377,8 +519,14 @@ GRN_PLUGIN_REGISTER(grn_ctx *ctx)
DEF_VAR(vars[4], "output_columns");
DEF_VAR(vars[5], "id");
DEF_COMMAND("add", command_add, 2, vars);
+ DEF_COMMAND("push", command_push, 2, vars);
DEF_COMMAND("set", command_set, 6, vars);
+ DEF_VAR(vars[0], "table");
+ DEF_VAR(vars[1], "output_columns");
+ DEF_VAR(vars[2], "non_block");
+ DEF_COMMAND("pull", command_pull, 3, vars);
+
return ctx->rc;
}