[Groonga-commit] droonga/fluent-plugin-droonga at a92af0f [master] Support random broadcast

Back to archive index

YUKI Hiroshi null+****@clear*****
Wed Jan 29 18:20:06 JST 2014


YUKI Hiroshi	2014-01-29 18:20:06 +0900 (Wed, 29 Jan 2014)

  New Revision: a92af0fd9e5235da0a38b1d61f9fd292701c1b64
  https://github.com/droonga/fluent-plugin-droonga/commit/a92af0fd9e5235da0a38b1d61f9fd292701c1b64

  Message:
    Support random broadcast

  Modified files:
    lib/droonga/distributed_command_planner.rb

  Modified: lib/droonga/distributed_command_planner.rb (+53 -30)
===================================================================
--- lib/droonga/distributed_command_planner.rb    2014-01-29 17:54:55 +0900 (41cd9a8)
+++ lib/droonga/distributed_command_planner.rb    2014-01-29 18:20:06 +0900 (477ad7e)
@@ -17,13 +17,14 @@
 
 module Droonga
   class DistributedCommandPlanner
-    attr_accessor :key
+    attr_accessor :key, :dataset
     attr_reader :outputs
 
     def initialize(source_message)
       @source_message = source_message
 
       @key = nil
+      @dataset = nil
       @outputs = []
 
       @reducers = []
@@ -38,56 +39,78 @@ module Droonga
     end
 
     def reduce(name, reducer)
-      @reducers << {
-        "type"    => "reduce",
-        "body"    => {
-          name => {
-            "#{name}_reduced" => reducer,
-          },
-        },
-        "inputs"  => [name],
-        "outputs" => ["#{name}_reduced"],
-      }
-
-      @gatherers << {
-        "type"   => "gather",
-        "body"   => {
-          "#{name}_reduced" => {
-            "output" => name,
-          },
-        },
-        "inputs" => ["#{name}_reduced"],
-        "post"   => true,
-      }
+      @reducers << reducer_message("reduce", name, reducer)
+      @gatherers << gatherer_message("gather", name)
     end
 
-    def scatter_all
+    def scatter_all(body=nil)
       raise MessageProcessingError.new("missing key") unless @key
       @processors << {
         "command" => @source_message["type"],
-        "dataset" => @source_message["dataset"],
-        "body"    => @source_message["body"],
+        "dataset" => @dataset || @source_message["dataset"],
+        "body"    => body || @source_message["body"],
         "key"     => @key,
         "type"    => "scatter",
-        "outputs" => @outputs + ["errors"],
+        "outputs" => @outputs,
         "replica" => "all",
         "post"    => true
       }
     end
 
-    def broadcast_all
+    def broadcast_all(body=nil)
       @processors << {
         "command" => @source_message["type"],
-        "dataset" => @source_message["dataset"],
-        "body"    => @source_message["body"],
+        "dataset" => @dataset || @source_message["dataset"],
+        "body"    => body || @source_message["body"],
         "type"    => "broadcast",
-        "outputs" => @outputs + ["errors"],
+        "outputs" => @outputs,
         "replica" => "all",
         "post"    => true
       }
     end
 
+    def broadcast_at_random(body=nil)
+      @processors << {
+        "command" => @source_message["type"],
+        "dataset" => @dataset || @source_message["dataset"],
+        "body"    => body || @source_message["body"],
+        "type"    => "broadcast",
+        "outputs" => @outputs,
+        "replica" => "random",
+      }
+    end
+
     private
+    def reducer_message(command, name, reducer)
+      {
+        "type"    => command,
+        "body"    => {
+          name => {
+            output_name(name) => reducer,
+          },
+        },
+        "inputs"  => [name],
+        "outputs" => [output_name(name)],
+      }
+    end
+
+    def gatherer_message(command, name)
+      {
+        "type"   => command,
+        "body"   => {
+          output_name(name) => {
+            "output" => name,
+          },
+        },
+        "inputs" => [output_name(name)],
+        "post"   => true,
+      }
+    end
+
+    def output_name(name)
+      "#{name}_reduced"
+    end
+
     #XXX Now, we include definitions to merge errors in the body.
     #    However, this makes the term "errors" reserved, so plugins
     #    cannot use their custom "errors" in the body.
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index