Kouhei Sutou
null+****@clear*****
Tue Dec 17 23:57:01 JST 2013
Kouhei Sutou 2013-12-17 23:57:01 +0900 (Tue, 17 Dec 2013) New Revision: 135d196ca77ae0524daeb43bf585e352d4f31dc1 https://github.com/droonga/fluent-plugin-droonga/commit/135d196ca77ae0524daeb43bf585e352d4f31dc1 Message: Extract distribution code from dispatcher Added files: lib/droonga/distribution_planner.rb Modified files: lib/droonga/dispatcher.rb lib/droonga/distributor.rb Modified: lib/droonga/dispatcher.rb (+0 -78) =================================================================== --- lib/droonga/dispatcher.rb 2013-12-17 23:42:36 +0900 (17938c3) +++ lib/droonga/dispatcher.rb 2013-12-17 23:57:01 +0900 (624eaf0) @@ -116,24 +116,11 @@ module Droonga def handle(message, arguments) case message - when Array - handle_incoming_message(message) when Hash handle_internal_message(message) end end - def handle_incoming_message(message) - id = generate_id - planner = Planner.new(self, message) - destinations = planner.resolve(id) - components = planner.components - message = { "id" => id, "components" => components } - destinations.each do |destination, frequency| - dispatch(message, destination) - end - end - def handle_internal_message(message) id = message["id"] collector = @collectors[id] @@ -214,64 +201,12 @@ module Droonga class Planner attr_reader :components - class UndefinedInputError < StandardError - attr_reader :input - def initialize(input) - @input = input - super("undefined input assigned: <#{input}>") - end - end - class CyclicComponentsError < StandardError - attr_reader :components - def initialize(components) - @components = components - super("cyclic components found: <#{components}>") - end - end - - include TSort def initialize(dispatcher, components) @dispatcher = dispatcher @components = components end - def resolve(id) - @dependency = {} - @components.each do |component| - @dependency[component] = component["inputs"] - next unless component["outputs"] - component["outputs"].each do |output| - @dependency[output] = [component] - end - end - @components = [] - each_strongly_connected_component do |cs| - raise CyclicComponentsError.new(cs) if cs.size > 1 - @components.concat(cs) unless cs.first.is_a? String - end - resolve_routes(id) - end - - def resolve_routes(id) - local = [id] - destinations = Hash.new(0) - @components.each do |component| - dataset = component["dataset"] - routes = - if dataset - Droonga.catalog.get_routes(dataset, component) - else - local - end - routes.each do |route| - destinations[@dispatcher.farm_path(route)] += 1 - end - component["routes"] = routes - end - return destinations - end - def get_collector(id) resolve_descendants tasks = [] @@ -326,19 +261,6 @@ module Droonga end descendants end - - def tsort_each_node(&block) - @dependency.each_key(&block) - end - - def tsort_each_child(node, &block) - if node.is_a? String and @dependency[node].nil? - raise UndefinedInputError.new(node) - end - if @dependency[node] - @dependency[node].each(&block) - end - end end end end Added: lib/droonga/distribution_planner.rb (+96 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/distribution_planner.rb 2013-12-17 23:57:01 +0900 (f79d6fe) @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "tsort" + +module Droonga + class DistributionPlanner + class UndefinedInputError < StandardError + attr_reader :input + def initialize(input) + @input = input + super("undefined input assigned: <#{input}>") + end + end + + class CyclicComponentsError < StandardError + attr_reader :components + def initialize(components) + @components = components + super("cyclic components found: <#{components}>") + end + end + + include TSort + + attr_reader :components + def initialize(dispatcher, components) + @dispatcher = dispatcher + @components = components + end + + def resolve(id) + @dependency = {} + @components.each do |component| + @dependency[component] = component["inputs"] + next unless component["outputs"] + component["outputs"].each do |output| + @dependency[output] = [component] + end + end + @components = [] + each_strongly_connected_component do |cs| + raise CyclicComponentsError.new(cs) if cs.size > 1 + @components.concat(cs) unless cs.first.is_a? String + end + resolve_routes(id) + end + + private + def resolve_routes(id) + local = [id] + destinations = Hash.new(0) + @components.each do |component| + dataset = component["dataset"] + routes = + if dataset + Droonga.catalog.get_routes(dataset, component) + else + local + end + routes.each do |route| + destinations[@dispatcher.farm_path(route)] += 1 + end + component["routes"] = routes + end + return destinations + end + + def tsort_each_node(&block) + @dependency.each_key(&block) + end + + def tsort_each_child(node, &block) + if node.is_a? String and @dependency[node].nil? + raise UndefinedInputError.new(node) + end + if @dependency[node] + @dependency[node].each(&block) + end + end + end +end Modified: lib/droonga/distributor.rb (+9 -1) =================================================================== --- lib/droonga/distributor.rb 2013-12-17 23:42:36 +0900 (f802210) +++ lib/droonga/distributor.rb 2013-12-17 23:57:01 +0900 (8bb00c2) @@ -17,6 +17,7 @@ require "droonga/pluggable" require "droonga/distributor_plugin" +require "droonga/distribution_planner" module Droonga class Distributor @@ -31,7 +32,14 @@ module Droonga end def distribute(message) - @dispatcher.handle_incoming_message(message) + id =****@dispa*****_id + planner = DistributionPlanner.new(@dispatcher, message) + destinations = planner.resolve(id) + components = planner.components + dispatch_message = { "id" => id, "components" => components } + destinations.each do |destination, frequency| + @dispatcher.dispatch(dispatch_message, destination) + end end private -------------- next part -------------- HTML����������������������������...Download