[Groonga-commit] groonga/groonga [master] Added suggest learning systems.

Back to archive index

null+****@clear***** null+****@clear*****
2010年 12月 7日 (火) 20:25:33 JST


Tasuku SUENAGA a.k.a. gunyarakun	2010-12-07 11:25:33 +0000 (Tue, 07 Dec 2010)

  New Revision: f4d6e2af3401065a40e67523d92c6f0c8bebc757

  Log:
    Added suggest learning systems.

  Added files:
    src/suggest/Makefile
    src/suggest/suggest-httpd.c
    src/suggest/suggest-learner.c
    src/suggest/util.c

  Added: src/suggest/Makefile (+10 -0) 100644
===================================================================
--- /dev/null
+++ src/suggest/Makefile    2010-12-07 11:25:33 +0000 (ab72182)
@@ -0,0 +1,10 @@
+all: suggest-learner suggest-httpd
+
+suggest-httpd: suggest-httpd.c util.c
+	gcc -O3 -ggdb -Wall -o suggest-httpd suggest-httpd.c util.c -lzmq -levent -lmsgpack -lgroonga
+
+suggest-learner: suggest-learner.c util.c
+	gcc -O3 -ggdb -Wall -o suggest-learner suggest-learner.c util.c -lzmq -lmsgpack -lgroonga
+
+clean:
+	rm suggest-httpd suggest-learner

  Added: src/suggest/suggest-httpd.c (+686 -0) 100644
