Daijiro MORI
null+****@clear*****
Mon Aug 19 15:05:50 JST 2013
Daijiro MORI 2013-08-19 15:05:50 +0900 (Mon, 19 Aug 2013) New Revision: 34cae7731529f07badfc6917e0e77ce85aedbda3 https://github.com/droonga/fluent-plugin-droonga/commit/34cae7731529f07badfc6917e0e77ce85aedbda3 Message: Support asynchronous command execution. Modified files: lib/droonga/proxy.rb lib/droonga/worker.rb Modified: lib/droonga/proxy.rb (+58 -17) =================================================================== --- lib/droonga/proxy.rb 2013-08-18 15:06:09 +0900 (435b140) +++ lib/droonga/proxy.rb 2013-08-19 15:05:50 +0900 (eb9a974) @@ -240,31 +240,51 @@ module Droonga component = task["component"] type = component["type"] command = component["command"] || ("proxy_" + type) + n_of_expects = component["n_of_expects"] + synchronous = nil if command + # TODO: should be controllable for each command respectively. + synchronous = !n_of_expects.zero? message = { "task"=>task, "name"=>name, "value"=>value } - #todo: add_route and n_of_expects++ if it would run asynchronously - @proxy.post(message, command) + unless synchronous + descendants = {} + component["descendants"].each do |name, indices| + descendants[name] = indices.collect do |index| + dest = @components[index] + dest["routes"].map do |route| + @proxy.farm_path(route) + end + end + end + message["descendants"] = descendants + message["id"] = @id + end + @proxy.post(message, + "type" => command, + "synchronous"=> synchronous) end - return if task["n_of_inputs"] < component["n_of_expects"] + return if task["n_of_inputs"] < n_of_expects #the task is done - result = task["values"] - post = component["post"] - @proxy.post(result, post) if post - component["descendants"].each do |name, indices| - message = { - "id" => @id, - "input" => name, - "value" => result[name] - } - indices.each do |index| - dest = @components[index] - routes = dest["routes"] - routes.each do |route| - @proxy.dispatch(message, route) + if synchronous + result = task["values"] + post = component["post"] + @proxy.post(result, post) if post + component["descendants"].each do |name, indices| + message = { + "id" => @id, + "input" => name, + "value" => result[name] + } + indices.each do |index| + dest = @components[index] + routes = dest["routes"] + routes.each do |route| + @proxy.dispatch(message, route) + end end end end @@ -301,7 +321,10 @@ module Droonga @output_names = @component["outputs"] @body = @component["body"] @output_values = @task["values"] + @descendants = request["descendants"] + @id = request["id"] super(command, request["value"], *arguments) + output if @descendants end def emit(value, name = nil) @@ -316,6 +339,24 @@ module Droonga @task["values"][name] = value end + def output + result = @task["values"] + post = component["post"] + post(result, post) if post + @descendants.each do |name, dests| + message = { + "id" => @id, + "input" => name, + "value" => result[name] + } + dests.each do |routes| + routes.each do |route| + post(message, "to"=>route, "type"=>"proxy") + end + end + end + end + def prefer_synchronous?(command) return true end Modified: lib/droonga/worker.rb (+3 -3) =================================================================== --- lib/droonga/worker.rb 2013-08-18 15:06:09 +0900 (2f04d6f) +++ lib/droonga/worker.rb 2013-08-19 15:05:50 +0900 (ae9110a) @@ -152,9 +152,9 @@ module Droonga return unless output if command message = envelope - message[:body] = body - message[:type] = command - message[:arguments] = arguments + message["body"] = body + message["type"] = command + message["arguments"] = arguments else message = { inReplyTo: envelope["id"], -------------- next part -------------- HTML����������������������������...Download