[Groonga-commit] groonga/groonga at f3d825b [master] groonga-http: support request_timeout

Back to archive index

Kouhei Sutou null+****@clear*****
Sun Apr 3 01:15:29 JST 2016


Kouhei Sutou	2016-04-03 01:15:29 +0900 (Sun, 03 Apr 2016)

  New Revision: f3d825b42af41172715a145ffa2557c12b7cf21b
  https://github.com/groonga/groonga/commit/f3d825b42af41172715a145ffa2557c12b7cf21b

  Message:
    groonga-http: support request_timeout

  Modified files:
    lib/grn_ctx.h
    src/groonga.c

  Modified: lib/grn_ctx.h (+5 -0)
===================================================================
--- lib/grn_ctx.h    2016-04-03 01:07:40 +0900 (93bb595)
+++ lib/grn_ctx.h    2016-04-03 01:15:29 +0900 (e33a9c0)
@@ -520,9 +520,14 @@ extern grn_timeval grn_starttime;
 #ifndef GRN_TIMEVAL_STR_FORMAT
 #define GRN_TIMEVAL_STR_FORMAT "%04d-%02d-%02d %02d:%02d:%02d.%06d"
 #endif /* GRN_TIMEVAL_STR_FORMAT */
+#define GRN_TIMEVAL_TO_MSEC(timeval)                    \
+  (((timeval)->tv_sec * GRN_TIME_MSEC_PER_SEC) +        \
+   ((timeval)->tv_nsec / GRN_TIME_NSEC_PER_MSEC))
 #define GRN_TIME_NSEC_PER_SEC 1000000000
 #define GRN_TIME_NSEC_PER_SEC_F 1000000000.0
+#define GRN_TIME_NSEC_PER_MSEC 1000000
 #define GRN_TIME_NSEC_PER_USEC (GRN_TIME_NSEC_PER_SEC / GRN_TIME_USEC_PER_SEC)
+#define GRN_TIME_MSEC_PER_SEC 1000
 #define GRN_TIME_NSEC_TO_USEC(nsec) ((nsec) / GRN_TIME_NSEC_PER_USEC)
 #define GRN_TIME_USEC_TO_NSEC(usec) ((usec) * GRN_TIME_NSEC_PER_USEC)
 

  Modified: src/groonga.c (+246 -20)
===================================================================
--- src/groonga.c    2016-04-03 01:07:40 +0900 (a209136)
+++ src/groonga.c    2016-04-03 01:15:29 +0900 (c95c2df)
@@ -122,6 +122,25 @@ grn_rc_to_exit_code(grn_rc rc)
   }
 }
 
+static void
+break_accept_event_loop(grn_ctx *ctx)
+{
+  grn_com *client;
+  const char *address;
+
+  if (strcmp(bind_address, "0.0.0.0") == 0) {
+    address = "127.0.0.1";
+  } else if (strcmp(bind_address, "::") == 0) {
+    address = "::1";
+    } else {
+    address = bind_address;
+  }
+  client = grn_com_copen(ctx, NULL, address, port);
+  if (client) {
+    grn_com_close(ctx, client);
+  }
+}
+
 #ifdef GRN_WITH_LIBEDIT
 #include <locale.h>
 #include <histedit.h>
@@ -525,6 +544,229 @@ groonga_set_thread_limit(uint32_t new_limit, void *data)
   }
 }
 
