[Groonga-commit] groonga/groonga [master] Added log load function to suggest learner.

Back to archive index

null+****@clear***** null+****@clear*****
2011年 2月 17日 (木) 19:55:36 JST


Tasuku SUENAGA a.k.a. gunyarakun	2011-02-17 10:55:36 +0000 (Thu, 17 Feb 2011)

  New Revision: 5ecc3e31b01e9666628bfe6fa59e7f2dd994a1eb

  Log:
    Added log load function to suggest learner.

  Modified files:
    src/suggest/Makefile.am
    src/suggest/groonga_suggest_httpd.c
    src/suggest/groonga_suggest_learner.c
    src/suggest/util.c
    src/suggest/util.h

  Modified: src/suggest/Makefile.am (+5 -0)
===================================================================
--- src/suggest/Makefile.am    2011-02-17 03:35:07 +0000 (02f1bec)
+++ src/suggest/Makefile.am    2011-02-17 10:55:36 +0000 (787978f)
@@ -27,11 +27,13 @@ DEFAULT_INCLUDES = 				\
 groonga_suggest_learner_SOURCES = groonga_suggest_learner.c
 groonga_suggest_learner_CFLAGS =		\
 	$(AM_CFLAGS)				\
+	$(LIBEVENT_CFLAGS)			\
 	$(LIBZMQ_CFLAGS)			\
 	$(MESSAGE_PACK_CFLAGS)
 groonga_suggest_learner_LDADD =			\
 	libutil.la				\
 	$(top_builddir)/lib/libgroonga.la	\
+	$(LIBEVENT_LIBS)			\
 	$(LIBZMQ_LIBS)				\
 	$(MESSAGE_PACK_LIBS)
 
@@ -55,3 +57,6 @@ groonga_suggest_create_dataset_LDADD =		\
 	$(top_builddir)/lib/libgroonga.la
 
 libutil_la_SOURCES = util.c util.h
+libutil_la_CFLAGS =			\
+	$(AM_CFLAGS)				\
+	$(LIBEVENT_CFLAGS)

  Modified: src/suggest/groonga_suggest_httpd.c (+7 -47)
===================================================================
--- src/suggest/groonga_suggest_httpd.c    2011-02-17 03:35:07 +0000 (e46802a)
+++ src/suggest/groonga_suggest_httpd.c    2011-02-17 10:55:36 +0000 (3747d43)
@@ -79,17 +79,6 @@ static uint32_t default_max_threads = DEFAULT_MAX_THREADS;
 static volatile sig_atomic_t loop = 1;
 static grn_obj *db;
 
