Daijiro MORI
null+****@clear*****
Wed Aug 14 22:28:38 JST 2013
Daijiro MORI 2013-08-14 22:28:38 +0900 (Wed, 14 Aug 2013) New Revision: 4db3e4008a82506b5896814035e1e191ee2458aa https://github.com/groonga/fluent-plugin-droonga/commit/4db3e4008a82506b5896814035e1e191ee2458aa Message: Implement Collector::handle. Modified files: lib/droonga/proxy.rb Modified: lib/droonga/proxy.rb (+52 -11) =================================================================== --- lib/droonga/proxy.rb 2013-08-14 10:32:22 +0900 (0688388) +++ lib/droonga/proxy.rb 2013-08-14 22:28:38 +0900 (92be2cb) @@ -19,6 +19,7 @@ require 'tsort' module Droonga class Proxy + attr_reader :collectors def initialize(worker, name) @worker = worker @name = name @@ -38,7 +39,7 @@ module Droonga def handle_incoming_message(message) id = generate_id - @planner = Planner.new(message) + @planner = Planner.new(self, message) destinations =****@plann*****(id) message = { "id" => id, @@ -51,21 +52,33 @@ module Droonga def dispatch(destination, message) if destination =~ @local - @collectors[message["id"]] =****@plann*****_collector(@local) + if message["input"] + collector = @collectors[message["id"]] + collector.handle(message["input"], message["value"]) + else + collector =****@plann*****_collector(message["id"], @local) + @collectors[message["id"]] = collector + collector.handle(nil, nil) + end else - post(destination, message) + destination =~ /\A.*:\d+\/[^\.]+/ + post($&, message) end end def handle_internal_message(message) components = message["components"] - @planner = Planner.new(components) unless @planner + @planner = Planner.new(self, components) unless @planner if message["input"] # received a piece of result + collector = @collectors[message["id"]] + collector.handle(message["input"], message["value"]) else # received a query # what if @collectors[message["id"]] ? - @collectors[message["id"]] =****@plann*****_collector(@local) + collector =****@plann*****_collector(message["id"], @local) + @collectors[message["id"]] = collector + collector.handle(nil, nil) end end @@ -98,7 +111,8 @@ module Droonga end include TSort - def initialize(components) + def initialize(proxy, components) + @proxy = proxy @components = components end @@ -139,7 +153,7 @@ module Droonga return destinations end - def get_collector(local) + def get_collector(id, local) resolve_descendants tasks = [] inputs = {} @@ -149,15 +163,17 @@ module Droonga task = { "route" => route, "component" => component, - "n_of_inputs" => 0 + "n_of_inputs" => 0, + "values" => [] } + tasks << task (component["inputs"] || [nil]).each do |input| inputs[input] ||= [] inputs[input] << task end end end - Collector.new(@components, tasks, inputs) + Collector.new(id, @proxy, @components, tasks, inputs) end def resolve_descendants @@ -206,16 +222,41 @@ module Droonga end class Collector - def initialize(components, tasks, inputs) + def initialize(id, proxy, components, tasks, inputs) + @id = id + @proxy = proxy @components = components @tasks = tasks + @n_dones = 0 @inputs = inputs end def handle(name, value) tasks = @inputs[name] tasks.each do |task| - # todo + if name + task["values"] << value + task["n_of_inputs"] += 1 + end + component = task["component"] + return if task["n_of_inputs"] < component["n_of_expects"] + result = task["values"] + component["descendants"].each do |name, indices| + message = { + "id" => @id, + "input" => name, + "value" => result + } + indices.each do |index| + dest = @components[index] + routes = dest["routes"] + routes.each do |route| + @proxy.dispatch(route, message) + end + end + end + @n_dones += 1 + @proxy.collectors.delete(@id) if @n_dones ==****@tasks***** end end end -------------- next part -------------- HTML����������������������������...Download