[Groonga-commit] droonga/fluent-plugin-droonga at 289c22a [master] Divide collector.rb from dispatcher.rb.

Back to archive index

Daijiro MORI null+****@clear*****
Wed Nov 20 15:43:32 JST 2013


Daijiro MORI	2013-11-20 15:43:32 +0900 (Wed, 20 Nov 2013)

  New Revision: 289c22aa5c0946874229bdbb07791f4ada6cc867
  https://github.com/droonga/fluent-plugin-droonga/commit/289c22aa5c0946874229bdbb07791f4ada6cc867

  Message:
    Divide collector.rb from dispatcher.rb.

  Added files:
    lib/droonga/collector.rb
  Modified files:
    lib/droonga/dispatcher.rb

  Added: lib/droonga/collector.rb (+116 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/collector.rb    2013-11-20 15:43:32 +0900 (b773bbb)
@@ -0,0 +1,116 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/handler"
+
+module Droonga
+  class Collector
+    def initialize(id, dispatcher, components, tasks, inputs)
+      @id = id
+      @dispatcher = dispatcher
+      @components = components
+      @tasks = tasks
+      @n_dones = 0
+      @inputs = inputs
+    end
+
+    def handle(name, value)
+      tasks = @inputs[name]
+      unless tasks
+        #TODO: result arrived before its query
+        return
+      end
+      tasks.each do |task|
+        task["n_of_inputs"] += 1 if name
+        component = task["component"]
+        type = component["type"]
+        command = component["command"] || ("collector_" + type)
+        n_of_expects = component["n_of_expects"]
+        synchronous = nil
+        if command
+          # TODO: should be controllable for each command respectively.
+          synchronous = !n_of_expects.zero?
+          # TODO: check if asynchronous execution is available.
+          message = {
+            "task"=>task,
+            "name"=>name,
+            "value"=>value
+          }
+          unless synchronous
+            descendants = {}
+            component["descendants"].each do |name, indices|
+              descendants[name] = indices.collect do |index|
+                @components[index]["routes"].map do |route|
+                  @dispatcher.farm_path(route)
+                end
+              end
+            end
+            message["descendants"] = descendants
+            message["id"] = @id
+          end
+          @dispatcher.deliver(@id, task["route"], message, command, synchronous)
+        end
+        return if task["n_of_inputs"] < n_of_expects
+        #the task is done
+        if synchronous
+          result = task["values"]
+          post = component["post"]
+          @dispatcher.post(result, post) if post
+          component["descendants"].each do |name, indices|
+            message = {
+              "id" => @id,
+              "input" => name,
+              "value" => result[name]
+            }
+            indices.each do |index|
+              @components[index]["routes"].each do |route|
+                @dispatcher.dispatch(message, route)
+              end
+            end
+          end
+        end
+        @n_dones += 1
+        @dispatcher.collectors.delete(@id) if @n_dones ==****@tasks*****
+      end
+    end
+  end
+
+  class CollectorHandler < Droonga::Handler
+    attr_reader :task, :input_name, :component, :output_values, :body, :output_names
+    def handle(command, request, *arguments)
+      return false unless request.is_a? Hash
+      @task = request["task"]
+      return false unles****@task*****_a? Hash
+      @component = @task["component"]
+      return false unles****@compo*****_a? Hash
+      @output_values = @task["values"]
+      @body = @component["body"]
+      @output_names = @component["outputs"]
+      @id = request["id"]
+      @value = request["value"]
+      @input_name = request["name"]
+      @descendants = request["descendants"]
+      invoke(command, @value, *arguments)
+      output if @descendants
+      true
+    end
+
+    def prefer_synchronous?(command)
+      return true
+    end
+  end
+end

  Modified: lib/droonga/dispatcher.rb (+1 -96)
===================================================================
--- lib/droonga/dispatcher.rb    2013-11-20 15:21:40 +0900 (6b328ab)
+++ lib/droonga/dispatcher.rb    2013-11-20 15:43:32 +0900 (9c2e97a)
@@ -19,6 +19,7 @@ require 'tsort'
 require "droonga/handler"
 require "droonga/adapter"
 require "droonga/catalog"
+require "droonga/collector"
 
 module Droonga
   class Dispatcher
@@ -250,77 +251,6 @@ module Droonga
         end
       end
     end
-
-    class Collector
-      def initialize(id, dispatcher, components, tasks, inputs)
-        @id = id
-        @dispatcher = dispatcher
-        @components = components
-        @tasks = tasks
-        @n_dones = 0
-        @inputs = inputs
-      end
-
-      def handle(name, value)
-        tasks = @inputs[name]
-        unless tasks
-          #TODO: result arrived before its query
-          return
-        end
-        tasks.each do |task|
-          task["n_of_inputs"] += 1 if name
-          component = task["component"]
-          type = component["type"]
-          command = component["command"] || ("collector_" + type)
-          n_of_expects = component["n_of_expects"]
-          synchronous = nil
-          if command
-            # TODO: should be controllable for each command respectively.
-            synchronous = !n_of_expects.zero?
-            # TODO: check if asynchronous execution is available.
-            message = {
-              "task"=>task,
-              "name"=>name,
-              "value"=>value
-            }
-            unless synchronous
-              descendants = {}
-              component["descendants"].each do |name, indices|
-                descendants[name] = indices.collect do |index|
-                  @components[index]["routes"].map do |route|
-                    @dispatcher.farm_path(route)
-                  end
-                end
-              end
-              message["descendants"] = descendants
-              message["id"] = @id
-            end
-            @dispatcher.deliver(@id, task["route"], message, command, synchronous)
-          end
-          return if task["n_of_inputs"] < n_of_expects
-          #the task is done
-          if synchronous
-            result = task["values"]
-            post = component["post"]
-            @dispatcher.post(result, post) if post
-            component["descendants"].each do |name, indices|
-              message = {
-                "id" => @id,
-                "input" => name,
-                "value" => result[name]
-              }
-              indices.each do |index|
-                @components[index]["routes"].each do |route|
-                  @dispatcher.dispatch(message, route)
-                end
-              end
-            end
-          end
-          @n_dones += 1
-          @dispatcher.collectors.delete(@id) if @n_dones ==****@tasks*****
-        end
-      end
-    end
   end
 
   class DispatcherMessageHandler < Droonga::Handler
@@ -343,29 +273,4 @@ module Droonga
       return true
     end
   end
-
-  class CollectorHandler < Droonga::Handler
-    attr_reader :task, :input_name, :component, :output_values, :body, :output_names
-    def handle(command, request, *arguments)
-      return false unless request.is_a? Hash
-      @task = request["task"]
-      return false unles****@task*****_a? Hash
-      @component = @task["component"]
-      return false unles****@compo*****_a? Hash
-      @output_values = @task["values"]
-      @body = @component["body"]
-      @output_names = @component["outputs"]
-      @id = request["id"]
-      @value = request["value"]
-      @input_name = request["name"]
-      @descendants = request["descendants"]
-      invoke(command, @value, *arguments)
-      output if @descendants
-      true
-    end
-
-    def prefer_synchronous?(command)
-      return true
-    end
-  end
 end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index