[Groonga-commit] droonga/fluent-plugin-droonga at 34cae77 [master] Support asynchronous command execution.

Back to archive index

Daijiro MORI null+****@clear*****
Mon Aug 19 15:05:50 JST 2013


Daijiro MORI	2013-08-19 15:05:50 +0900 (Mon, 19 Aug 2013)

  New Revision: 34cae7731529f07badfc6917e0e77ce85aedbda3
  https://github.com/droonga/fluent-plugin-droonga/commit/34cae7731529f07badfc6917e0e77ce85aedbda3

  Message:
    Support asynchronous command execution.

  Modified files:
    lib/droonga/proxy.rb
    lib/droonga/worker.rb

  Modified: lib/droonga/proxy.rb (+58 -17)
===================================================================
--- lib/droonga/proxy.rb    2013-08-18 15:06:09 +0900 (435b140)
+++ lib/droonga/proxy.rb    2013-08-19 15:05:50 +0900 (eb9a974)
@@ -240,31 +240,51 @@ module Droonga
           component = task["component"]
           type = component["type"]
           command = component["command"] || ("proxy_" + type)
+          n_of_expects = component["n_of_expects"]
+          synchronous = nil
           if command
+            # TODO: should be controllable for each command respectively.
+            synchronous = !n_of_expects.zero?
             message = {
               "task"=>task,
               "name"=>name,
               "value"=>value
             }
-            #todo: add_route and n_of_expects++ if it would run asynchronously
-            @proxy.post(message, command)
+            unless synchronous
+              descendants = {}
+              component["descendants"].each do |name, indices|
+                descendants[name] = indices.collect do |index|
+                  dest = @components[index]
+                  dest["routes"].map do |route|
+                    @proxy.farm_path(route)
+                  end
+                end
+              end
+              message["descendants"] = descendants
+              message["id"] = @id
+            end
+            @proxy.post(message,
+                        "type" => command,
+                        "synchronous"=> synchronous)
           end
-          return if task["n_of_inputs"] < component["n_of_expects"]
+          return if task["n_of_inputs"] < n_of_expects
           #the task is done
-          result = task["values"]
-          post = component["post"]
-          @proxy.post(result, post) if post
-          component["descendants"].each do |name, indices|
-            message = {
-              "id" => @id,
-              "input" => name,
-              "value" => result[name]
-            }
-            indices.each do |index|
-              dest = @components[index]
-              routes = dest["routes"]
-              routes.each do |route|
-                @proxy.dispatch(message, route)
+          if synchronous
+            result = task["values"]
+            post = component["post"]
+            @proxy.post(result, post) if post
+            component["descendants"].each do |name, indices|
+              message = {
+                "id" => @id,
+                "input" => name,
+                "value" => result[name]
+              }
+              indices.each do |index|
+                dest = @components[index]
+                routes = dest["routes"]
+                routes.each do |route|
+                  @proxy.dispatch(message, route)
+                end
               end
             end
           end
@@ -301,7 +321,10 @@ module Droonga
       @output_names = @component["outputs"]
       @body = @component["body"]
       @output_values = @task["values"]
+      @descendants = request["descendants"]
+      @id = request["id"]
       super(command, request["value"], *arguments)
+      output if @descendants
     end
 
     def emit(value, name = nil)
@@ -316,6 +339,24 @@ module Droonga
       @task["values"][name] = value
     end
 
+    def output
+      result = @task["values"]
+      post = component["post"]
+      post(result, post) if post
+      @descendants.each do |name, dests|
+        message = {
+          "id" => @id,
+          "input" => name,
+          "value" => result[name]
+        }
+        dests.each do |routes|
+          routes.each do |route|
+            post(message, "to"=>route, "type"=>"proxy")
+          end
+        end
+      end
+    end
+
     def prefer_synchronous?(command)
       return true
     end

  Modified: lib/droonga/worker.rb (+3 -3)
===================================================================
--- lib/droonga/worker.rb    2013-08-18 15:06:09 +0900 (2f04d6f)
+++ lib/droonga/worker.rb    2013-08-19 15:05:50 +0900 (ae9110a)
@@ -152,9 +152,9 @@ module Droonga
       return unless output
       if command
         message = envelope
-        message[:body] = body
-        message[:type] = command
-        message[:arguments] = arguments
+        message["body"] = body
+        message["type"] = command
+        message["arguments"] = arguments
       else
         message = {
           inReplyTo: envelope["id"],
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index