[Groonga-commit] droonga/fluent-plugin-droonga at 1adacde [master] Call handler from inside the proxy.

Back to archive index

Daijiro MORI null+****@clear*****
Thu Aug 15 23:44:09 JST 2013


Daijiro MORI	2013-08-15 23:44:09 +0900 (Thu, 15 Aug 2013)

  New Revision: 1adacde51970a75a5d2fcb2e7b070a908707d486
  https://github.com/droonga/fluent-plugin-droonga/commit/1adacde51970a75a5d2fcb2e7b070a908707d486

  Message:
    Call handler from inside the proxy.

  Modified files:
    lib/droonga/plugin/handler_proxy.rb
    lib/droonga/proxy.rb

  Modified: lib/droonga/plugin/handler_proxy.rb (+1 -1)
===================================================================
--- lib/droonga/plugin/handler_proxy.rb    2013-08-15 13:18:22 +0900 (5ca24de)
+++ lib/droonga/plugin/handler_proxy.rb    2013-08-15 23:44:09 +0900 (c3327b0)
@@ -30,7 +30,7 @@ module Droonga
     command :proxy
 
     def proxy(request, *arguments)
-      @proxy.handle(request)
+      @proxy.handle(request, arguments)
     end
   end
 end

  Modified: lib/droonga/proxy.rb (+31 -14)
===================================================================
--- lib/droonga/proxy.rb    2013-08-15 13:18:22 +0900 (f1a5c69)
+++ lib/droonga/proxy.rb    2013-08-15 23:44:09 +0900 (99e102e)
@@ -28,7 +28,7 @@ module Droonga
       @local = Regexp.new("^#{@name}")
     end
 
-    def handle(message)
+    def handle(message, arguments)
       case message
       when Array
         handle_incoming_message(message)
@@ -44,7 +44,7 @@ module Droonga
       components = planner.components
       message = { "id" => id, "components" => components }
       destinations.each do |destination, frequency|
-        dispatch(destination, message)
+        dispatch(message, destination)
       end
     end
 
@@ -63,16 +63,16 @@ module Droonga
       collector.handle(message["input"], message["value"])
     end
 
-    def dispatch(destination, message)
+    def dispatch(message, destination)
       if local?(destination)
         handle_internal_message(message)
       else
-        post(farm_path(destination), message)
+        post(message, "to"=>farm_path(destination), "type"=>"proxy")
       end
     end
 
-    def post(route, message)
-      @worker.post(message, "to"=> route, "type"=>"proxy")
+    def post(message, destination)
+      @worker.post(message, destination)
     end
 
     def generate_id
@@ -164,7 +164,7 @@ module Droonga
               "route" => route,
               "component" => component,
               "n_of_inputs" => 0,
-              "values" => []
+              "value" => nil
             }
             tasks << task
             (component["inputs"] || [nil]).each do |input|
@@ -235,13 +235,31 @@ module Droonga
       def handle(name, value)
         tasks = @inputs[name]
         tasks.each do |task|
-          if name
-            task["values"] << value
-            task["n_of_inputs"] += 1
-          end
           component = task["component"]
+          type = component["type"]
+          args = component["args"]
+          command = component["command"]
+          if command
+            message = {
+              "task"=>task,
+              "name"=>name,
+              "value"=>value
+            }
+            #todo: add_route and n_of_expects++ if it would run asynchronously
+            @proxy.post(message, command)
+          else
+            task["value"] ||= {}
+            task["value"][name] ||= []
+            task["value"][name] << value
+          end
+          task["n_of_inputs"] += 1 if name
           return if task["n_of_inputs"] < component["n_of_expects"]
-          result = task["values"]
+          #the task is done
+          result = task["value"]
+          case type
+          when "send"
+            @proxy.post(result, args)
+          end
           component["descendants"].each do |name, indices|
             message = {
               "id" => @id,
@@ -252,13 +270,12 @@ module Droonga
               dest = @components[index]
               routes = dest["routes"]
               routes.each do |route|
-                @proxy.dispatch(route, message)
+                @proxy.dispatch(message, route)
               end
             end
           end
           @n_dones += 1
           @proxy.collectors.delete(@id) if @n_dones ==****@tasks*****
-          p****@proxy*****
         end
       end
     end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index