YUKI Hiroshi
null+****@clear*****
Mon Feb 3 11:28:32 JST 2014
YUKI Hiroshi 2014-02-03 11:28:32 +0900 (Mon, 03 Feb 2014) New Revision: 41ab9194101a2b1c29d7bf508cb1cfbf1268a0a4 https://github.com/droonga/fluent-plugin-droonga/commit/41ab9194101a2b1c29d7bf508cb1cfbf1268a0a4 Message: Simplify API of DistributedCommandPlanner Old Model: planner = DistributedCommandPlanner.new(message) planner.outputs << "name" planner.reduce("name", <reducer>) planner.broadcast_all planner.messages New Model: planner = DistributedCommandPlanner.new(message) planner.broadcast(:write => true) planner.reduce("name", <reducer>) planner.plan planner.messages Modified files: lib/droonga/distributed_command_planner.rb lib/droonga/plugin/distributor/crud.rb lib/droonga/plugin/distributor/distributed_search_planner.rb lib/droonga/plugin/distributor/groonga.rb lib/droonga/plugin/distributor/search.rb lib/droonga/plugin/distributor/watch.rb test/unit/helper/distributed_search_planner_helper.rb Modified: lib/droonga/distributed_command_planner.rb (+22 -24) =================================================================== --- lib/droonga/distributed_command_planner.rb 2014-02-03 12:41:14 +0900 (9430f7c) +++ lib/droonga/distributed_command_planner.rb 2014-02-03 11:28:32 +0900 (bd069b7) @@ -18,7 +18,7 @@ module Droonga class DistributedCommandPlanner attr_accessor :key, :dataset - attr_reader :outputs + attr_reader :messages def initialize(source_message) @source_message = source_message @@ -29,55 +29,49 @@ module Droonga @reducers = [] @gatherers = [] - @processors = [] + @processor = nil + + @messages = [] plan_errors_handling end - def messages - unified_reducers + unified_gatherers + @processors + def plan + @messages = unified_reducers + unified_gatherers + [fixed_processor] end def reduce(name, reducer) @reducers << reducer_message("reduce", name, reducer) @gatherers << gatherer_message("gather", name) + @outputs << output_name(name) end - def scatter_all(body=nil) + def scatter(body=nil) raise MessageProcessingError.new("missing key") unless @key - @processors << { + @processor = { "command" => @source_message["type"], "dataset" => @dataset || @source_message["dataset"], "body" => body || @source_message["body"], "key" => @key, "type" => "scatter", - "outputs" => @outputs, "replica" => "all", "post" => true } end - def broadcast_all(body=nil) - @processors << { + def broadcast(body=nil, options={}) + processor = { "command" => @source_message["type"], "dataset" => @dataset || @source_message["dataset"], "body" => body || @source_message["body"], "type" => "broadcast", - "outputs" => @outputs, - "replica" => "all", - "post" => true - } - end - - def broadcast_at_random(body=nil) - @processors << { - "command" => @source_message["type"], - "dataset" => @dataset || @source_message["dataset"], - "body" => body || @source_message["body"], - "type" => "broadcast", - "outputs" => @outputs, - "replica" => "random", + "replica" => "random" } + if options[:write] + processor["replica"] = "all" + processor["post"] = true + end + @processor = processor end private @@ -112,6 +106,11 @@ module Droonga unified_gatherers.values end + def fixed_processor + @processor["outputs"] = @outputs + @processor + end + def reducer_message(command, name, reducer) { "type" => command, @@ -147,7 +146,6 @@ module Droonga # cannot use their custom "errors" in the body. # This must be rewritten. def plan_errors_handling - @outputs << "errors" reduce("errors", "type" => "sum", "limit" => -1) end end Modified: lib/droonga/plugin/distributor/crud.rb (+2 -2) =================================================================== --- lib/droonga/plugin/distributor/crud.rb 2014-02-03 12:41:14 +0900 (127371c) +++ lib/droonga/plugin/distributor/crud.rb 2014-02-03 11:28:32 +0900 (e910e92) @@ -41,9 +41,9 @@ module Droonga def scatter_all(message) planner = DistributedCommandPlanner.new(message) planner.key = message["body"]["key"] || rand.to_s - planner.outputs << "success" + planner.scatter planner.reduce("success", "type" => "and") - planner.scatter_all + planner.plan distribute(planner.messages) end end Modified: lib/droonga/plugin/distributor/distributed_search_planner.rb (+2 -4) =================================================================== --- lib/droonga/plugin/distributor/distributed_search_planner.rb 2014-02-03 12:41:14 +0900 (99a76e7) +++ lib/droonga/plugin/distributor/distributed_search_planner.rb 2014-02-03 11:28:32 +0900 (36194b6) @@ -27,7 +27,7 @@ module Droonga @queries = @request["queries"] end - def build_messages + def plan Searcher::QuerySorter.validate_dependencies(@queries) ensure_unifiable! @@ -37,7 +37,7 @@ module Droonga end @dataset = @source_message["dataset"] || @request["dataset"] - broadcast_at_random(@request) + broadcast(@request) end private @@ -70,8 +70,6 @@ module Droonga return end - @outputs << input_name - transformer = QueryTransformer.new(query) elements = transformer.mappers Modified: lib/droonga/plugin/distributor/groonga.rb (+2 -2) =================================================================== --- lib/droonga/plugin/distributor/groonga.rb 2014-02-03 12:41:14 +0900 (c91f833) +++ lib/droonga/plugin/distributor/groonga.rb 2014-02-03 11:28:32 +0900 (e41e76b) @@ -45,9 +45,9 @@ module Droonga private def broadcast_all(message) planner = DistributedCommandPlanner.new(message) - planner.outputs << "result" + planner.broadcast(:write => true) planner.reduce("result", "type" => "or") - planner.broadcast_all + planner.plan distribute(planner.messages) end end Modified: lib/droonga/plugin/distributor/search.rb (+1 -1) =================================================================== --- lib/droonga/plugin/distributor/search.rb 2014-02-03 12:41:14 +0900 (02139ca) +++ lib/droonga/plugin/distributor/search.rb 2014-02-03 11:28:32 +0900 (a7e0504) @@ -25,7 +25,7 @@ module Droonga command :search def search(message) planner = DistributedSearchPlanner.new(message) - planner.build_messages + planner.plan distribute(planner.messages) end end Modified: lib/droonga/plugin/distributor/watch.rb (+2 -2) =================================================================== --- lib/droonga/plugin/distributor/watch.rb 2014-02-03 12:41:14 +0900 (31dd2f9) +++ lib/droonga/plugin/distributor/watch.rb 2014-02-03 11:28:32 +0900 (19a0894) @@ -44,9 +44,9 @@ module Droonga private def broadcast_all(message) planner = DistributedCommandPlanner.new(message) - planner.outputs << "success" + planner.broadcast(:write => true) planner.reduce("success", "type" => "or") - planner.broadcast_all + planner.plan distribute(planner.messages) end end Modified: test/unit/helper/distributed_search_planner_helper.rb (+1 -1) =================================================================== --- test/unit/helper/distributed_search_planner_helper.rb 2014-02-03 12:41:14 +0900 (4c300c0) +++ test/unit/helper/distributed_search_planner_helper.rb 2014-02-03 11:28:32 +0900 (c5356d8) @@ -18,7 +18,7 @@ require "droonga/plugin/distributor/distributed_search_planner" module DistributedSearchPlannerHelper def plan(search_request) planner = Droonga::DistributedSearchPlanner.new(search_request) - planner.build_messages + planner.plan planner.messages end -------------- next part -------------- HTML����������������������������...Download