===================================================================
--- /dev/null
+++ src/suggest/suggest-httpd.c    2010-12-07 11:25:33 +0000 (40b463a)
@@ -0,0 +1,686 @@
+/* -*- c-basic-offset: 2 -*- */
+/* Copyright(C) 2010- Brazil
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License version 2.1 as published by the Free Software Foundation.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+#include <stdio.h>
+#include <signal.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <err.h>
+
+#include <fcntl.h>
+#include <sys/queue.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <sys/resource.h>
+
+#include <zmq.h>
+#include <event.h>
+#include <evhttp.h>
+#include <msgpack.h>
+#include <groonga/groonga.h>
+#include <pthread.h>
+
+#define DEFAULT_PORT 8080
+#define DEFAULT_MAX_THREADS 8
+
+int print_error(const char *format, ...);
+grn_rc grn_ctx_close(grn_ctx *ctx);
+
+#define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
+
+#define LISTEN_BACKLOG 756
+#define MIN_MAX_FDS 2048
+
+typedef struct {
+  grn_ctx *ctx;
+  grn_obj *db;
+  void *zmq_sock;
+  grn_obj cmd_buf;
+  pthread_t thd;
+  struct event_base *base;
+  struct evhttp *httpd;
+  struct event pulse;
+} thd_data;
+
+typedef struct {
+  const char *db_path;
+  const char *recv_endpoint;
+  pthread_t thd;
+  void *zmq_ctx;
+} recv_thd_data;
+
+#define CMD_BUF_SIZE 1024
+
+static uint32_t default_max_threads = DEFAULT_MAX_THREADS;
+static volatile sig_atomic_t loop = 1;
+static grn_obj *db;
+
+static uint64_t
+atouint64_t(const char *s)
+{
+  uint64_t r;
+  for (r = 0; *s; s++) {
+    r *= 10;
+    r += (*s - '0');
+  }
+  return r;
+}
+
+static int
+suggest_result(struct evbuffer *res_buf, const char *types, const char *query, const char *target_name, grn_obj *cmd_buf, grn_ctx *ctx)
+{
+  if (types && query) {
+    GRN_BULK_REWIND(cmd_buf);
+    GRN_TEXT_PUTS(ctx, cmd_buf, "/d/suggest?table=item_");
+    grn_text_urlenc(ctx, cmd_buf, target_name, strlen(target_name));
+    GRN_TEXT_PUTS(ctx, cmd_buf, "&column=kana&types=");
+    grn_text_urlenc(ctx, cmd_buf, types, strlen(types));
+    GRN_TEXT_PUTS(ctx, cmd_buf, "&query=");
+    grn_text_urlenc(ctx, cmd_buf, query, strlen(query));
+    {
+      char *res;
+      int flags;
+      unsigned int res_len;
+
+      grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), 0);
+      grn_ctx_recv(ctx, &res, &res_len, &flags);
+
+      evbuffer_add(res_buf, res, res_len);
+      return res_len;
+    }
+  } else {
+    evbuffer_add(res_buf, "{}", 2);
+    return 2;
+  }
+}
+
+static int
+log_send(struct evbuffer *res_buf, thd_data *thd, struct evkeyvalq *get_args)
+{
+  uint64_t millisec = 0;
+  const char *callback = NULL, *types = NULL, *query = NULL,
+             *client_id = NULL, *target_name = NULL,
+             *learn_target_name = NULL;
+  struct evkeyval *get;
+
+  TAILQ_FOREACH(get, get_args, next) {
+    switch(get->key[0]) {
+    case 't':
+      /* TODO: check types */
+      types = get->value;
+      break;
+    case 'i':
+      client_id = get->value;
+    case 'c':
+      if (!strcmp(get->key, "callback")) {
+        callback = get->value;
+      }
+      break;
+    case 'q':
+      query = get->value;
+      break;
+    case 's':
+      millisec = atouint64_t(get->value);
+      break;
+    case 'n':
+      /* TODO: check target_name */
+      target_name = get->value;
+      break;
+    case 'l':
+      learn_target_name = get->value;
+      break;
+    default:
+      break;
+    }
+  }
+  /* send data to learn client */
+  if (thd->zmq_sock && millisec && client_id && query && learn_target_name) {
+    char c;
+    size_t l;
+    msgpack_packer pk;
+    msgpack_sbuffer sbuf;
+    int cnt, submit_flag = 0;
+
+    msgpack_sbuffer_init(&sbuf);
+    msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
+
+    cnt = 4;
+    if (types && !strcmp(types, "submit")) {
+      cnt++;
+      types = NULL;
+      submit_flag = 1;
+    }
+    msgpack_pack_map(&pk, cnt);
+
+    c = 'i';
+    msgpack_pack_raw(&pk, 1);
+    msgpack_pack_raw_body(&pk, &c, 1);
+    l = strlen(client_id);
+    msgpack_pack_raw(&pk, l);
+    msgpack_pack_raw_body(&pk, client_id, l);
+
+    c = 'q';
+    msgpack_pack_raw(&pk, 1);
+    msgpack_pack_raw_body(&pk, &c, 1);
+    l = strlen(query);
+    msgpack_pack_raw(&pk, l);
+    msgpack_pack_raw_body(&pk, query, l);
+
+    c = 's';
+    msgpack_pack_raw(&pk, 1);
+    msgpack_pack_raw_body(&pk, &c, 1);
+    msgpack_pack_uint64(&pk, millisec);
+
+    c = 'l';
+    msgpack_pack_raw(&pk, 1);
+    msgpack_pack_raw_body(&pk, &c, 1);
+    l = strlen(learn_target_name);
+    msgpack_pack_raw(&pk, l);
+    msgpack_pack_raw_body(&pk, learn_target_name, l);
+
+    if (submit_flag) {
+      c = 't';
+      msgpack_pack_raw(&pk, 1);
+      msgpack_pack_raw_body(&pk, &c, 1);
+      msgpack_pack_true(&pk);
+    }
+    {
+      zmq_msg_t msg;
+      if (!zmq_msg_init_size(&msg, sbuf.size)) {
+        memcpy((void *)zmq_msg_data(&msg), sbuf.data, sbuf.size);
+        if (zmq_send(thd->zmq_sock, &msg, 0)) {
+          print_error("zmq_send() error");
+        }
+        zmq_msg_close(&msg);
+      }
+    }
+    msgpack_sbuffer_destroy(&sbuf);
+  }
+  /* make result */
+  {
+    int content_length;
+    if (callback) {
+      content_length = strlen(callback);
+      evbuffer_add(res_buf, callback, content_length);
+      evbuffer_add(res_buf, "(", 1);
+      content_length += suggest_result(res_buf, types, query, target_name, &(thd->cmd_buf), thd->ctx) + 3;
+      evbuffer_add(res_buf, ");", 2);
+    } else {
+      content_length = suggest_result(res_buf, types, query, target_name, &(thd->cmd_buf), thd->ctx) + 3;
+    }
+    return content_length;
+  }
+}
+
+static void
+cleanup_httpd_thread(thd_data *thd) {
+  if (thd->httpd) {
+    evhttp_free(thd->httpd);
+  }
+  if (thd->zmq_sock) {
+    zmq_close(thd->zmq_sock);
+  }
+  grn_obj_unlink(thd->ctx, &(thd->cmd_buf));
+  if (thd->ctx) {
+    grn_ctx_close(thd->ctx);
+  }
+  event_base_free(thd->base);
+}
+
+static void
+generic_handler(struct evhttp_request *req, void *arg)
+{
+  struct evkeyvalq args;
+  thd_data *thd = arg;
+
+  if (!loop) {
+    event_base_loopexit(thd->base, NULL);
+    return;
+  }
+  if (!req->uri) { return; }
+
+  {
+    char *uri = evhttp_decode_uri(req->uri);
+    evhttp_parse_query(uri, &args);
+    free(uri);
+  }
+
+  {
+    struct evbuffer *res_buf;
+    if (!(res_buf = evbuffer_new())) {
+      err(1, "failed to create response buffer");
+    }
+
+    evhttp_add_header(req->output_headers,
+      "Content-Type", "text/javascript; charset=UTF-8");
+    evhttp_add_header(req->output_headers, "Connection", "close");
+
+    {
+      int content_length = log_send(res_buf, thd, &args);
+      if (content_length >= 0) {
+        char num_buf[16];
+        snprintf(num_buf, 16, "%d", content_length);
+        evhttp_add_header(req->output_headers, "Content-Length", num_buf);
+      }
+    }
+    evhttp_send_reply(req, HTTP_OK, "OK", res_buf);
+    evbuffer_free(res_buf);
+  }
+  evhttp_clear_headers(&args);
+}
+
+static int
+bind_socket(int port)
+{
+  int nfd;
+  if ((nfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+    print_error("cannot open socket for http.");
+    return -1;
+  } else {
+    int r, one = 1;
+    struct sockaddr_in addr;
+
+    r = setsockopt(nfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(int));
+    memset(&addr, 0, sizeof(addr));
+    addr.sin_family = AF_INET;
+    addr.sin_addr.s_addr = INADDR_ANY;
+    addr.sin_port = htons(port);
+
+    if ((r = bind(nfd, (struct sockaddr *)&addr, sizeof(addr))) < 0) {
+      print_error("cannot bind socket for http.");
+      return r;
+    }
+    if ((r = listen(nfd, LISTEN_BACKLOG)) < 0) {
+      print_error("cannot listen socket for http.");
+      return r;
+    }
+    if ((r = fcntl(nfd, F_GETFL, 0)) < 0 || fcntl(nfd, F_SETFL, r | O_NONBLOCK) < 0 ) {
+      print_error("cannot fcntl socket for http.");
+      return -1;
+    }
+    return nfd;
+  }
+}
+
+static void
+signal_handler(int sig)
+{
+  loop = 0;
+}
+
+void
+timeout_handler(int fd, short events, void *arg) {
+  thd_data *thd = arg;
+  if (!loop) {
+    event_base_loopexit(thd->base, NULL);
+  } else {
+    struct timeval tv = {1, 0};
+    evtimer_add(&(thd->pulse), &tv);
+  }
+}
+
+static void *
+dispatch(void *arg)
+{
+  event_base_dispatch((struct event_base *)arg);
+  return NULL;
+}
+
+grn_rc grn_text_ulltoa(grn_ctx *ctx, grn_obj *buf, unsigned long long int i);
+
+static void
+msgpack2json(msgpack_object *o, grn_ctx *ctx, grn_obj *buf)
+{
+  switch (o->type) {
+  case MSGPACK_OBJECT_POSITIVE_INTEGER:
+    grn_text_ulltoa(ctx, buf, o->via.u64);
+    break;
+  case MSGPACK_OBJECT_RAW:
+    grn_text_esc(ctx, buf, o->via.raw.ptr, o->via.raw.size);
+    break;
+  case MSGPACK_OBJECT_ARRAY:
+    GRN_TEXT_PUTC(ctx, buf, '[');
+    {
+      int i;
+      for (i = 0; i < o->via.array.size; i++) {
+        msgpack2json(o->via.array.ptr, ctx, buf);
+      }
+    }
+    GRN_TEXT_PUTC(ctx, buf, ']');
+    break;
+  case MSGPACK_OBJECT_DOUBLE:
+    grn_text_ftoa(ctx, buf, o->via.dec);
+    break;
+  default:
+    print_error("cannot handle this msgpack type.");
+  }
+}
+
+static void
+load_from_learner(msgpack_object *o, grn_ctx *ctx, grn_obj *cmd_buf)
+{
+  if (o->type == MSGPACK_OBJECT_MAP && o->via.map.size) {
+    msgpack_object_kv *kv;
+    kv = &(o->via.map.ptr[0]);
+    if (kv->key.type == MSGPACK_OBJECT_RAW && kv->key.via.raw.size == 6 &&
+        !memcmp(kv->key.via.raw.ptr, CONST_STR_LEN("target"))) {
+      if (kv->val.type == MSGPACK_OBJECT_RAW) {
+        int i;
+        GRN_BULK_REWIND(cmd_buf);
+        GRN_TEXT_PUTS(ctx, cmd_buf, "load --table ");
+        GRN_TEXT_PUT(ctx, cmd_buf, kv->val.via.raw.ptr, kv->val.via.raw.size);
+        grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE);
+        grn_ctx_send(ctx, CONST_STR_LEN("["), GRN_CTX_MORE);
+        if (kv->val.via.raw.size > 5) {
+          if (!memcmp(kv->val.via.raw.ptr, CONST_STR_LEN("item_")) ||
+              !memcmp(kv->val.via.raw.ptr, CONST_STR_LEN("pair_"))) {
+            char delim = '{';
+            GRN_BULK_REWIND(cmd_buf);
+            for (i = 1; i < o->via.map.size; i++) {
+              GRN_TEXT_PUTC(ctx, cmd_buf, delim);
+              kv = &(o->via.map.ptr[i]);
+              msgpack2json(&(kv->key), ctx, cmd_buf);
+              GRN_TEXT_PUTC(ctx, cmd_buf, ':');
+              msgpack2json(&(kv->val), ctx, cmd_buf);
+              delim = ',';
+            }
+            GRN_TEXT_PUTC(ctx, cmd_buf, '}');
+            /* printf("msg: %.*s\n", GRN_TEXT_LEN(cmd_buf), GRN_TEXT_VALUE(cmd_buf)); */
+            grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE);
+          }
+        }
+        grn_ctx_send(ctx, CONST_STR_LEN("]"), 0);
+        {
+          char *res;
+          int flags;
+          unsigned int res_len;
+          grn_ctx_recv(ctx, &res, &res_len, &flags);
+        }
+      }
+    }
+  }
+}
+
+static void
+recv_handler(grn_ctx *ctx, void *zmq_recv_sock, msgpack_zone *mempool, grn_obj *cmd_buf)
+{
+  zmq_msg_t msg;
+
+  if (zmq_msg_init(&msg)) {
+    print_error("cannot init zmq message.");
+  } else {
+    if (zmq_recv(zmq_recv_sock, &msg, 0)) {
+      print_error("cannot recv zmq message.");
+    } else {
+      msgpack_object obj;
+      msgpack_unpack_return ret;
+
+      ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
+      if (MSGPACK_UNPACK_SUCCESS == ret) {
+        load_from_learner(&obj, ctx, cmd_buf);
+      } else {
+        print_error("invalid recv data.");
+      }
+      msgpack_zone_clear(mempool);
+    }
+    zmq_msg_close(&msg);
+  }
+}
+
+static void *
+recv_from_learner(void *arg)
+{
+  void *zmq_recv_sock;
+  recv_thd_data *thd = arg;
+
+  if ((zmq_recv_sock = zmq_socket(thd->zmq_ctx, ZMQ_SUB))) {
+    if (!zmq_connect(zmq_recv_sock, thd->recv_endpoint)) {
+      grn_ctx ctx;
+      if (!grn_ctx_init(&ctx, 0)) {
+        if ((!grn_ctx_use(&ctx, db)/*grn_db_open(&ctx, thd->db_path)*/)) {
+          msgpack_zone *mempool;
+          if ((mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
+            grn_obj cmd_buf;
+            zmq_pollitem_t items[] = {
+              { zmq_recv_sock, 0, ZMQ_POLLIN, 0}
+            };
+            GRN_TEXT_INIT(&cmd_buf, 0);
+            zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
+            while (loop) {
+              zmq_poll(items, 1, 10000);
+              if (items[0].revents & ZMQ_POLLIN) {
+                recv_handler(&ctx, zmq_recv_sock, mempool, &cmd_buf);
+              }
+            }
+            grn_obj_unlink(&ctx, &cmd_buf);
+            msgpack_zone_free(mempool);
+          } else {
+            print_error("cannot create msgpack zone.");
+          }
+          /* db_close */
+        } else {
+          print_error("error in grn_db_open() on recv thread.");
+        }
+        grn_ctx_fin(&ctx);
+      } else {
+        print_error("error in grn_ctx_init() on recv thread.");
+      }
+    } else {
+      print_error("cannot create recv zmq_socket.");
+    }
+  } else {
+    print_error("cannot connect zmq_socket.");
+  }
+  return NULL;
+}
+
+static int
+serve_threads(int nthreads, int port, const char *db_path, void *zmq_ctx,
+              const char *send_endpoint, const char *recv_endpoint)
+{
+  int i, nfd;
+  if ((nfd = bind_socket(port)) < 0) {
+    print_error("cannot bind socket. please check port number with netstat.");
+    return -1;
+  }
+
+  thd_data thds[nthreads];
+  for (i = 0; i < nthreads; i++) {
+    memset(&thds[i], 0, sizeof(thds[i]));
+    if (!(thds[i].base = event_init())) {
+      print_error("error in event_init() on thread %d.", i);
+    } else {
+      if (!(thds[i].httpd = evhttp_new(thds[i].base))) {
+        print_error("error in evhttp_new() on thread %d.", i);
+      } else {
+        int r;
+        if ((r = evhttp_accept_socket(thds[i].httpd, nfd))) {
+          print_error("error in evhttp_accept_socket() on thread %d.", i);
+        } else {
+          if (send_endpoint) {
+            if (!(thds[i].zmq_sock = zmq_socket(zmq_ctx, ZMQ_PUB))) {
+              print_error("cannot create zmq_socket.");
+            } else if (zmq_connect(thds[i].zmq_sock, send_endpoint)) {
+              print_error("cannot connect zmq_socket.");
+              zmq_close(thds[i].zmq_sock);
+              thds[i].zmq_sock = NULL;
+            }
+          } else {
+            thds[i].zmq_sock = NULL;
+          }
+          if (!(thds[i].ctx = grn_ctx_open(0))) {
+            print_error("error in grn_ctx_open() on thread %d.", i);
+          } else if (grn_ctx_use(thds[i].ctx, db)
+/*!(thds[i].db = grn_db_open(thds[i].ctx, db_path))*/) {
+            print_error("error in grn_db_open() on thread %d.", i);
+          } else {
+            GRN_TEXT_INIT(&(thds[i].cmd_buf), 0);
+            evhttp_set_gencb(thds[i].httpd, generic_handler, &thds[i]);
+            evhttp_set_timeout(thds[i].httpd, 10);
+            {
+              struct timeval tv = {1, 0};
+              evtimer_set(&(thds[i].pulse), timeout_handler, &thds[i]);
+              evtimer_add(&(thds[i].pulse), &tv);
+            }
+            if ((r = pthread_create(&(thds[i].thd), NULL, dispatch, thds[i].base))) {
+              print_error("error in pthread_create() on thread %d.", i);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /* recv thread from learner */
+  if (recv_endpoint) {
+    recv_thd_data rthd;
+    rthd.db_path = db_path;
+    rthd.recv_endpoint = recv_endpoint;
+    rthd.zmq_ctx = zmq_ctx;
+
+    if (pthread_create(&(rthd.thd), NULL, recv_from_learner, &rthd)) {
+      print_error("error in pthread_create() on thread %d.", i);
+    }
+    pthread_join(rthd.thd, NULL);
+  } else {
+    while (loop) { sleep(1000); }
+  }
+
+  /* join all httpd thread */
+  for (i = 0; i < nthreads; i++) {
+    if (thds[i].thd) {
+      pthread_join(thds[i].thd, NULL);
+    }
+    cleanup_httpd_thread(&(thds[i]));
+  }
+  return 0;
+}
+
+static uint32_t
+get_core_number(void)
+{
+#ifdef ACTUALLY_GET_CORE_NUMBER
+#ifdef _SC_NPROCESSORS_CONF
+  return sysconf(_SC_NPROCESSORS_CONF);
+#else /* _SC_NPROCESSORS_CONF */
+  int n_processors;
+  size_t length = sizeof(n_processors);
+  int mib[] = {CTL_HW, HW_NCPU};
+  if (sysctl(mib, sizeof(mib) / sizeof(mib[0]),
+             &n_processors, &length, NULL, 0) == 0 &&
+      length == sizeof(n_processors) &&
+      0 < n_processors) {
+    return n_processors;
+  } else {
+    return 1;
+  }
+#endif /* _SC_NPROCESSORS_CONF */
+#endif /* ACTUALLY_GET_CORE_NUMBER */
+  return 0;
+}
+
+static void
+usage(FILE *output)
+{
+  fprintf(output,
+          "Usage: suggest-httpd [options...] db_path\n"
+          "db_path:\n"
+          "  specify groonga database path which is used for suggestion.\n"
+          "options:\n"
+          "  -p <port number>   : http server port number (default: %d)\n"
+          "  -c <thread number> : server thread number (default: %d)\n"
+          "  -s <send endpoint> : send endpoint (ex. tcp://example.com:1234)\n"
+          "  -r <recv endpoint> : recv endpoint (ex. tcp://example.com:1235)\n",
+          DEFAULT_PORT, default_max_threads);
+}
+
+int
+main(int argc, char **argv)
+{
+  int port_no = DEFAULT_PORT;
+  const char *send_endpoint = NULL, *recv_endpoint = NULL;
+
+  /* check environment */
+  {
+    struct rlimit rlim;
+    if (!getrlimit(RLIMIT_NOFILE, &rlim)) {
+      if (rlim.rlim_max < MIN_MAX_FDS) {
+        print_error("too small max fds. `ulimit -n`");
+        return -1;
+      }
+      rlim.rlim_cur = rlim.rlim_cur;
+      setrlimit(RLIMIT_NOFILE, &rlim);
+    }
+  }
+  if (!(default_max_threads = get_core_number())) {
+    default_max_threads = DEFAULT_MAX_THREADS;
+  }
+
+  /* parse options */
+  {
+    int ch;
+    extern char *optarg;
+    extern int optind, opterr;
+
+    while ((ch = getopt(argc, argv, "c:p:s:r:")) != -1) {
+      switch(ch) {
+      case 'c':
+        default_max_threads = atoi(optarg);
+        break;
+      case 'p':
+        port_no = atoi(optarg);
+        break;
+      case 's':
+        send_endpoint = optarg;
+        break;
+      case 'r':
+        recv_endpoint = optarg;
+        break;
+      }
+    }
+    argc -= optind; argv += optind;
+  }
+
+  /* main */
+  if (argc != 1) {
+    usage(stderr);
+  } else {
+    grn_ctx ctx;
+    void *zmq_ctx;
+    grn_init();
+    grn_ctx_init(&ctx, 0);
+    if (!(db = grn_db_open(&ctx, argv[0]))) {
+      /* error! */
+    }
+    if (!(zmq_ctx = zmq_init(1))) {
+      print_error("cannot create zmq context.");
+    } else {
+      signal(SIGTERM, signal_handler);
+      signal(SIGINT, signal_handler);
+      signal(SIGQUIT, signal_handler);
+
+      serve_threads(default_max_threads, port_no, argv[0], zmq_ctx, send_endpoint, recv_endpoint);
+      zmq_term(zmq_ctx);
+    }
+    grn_ctx_fin(&ctx);
+    grn_fin();
+  }
+  return 0;
+}

  Added: src/suggest/suggest-learner.c (+542 -0) 100644
===================================================================
--- /dev/null
+++ src/suggest/suggest-learner.c    2010-12-07 11:25:33 +0000 (e8a9941)
@@ -0,0 +1,542 @@
+/* -*- c-basic-offset: 2 -*- */
+/* Copyright(C) 2010- Brazil
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License version 2.1 as published by the Free Software Foundation.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+#include <zmq.h>
+#include <stdio.h>
+#include <signal.h>
+#include <unistd.h>
+#include <msgpack.h>
+#include <pthread.h>
+#include <groonga/groonga.h>
+
+#define DEFAULT_RECV_ENDPOINT "tcp://*:1234"
+#define DEFAULT_SEND_ENDPOINT "tcp://*:1235"
+
+int print_error(const char *format, ...);
+
+#define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
+
+typedef struct {
+  const char *db_path;
+  const char *send_endpoint;
+  pthread_t thd;
+  void *zmq_ctx;
+} send_thd_data;
+
+static volatile sig_atomic_t loop = 1;
+
+static void
+load_to_groonga(grn_ctx *ctx,
+                grn_obj *buf,
+                const char *query, uint32_t query_len,
+                const char *client_id, uint32_t client_id_len,
+                const char *learn_target_name, uint32_t learn_target_name_len,
+                uint64_t millisec,
+                int submit)
+{
+  GRN_BULK_REWIND(buf);
+  GRN_TEXT_PUTS(ctx, buf, "load --table event_");
+  GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
+  GRN_TEXT_PUTS(ctx, buf, " --each 'suggest_preparer(_id,type,item,sequence,time,pair_");
+  GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
+  GRN_TEXT_PUTS(ctx, buf, ")'");
+  grn_ctx_send(ctx, GRN_TEXT_VALUE(buf), GRN_TEXT_LEN(buf), GRN_CTX_MORE);
+  grn_ctx_send(ctx, CONST_STR_LEN("["), GRN_CTX_MORE);
+
+  GRN_BULK_REWIND(buf);
+  GRN_TEXT_PUTS(ctx, buf, "{\"item\":");
+  grn_text_esc(ctx, buf, query, query_len);
+  GRN_TEXT_PUTS(ctx, buf, ",\"sequence\":");
+  grn_text_esc(ctx, buf, client_id, client_id_len);
+  GRN_TEXT_PUTS(ctx, buf, ",\"time\":");
+  grn_text_ftoa(ctx, buf, (double)millisec / 1000);
+  if (submit) {
+    GRN_TEXT_PUTS(ctx, buf, ",\"type\":\"submit\"}");
+  } else {
+    GRN_TEXT_PUTS(ctx, buf, "}");
+  }
+  /* printf("%.*s\n", GRN_TEXT_LEN(buf), GRN_TEXT_VALUE(buf)); */
+  grn_ctx_send(ctx, GRN_TEXT_VALUE(buf), GRN_TEXT_LEN(buf), GRN_CTX_MORE);
+
+  grn_ctx_send(ctx, CONST_STR_LEN("]"), 0);
+
+  {
+    char *res;
+    int flags;
+    unsigned int res_len;
+    grn_ctx_recv(ctx, &res, &res_len, &flags);
+  }
+}
+
+#define PACK_KEY_FROM_ID(id) \
+{ \
+  int _k_len; \
+  char _k_buf[GRN_TABLE_MAX_KEY_SIZE]; \
+  _k_len = grn_table_get_key(ctx, ref_table, (id), _k_buf, GRN_TABLE_MAX_KEY_SIZE); \
+  msgpack_pack_raw(&pk, _k_len); \
+  msgpack_pack_raw_body(&pk, _k_buf, _k_len); \
+}
+
+#define PACK_MAP_ITEM(col_name) \
+{ \
+  grn_obj _v; \
+  msgpack_pack_raw(&pk, sizeof(#col_name) - 1); \
+  msgpack_pack_raw_body(&pk, CONST_STR_LEN(#col_name)); \
+  switch (col_##col_name->header.type) { \
+  case GRN_COLUMN_FIX_SIZE: \
+    GRN_VALUE_FIX_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
+    break; \
+  case GRN_COLUMN_VAR_SIZE: \
+    if ((col_##col_name->header.flags & GRN_OBJ_COLUMN_TYPE_MASK) == GRN_OBJ_COLUMN_VECTOR) { \
+      GRN_VALUE_FIX_SIZE_INIT(&_v, GRN_OBJ_VECTOR, grn_obj_get_range(ctx, col_##col_name)); \
+    } else { \
+      GRN_VALUE_VAR_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
+    } \
+    break; \
+  } \
+  grn_obj_get_value(ctx, col_##col_name, rec_id, &_v); \
+ \
+  switch (_v.header.type) { \
+  case GRN_BULK: \
+    switch (_v.header.domain) { \
+    case GRN_DB_SHORT_TEXT: \
+      msgpack_pack_raw(&pk, GRN_TEXT_LEN(&_v)); \
+      msgpack_pack_raw_body(&pk, GRN_TEXT_VALUE(&_v), GRN_TEXT_LEN(&_v)); \
+      break; \
+    case GRN_DB_INT32: \
+      msgpack_pack_int32(&pk, GRN_INT32_VALUE(&_v)); \
+      break; \
+    case GRN_DB_UINT32: \
+      msgpack_pack_uint32(&pk, GRN_UINT32_VALUE(&_v)); \
+      break; \
+    case GRN_DB_TIME: \
+      msgpack_pack_double(&pk, (double)GRN_TIME_VALUE(&_v) / GRN_TIME_USEC_PER_SEC); \
+      break; \
+    default: /* ref. to ShortText key */ \
+      PACK_KEY_FROM_ID(GRN_RECORD_VALUE(&_v)); \
+    } \
+    break; \
+  case GRN_UVECTOR: /* ref.s to ShortText key */ \
+    { \
+      grn_id *_idv = (grn_id *)GRN_BULK_HEAD(&_v), *_idve = (grn_id *)GRN_BULK_CURR(&_v); \
+      msgpack_pack_array(&pk, _idve - _idv); \
+      for (; _idv < _idve; _idv++) { \
+        PACK_KEY_FROM_ID(*_idv); \
+      } \
+    } \
+    break; \
+  default: \
+    print_error("invalid groonga object type(%d) for msgpack.", _v.header.type); \
+    msgpack_pack_nil(&pk); \
+    break; \
+  } \
+  grn_obj_close(ctx, &_v); \
+}
+
+static int
+zmq_send_to_httpd(void *zmq_send_sock, void *data, size_t size)
+{
+  zmq_msg_t msg;
+  if (!zmq_msg_init_size(&msg, size)) {
+    memcpy((void *)zmq_msg_data(&msg), data, size);
+    if (zmq_send(zmq_send_sock, &msg, 0)) {
+      print_error("zmq_send() error");
+      return -1;
+    }
+    zmq_msg_close(&msg);
+  } else {
+    print_error("zmq_msg_init_size() error");
+  }
+  return 0;
+}
+
+static void
+send_handler(void *zmq_send_sock, grn_ctx *ctx)
+{
+  grn_table_cursor *cur;
+  if ((cur = grn_table_cursor_open(ctx, grn_ctx_db(ctx), NULL, 0, NULL, 0,
+       0, -1, 0))) {
+    grn_id table_id;
+    while (loop && (table_id = grn_table_cursor_next(ctx, cur)) != GRN_ID_NIL) {
+      grn_obj *table;
+      if ((table = grn_ctx_at(ctx, table_id))) {
+        int name_len;
+        char name_buf[GRN_TABLE_MAX_KEY_SIZE];
+
+        name_len = grn_obj_name(ctx, table, name_buf,
+                                GRN_TABLE_MAX_KEY_SIZE);
+
+        if (name_len > 5) {
+          if (table->header.type == GRN_TABLE_PAT_KEY &&
+              !memcmp(name_buf, CONST_STR_LEN("item_"))) {
+            /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */
+            grn_obj *ref_table;
+            grn_table_cursor *tc;
+            grn_obj *col_last, *col_kana, *col_freq, *col_freq2,
+                    *col_buzz, *col_boost;
+
+            col_kana = grn_obj_column(ctx, table, CONST_STR_LEN("kana"));
+            col_freq = grn_obj_column(ctx, table, CONST_STR_LEN("freq"));
+            col_last = grn_obj_column(ctx, table, CONST_STR_LEN("last"));
+            col_boost = grn_obj_column(ctx, table, CONST_STR_LEN("boost"));
+            col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2"));
+            col_buzz = grn_obj_column(ctx, table, CONST_STR_LEN("buzz"));
+
+            ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_kana));
+
+            if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL,
+                                            0, 0, -1, 0))) {
+              grn_id rec_id;
+              while (loop && (rec_id = grn_table_cursor_next(ctx, tc))
+                     != GRN_ID_NIL) {
+                char *key;
+                size_t key_len;
+                msgpack_packer pk;
+                msgpack_sbuffer sbuf;
+
+                msgpack_sbuffer_init(&sbuf);
+                msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
+
+                msgpack_pack_map(&pk, 8);
+
+                /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */
+                msgpack_pack_raw(&pk, 6);
+                msgpack_pack_raw_body(&pk, CONST_STR_LEN("target"));
+                msgpack_pack_raw(&pk, name_len);
+                msgpack_pack_raw_body(&pk, name_buf, name_len);
+
+                msgpack_pack_raw(&pk, 4);
+                msgpack_pack_raw_body(&pk, CONST_STR_LEN("_key"));
+                key_len = grn_table_cursor_get_key(ctx, tc, (void **)&key);
+                msgpack_pack_raw(&pk, key_len);
+                msgpack_pack_raw_body(&pk, key, key_len);
+
+                PACK_MAP_ITEM(last);
+                PACK_MAP_ITEM(kana);
+                PACK_MAP_ITEM(freq);
+                PACK_MAP_ITEM(freq2);
+                PACK_MAP_ITEM(buzz);
+                PACK_MAP_ITEM(boost);
+
+                zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
+
+                msgpack_sbuffer_destroy(&sbuf);
+              }
+              grn_table_cursor_close(ctx, tc);
+            }
+          } else if (table->header.type == GRN_TABLE_HASH_KEY &&
+                     !memcmp(name_buf, CONST_STR_LEN("pair_"))) {
+            grn_obj *ref_table;
+            grn_table_cursor *tc;
+            grn_obj *col_pre, *col_post, *col_freq0, *col_freq1, *col_freq2;
+
+            col_pre = grn_obj_column(ctx, table, CONST_STR_LEN("pre"));
+            col_post = grn_obj_column(ctx, table, CONST_STR_LEN("post"));
+            col_freq0 = grn_obj_column(ctx, table, CONST_STR_LEN("freq0"));
+            col_freq1 = grn_obj_column(ctx, table, CONST_STR_LEN("freq1"));
+            col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2"));
+
+            ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_pre));
+
+            if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL,
+                                            0, 0, -1, 0))) {
+              grn_id rec_id;
+              while (loop && (rec_id = grn_table_cursor_next(ctx, tc))
+                     != GRN_ID_NIL) {
+                uint64_t *key;
+                msgpack_packer pk;
+                msgpack_sbuffer sbuf;
+
+                /* skip freq0 == 0 && freq1 == 0 && freq2 == 0 */
+                {
+                  grn_obj f;
+                  grn_obj_get_value(ctx, col_freq0, rec_id, &f);
+                  if (!GRN_INT32_VALUE(&f)) {
+                    grn_obj_get_value(ctx, col_freq1, rec_id, &f);
+                    if (!GRN_INT32_VALUE(&f)) {
+                      grn_obj_get_value(ctx, col_freq2, rec_id, &f);
+                      if (!GRN_INT32_VALUE(&f)) { continue; }
+                    }
+                  }
+                }
+
+                /* make pair_* message */
+                msgpack_sbuffer_init(&sbuf);
+                msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
+
+                msgpack_pack_map(&pk, 7);
+                /* ["_key","UInt64"],["pre","item_all"],["post","item_all"],["freq2","Int32"],["freq1","Int32"],["freq0","Int32"] */
+
+                msgpack_pack_raw(&pk, 6);
+                msgpack_pack_raw_body(&pk, CONST_STR_LEN("target"));
+                msgpack_pack_raw(&pk, name_len);
+                msgpack_pack_raw_body(&pk, name_buf, name_len);
+
+                msgpack_pack_raw(&pk, 4);
+                msgpack_pack_raw_body(&pk, CONST_STR_LEN("_key"));
+                grn_table_cursor_get_key(ctx, tc, (void **)&key);
+                msgpack_pack_uint64(&pk, *key);
+
+                PACK_MAP_ITEM(pre);
+                PACK_MAP_ITEM(post);
+                PACK_MAP_ITEM(freq0);
+                PACK_MAP_ITEM(freq1);
+                PACK_MAP_ITEM(freq2);
+
+                zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
+
+                msgpack_sbuffer_destroy(&sbuf);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}
+
+static void *
+send_to_httpd(void *arg)
+{
+  send_thd_data *thd = arg;
+  void *zmq_send_sock;
+  if ((zmq_send_sock = zmq_socket(thd->zmq_ctx, ZMQ_PUB))) {
+    if (!zmq_bind(zmq_send_sock, thd->send_endpoint)) {
+      grn_ctx ctx;
+      if (!(grn_ctx_init(&ctx, 0))) {
+        if ((grn_db_open(&ctx, thd->db_path))) {
+          while (loop) {
+            send_handler(zmq_send_sock, &ctx);
+          }
+        } else {
+          print_error("error in grn_db_open() on send thread.");
+        }
+        grn_ctx_fin(&ctx);
+      } else {
+        print_error("error in grn_ctx_init() on send thread.");
+      }
+    } else {
+      print_error("cannot bind zmq_socket.");
+    }
+  } else {
+    print_error("cannot create zmq_socket.");
+  }
+  return NULL;
+}
+
+static void
+handle_msg(msgpack_object *obj, grn_ctx *ctx, grn_obj *buf)
+{
+  int submit_flag = 0;
+  uint64_t millisec = 0;
+  const char *query = NULL,
+             *client_id = NULL, *learn_target_names = NULL;
+  uint32_t query_len, client_id_len, learn_target_names_len;
+  if (obj->type == MSGPACK_OBJECT_MAP) {
+    int i;
+    for (i = 0; i < obj->via.map.size; i++) {
+      msgpack_object_kv *kv;
+      kv = &(obj->via.map.ptr[i]);
+      if (kv->key.type == MSGPACK_OBJECT_RAW && kv->key.via.raw.size) {
+        switch (kv->key.via.raw.ptr[0]) {
+        case 'i':
+          if (kv->val.type == MSGPACK_OBJECT_RAW) {
+            client_id_len = kv->val.via.raw.size;
+            client_id = kv->val.via.raw.ptr;
+          }
+          break;
+        case 'q':
+          if (kv->val.type == MSGPACK_OBJECT_RAW) {
+            query_len = kv->val.via.raw.size;
+            query = kv->val.via.raw.ptr;
+          }
+          break;
+        case 'l':
+          if (kv->val.type == MSGPACK_OBJECT_RAW) {
+            learn_target_names_len = kv->val.via.raw.size;
+            learn_target_names = kv->val.via.raw.ptr;
+          }
+          break;
+        case 's':
+          if (kv->val.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
+            millisec = kv->val.via.u64;
+          }
+          break;
+        case 't':
+          if (kv->val.type == MSGPACK_OBJECT_BOOLEAN) {
+            submit_flag = (kv->val.via.boolean ? 1 : 0);
+          }
+          break;
+        default:
+          break;
+        }
+      }
+    }
+    if (millisec && query && client_id && learn_target_names) {
+      unsigned int tn_len;
+      const char *tn, *tnp, *tne;
+      tn = tnp = learn_target_names;
+      tne = learn_target_names + learn_target_names_len;
+      while (tnp <= tne) {
+        if (tnp == tne || *tnp == '|') {
+          tn_len = tnp - tn;
+
+          /*
+          printf("sec: %llu query %.*s client_id: %.*s target: %.*s\n",
+            millisec,
+            query_len, query,
+            client_id_len, client_id,
+            tn_len, tn);
+          */
+          load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
+                          tn, tn_len, millisec, submit_flag);
+
+          tn = ++tnp;
+        } else {
+          tnp++;
+        }
+      }
+    }
+  }
+}
+
+static void
+event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
+{
+  grn_obj buf;
+  GRN_TEXT_INIT(&buf, 0);
+  zmq_pollitem_t items[] = {
+    { zmq_sock, 0, ZMQ_POLLIN, 0}
+  };
+  while (loop) {
+    zmq_poll(items, 1, 10000);
+    if (items[0].revents & ZMQ_POLLIN) { /* always true */
+      zmq_msg_t msg;
+      if (zmq_msg_init(&msg)) {
+        print_error("cannot init zmq message.");
+      } else {
+        if (zmq_recv(zmq_sock, &msg, 0)) {
+          print_error("cannot recv zmq message.");
+        } else {
+          msgpack_object obj;
+          msgpack_unpack_return ret;
+          ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
+          if (MSGPACK_UNPACK_SUCCESS == ret) {
+            /* msgpack_object_print(stdout, obj); */
+            handle_msg(&obj, ctx, &buf);
+          }
+          msgpack_zone_clear(mempool);
+        }
+        zmq_msg_close(&msg);
+      }
+    }
+  }
+  grn_obj_unlink(ctx, &buf);
+}
+
+static void
+usage(FILE *output)
+{
+  fprintf(output,
+          "Usage: suggest-learner [options...] db_path\n"
+          "options:\n"
+          "  -r <recv endpoint>: recv endpoint (default: %s)\n"
+          "  -s <send endpoint>: send endpoint (default: %s)\n",
+          DEFAULT_RECV_ENDPOINT, DEFAULT_SEND_ENDPOINT);
+}
+
+static void
+signal_handler(int sig)
+{
+  loop = 0;
+}
+
+int
+main(int argc, char **argv)
+{
+  const char *recv_endpoint = DEFAULT_RECV_ENDPOINT,
+             *send_endpoint = DEFAULT_SEND_ENDPOINT;
+
+  /* parse options */
+  {
+    int ch;
+    extern char *optarg;
+    extern int optind, opterr;
+
+    while ((ch = getopt(argc, argv, "r:s:")) != -1) {
+      switch(ch) {
+      case 'r':
+        recv_endpoint = optarg;
+        break;
+      case 's':
+        send_endpoint = optarg;
+        break;
+      }
+    }
+    argc -= optind; argv += optind;
+  }
+
+  /* main */
+  if (argc != 1) {
+    usage(stderr);
+  } else {
+    grn_ctx *ctx;
+    grn_init();
+    msgpack_zone *mempool;
+
+    signal(SIGTERM, signal_handler);
+    signal(SIGINT, signal_handler);
+    signal(SIGQUIT, signal_handler);
+
+    ctx = grn_ctx_open(0);
+    if (!(grn_db_open(ctx, argv[0]))) {
+      print_error("cannot open database.");
+    } else {
+      if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
+        print_error("cannot create msgpack zone.");
+      } else {
+        void *zmq_ctx, *zmq_recv_sock;
+        if (!(zmq_ctx = zmq_init(1))) {
+          print_error("cannot create zmq context.");
+        } else {
+          if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
+            print_error("cannot create zmq_socket.");
+          } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
+            print_error("cannot bind zmq_socket.");
+          } else {
+            send_thd_data thd;
+            zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
+            thd.db_path = argv[0];
+            thd.send_endpoint = send_endpoint;
+            thd.zmq_ctx = zmq_ctx;
+
+            if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) {
+              print_error("error in pthread_create() for sending datas.");
+            }
+            event_loop(mempool, zmq_recv_sock, ctx);
+            pthread_join(thd.thd, NULL);
+          }
+          zmq_term(zmq_ctx);
+        }
+        msgpack_zone_free(mempool);
+      }
+    }
+    grn_obj_close(ctx, grn_ctx_db(ctx));
+    grn_ctx_fin(ctx);
+    grn_fin();
+  }
+  return 0;
+}

  Added: src/suggest/util.c (+17 -0) 100644
===================================================================
--- /dev/null
+++ src/suggest/util.c    2010-12-07 11:25:33 +0000 (10c820c)
@@ -0,0 +1,17 @@
+#include <stdio.h>
+#include <stdarg.h>
+
+int
+print_error(const char *format, ...)
+{
+  int r;
+  va_list l;
+
+  va_start(l, format);
+  vfprintf(stderr, format, l);
+  r = fprintf(stderr, "\n");
+  fflush(stderr);
+  va_end(l);
+
+  return r;
+}




Groonga-commit メーリングリストの案内
Back to archive index