[Groonga-commit] groonga/groonga at 95d2b3f [master] Support streaming dump

Back to archive index

Kouhei Sutou null+****@clear*****
Tue Jan 6 23:20:50 JST 2015


Kouhei Sutou	2015-01-06 23:20:50 +0900 (Tue, 06 Jan 2015)

  New Revision: 95d2b3f12444ac351c8884556ac335fbb8dc67f1
  https://github.com/groonga/groonga/commit/95d2b3f12444ac351c8884556ac335fbb8dc67f1

  Message:
    Support streaming dump
    
    Currently, "groonga --protocol http" only supports it.

  Modified files:
    include/groonga/groonga.h
    lib/ctx.c
    lib/proc.c
    src/groonga.c

  Modified: include/groonga/groonga.h (+1 -0)
===================================================================
--- include/groonga/groonga.h    2015-01-05 23:06:34 +0900 (d1c9347)
+++ include/groonga/groonga.h    2015-01-06 23:20:50 +0900 (684cf1e)
@@ -1302,6 +1302,7 @@ GRN_API void grn_output_envelope(grn_ctx *ctx, grn_rc rc,
                                  grn_obj *head, grn_obj *body, grn_obj *foot,
                                  const char *file, int line);
 
+GRN_API void grn_ctx_output_flush(grn_ctx *ctx, int flags);
 GRN_API void grn_ctx_output_array_open(grn_ctx *ctx,
                                        const char *name, int nelements);
 GRN_API void grn_ctx_output_array_close(grn_ctx *ctx);

  Modified: lib/ctx.c (+12 -0)
===================================================================
--- lib/ctx.c    2015-01-05 23:06:34 +0900 (7de01f6)
+++ lib/ctx.c    2015-01-06 23:20:50 +0900 (565cc27)
@@ -3003,6 +3003,18 @@ grn_set_term_handler(void)
 }
 
 void
+grn_ctx_output_flush(grn_ctx *ctx, int flags)
+{
+  if (flags & GRN_CTX_QUIET) {
+    return;
+  }
+  if (!ctx->impl->output) {
+    return;
+  }
+  ctx->impl->output(ctx, 0, ctx->impl->data.ptr);
+}
+
+void
 grn_ctx_output_array_open(grn_ctx *ctx, const char *name, int nelements)
 {
   grn_output_array_open(ctx, ctx->impl->outbuf, ctx->impl->output_type,

  Modified: lib/proc.c (+6 -0)
===================================================================
--- lib/proc.c    2015-01-05 23:06:34 +0900 (359a53a)
+++ lib/proc.c    2015-01-06 23:20:50 +0900 (e13ebb9)
@@ -2629,6 +2629,8 @@ exit :
   return NULL;
 }
 
+static const size_t DUMP_FLUSH_THRESHOLD_SIZE = 256 * 1024;
+
 static void
 dump_name(grn_ctx *ctx, grn_obj *outbuf, const char *name, int name_len)
 {
@@ -2966,6 +2968,9 @@ dump_records(grn_ctx *ctx, grn_obj *outbuf, grn_obj *table)
       }
     }
     GRN_TEXT_PUTC(ctx, outbuf, ']');