-static uint64_t
-atouint64_t(const char *s)
-{
-  uint64_t r;
-  for (r = 0; *s; s++) {
-    r *= 10;
-    r += (*s - '0');
-  }
-  return r;
-}
-
 static int
 suggest_result(struct evbuffer *res_buf, const char *types, const char *query, const char *target_name, grn_obj *cmd_buf, grn_ctx *ctx)
 {
@@ -121,42 +110,13 @@ suggest_result(struct evbuffer *res_buf, const char *types, const char *query, c
 static int
 log_send(struct evbuffer *res_buf, thd_data *thd, struct evkeyvalq *get_args)
 {
-  uint64_t millisec = 0;
-  const char *callback = NULL, *types = NULL, *query = NULL,
-             *client_id = NULL, *target_name = NULL,
-             *learn_target_name = NULL;
-  struct evkeyval *get;
-
-  TAILQ_FOREACH(get, get_args, next) {
-    switch(get->key[0]) {
-    case 't':
-      /* TODO: check types */
-      types = get->value;
-      break;
-    case 'i':
-      client_id = get->value;
-    case 'c':
-      if (!strcmp(get->key, "callback")) {
-        callback = get->value;
-      }
-      break;
-    case 'q':
-      query = get->value;
-      break;
-    case 's':
-      millisec = atouint64_t(get->value);
-      break;
-    case 'n':
-      /* TODO: check target_name */
-      target_name = get->value;
-      break;
-    case 'l':
-      learn_target_name = get->value;
-      break;
-    default:
-      break;
-    }
-  }
+  uint64_t millisec;
+  const char *callback, *types, *query, *client_id, *target_name,
+             *learn_target_name;
+
+  parse_keyval(get_args, &query, &types, &client_id, &target_name,
+               &learn_target_name, &callback, &millisec);
+
   /* send data to learn client */
   if (thd->zmq_sock && millisec && client_id && query && learn_target_name) {
     char c;

  Modified: src/suggest/groonga_suggest_learner.c (+123 -49)
===================================================================
--- src/suggest/groonga_suggest_learner.c    2011-02-17 03:35:07 +0000 (705fb3e)
+++ src/suggest/groonga_suggest_learner.c    2011-02-17 10:55:36 +0000 (e966b9a)
@@ -24,6 +24,8 @@
 
 #include "util.h"
 
+#include <evhttp.h>
+
 #define DEFAULT_RECV_ENDPOINT "tcp://*:1234"
 #define DEFAULT_SEND_ENDPOINT "tcp://*:1235"
 #define SEND_WAIT 1000 /* 0.001sec */
@@ -82,6 +84,43 @@ load_to_groonga(grn_ctx *ctx,
   }
 }
 
+void
+load_to_multi_targets(grn_ctx *ctx,
+                      grn_obj *buf,
+                      const char *query, uint32_t query_len,
+                      const char *client_id, uint32_t client_id_len,
+                      const char *learn_target_names,
+                      uint32_t learn_target_names_len,
+                      uint64_t millisec,
+                      int submit)
+{
+  if (millisec && query && client_id && learn_target_names) {
+    unsigned int tn_len;
+    const char *tn, *tnp, *tne;
+    tn = tnp = learn_target_names;
+    tne = learn_target_names + learn_target_names_len;
+    while (tnp <= tne) {
+      if (tnp == tne || *tnp == '|') {
+        tn_len = tnp - tn;
+
+        /*
+        printf("sec: %llu query %.*s client_id: %.*s target: %.*s\n",
+          millisec,
+          query_len, query,
+          client_id_len, client_id,
+          tn_len, tn);
+        */
+        load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
+                        tn, tn_len, millisec, submit);
+
+        tn = ++tnp;
+      } else {
+        tnp++;
+      }
+    }
+  }
+}
+
 #define PACK_KEY_FROM_ID(id) \
 { \
   int _k_len; \
@@ -397,36 +436,15 @@ handle_msg(msgpack_object *obj, grn_ctx *ctx, grn_obj *buf)
         }
       }
     }
-    if (millisec && query && client_id && learn_target_names) {
-      unsigned int tn_len;
-      const char *tn, *tnp, *tne;
-      tn = tnp = learn_target_names;
-      tne = learn_target_names + learn_target_names_len;
-      while (tnp <= tne) {
-        if (tnp == tne || *tnp == '|') {
-          tn_len = tnp - tn;
-
-          /*
-          printf("sec: %llu query %.*s client_id: %.*s target: %.*s\n",
-            millisec,
-            query_len, query,
-            client_id_len, client_id,
-            tn_len, tn);
-          */
-          load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
-                          tn, tn_len, millisec, submit_flag);
-
-          tn = ++tnp;
-        } else {
-          tnp++;
-        }
-      }
-    }
+    load_to_multi_targets(ctx, buf, query, query_len,
+                          client_id, client_id_len,
+                          learn_target_names, learn_target_names_len,
+                          millisec, submit_flag);
   }
 }
 
 static void
