[Groonga-commit] droonga/fluent-plugin-droonga at 2b8b956 [master] Build distributed command messages by the DistributedCommandPlanner

Back to archive index

YUKI Hiroshi null+****@clear*****
Wed Jan 29 17:49:15 JST 2014


YUKI Hiroshi	2014-01-29 17:49:15 +0900 (Wed, 29 Jan 2014)

  New Revision: 2b8b9560566fff1d686137d7469daa80a2539034
  https://github.com/droonga/fluent-plugin-droonga/commit/2b8b9560566fff1d686137d7469daa80a2539034

  Message:
    Build distributed command messages by the DistributedCommandPlanner

  Added files:
    lib/droonga/distributed_command_planner.rb
  Modified files:
    lib/droonga/distributor_plugin.rb
    lib/droonga/plugin/distributor/crud.rb

  Added: lib/droonga/distributed_command_planner.rb (+100 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/distributed_command_planner.rb    2014-01-29 17:49:15 +0900 (41cd9a8)
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+module Droonga
+  class DistributedCommandPlanner
+    attr_accessor :key
+    attr_reader :outputs
+
+    def initialize(source_message)
+      @source_message = source_message
+
+      @key = nil
+      @outputs = []
+
+      @reducers = []
+      @gatherers = []
+      @processors = []
+
+      plan_errors_handling
+    end
+
+    def messages
+      @reducers + @gatherers + @processors
+    end
+
+    def reduce(name, reducer)
+      @reducers << {
+        "type"    => "reduce",
+        "body"    => {
+          name => {
+            "#{name}_reduced" => reducer,
+          },
+        },
+        "inputs"  => [name],
+        "outputs" => ["#{name}_reduced"],
+      }
+
+      @gatherers << {
+        "type"   => "gather",
+        "body"   => {
+          "#{name}_reduced" => {
+            "output" => name,
+          },
+        },
+        "inputs" => ["#{name}_reduced"],
+        "post"   => true,
+      }
+    end
+
+    def scatter_all
+      raise MessageProcessingError.new("missing key") unless @key
+      @processors << {
+        "command" => @source_message["type"],
+        "dataset" => @source_message["dataset"],
+        "body"    => @source_message["body"],
+        "key"     => @key,
+        "type"    => "scatter",
+        "outputs" => @outputs + ["errors"],
+        "replica" => "all",
+        "post"    => true
+      }
+    end
+
+    def broadcast_all
+      @processors << {
+        "command" => @source_message["type"],
+        "dataset" => @source_message["dataset"],
+        "body"    => @source_message["body"],
+        "type"    => "broadcast",
+        "outputs" => @outputs + ["errors"],
+        "replica" => "all",
+        "post"    => true
+      }
+    end
+
+    private
+    #XXX Now, we include definitions to merge errors in the body.
+    #    However, this makes the term "errors" reserved, so plugins
+    #    cannot use their custom "errors" in the body.
+    #    This must be rewritten. 
+    def plan_errors_handling
+      @outputs << "errors"
+      reduce("errors", "type" => "sum", "limit" => -1)
+    end
+  end
+end

  Modified: lib/droonga/distributor_plugin.rb (+1 -69)
===================================================================
--- lib/droonga/distributor_plugin.rb    2014-01-29 17:18:54 +0900 (ceb77bf)
+++ lib/droonga/distributor_plugin.rb    2014-01-29 17:49:15 +0900 (b5c149d)
@@ -16,6 +16,7 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 require "droonga/plugin"
+require "droonga/distributed_command_planner"
 
 module Droonga
   class DistributorPlugin < Plugin
@@ -30,16 +31,6 @@ module Droonga
       @distributor.distribute(messages)
     end
 
-    def scatter_all(message, key)
-      messages = [reducer(message), gatherer(message), scatterer(message, key)]
-      distribute(messages)
-    end
-
-    def broadcast_all(message)
-      messages = [reducer(message), gatherer(message), broadcaster(message)]
-      distribute(messages)
-    end
-
     private
     def process_error(command, error, arguments)
       if error.is_a?(MessageProcessingError)
@@ -48,64 +39,5 @@ module Droonga
         super
       end
     end
-
-    #XXX Now, default scatterer/broadcaster/reducer/gatherer includes
-    #    definitions to merge errors in the body. However, this makes
-    #    the term "errors" reserved, so plugins cannot use their custom
-    #    "errors" in the body. This must be rewritten. 
-
-    def scatterer(message, key)
-      {
-        "command" => message["type"],
-        "dataset" => message["dataset"],
-        "body"    => message["body"],
-        "key"     => key,
-        "type"    => "scatter",
-        "outputs" => ["errors"],
-        "replica" => "all",
-        "post"    => true
-      }
-    end
-
-    def broadcaster(message)
-      {
-        "command" => message["type"],
-        "dataset" => message["dataset"],
-        "body"    => message["body"],
-        "type"    => "broadcast",
-        "outputs" => ["errors"],
-        "replica" => "all",
-        "post"    => true
-      }
-    end
-
-    def reducer(message)
-      {
-        "type"    => "reduce",
-        "body"    => {
-          "errors" => {
-            "errors_reduced" => {
-              "type" => "sum",
-              "limit" => -1,
-            },
-          },
-        },
-        "inputs"  => ["errors"],
-        "outputs" => ["errors_reduced"],
-      }
-    end
-
-    def gatherer(message)
-      {
-        "type"   => "gather",
-        "body"   => {
-          "errors_reduced" => {
-            "output" => "errors",
-          },
-        },
-        "inputs" => ["errors_reduced"],
-        "post"   => true,
-      }
-    end
   end
 end

  Modified: lib/droonga/plugin/distributor/crud.rb (+10 -31)
===================================================================
--- lib/droonga/plugin/distributor/crud.rb    2014-01-29 17:18:54 +0900 (f4c471e)
+++ lib/droonga/plugin/distributor/crud.rb    2014-01-29 17:49:15 +0900 (127371c)
@@ -23,49 +23,28 @@ module Droonga
 
     command :add
     def add(message)
-      key = message["body"]["key"] || rand.to_s
-      scatter_all(message, key)
+      scatter_all(message)
     end
 
     command :update
     def update(message)
-      key = message["body"]["key"] || rand.to_s
-      scatter_all(message, key)
+      scatter_all(message)
     end
 
     # TODO: What is this?
     command :reset
     def reset(message)
-      key = message["body"]["key"] || rand.to_s
-      scatter_all(message, key)
+      scatter_all(message)
     end
 
     private
-    def scatterer(message, key)
-      scatterer = super
-      scatterer["outputs"] << "success"
-      scatterer
-    end
-
-    def reducer(message)
-      reducer = super
-      reducer["body"]["success"] = {
-        "success_reduced" => {
-          "type" => "and",
-        },
-      }
-      reducer["inputs"] << "success"
-      reducer["outputs"] << "success_reduced"
-      reducer
-    end
-
-    def gatherer(message)
-      gatherer = super
-      gatherer["body"]["success_reduced"] = {
-        "output" => "success",
-      }
-      gatherer["inputs"] << "success_reduced"
-      gatherer
+    def scatter_all(message)
+      planner = DistributedCommandPlanner.new(message)
+      planner.key = message["body"]["key"] || rand.to_s
+      planner.outputs << "success"
+      planner.reduce("success", "type" => "and")
+      planner.scatter_all
+      distribute(planner.messages)
     end
   end
 end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index