[Groonga-commit] droonga/fluent-plugin-droonga at 135d196 [master] Extract distribution code from dispatcher

Back to archive index

Kouhei Sutou null+****@clear*****
Tue Dec 17 23:57:01 JST 2013


Kouhei Sutou	2013-12-17 23:57:01 +0900 (Tue, 17 Dec 2013)

  New Revision: 135d196ca77ae0524daeb43bf585e352d4f31dc1
  https://github.com/droonga/fluent-plugin-droonga/commit/135d196ca77ae0524daeb43bf585e352d4f31dc1

  Message:
    Extract distribution code from dispatcher

  Added files:
    lib/droonga/distribution_planner.rb
  Modified files:
    lib/droonga/dispatcher.rb
    lib/droonga/distributor.rb

  Modified: lib/droonga/dispatcher.rb (+0 -78)
===================================================================
--- lib/droonga/dispatcher.rb    2013-12-17 23:42:36 +0900 (17938c3)
+++ lib/droonga/dispatcher.rb    2013-12-17 23:57:01 +0900 (624eaf0)
@@ -116,24 +116,11 @@ module Droonga
 
     def handle(message, arguments)
       case message
-      when Array
-        handle_incoming_message(message)
       when Hash
         handle_internal_message(message)
       end
     end
 
-    def handle_incoming_message(message)
-      id = generate_id
-      planner = Planner.new(self, message)
-      destinations = planner.resolve(id)
-      components = planner.components
-      message = { "id" => id, "components" => components }
-      destinations.each do |destination, frequency|
-        dispatch(message, destination)
-      end
-    end
-
     def handle_internal_message(message)
       id = message["id"]
       collector = @collectors[id]
@@ -214,64 +201,12 @@ module Droonga
 
     class Planner
       attr_reader :components
-      class UndefinedInputError < StandardError
-        attr_reader :input
-        def initialize(input)
-          @input = input
-          super("undefined input assigned: <#{input}>")
-        end
-      end
 
-      class CyclicComponentsError < StandardError
-        attr_reader :components
-        def initialize(components)
-          @components = components
-          super("cyclic components found: <#{components}>")
-        end
-      end
-
-      include TSort
       def initialize(dispatcher, components)
         @dispatcher = dispatcher
         @components = components
       end
 
-      def resolve(id)
-        @dependency = {}
-        @components.each do |component|
-          @dependency[component] = component["inputs"]
-          next unless component["outputs"]
-          component["outputs"].each do |output|
-            @dependency[output] = [component]
-          end
-        end
-        @components = []
-        each_strongly_connected_component do |cs|
-          raise CyclicComponentsError.new(cs) if cs.size > 1
-          @components.concat(cs) unless cs.first.is_a? String
-        end
-        resolve_routes(id)
-      end
-
-      def resolve_routes(id)
-        local = [id]
-        destinations = Hash.new(0)
-        @components.each do |component|
-          dataset = component["dataset"]
-          routes =
-            if dataset
-              Droonga.catalog.get_routes(dataset, component)
-            else
-              local
-            end
-          routes.each do |route|
-            destinations[@dispatcher.farm_path(route)] += 1
-          end
-          component["routes"] = routes
-        end
-        return destinations
-      end
-
       def get_collector(id)
         resolve_descendants
         tasks = []
@@ -326,19 +261,6 @@ module Droonga
         end
         descendants
       end
-
-      def tsort_each_node(&block)
-        @dependency.each_key(&block)
-      end
-
-      def tsort_each_child(node, &block)
-        if node.is_a? String and @dependency[node].nil?
-          raise UndefinedInputError.new(node)
-        end
-        if @dependency[node]
-          @dependency[node].each(&block)
-        end
-      end
     end
   end
 end

  Added: lib/droonga/distribution_planner.rb (+96 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/distribution_planner.rb    2013-12-17 23:57:01 +0900 (f79d6fe)
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "tsort"
+
+module Droonga
+  class DistributionPlanner
+    class UndefinedInputError < StandardError
+      attr_reader :input
+      def initialize(input)
+        @input = input
+        super("undefined input assigned: <#{input}>")
+      end
+    end
+
+    class CyclicComponentsError < StandardError
+      attr_reader :components
+      def initialize(components)
+        @components = components
+        super("cyclic components found: <#{components}>")
+      end
+    end
+
+    include TSort
+
+    attr_reader :components
+    def initialize(dispatcher, components)
+      @dispatcher = dispatcher
+      @components = components
+    end
+
+    def resolve(id)
+      @dependency = {}
+      @components.each do |component|
+        @dependency[component] = component["inputs"]
+        next unless component["outputs"]
+        component["outputs"].each do |output|
+          @dependency[output] = [component]
+        end
+      end
+      @components = []
+      each_strongly_connected_component do |cs|
+        raise CyclicComponentsError.new(cs) if cs.size > 1
+        @components.concat(cs) unless cs.first.is_a? String
+      end
+      resolve_routes(id)
+    end
+
+    private
+    def resolve_routes(id)
+      local = [id]
+      destinations = Hash.new(0)
+      @components.each do |component|
+        dataset = component["dataset"]
+        routes =
+          if dataset
+            Droonga.catalog.get_routes(dataset, component)
+          else
+            local
+          end
+        routes.each do |route|
+          destinations[@dispatcher.farm_path(route)] += 1
+        end
+        component["routes"] = routes
+      end
+      return destinations
+    end
+
+    def tsort_each_node(&block)
+      @dependency.each_key(&block)
+    end
+
+    def tsort_each_child(node, &block)
+      if node.is_a? String and @dependency[node].nil?
+        raise UndefinedInputError.new(node)
+      end
+      if @dependency[node]
+        @dependency[node].each(&block)
+      end
+    end
+  end
+end

  Modified: lib/droonga/distributor.rb (+9 -1)
===================================================================
--- lib/droonga/distributor.rb    2013-12-17 23:42:36 +0900 (f802210)
+++ lib/droonga/distributor.rb    2013-12-17 23:57:01 +0900 (8bb00c2)
@@ -17,6 +17,7 @@
 
 require "droonga/pluggable"
 require "droonga/distributor_plugin"
+require "droonga/distribution_planner"
 
 module Droonga
   class Distributor
@@ -31,7 +32,14 @@ module Droonga
     end
 
     def distribute(message)
-      @dispatcher.handle_incoming_message(message)
+      id =****@dispa*****_id
+      planner = DistributionPlanner.new(@dispatcher, message)
+      destinations = planner.resolve(id)
+      components = planner.components
+      dispatch_message = { "id" => id, "components" => components }
+      destinations.each do |destination, frequency|
+        @dispatcher.dispatch(dispatch_message, destination)
+      end
     end
 
     private
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index