YUKI Hiroshi
null+****@clear*****
Wed Jan 29 18:20:06 JST 2014
YUKI Hiroshi 2014-01-29 18:20:06 +0900 (Wed, 29 Jan 2014) New Revision: a92af0fd9e5235da0a38b1d61f9fd292701c1b64 https://github.com/droonga/fluent-plugin-droonga/commit/a92af0fd9e5235da0a38b1d61f9fd292701c1b64 Message: Support random broadcast Modified files: lib/droonga/distributed_command_planner.rb Modified: lib/droonga/distributed_command_planner.rb (+53 -30) =================================================================== --- lib/droonga/distributed_command_planner.rb 2014-01-29 17:54:55 +0900 (41cd9a8) +++ lib/droonga/distributed_command_planner.rb 2014-01-29 18:20:06 +0900 (477ad7e) @@ -17,13 +17,14 @@ module Droonga class DistributedCommandPlanner - attr_accessor :key + attr_accessor :key, :dataset attr_reader :outputs def initialize(source_message) @source_message = source_message @key = nil + @dataset = nil @outputs = [] @reducers = [] @@ -38,56 +39,78 @@ module Droonga end def reduce(name, reducer) - @reducers << { - "type" => "reduce", - "body" => { - name => { - "#{name}_reduced" => reducer, - }, - }, - "inputs" => [name], - "outputs" => ["#{name}_reduced"], - } - - @gatherers << { - "type" => "gather", - "body" => { - "#{name}_reduced" => { - "output" => name, - }, - }, - "inputs" => ["#{name}_reduced"], - "post" => true, - } + @reducers << reducer_message("reduce", name, reducer) + @gatherers << gatherer_message("gather", name) end - def scatter_all + def scatter_all(body=nil) raise MessageProcessingError.new("missing key") unless @key @processors << { "command" => @source_message["type"], - "dataset" => @source_message["dataset"], - "body" => @source_message["body"], + "dataset" => @dataset || @source_message["dataset"], + "body" => body || @source_message["body"], "key" => @key, "type" => "scatter", - "outputs" => @outputs + ["errors"], + "outputs" => @outputs, "replica" => "all", "post" => true } end - def broadcast_all + def broadcast_all(body=nil) @processors << { "command" => @source_message["type"], - "dataset" => @source_message["dataset"], - "body" => @source_message["body"], + "dataset" => @dataset || @source_message["dataset"], + "body" => body || @source_message["body"], "type" => "broadcast", - "outputs" => @outputs + ["errors"], + "outputs" => @outputs, "replica" => "all", "post" => true } end + def broadcast_at_random(body=nil) + @processors << { + "command" => @source_message["type"], + "dataset" => @dataset || @source_message["dataset"], + "body" => body || @source_message["body"], + "type" => "broadcast", + "outputs" => @outputs, + "replica" => "random", + } + end + private + def reducer_message(command, name, reducer) + { + "type" => command, + "body" => { + name => { + output_name(name) => reducer, + }, + }, + "inputs" => [name], + "outputs" => [output_name(name)], + } + end + + def gatherer_message(command, name) + { + "type" => command, + "body" => { + output_name(name) => { + "output" => name, + }, + }, + "inputs" => [output_name(name)], + "post" => true, + } + end + + def output_name(name) + "#{name}_reduced" + end + #XXX Now, we include definitions to merge errors in the body. # However, this makes the term "errors" reserved, so plugins # cannot use their custom "errors" in the body. -------------- next part -------------- HTML����������������������������...Download