Daijiro MORI
null+****@clear*****
Wed Nov 20 15:43:32 JST 2013
Daijiro MORI 2013-11-20 15:43:32 +0900 (Wed, 20 Nov 2013) New Revision: 289c22aa5c0946874229bdbb07791f4ada6cc867 https://github.com/droonga/fluent-plugin-droonga/commit/289c22aa5c0946874229bdbb07791f4ada6cc867 Message: Divide collector.rb from dispatcher.rb. Added files: lib/droonga/collector.rb Modified files: lib/droonga/dispatcher.rb Added: lib/droonga/collector.rb (+116 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/collector.rb 2013-11-20 15:43:32 +0900 (b773bbb) @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "droonga/handler" + +module Droonga + class Collector + def initialize(id, dispatcher, components, tasks, inputs) + @id = id + @dispatcher = dispatcher + @components = components + @tasks = tasks + @n_dones = 0 + @inputs = inputs + end + + def handle(name, value) + tasks = @inputs[name] + unless tasks + #TODO: result arrived before its query + return + end + tasks.each do |task| + task["n_of_inputs"] += 1 if name + component = task["component"] + type = component["type"] + command = component["command"] || ("collector_" + type) + n_of_expects = component["n_of_expects"] + synchronous = nil + if command + # TODO: should be controllable for each command respectively. + synchronous = !n_of_expects.zero? + # TODO: check if asynchronous execution is available. + message = { + "task"=>task, + "name"=>name, + "value"=>value + } + unless synchronous + descendants = {} + component["descendants"].each do |name, indices| + descendants[name] = indices.collect do |index| + @components[index]["routes"].map do |route| + @dispatcher.farm_path(route) + end + end + end + message["descendants"] = descendants + message["id"] = @id + end + @dispatcher.deliver(@id, task["route"], message, command, synchronous) + end + return if task["n_of_inputs"] < n_of_expects + #the task is done + if synchronous + result = task["values"] + post = component["post"] + @dispatcher.post(result, post) if post + component["descendants"].each do |name, indices| + message = { + "id" => @id, + "input" => name, + "value" => result[name] + } + indices.each do |index| + @components[index]["routes"].each do |route| + @dispatcher.dispatch(message, route) + end + end + end + end + @n_dones += 1 + @dispatcher.collectors.delete(@id) if @n_dones ==****@tasks***** + end + end + end + + class CollectorHandler < Droonga::Handler + attr_reader :task, :input_name, :component, :output_values, :body, :output_names + def handle(command, request, *arguments) + return false unless request.is_a? Hash + @task = request["task"] + return false unles****@task*****_a? Hash + @component = @task["component"] + return false unles****@compo*****_a? Hash + @output_values = @task["values"] + @body = @component["body"] + @output_names = @component["outputs"] + @id = request["id"] + @value = request["value"] + @input_name = request["name"] + @descendants = request["descendants"] + invoke(command, @value, *arguments) + output if @descendants + true + end + + def prefer_synchronous?(command) + return true + end + end +end Modified: lib/droonga/dispatcher.rb (+1 -96) =================================================================== --- lib/droonga/dispatcher.rb 2013-11-20 15:21:40 +0900 (6b328ab) +++ lib/droonga/dispatcher.rb 2013-11-20 15:43:32 +0900 (9c2e97a) @@ -19,6 +19,7 @@ require 'tsort' require "droonga/handler" require "droonga/adapter" require "droonga/catalog" +require "droonga/collector" module Droonga class Dispatcher @@ -250,77 +251,6 @@ module Droonga end end end - - class Collector - def initialize(id, dispatcher, components, tasks, inputs) - @id = id - @dispatcher = dispatcher - @components = components - @tasks = tasks - @n_dones = 0 - @inputs = inputs - end - - def handle(name, value) - tasks = @inputs[name] - unless tasks - #TODO: result arrived before its query - return - end - tasks.each do |task| - task["n_of_inputs"] += 1 if name - component = task["component"] - type = component["type"] - command = component["command"] || ("collector_" + type) - n_of_expects = component["n_of_expects"] - synchronous = nil - if command - # TODO: should be controllable for each command respectively. - synchronous = !n_of_expects.zero? - # TODO: check if asynchronous execution is available. - message = { - "task"=>task, - "name"=>name, - "value"=>value - } - unless synchronous - descendants = {} - component["descendants"].each do |name, indices| - descendants[name] = indices.collect do |index| - @components[index]["routes"].map do |route| - @dispatcher.farm_path(route) - end - end - end - message["descendants"] = descendants - message["id"] = @id - end - @dispatcher.deliver(@id, task["route"], message, command, synchronous) - end - return if task["n_of_inputs"] < n_of_expects - #the task is done - if synchronous - result = task["values"] - post = component["post"] - @dispatcher.post(result, post) if post - component["descendants"].each do |name, indices| - message = { - "id" => @id, - "input" => name, - "value" => result[name] - } - indices.each do |index| - @components[index]["routes"].each do |route| - @dispatcher.dispatch(message, route) - end - end - end - end - @n_dones += 1 - @dispatcher.collectors.delete(@id) if @n_dones ==****@tasks***** - end - end - end end class DispatcherMessageHandler < Droonga::Handler @@ -343,29 +273,4 @@ module Droonga return true end end - - class CollectorHandler < Droonga::Handler - attr_reader :task, :input_name, :component, :output_values, :body, :output_names - def handle(command, request, *arguments) - return false unless request.is_a? Hash - @task = request["task"] - return false unles****@task*****_a? Hash - @component = @task["component"] - return false unles****@compo*****_a? Hash - @output_values = @task["values"] - @body = @component["body"] - @output_names = @component["outputs"] - @id = request["id"] - @value = request["value"] - @input_name = request["name"] - @descendants = request["descendants"] - invoke(command, @value, *arguments) - output if @descendants - true - end - - def prefer_synchronous?(command) - return true - end - end end -------------- next part -------------- HTML����������������������������...Download