-event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
+recv_event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
 {
   grn_obj buf;
   zmq_pollitem_t items[] = {
@@ -459,6 +477,51 @@ event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
   grn_obj_unlink(ctx, &buf);
 }
 
+#define MAX_LOG_LENGTH 0x2000
+
+static void
+load_log(grn_ctx *ctx, const char *log_file_name)
+{
+  FILE *fp;
+  if ((fp = fopen(log_file_name, "r"))) {
+    grn_obj buf;
+    char line_buf[MAX_LOG_LENGTH];
+
+    GRN_TEXT_INIT(&buf, 0);
+    while (fgets(line_buf, MAX_LOG_LENGTH, fp)) {
+      if (strrchr(line_buf, '\n')) {
+        uint64_t millisec;
+        struct evkeyvalq get_args;
+        const char *callback, *types, *query, *client_id, *target_name,
+                   *learn_target_name;
+        {
+          char *uri = evhttp_decode_uri(line_buf);
+          evhttp_parse_query(uri, &get_args);
+          free(uri);
+        }
+        parse_keyval(&get_args, &query, &types, &client_id, &target_name,
+                     &learn_target_name, &callback, &millisec);
+
+        if (query && client_id && learn_target_name && millisec) {
+          load_to_multi_targets(ctx, &buf, query, strlen(query),
+                                client_id, strlen(client_id),
+                                learn_target_name, strlen(learn_target_name),
+                                millisec,
+                                types && !strcmp(types, "submit"));
+        }
+
+        evhttp_clear_headers(&get_args);
+      } else {
+        while (1) {
+          int c = fgetc(fp);
+          if (c == '\n' || c == EOF) { break; }
+        }
+      }
+    }
+    grn_obj_close(ctx, &buf);
+  }
+}
+
 static void
 usage(FILE *output)
 {
@@ -467,6 +530,7 @@ usage(FILE *output)
           "options:\n"
           "  -r <recv endpoint>: recv endpoint (default: %s)\n"
           "  -s <send endpoint>: send endpoint (default: %s)\n"
+          "  -l <logfile>      : load from log file of webserver.\n"
           "  -d                : daemonize\n",
           DEFAULT_RECV_ENDPOINT, DEFAULT_SEND_ENDPOINT);
 }
@@ -482,13 +546,14 @@ main(int argc, char **argv)
 {
   int daemon = 0;
   const char *recv_endpoint = DEFAULT_RECV_ENDPOINT,
-             *send_endpoint = DEFAULT_SEND_ENDPOINT;
+             *send_endpoint = DEFAULT_SEND_ENDPOINT,
+             *load_logfile_name = NULL;
 
   /* parse options */
   {
     int ch;
 
-    while ((ch = getopt(argc, argv, "r:s:d")) != -1) {
+    while ((ch = getopt(argc, argv, "r:s:dl:")) != -1) {
       switch(ch) {
       case 'r':
         recv_endpoint = optarg;
@@ -499,6 +564,9 @@ main(int argc, char **argv)
       case 'd':
         daemon = 1;
         break;
+      case 'l':
+        load_logfile_name = optarg;
+        break;
       }
     }
     argc -= optind; argv += optind;
@@ -525,33 +593,39 @@ main(int argc, char **argv)
     if (!(grn_db_open(ctx, argv[0]))) {
       print_error("cannot open database.");
     } else {
-      if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
-        print_error("cannot create msgpack zone.");
+      if (load_logfile_name) {
+        /* loading log mode */
+        load_log(ctx, load_logfile_name);
       } else {
-        void *zmq_ctx, *zmq_recv_sock;
-        if (!(zmq_ctx = zmq_init(1))) {
-          print_error("cannot create zmq context.");
+        /* zeromq/msgpack recv mode */
+        if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
+          print_error("cannot create msgpack zone.");
         } else {
-          if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
-            print_error("cannot create zmq_socket.");
-          } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
-            print_error("cannot bind zmq_socket.");
+          void *zmq_ctx, *zmq_recv_sock;
+          if (!(zmq_ctx = zmq_init(1))) {
+            print_error("cannot create zmq context.");
           } else {
-            send_thd_data thd;
-            zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
-            thd.db_path = argv[0];
-            thd.send_endpoint = send_endpoint;
-            thd.zmq_ctx = zmq_ctx;
-
-            if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) {
-              print_error("error in pthread_create() for sending datas.");
+            if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
+              print_error("cannot create zmq_socket.");
+            } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
+              print_error("cannot bind zmq_socket.");
+            } else {
+              send_thd_data thd;
+              zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
+              thd.db_path = argv[0];
+              thd.send_endpoint = send_endpoint;
+              thd.zmq_ctx = zmq_ctx;
+
+              if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) {
+                print_error("error in pthread_create() for sending datas.");
+              }
+              recv_event_loop(mempool, zmq_recv_sock, ctx);
+              pthread_join(thd.thd, NULL);
             }
-            event_loop(mempool, zmq_recv_sock, ctx);
-            pthread_join(thd.thd, NULL);
+            zmq_term(zmq_ctx);
           }
-          zmq_term(zmq_ctx);
+          msgpack_zone_free(mempool);
         }
-        msgpack_zone_free(mempool);
       }
     }
     grn_obj_close(ctx, grn_ctx_db(ctx));

  Modified: src/suggest/util.c (+62 -1)
