[Groonga-commit] groonga/fluent-plugin-droonga at 4db3e40 [master] Implement Collector::handle.

Back to archive index

Daijiro MORI null+****@clear*****
Wed Aug 14 22:28:38 JST 2013


Daijiro MORI	2013-08-14 22:28:38 +0900 (Wed, 14 Aug 2013)

  New Revision: 4db3e4008a82506b5896814035e1e191ee2458aa
  https://github.com/groonga/fluent-plugin-droonga/commit/4db3e4008a82506b5896814035e1e191ee2458aa

  Message:
    Implement Collector::handle.

  Modified files:
    lib/droonga/proxy.rb

  Modified: lib/droonga/proxy.rb (+52 -11)
===================================================================
--- lib/droonga/proxy.rb    2013-08-14 10:32:22 +0900 (0688388)
+++ lib/droonga/proxy.rb    2013-08-14 22:28:38 +0900 (92be2cb)
@@ -19,6 +19,7 @@ require 'tsort'
 
 module Droonga
   class Proxy
+    attr_reader :collectors
     def initialize(worker, name)
       @worker = worker
       @name = name
@@ -38,7 +39,7 @@ module Droonga
 
     def handle_incoming_message(message)
       id = generate_id
-      @planner = Planner.new(message)
+      @planner = Planner.new(self, message)
       destinations =****@plann*****(id)
       message = {
         "id" => id,
@@ -51,21 +52,33 @@ module Droonga
 
     def dispatch(destination, message)
       if destination =~ @local
-        @collectors[message["id"]] =****@plann*****_collector(@local)
+        if message["input"]
+          collector = @collectors[message["id"]]
+          collector.handle(message["input"], message["value"])
+        else
+          collector =****@plann*****_collector(message["id"], @local)
+          @collectors[message["id"]] = collector
+          collector.handle(nil, nil)
+        end
       else
-        post(destination, message)
+        destination =~ /\A.*:\d+\/[^\.]+/
+        post($&, message)
       end
     end
 
     def handle_internal_message(message)
       components = message["components"]
-      @planner = Planner.new(components) unless @planner
+      @planner = Planner.new(self, components) unless @planner
       if message["input"]
         # received a piece of result
+        collector = @collectors[message["id"]]
+        collector.handle(message["input"], message["value"])
       else
         # received a query
         # what if @collectors[message["id"]] ?
-        @collectors[message["id"]] =****@plann*****_collector(@local)
+        collector =****@plann*****_collector(message["id"], @local)
+        @collectors[message["id"]] = collector
+        collector.handle(nil, nil)
       end
     end
 
@@ -98,7 +111,8 @@ module Droonga
       end
 
       include TSort
-      def initialize(components)
+      def initialize(proxy, components)
+        @proxy = proxy
         @components = components
       end
 
@@ -139,7 +153,7 @@ module Droonga
         return destinations
       end
 
-      def get_collector(local)
+      def get_collector(id, local)
         resolve_descendants
         tasks = []
         inputs = {}
@@ -149,15 +163,17 @@ module Droonga
             task = {
               "route" => route,
               "component" => component,
-              "n_of_inputs" => 0
+              "n_of_inputs" => 0,
+              "values" => []
             }
+            tasks << task
             (component["inputs"] || [nil]).each do |input|
               inputs[input] ||= []
               inputs[input] << task
             end
           end
         end
-        Collector.new(@components, tasks, inputs)
+        Collector.new(id, @proxy, @components, tasks, inputs)
       end
 
       def resolve_descendants
@@ -206,16 +222,41 @@ module Droonga
     end
 
     class Collector
-      def initialize(components, tasks, inputs)
+      def initialize(id, proxy, components, tasks, inputs)
+        @id = id
+        @proxy = proxy
         @components = components
         @tasks = tasks
+        @n_dones = 0
         @inputs = inputs
       end
 
       def handle(name, value)
         tasks = @inputs[name]
         tasks.each do |task|
-          # todo
+          if name
+            task["values"] << value
+            task["n_of_inputs"] += 1
+          end
+          component = task["component"]
+          return if task["n_of_inputs"] < component["n_of_expects"]
+          result = task["values"]
+          component["descendants"].each do |name, indices|
+            message = {
+              "id" => @id,
+              "input" => name,
+              "value" => result
+            }
+            indices.each do |index|
+              dest = @components[index]
+              routes = dest["routes"]
+              routes.each do |route|
+                @proxy.dispatch(route, message)
+              end
+            end
+          end
+          @n_dones += 1
+          @proxy.collectors.delete(@id) if @n_dones ==****@tasks*****
         end
       end
     end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index