Daijiro MORI
null+****@clear*****
Thu Aug 15 23:44:09 JST 2013
Daijiro MORI 2013-08-15 23:44:09 +0900 (Thu, 15 Aug 2013) New Revision: 1adacde51970a75a5d2fcb2e7b070a908707d486 https://github.com/droonga/fluent-plugin-droonga/commit/1adacde51970a75a5d2fcb2e7b070a908707d486 Message: Call handler from inside the proxy. Modified files: lib/droonga/plugin/handler_proxy.rb lib/droonga/proxy.rb Modified: lib/droonga/plugin/handler_proxy.rb (+1 -1) =================================================================== --- lib/droonga/plugin/handler_proxy.rb 2013-08-15 13:18:22 +0900 (5ca24de) +++ lib/droonga/plugin/handler_proxy.rb 2013-08-15 23:44:09 +0900 (c3327b0) @@ -30,7 +30,7 @@ module Droonga command :proxy def proxy(request, *arguments) - @proxy.handle(request) + @proxy.handle(request, arguments) end end end Modified: lib/droonga/proxy.rb (+31 -14) =================================================================== --- lib/droonga/proxy.rb 2013-08-15 13:18:22 +0900 (f1a5c69) +++ lib/droonga/proxy.rb 2013-08-15 23:44:09 +0900 (99e102e) @@ -28,7 +28,7 @@ module Droonga @local = Regexp.new("^#{@name}") end - def handle(message) + def handle(message, arguments) case message when Array handle_incoming_message(message) @@ -44,7 +44,7 @@ module Droonga components = planner.components message = { "id" => id, "components" => components } destinations.each do |destination, frequency| - dispatch(destination, message) + dispatch(message, destination) end end @@ -63,16 +63,16 @@ module Droonga collector.handle(message["input"], message["value"]) end - def dispatch(destination, message) + def dispatch(message, destination) if local?(destination) handle_internal_message(message) else - post(farm_path(destination), message) + post(message, "to"=>farm_path(destination), "type"=>"proxy") end end - def post(route, message) - @worker.post(message, "to"=> route, "type"=>"proxy") + def post(message, destination) + @worker.post(message, destination) end def generate_id @@ -164,7 +164,7 @@ module Droonga "route" => route, "component" => component, "n_of_inputs" => 0, - "values" => [] + "value" => nil } tasks << task (component["inputs"] || [nil]).each do |input| @@ -235,13 +235,31 @@ module Droonga def handle(name, value) tasks = @inputs[name] tasks.each do |task| - if name - task["values"] << value - task["n_of_inputs"] += 1 - end component = task["component"] + type = component["type"] + args = component["args"] + command = component["command"] + if command + message = { + "task"=>task, + "name"=>name, + "value"=>value + } + #todo: add_route and n_of_expects++ if it would run asynchronously + @proxy.post(message, command) + else + task["value"] ||= {} + task["value"][name] ||= [] + task["value"][name] << value + end + task["n_of_inputs"] += 1 if name return if task["n_of_inputs"] < component["n_of_expects"] - result = task["values"] + #the task is done + result = task["value"] + case type + when "send" + @proxy.post(result, args) + end component["descendants"].each do |name, indices| message = { "id" => @id, @@ -252,13 +270,12 @@ module Droonga dest = @components[index] routes = dest["routes"] routes.each do |route| - @proxy.dispatch(route, message) + @proxy.dispatch(message, route) end end end @n_dones += 1 @proxy.collectors.delete(@id) if @n_dones ==****@tasks***** - p****@proxy***** end end end -------------- next part -------------- HTML����������������������������... Download