===================================================================
--- src/suggest/util.c    2011-02-17 03:35:07 +0000 (c21f4c7)
+++ src/suggest/util.c    2011-02-17 10:55:36 +0000 (5ea5ede)
@@ -15,14 +15,18 @@
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
 #include <stdio.h>
+#include <string.h>
+#include <stdint.h>
 #include <stdarg.h>
 #include <unistd.h>
-#include <sys/types.h>
 #include <sys/wait.h>
+#include <sys/queue.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 
+#include "util.h"
+
 int
 print_error(const char *format, ...)
 {
@@ -74,3 +78,60 @@ daemonize(void)
   }
   return 1;
 }
+
+static uint64_t
+atouint64_t(const char *s)
+{
+  uint64_t r;
+  for (r = 0; *s; s++) {
+    r *= 10;
+    r += (*s - '0');
+  }
+  return r;
+}
+
+void
+parse_keyval(struct evkeyvalq *get_args,
+             const char **query, const char **types,
+             const char **client_id, const char **target_name,
+             const char **learn_target_name,
+             const char **callback,
+             uint64_t *millisec)
+{
+  struct evkeyval *get;
+
+  *query = *types = *client_id = *target_name =
+    *learn_target_name = *callback = NULL;
+  *millisec = 0;
+
+  TAILQ_FOREACH(get, get_args, next) {
+    switch(get->key[0]) {
+    case 'q':
+      *query = get->value;
+      break;
+    case 't':
+      /* TODO: check types */
+      *types = get->value;
+      break;
+    case 'i':
+      *client_id = get->value;
+    case 'c':
+      if (!strcmp(get->key, "callback")) {
+        *callback = get->value;
+      }
+      break;
+    case 's':
+      *millisec = atouint64_t(get->value);
+      break;
+    case 'n':
+      /* TODO: check target_name */
+      *target_name = get->value;
+      break;
+    case 'l':
+      *learn_target_name = get->value;
+      break;
+    default:
+      break;
+    }
+  }
+}

  Modified: src/suggest/util.h (+10 -0)
===================================================================
--- src/suggest/util.h    2011-02-17 03:35:07 +0000 (1983481)
+++ src/suggest/util.h    2011-02-17 10:55:36 +0000 (e7e18b2)
@@ -17,7 +17,17 @@
 #ifndef GRN_SUGGEST_UTIL_H
 #define GRN_SUGGEST_UTIL_H
 
+#include <sys/queue.h>
+#include <event.h>
+#include <stdint.h>
+
 int print_error(const char *format, ...);
 int daemonize(void);
+void parse_keyval(struct evkeyvalq *get_args,
+                  const char **query, const char **types,
+                  const char **client_id, const char **target_name,
+                  const char **learn_target_name,
+                  const char **callback,
+                  uint64_t *millisec);
 
 #endif /* GRN_SUGGEST_UTIL_H */




Groonga-commit メーリングリストの案内
Back to archive index