+typedef struct {
+  grn_mutex mutex;
+  grn_ctx ctx;
+  grn_pat *entries;
+  uint64_t earliest_unix_time_msec;
+} request_timer_data;
+static request_timer_data the_request_timer_data;
+
+static void *
+request_timer_register(const char *request_id,
+                       unsigned int request_id_size,
+                       double timeout,
+                       void *user_data)
+{
+  request_timer_data *data = user_data;
+  grn_id id = GRN_ID_NIL;
+
+  {
+    grn_ctx *ctx = &(data->ctx);
+    grn_bool is_first_timer;
+    grn_timeval tv;
+    uint64_t timeout_unix_time_msec;
+    void *value;
+
+    MUTEX_LOCK(data->mutex);
+    is_first_timer = (grn_pat_size(ctx, data->entries) == 0);
+    grn_timeval_now(ctx, &tv);
+    timeout_unix_time_msec = GRN_TIMEVAL_TO_MSEC(&tv) + (timeout * 1000);
+    while (GRN_TRUE) {
+      int added;
+      id = grn_pat_add(ctx, data->entries,
+                       &timeout_unix_time_msec, sizeof(uint64_t),
+                       &value, &added);
+      if (added != 0) {
+        break;
+      }
+      timeout_unix_time_msec++;
+    }
+    grn_memcpy(value, &request_id_size, sizeof(unsigned int));
+    grn_memcpy(((uint8_t *)value) + sizeof(unsigned int),
+               request_id, request_id_size);
+    if (data->earliest_unix_time_msec == 0 ||
+        data->earliest_unix_time_msec > timeout_unix_time_msec) {
+      data->earliest_unix_time_msec = timeout_unix_time_msec;
+    }
+    if (is_first_timer) {
+      break_accept_event_loop(ctx);
+    }
+    MUTEX_UNLOCK(data->mutex);
+  }
+
+  return (void *)(uint64_t)id;
+}
+
+static void
+request_timer_unregister(void *timer_id,
+                         void *user_data)
+{
+  request_timer_data *data = user_data;
+  grn_id id = (grn_id)(uint64_t)timer_id;
+
+  {
+    grn_ctx *ctx = &(data->ctx);
+    uint64_t timeout_unix_time_msec;
+    int key_size;
+
+    MUTEX_LOCK(data->mutex);
+    key_size = grn_pat_get_key(ctx,
+                               data->entries,
+                               id,
+                               &timeout_unix_time_msec,
+                               sizeof(uint64_t));
+    if (key_size > 0) {
+      grn_pat_delete_by_id(ctx, data->entries, id, NULL);
+      if (data->earliest_unix_time_msec >= timeout_unix_time_msec) {
+        data->earliest_unix_time_msec = 0;
+      }
+    }
+    MUTEX_UNLOCK(data->mutex);
+  }
+}
+
+static void
+request_timer_fin(void *user_data)
+{
+  request_timer_data *data = user_data;
+
+  {
+    grn_ctx *ctx = &(data->ctx);
+    grn_pat_close(ctx, data->entries);
+    grn_ctx_fin(ctx);
+    MUTEX_FIN(data->mutex);
+  }
+}
+
+static void
+request_timer_init(void)
+{
+  static grn_request_timer timer;
+  request_timer_data *data = &the_request_timer_data;
+  grn_ctx *ctx;
+
+  MUTEX_INIT(data->mutex);
+  ctx = &(data->ctx);
+  grn_ctx_init(ctx, 0);
+  data->entries = grn_pat_create(ctx,
+                                 NULL,
+                                 sizeof(uint64_t),
+                                 GRN_TABLE_MAX_KEY_SIZE,
+                                 GRN_OBJ_KEY_UINT);
+  data->earliest_unix_time_msec = 0;
+
+  timer.user_data = data;
+  timer.register_func = request_timer_register;
+  timer.unregister_func = request_timer_unregister;
+  timer.fin_func = request_timer_fin;
+
+  grn_request_timer_set(&timer);
+}
+
+static grn_bool
+request_timer_ensure_earliest_unix_time_msec(void)
+{
+  request_timer_data *data = &the_request_timer_data;
+  grn_ctx *ctx;
+  grn_pat_cursor *cursor;
+
+  if (data->earliest_unix_time_msec > 0) {
+    return GRN_TRUE;
+  }
+
+  ctx = &(data->ctx);
+  cursor = grn_pat_cursor_open(ctx, data->entries,
+                               NULL, 0,
+                               NULL, 0,
+                               0, 1, GRN_CURSOR_ASCENDING);
+  if (!cursor) {
+    return GRN_FALSE;
+  }
+  while (grn_pat_cursor_next(ctx, cursor) != GRN_ID_NIL) {
+    void *key;
+    uint64_t timeout_unix_time_msec;
+
+    grn_pat_cursor_get_key(ctx, cursor, &key);
+    timeout_unix_time_msec = *(uint64_t *)key;
+    data->earliest_unix_time_msec = timeout_unix_time_msec;
+    break;
+  }
+  grn_pat_cursor_close(ctx, cursor);
+
+  return data->earliest_unix_time_msec > 0;
+}
+
+static int
+request_timer_get_poll_timeout(void)
+{
+  request_timer_data *data = &the_request_timer_data;
+  int timeout = 1000;
+  grn_ctx *ctx;
+  grn_timeval tv;
+
+  MUTEX_LOCK(data->mutex);
+  ctx = &(data->ctx);
+  if (grn_pat_size(ctx, data->entries) == 0) {
+    goto exit;
+  }
+
+  if (!request_timer_ensure_earliest_unix_time_msec()) {
+    goto exit;
+  }
+
+  grn_timeval_now(ctx, &tv);
+  timeout = data->earliest_unix_time_msec - GRN_TIMEVAL_TO_MSEC(&tv);
+  if (timeout < 0) {
+    timeout = 0;
+  } else if (timeout > 1000) {
+    timeout = 1000;
+  }
+
+exit :
+  MUTEX_UNLOCK(data->mutex);
+
+  return timeout;
+}
+
+static void
+request_timer_process_timeout(void)
+{
+  request_timer_data *data = &the_request_timer_data;
+  grn_ctx *ctx;
+  grn_timeval tv;
+  uint64_t max;
+  grn_pat_cursor *cursor;
+
+  ctx = &(data->ctx);
+  if (grn_pat_size(ctx, data->entries) == 0) {
+    return;
+  }
+
+  grn_timeval_now(ctx, &tv);
+  max = GRN_TIMEVAL_TO_MSEC(&tv);
+  cursor = grn_pat_cursor_open(ctx, data->entries,
+                               NULL, 0,
+                               &max, sizeof(uint64_t),
+                               0, -1, GRN_CURSOR_ASCENDING);
+  if (!cursor) {
+    return;
+  }
+
+  grn_id id;
+  while ((id = grn_pat_cursor_next(ctx, cursor)) != GRN_ID_NIL) {
+    void *value;
+    const char *request_id;
+    unsigned int request_id_size;
+
+    grn_pat_cursor_get_value(ctx, cursor, &value);
+    request_id_size = *((unsigned int *)value);
+    request_id = (const char *)(((uint8_t *)value) + sizeof(unsigned int));
+    grn_request_canceler_cancel(request_id, request_id_size);
+  }
+  grn_pat_cursor_close(ctx, cursor);
+}
+
 static void
 reset_ready_notify_pipe(void)
 {
@@ -658,7 +900,9 @@ daemonize(void)
 static void
 run_server_loop(grn_ctx *ctx, grn_com_event *ev)
 {
-  while (!grn_com_event_poll(ctx, ev, 1000) && grn_gctx.stat != GRN_CTX_QUIT) {
+  request_timer_init();
+  while (!grn_com_event_poll(ctx, ev, request_timer_get_poll_timeout()) &&
+         grn_gctx.stat != GRN_CTX_QUIT) {
     grn_edge *edge;
     while ((edge = (grn_edge *)grn_com_queue_deque(ctx, &ctx_old))) {
       grn_obj *msg;
@@ -674,6 +918,7 @@ run_server_loop(grn_ctx *ctx, grn_com_event *ev)
       }
       grn_edges_delete(ctx, edge);
     }
+    request_timer_process_timeout();
     /* todo : log stat */
   }
   for (;;) {
@@ -2126,25 +2371,6 @@ check_rlimit_nofile(grn_ctx *ctx)
 #endif /* WIN32 */
 }
 
-static void
-break_accept_event_loop(grn_ctx *ctx)
-{
-  grn_com *client;
-  const char *address;
-
-  if (strcmp(bind_address, "0.0.0.0") == 0) {
-    address = "127.0.0.1";
-  } else if (strcmp(bind_address, "::") == 0) {
-    address = "::1";
-    } else {
-    address = bind_address;
-  }
-  client = grn_com_copen(ctx, NULL, address, port);
-  if (client) {
-    grn_com_close(ctx, client);
-  }
-}
-
 static grn_thread_func_result CALLBACK
 h_worker(void *arg)
 {
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index