null+****@clear*****
null+****@clear*****
2011年 2月 17日 (木) 19:55:36 JST
Tasuku SUENAGA a.k.a. gunyarakun 2011-02-17 10:55:36 +0000 (Thu, 17 Feb 2011)
New Revision: 5ecc3e31b01e9666628bfe6fa59e7f2dd994a1eb
Log:
Added log load function to suggest learner.
Modified files:
src/suggest/Makefile.am
src/suggest/groonga_suggest_httpd.c
src/suggest/groonga_suggest_learner.c
src/suggest/util.c
src/suggest/util.h
Modified: src/suggest/Makefile.am (+5 -0)
===================================================================
--- src/suggest/Makefile.am 2011-02-17 03:35:07 +0000 (02f1bec)
+++ src/suggest/Makefile.am 2011-02-17 10:55:36 +0000 (787978f)
@@ -27,11 +27,13 @@ DEFAULT_INCLUDES = \
groonga_suggest_learner_SOURCES = groonga_suggest_learner.c
groonga_suggest_learner_CFLAGS = \
$(AM_CFLAGS) \
+ $(LIBEVENT_CFLAGS) \
$(LIBZMQ_CFLAGS) \
$(MESSAGE_PACK_CFLAGS)
groonga_suggest_learner_LDADD = \
libutil.la \
$(top_builddir)/lib/libgroonga.la \
+ $(LIBEVENT_LIBS) \
$(LIBZMQ_LIBS) \
$(MESSAGE_PACK_LIBS)
@@ -55,3 +57,6 @@ groonga_suggest_create_dataset_LDADD = \
$(top_builddir)/lib/libgroonga.la
libutil_la_SOURCES = util.c util.h
+libutil_la_CFLAGS = \
+ $(AM_CFLAGS) \
+ $(LIBEVENT_CFLAGS)
Modified: src/suggest/groonga_suggest_httpd.c (+7 -47)
===================================================================
--- src/suggest/groonga_suggest_httpd.c 2011-02-17 03:35:07 +0000 (e46802a)
+++ src/suggest/groonga_suggest_httpd.c 2011-02-17 10:55:36 +0000 (3747d43)
@@ -79,17 +79,6 @@ static uint32_t default_max_threads = DEFAULT_MAX_THREADS;
static volatile sig_atomic_t loop = 1;
static grn_obj *db;
-static uint64_t
-atouint64_t(const char *s)
-{
- uint64_t r;
- for (r = 0; *s; s++) {
- r *= 10;
- r += (*s - '0');
- }
- return r;
-}
-
static int
suggest_result(struct evbuffer *res_buf, const char *types, const char *query, const char *target_name, grn_obj *cmd_buf, grn_ctx *ctx)
{
@@ -121,42 +110,13 @@ suggest_result(struct evbuffer *res_buf, const char *types, const char *query, c
static int
log_send(struct evbuffer *res_buf, thd_data *thd, struct evkeyvalq *get_args)
{
- uint64_t millisec = 0;
- const char *callback = NULL, *types = NULL, *query = NULL,
- *client_id = NULL, *target_name = NULL,
- *learn_target_name = NULL;
- struct evkeyval *get;
-
- TAILQ_FOREACH(get, get_args, next) {
- switch(get->key[0]) {
- case 't':
- /* TODO: check types */
- types = get->value;
- break;
- case 'i':
- client_id = get->value;
- case 'c':
- if (!strcmp(get->key, "callback")) {
- callback = get->value;
- }
- break;
- case 'q':
- query = get->value;
- break;
- case 's':
- millisec = atouint64_t(get->value);
- break;
- case 'n':
- /* TODO: check target_name */
- target_name = get->value;
- break;
- case 'l':
- learn_target_name = get->value;
- break;
- default:
- break;
- }
- }
+ uint64_t millisec;
+ const char *callback, *types, *query, *client_id, *target_name,
+ *learn_target_name;
+
+ parse_keyval(get_args, &query, &types, &client_id, &target_name,
+ &learn_target_name, &callback, &millisec);
+
/* send data to learn client */
if (thd->zmq_sock && millisec && client_id && query && learn_target_name) {
char c;
Modified: src/suggest/groonga_suggest_learner.c (+123 -49)
===================================================================
--- src/suggest/groonga_suggest_learner.c 2011-02-17 03:35:07 +0000 (705fb3e)
+++ src/suggest/groonga_suggest_learner.c 2011-02-17 10:55:36 +0000 (e966b9a)
@@ -24,6 +24,8 @@
#include "util.h"
+#include <evhttp.h>
+
#define DEFAULT_RECV_ENDPOINT "tcp://*:1234"
#define DEFAULT_SEND_ENDPOINT "tcp://*:1235"
#define SEND_WAIT 1000 /* 0.001sec */
@@ -82,6 +84,43 @@ load_to_groonga(grn_ctx *ctx,
}
}
+void
+load_to_multi_targets(grn_ctx *ctx,
+ grn_obj *buf,
+ const char *query, uint32_t query_len,
+ const char *client_id, uint32_t client_id_len,
+ const char *learn_target_names,
+ uint32_t learn_target_names_len,
+ uint64_t millisec,
+ int submit)
+{
+ if (millisec && query && client_id && learn_target_names) {
+ unsigned int tn_len;
+ const char *tn, *tnp, *tne;
+ tn = tnp = learn_target_names;
+ tne = learn_target_names + learn_target_names_len;
+ while (tnp <= tne) {
+ if (tnp == tne || *tnp == '|') {
+ tn_len = tnp - tn;
+
+ /*
+ printf("sec: %llu query %.*s client_id: %.*s target: %.*s\n",
+ millisec,
+ query_len, query,
+ client_id_len, client_id,
+ tn_len, tn);
+ */
+ load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
+ tn, tn_len, millisec, submit);
+
+ tn = ++tnp;
+ } else {
+ tnp++;
+ }
+ }
+ }
+}
+
#define PACK_KEY_FROM_ID(id) \
{ \
int _k_len; \
@@ -397,36 +436,15 @@ handle_msg(msgpack_object *obj, grn_ctx *ctx, grn_obj *buf)
}
}
}
- if (millisec && query && client_id && learn_target_names) {
- unsigned int tn_len;
- const char *tn, *tnp, *tne;
- tn = tnp = learn_target_names;
- tne = learn_target_names + learn_target_names_len;
- while (tnp <= tne) {
- if (tnp == tne || *tnp == '|') {
- tn_len = tnp - tn;
-
- /*
- printf("sec: %llu query %.*s client_id: %.*s target: %.*s\n",
- millisec,
- query_len, query,
- client_id_len, client_id,
- tn_len, tn);
- */
- load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
- tn, tn_len, millisec, submit_flag);
-
- tn = ++tnp;
- } else {
- tnp++;
- }
- }
- }
+ load_to_multi_targets(ctx, buf, query, query_len,
+ client_id, client_id_len,
+ learn_target_names, learn_target_names_len,
+ millisec, submit_flag);
}
}
static void
-event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
+recv_event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
{
grn_obj buf;
zmq_pollitem_t items[] = {
@@ -459,6 +477,51 @@ event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
grn_obj_unlink(ctx, &buf);
}
+#define MAX_LOG_LENGTH 0x2000
+
+static void
+load_log(grn_ctx *ctx, const char *log_file_name)
+{
+ FILE *fp;
+ if ((fp = fopen(log_file_name, "r"))) {
+ grn_obj buf;
+ char line_buf[MAX_LOG_LENGTH];
+
+ GRN_TEXT_INIT(&buf, 0);
+ while (fgets(line_buf, MAX_LOG_LENGTH, fp)) {
+ if (strrchr(line_buf, '\n')) {
+ uint64_t millisec;
+ struct evkeyvalq get_args;
+ const char *callback, *types, *query, *client_id, *target_name,
+ *learn_target_name;
+ {
+ char *uri = evhttp_decode_uri(line_buf);
+ evhttp_parse_query(uri, &get_args);
+ free(uri);
+ }
+ parse_keyval(&get_args, &query, &types, &client_id, &target_name,
+ &learn_target_name, &callback, &millisec);
+
+ if (query && client_id && learn_target_name && millisec) {
+ load_to_multi_targets(ctx, &buf, query, strlen(query),
+ client_id, strlen(client_id),
+ learn_target_name, strlen(learn_target_name),
+ millisec,
+ types && !strcmp(types, "submit"));
+ }
+
+ evhttp_clear_headers(&get_args);
+ } else {
+ while (1) {
+ int c = fgetc(fp);
+ if (c == '\n' || c == EOF) { break; }
+ }
+ }
+ }
+ grn_obj_close(ctx, &buf);
+ }
+}
+
static void
usage(FILE *output)
{
@@ -467,6 +530,7 @@ usage(FILE *output)
"options:\n"
" -r <recv endpoint>: recv endpoint (default: %s)\n"
" -s <send endpoint>: send endpoint (default: %s)\n"
+ " -l <logfile> : load from log file of webserver.\n"
" -d : daemonize\n",
DEFAULT_RECV_ENDPOINT, DEFAULT_SEND_ENDPOINT);
}
@@ -482,13 +546,14 @@ main(int argc, char **argv)
{
int daemon = 0;
const char *recv_endpoint = DEFAULT_RECV_ENDPOINT,
- *send_endpoint = DEFAULT_SEND_ENDPOINT;
+ *send_endpoint = DEFAULT_SEND_ENDPOINT,
+ *load_logfile_name = NULL;
/* parse options */
{
int ch;
- while ((ch = getopt(argc, argv, "r:s:d")) != -1) {
+ while ((ch = getopt(argc, argv, "r:s:dl:")) != -1) {
switch(ch) {
case 'r':
recv_endpoint = optarg;
@@ -499,6 +564,9 @@ main(int argc, char **argv)
case 'd':
daemon = 1;
break;
+ case 'l':
+ load_logfile_name = optarg;
+ break;
}
}
argc -= optind; argv += optind;
@@ -525,33 +593,39 @@ main(int argc, char **argv)
if (!(grn_db_open(ctx, argv[0]))) {
print_error("cannot open database.");
} else {
- if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
- print_error("cannot create msgpack zone.");
+ if (load_logfile_name) {
+ /* loading log mode */
+ load_log(ctx, load_logfile_name);
} else {
- void *zmq_ctx, *zmq_recv_sock;
- if (!(zmq_ctx = zmq_init(1))) {
- print_error("cannot create zmq context.");
+ /* zeromq/msgpack recv mode */
+ if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
+ print_error("cannot create msgpack zone.");
} else {
- if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
- print_error("cannot create zmq_socket.");
- } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
- print_error("cannot bind zmq_socket.");
+ void *zmq_ctx, *zmq_recv_sock;
+ if (!(zmq_ctx = zmq_init(1))) {
+ print_error("cannot create zmq context.");
} else {
- send_thd_data thd;
- zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
- thd.db_path = argv[0];
- thd.send_endpoint = send_endpoint;
- thd.zmq_ctx = zmq_ctx;
-
- if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) {
- print_error("error in pthread_create() for sending datas.");
+ if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
+ print_error("cannot create zmq_socket.");
+ } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
+ print_error("cannot bind zmq_socket.");
+ } else {
+ send_thd_data thd;
+ zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
+ thd.db_path = argv[0];
+ thd.send_endpoint = send_endpoint;
+ thd.zmq_ctx = zmq_ctx;
+
+ if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) {
+ print_error("error in pthread_create() for sending datas.");
+ }
+ recv_event_loop(mempool, zmq_recv_sock, ctx);
+ pthread_join(thd.thd, NULL);
}
- event_loop(mempool, zmq_recv_sock, ctx);
- pthread_join(thd.thd, NULL);
+ zmq_term(zmq_ctx);
}
- zmq_term(zmq_ctx);
+ msgpack_zone_free(mempool);
}
- msgpack_zone_free(mempool);
}
}
grn_obj_close(ctx, grn_ctx_db(ctx));
Modified: src/suggest/util.c (+62 -1)
===================================================================
--- src/suggest/util.c 2011-02-17 03:35:07 +0000 (c21f4c7)
+++ src/suggest/util.c 2011-02-17 10:55:36 +0000 (5ea5ede)
@@ -15,14 +15,18 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdio.h>
+#include <string.h>
+#include <stdint.h>
#include <stdarg.h>
#include <unistd.h>
-#include <sys/types.h>
#include <sys/wait.h>
+#include <sys/queue.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
+#include "util.h"
+
int
print_error(const char *format, ...)
{
@@ -74,3 +78,60 @@ daemonize(void)
}
return 1;
}
+
+static uint64_t
+atouint64_t(const char *s)
+{
+ uint64_t r;
+ for (r = 0; *s; s++) {
+ r *= 10;
+ r += (*s - '0');
+ }
+ return r;
+}
+
+void
+parse_keyval(struct evkeyvalq *get_args,
+ const char **query, const char **types,
+ const char **client_id, const char **target_name,
+ const char **learn_target_name,
+ const char **callback,
+ uint64_t *millisec)
+{
+ struct evkeyval *get;
+
+ *query = *types = *client_id = *target_name =
+ *learn_target_name = *callback = NULL;
+ *millisec = 0;
+
+ TAILQ_FOREACH(get, get_args, next) {
+ switch(get->key[0]) {
+ case 'q':
+ *query = get->value;
+ break;
+ case 't':
+ /* TODO: check types */
+ *types = get->value;
+ break;
+ case 'i':
+ *client_id = get->value;
+ case 'c':
+ if (!strcmp(get->key, "callback")) {
+ *callback = get->value;
+ }
+ break;
+ case 's':
+ *millisec = atouint64_t(get->value);
+ break;
+ case 'n':
+ /* TODO: check target_name */
+ *target_name = get->value;
+ break;
+ case 'l':
+ *learn_target_name = get->value;
+ break;
+ default:
+ break;
+ }
+ }
+}
Modified: src/suggest/util.h (+10 -0)
===================================================================
--- src/suggest/util.h 2011-02-17 03:35:07 +0000 (1983481)
+++ src/suggest/util.h 2011-02-17 10:55:36 +0000 (e7e18b2)
@@ -17,7 +17,17 @@
#ifndef GRN_SUGGEST_UTIL_H
#define GRN_SUGGEST_UTIL_H
+#include <sys/queue.h>
+#include <event.h>
+#include <stdint.h>
+
int print_error(const char *format, ...);
int daemonize(void);
+void parse_keyval(struct evkeyvalq *get_args,
+ const char **query, const char **types,
+ const char **client_id, const char **target_name,
+ const char **learn_target_name,
+ const char **callback,
+ uint64_t *millisec);
#endif /* GRN_SUGGEST_UTIL_H */