Kouhei Sutou
null+****@clear*****
Tue Dec 24 15:52:50 JST 2013
Kouhei Sutou 2013-12-24 15:52:50 +0900 (Tue, 24 Dec 2013) New Revision: 437825faee6a26bfae46b7fc74359e2cd95b9bcb https://github.com/droonga/fluent-plugin-droonga/commit/437825faee6a26bfae46b7fc74359e2cd95b9bcb Message: Move routing related codes to dispatcher from distributor Modified files: lib/droonga/dispatcher.rb lib/droonga/distribution_planner.rb lib/droonga/distributor.rb Modified: lib/droonga/dispatcher.rb (+22 -0) =================================================================== --- lib/droonga/dispatcher.rb 2013-12-24 15:03:45 +0900 (081d25e) +++ lib/droonga/dispatcher.rb 2013-12-24 15:52:50 +0900 (82a07ad) @@ -116,6 +116,28 @@ module Droonga end end + def dispatch_components(components) + id = generate_id + destinations = {} + components.each do |component| + dataset = component["dataset"] + if dataset + routes = Droonga.catalog.get_routes(dataset, component) + component["routes"] = routes + else + component["routes"] ||= [id] + end + routes = component["routes"] + routes.each do |route| + destinations[farm_path(route)] = true + end + end + dispatch_message = { "id" => id, "components" => components } + destinations.each_key do |destination| + dispatch(dispatch_message, destination) + end + end + # TODO: Use more meaningful name def process_in_farm(route, message, type, synchronous) # TODO: validate route is farm path Modified: lib/droonga/distribution_planner.rb (+4 -24) =================================================================== --- lib/droonga/distribution_planner.rb 2013-12-24 15:03:45 +0900 (f79d6fe) +++ lib/droonga/distribution_planner.rb 2013-12-24 15:52:50 +0900 (0e9254e) @@ -37,13 +37,12 @@ module Droonga include TSort - attr_reader :components def initialize(dispatcher, components) @dispatcher = dispatcher @components = components end - def resolve(id) + def plan @dependency = {} @components.each do |component| @dependency[component] = component["inputs"] @@ -52,34 +51,15 @@ module Droonga @dependency[output] = [component] end end - @components = [] + components = [] each_strongly_connected_component do |cs| raise CyclicComponentsError.new(cs) if cs.size > 1 - @components.concat(cs) unless cs.first.is_a? String + components.concat(cs) unless cs.first.is_a? String end - resolve_routes(id) + components end private - def resolve_routes(id) - local = [id] - destinations = Hash.new(0) - @components.each do |component| - dataset = component["dataset"] - routes = - if dataset - Droonga.catalog.get_routes(dataset, component) - else - local - end - routes.each do |route| - destinations[@dispatcher.farm_path(route)] += 1 - end - component["routes"] = routes - end - return destinations - end - def tsort_each_node(&block) @dependency.each_key(&block) end Modified: lib/droonga/distributor.rb (+4 -9) =================================================================== --- lib/droonga/distributor.rb 2013-12-24 15:03:45 +0900 (8bb00c2) +++ lib/droonga/distributor.rb 2013-12-24 15:52:50 +0900 (c9a7ebf) @@ -31,15 +31,10 @@ module Droonga load_plugins(options[:distributors] || ["search", "crud", "groonga", "watch"]) end - def distribute(message) - id =****@dispa*****_id - planner = DistributionPlanner.new(@dispatcher, message) - destinations = planner.resolve(id) - components = planner.components - dispatch_message = { "id" => id, "components" => components } - destinations.each do |destination, frequency| - @dispatcher.dispatch(dispatch_message, destination) - end + def distribute(components) + planner = DistributionPlanner.new(@dispatcher, components) + planned_components = planner.plan + @dispatcher.dispatch_components(planned_components) end private -------------- next part -------------- HTML����������������������������...Download