+    if (GRN_TEXT_LEN(outbuf) >= DUMP_FLUSH_THRESHOLD_SIZE) {
+      grn_ctx_output_flush(ctx, 0);
+    }
   }
   GRN_TEXT_PUTS(ctx, outbuf, "\n]\n");
   GRN_TEXT_PUT(ctx, outbuf, GRN_TEXT_VALUE(&delete_commands),
@@ -3217,6 +3222,7 @@ proc_dump(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data)
   ctx->impl->output_type = GRN_CONTENT_NONE;
   ctx->impl->mime_type = "text/x-groonga-command-list";
   dump_schema(ctx, outbuf);
+  grn_ctx_output_flush(ctx, 0);
   /* To update index columns correctly, we first create the whole schema, then
      load non-derivative records, while skipping records of index columns. That
      way, groonga will silently do the job of updating index columns for us. */

  Modified: src/groonga.c (+217 -77)
===================================================================
--- src/groonga.c    2015-01-05 23:06:34 +0900 (da3f853)
+++ src/groonga.c    2015-01-06 23:20:50 +0900 (e27bd72)
@@ -649,109 +649,245 @@ start_service(grn_ctx *ctx, const char *db_path,
 
 typedef struct {
   grn_msg *msg;
+  grn_bool in_body;
+  grn_bool is_chunked;
 } ht_context;
 
 static void
-h_output(grn_ctx *ctx, int flags, void *arg)
+h_output_set_header(grn_ctx *ctx, grn_obj *header,
+                    grn_rc rc, long long int content_length)
 {
-  grn_rc expr_rc = ctx->rc;
-  ht_context *hc = (ht_context *)arg;
-  grn_sock fd = hc->msg->u.fd;
-  grn_obj header, head, foot, *outbuf = ctx->impl->outbuf;
-  grn_bool should_return_body = (hc->msg->header.qtype == 'G');
-  if (!(flags & GRN_CTX_TAIL)) { return; }
-  GRN_TEXT_INIT(&header, 0);
-  GRN_TEXT_INIT(&head, 0);
-  GRN_TEXT_INIT(&foot, 0);
-  output_envelope(ctx, expr_rc, &head, outbuf, &foot);
-  switch (expr_rc) {
+  switch (rc) {
   case GRN_SUCCESS :
-    GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 200 OK\r\n");
+    GRN_TEXT_SETS(ctx, header, "HTTP/1.1 200 OK\r\n");
     break;
   case GRN_INVALID_ARGUMENT :
   case GRN_SYNTAX_ERROR :
-    GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 400 Bad Request\r\n");
+    GRN_TEXT_SETS(ctx, header, "HTTP/1.1 400 Bad Request\r\n");
     break;
   case GRN_NO_SUCH_FILE_OR_DIRECTORY :
-    GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 404 Not Found\r\n");
+    GRN_TEXT_SETS(ctx, header, "HTTP/1.1 404 Not Found\r\n");
     break;
   default :
-    GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 500 Internal Server Error\r\n");
+    GRN_TEXT_SETS(ctx, header, "HTTP/1.1 500 Internal Server Error\r\n");
     break;
   }
-  GRN_TEXT_PUTS(ctx, &header, "Connection: close\r\n");
-  GRN_TEXT_PUTS(ctx, &header, "Content-Type: ");
-  GRN_TEXT_PUTS(ctx, &header, grn_ctx_get_mime_type(ctx));
-  GRN_TEXT_PUTS(ctx, &header, "\r\nContent-Length: ");
-  grn_text_lltoa(ctx, &header,
-                 GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot));
-  GRN_TEXT_PUTS(ctx, &header, "\r\n\r\n");
-  {
-    ssize_t ret, len;
+  GRN_TEXT_PUTS(ctx, header, "Content-Type: ");
+  GRN_TEXT_PUTS(ctx, header, grn_ctx_get_mime_type(ctx));
+  GRN_TEXT_PUTS(ctx, header, "\r\n");
+  if (content_length >= 0) {
+    GRN_TEXT_PUTS(ctx, header, "Connection: close\r\n");
+    GRN_TEXT_PUTS(ctx, header, "Content-Length: ");
+    grn_text_lltoa(ctx, header, content_length);
+    GRN_TEXT_PUTS(ctx, header, "\r\n");
+  } else {
+    GRN_TEXT_PUTS(ctx, header, "Transfer-Encoding: chunked\r\n");
+  }
+  GRN_TEXT_PUTS(ctx, header, "\r\n");
+}
+
+static void
+h_output_send(grn_ctx *ctx, grn_sock fd,
+              grn_obj *header, grn_obj *head, grn_obj *body, grn_obj *foot)
+{
+  ssize_t ret;
+  ssize_t len = 0;
 #ifdef WIN32
-    int n_buffers;
-    WSABUF wsabufs[4];
-    wsabufs[0].buf = GRN_TEXT_VALUE(&header);
-    wsabufs[0].len = GRN_TEXT_LEN(&header);
-    n_buffers = 1;
-    len = GRN_TEXT_LEN(&header);
-    if (should_return_body) {
-      wsabufs[1].buf = GRN_TEXT_VALUE(&head);
-      wsabufs[1].len = GRN_TEXT_LEN(&head);
-      wsabufs[2].buf = GRN_TEXT_VALUE(outbuf);
-      wsabufs[2].len = GRN_TEXT_LEN(outbuf);
-      wsabufs[3].buf = GRN_TEXT_VALUE(&foot);
-      wsabufs[3].len = GRN_TEXT_LEN(&foot);
-      n_buffers += 3;
-      len += GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot);
-    }
-    {
-      DWORD sent;
-      if (WSASend(fd, wsabufs, n_buffers, &sent, 0, NULL, NULL) == SOCKET_ERROR) {
-        SERR("WSASend");
-      }
-      ret = sent;
+  int n_buffers = 0;
+  WSABUF wsabufs[4];
+  if (header) {
+    wsabufs[n_buffers].buf = GRN_TEXT_VALUE(header);
+    wsabufs[n_buffers].len = GRN_TEXT_LEN(header);
+    len += GRN_TEXT_LEN(header);
+    n_buffers++;
+  }
+  if (head) {
+    wsabufs[n_buffers].buf = GRN_TEXT_VALUE(head);
+    wsabufs[n_buffers].len = GRN_TEXT_LEN(head);
+    len += GRN_TEXT_LEN(head);
+    n_buffers++;
+  }
+  if (body) {
+    wsabufs[n_buffers].buf = GRN_TEXT_VALUE(body);
+    wsabufs[n_buffers].len = GRN_TEXT_LEN(body);
+    len += GRN_TEXT_LEN(body);
+    n_buffers++;
+  }
+  if (foot) {
+    wsabufs[n_buffers].buf = GRN_TEXT_VALUE(foot);
+    wsabufs[n_buffers].len = GRN_TEXT_LEN(foot);
+    len += GRN_TEXT_LEN(foot);
+    n_buffers++;
+  }
+  {
+    DWORD sent;
+    if (WSASend(fd, wsabufs, n_buffers, &sent, 0, NULL, NULL) == SOCKET_ERROR) {
+      SERR("WSASend");
     }
+    ret = sent;
+  }
 #else /* WIN32 */
-    struct iovec msg_iov[4];
-    struct msghdr msg;
-    msg.msg_name = NULL;
-    msg.msg_namelen = 0;
-    msg.msg_iov = msg_iov;
-    msg.msg_iovlen = 1;
-    msg.msg_control = NULL;
-    msg.msg_controllen = 0;
-    msg.msg_flags = 0;
-    msg_iov[0].iov_base = GRN_TEXT_VALUE(&header);
-    msg_iov[0].iov_len = GRN_TEXT_LEN(&header);
-    len = GRN_TEXT_LEN(&header);
-    if (should_return_body) {
-      msg_iov[1].iov_base = GRN_TEXT_VALUE(&head);
-      msg_iov[1].iov_len = GRN_TEXT_LEN(&head);
-      msg_iov[2].iov_base = GRN_TEXT_VALUE(outbuf);
-      msg_iov[2].iov_len = GRN_TEXT_LEN(outbuf);
-      msg_iov[3].iov_base = GRN_TEXT_VALUE(&foot);
-      msg_iov[3].iov_len = GRN_TEXT_LEN(&foot);
-      msg.msg_iovlen += 3;
-      len += GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot);
-    }
-    if ((ret = sendmsg(fd, &msg, MSG_NOSIGNAL)) == -1) {
-      SERR("sendmsg");
-    }
+  struct iovec msg_iov[4];
+  struct msghdr msg;
+  msg.msg_name = NULL;
+  msg.msg_namelen = 0;
+  msg.msg_iov = msg_iov;
+  msg.msg_iovlen = 0;
+  msg.msg_control = NULL;
+  msg.msg_controllen = 0;
+  msg.msg_flags = 0;
+
+  if (header) {
+    msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(header);
+    msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(header);
+    len += GRN_TEXT_LEN(header);
+    msg.msg_iovlen++;
+  }
+  if (head) {
+    msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(head);
+    msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(head);
+    len += GRN_TEXT_LEN(head);
+    msg.msg_iovlen++;
+  }
+  if (body) {
+    msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(body);
+    msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(body);
+    len += GRN_TEXT_LEN(body);
+    msg.msg_iovlen++;
+  }
+  if (foot) {
+    msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(foot);
+    msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(foot);
+    len += GRN_TEXT_LEN(foot);
+    msg.msg_iovlen++;
+  }
+  if ((ret = sendmsg(fd, &msg, MSG_NOSIGNAL)) == -1) {
+    SERR("sendmsg");
+  }
 #endif /* WIN32 */
-    if (ret != len) {
-      GRN_LOG(&grn_gctx, GRN_LOG_NOTICE,
-              "couldn't send all data (%" GRN_FMT_LLD "/%" GRN_FMT_LLD ")",
-              (long long int)ret, (long long int)len);
+  if (ret != len) {
+    GRN_LOG(&grn_gctx, GRN_LOG_NOTICE,
+            "couldn't send all data (%" GRN_FMT_LLD "/%" GRN_FMT_LLD ")",
+            (long long int)ret, (long long int)len);
+  }
+}
+
+static void
+h_output_raw(grn_ctx *ctx, int flags, ht_context *hc)
+{
+  grn_rc expr_rc = ctx->rc;
+  grn_sock fd = hc->msg->u.fd;
+  grn_obj header_;
+  grn_obj head_;
+  grn_obj body_;
+  grn_obj foot_;
+  grn_obj *header = NULL;
+  grn_obj *head = NULL;
+  grn_obj *body = NULL;
+  grn_obj *foot = NULL;
+  char *chunk = NULL;
+  unsigned int chunk_size = 0;
+  int recv_flags;
+  grn_bool is_last_message = (flags & GRN_CTX_TAIL);
+
+  GRN_TEXT_INIT(&header_, 0);
+  GRN_TEXT_INIT(&head_, 0);
+  GRN_TEXT_INIT(&body_, GRN_OBJ_DO_SHALLOW_COPY);
+  GRN_TEXT_INIT(&foot_, 0);
+
+  grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags);
+  GRN_TEXT_SET(ctx, &body_, chunk, chunk_size);
+
+  if (!hc->in_body) {
+    if (is_last_message) {
+      h_output_set_header(ctx, &header_, expr_rc, GRN_TEXT_LEN(&body_));
+      hc->is_chunked = GRN_FALSE;
+    } else {
+      h_output_set_header(ctx, &header_, expr_rc, -1);
+      hc->is_chunked = GRN_TRUE;
+    }
+    header = &header_;
+    hc->in_body = GRN_TRUE;
+  }
+
+  if (GRN_TEXT_LEN(&body_) > 0) {
+    if (hc->is_chunked) {
+      grn_text_printf(ctx, &head_,
+                      "%x\r\n", (unsigned int)GRN_TEXT_LEN(&body_));
+      head = &head_;
+      GRN_TEXT_PUTS(ctx, &foot_, "\r\n");
+      foot = &foot_;
+    }
+    body = &body_;
+  }
+
+  if (is_last_message) {
+    if (hc->is_chunked) {
+      GRN_TEXT_PUTS(ctx, &foot_, "0\r\n");
+      GRN_TEXT_PUTS(ctx, &foot_, "Connection: close\r\n");
+      GRN_TEXT_PUTS(ctx, &foot_, "\r\n");
+      foot = &foot_;
     }
   }
-  GRN_BULK_REWIND(outbuf);
+
+  h_output_send(ctx, fd, header, head, body, foot);
+
+  GRN_OBJ_FIN(ctx, &foot_);
+  GRN_OBJ_FIN(ctx, &body_);
+  GRN_OBJ_FIN(ctx, &head_);
+  GRN_OBJ_FIN(ctx, &header_);
+}
+
+static void
+h_output_typed(grn_ctx *ctx, int flags, ht_context *hc)
+{
+  grn_rc expr_rc = ctx->rc;
+  grn_sock fd = hc->msg->u.fd;
+  grn_obj header, head, body, foot;
+  char *chunk = NULL;
+  unsigned int chunk_size = 0;
+  int recv_flags;
+  grn_bool should_return_body = (hc->msg->header.qtype == 'G');
+
+  if (!(flags & GRN_CTX_TAIL)) { return; }
+
+  GRN_TEXT_INIT(&header, 0);
+  GRN_TEXT_INIT(&head, 0);
+  GRN_TEXT_INIT(&body, GRN_OBJ_DO_SHALLOW_COPY);
+  GRN_TEXT_INIT(&foot, 0);
+
+  grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags);
+  GRN_TEXT_SET(ctx, &body, chunk, chunk_size);
+
+  output_envelope(ctx, expr_rc, &head, &body, &foot);
+  h_output_set_header(ctx, &header, expr_rc,
+                      GRN_TEXT_LEN(&head) +
+                      GRN_TEXT_LEN(&body) +
+                      GRN_TEXT_LEN(&foot));
+  if (should_return_body) {
+    h_output_send(ctx, fd, &header, &head, &body, &foot);
+  } else {
+    h_output_send(ctx, fd, &header, NULL, NULL, NULL);
+  }
   GRN_OBJ_FIN(ctx, &foot);
+  GRN_OBJ_FIN(ctx, &body);
   GRN_OBJ_FIN(ctx, &head);
   GRN_OBJ_FIN(ctx, &header);
 }
 
 static void
+h_output(grn_ctx *ctx, int flags, void *arg)
+{
+  ht_context *hc = (ht_context *)arg;
+
+  if (grn_ctx_get_output_type(ctx) == GRN_CONTENT_NONE) {
+    h_output_raw(ctx, flags, hc);
+  } else {
+    h_output_typed(ctx, flags, hc);
+  }
+}
+
+static void
 do_htreq_get(grn_ctx *ctx, grn_msg *msg)
 {
   char *path = NULL;
@@ -982,6 +1118,8 @@ do_htreq_post(grn_ctx *ctx, grn_msg *msg)
   if (ctx->rc != GRN_SUCCESS) {
     ht_context context;
     context.msg = msg;
+    context.in_body = GRN_FALSE;
+    context.is_chunked = GRN_FALSE;
     h_output(ctx, GRN_CTX_TAIL, &context);
     return;
   }
@@ -1698,6 +1836,8 @@ h_worker(void *arg)
     nfthreads--;
     MUTEX_UNLOCK(q_mutex);
     hc.msg = (grn_msg *)msg;
+    hc.in_body = GRN_FALSE;
+    hc.is_chunked = GRN_FALSE;
     do_htreq(ctx, (grn_msg *)msg);
     MUTEX_LOCK(q_mutex);
   } while (nfthreads < max_nfthreads && grn_gctx.stat != GRN_CTX_QUIT);
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index