Kouhei Sutou
null+****@clear*****
Mon Feb 17 13:04:13 JST 2014
Kouhei Sutou 2014-02-17 13:04:13 +0900 (Mon, 17 Feb 2014) New Revision: cb63b0fc9d0dea15df4ad484c2dd4c73b5831a95 https://github.com/droonga/fluent-plugin-droonga/commit/cb63b0fc9d0dea15df4ad484c2dd4c73b5831a95 Message: planner: migrate to new plugin style Added files: lib/droonga/planner_runner.rb lib/droonga/plugins/search/distributed_search_planner.rb Copied files: lib/droonga/plugin/metadata/planner_message.rb (from lib/droonga/plugin.rb) lib/droonga/plugins/groonga/schema_planer.rb (from lib/droonga/error.rb) Removed files: lib/droonga/plugin/planner/crud.rb lib/droonga/plugin/planner/distributed_search_planner.rb lib/droonga/plugin/planner/groonga.rb lib/droonga/plugin/planner/search.rb lib/droonga/plugin/planner/watch.rb Modified files: lib/droonga/dispatcher.rb lib/droonga/error.rb lib/droonga/planner.rb lib/droonga/plugin.rb lib/droonga/plugins/crud.rb lib/droonga/plugins/groonga.rb lib/droonga/plugins/search.rb lib/droonga/plugins/watch.rb test/unit/helper/distributed_search_planner_helper.rb Renamed files: test/unit/plugins/search/planner/test_basic.rb (from test/unit/plugin/planner/search_planner/test_basic.rb) test/unit/plugins/search/planner/test_group_by.rb (from test/unit/plugin/planner/search_planner/test_group_by.rb) test/unit/plugins/search/planner/test_output.rb (from test/unit/plugin/planner/search_planner/test_output.rb) test/unit/plugins/search/planner/test_sort_by.rb (from test/unit/plugin/planner/search_planner/test_sort_by.rb) test/unit/plugins/search/test_handler.rb (from test/unit/plugins/test_search.rb) test/unit/plugins/search/test_planner.rb (from test/unit/plugin/planner/test_search.rb) Modified: lib/droonga/dispatcher.rb (+14 -8) =================================================================== --- lib/droonga/dispatcher.rb 2014-02-17 12:12:03 +0900 (24c8b79) +++ lib/droonga/dispatcher.rb 2014-02-17 13:04:13 +0900 (4e25495) @@ -17,7 +17,7 @@ require "English" require "tsort" require "droonga/adapter_runner" -require "droonga/planner" +require "droonga/planner_runner" require "droonga/collector" require "droonga/farm" require "droonga/session" @@ -56,12 +56,11 @@ module Droonga @sessions = {} @current_id = 0 @local = Regexp.new("^#{@name}") - @adapter_runners = create_adapter_runners + @adapter_runners = create_runners(AdapterRunner) @farm = Farm.new(name, @catalog, @loop, :dispatcher => self) @forwarder = Forwarder.new(@loop) @replier = Replier.new(@forwarder) - # TODO: make customizable - @planner = Planner.new(self, ["search", "crud", "groonga", "watch"]) + @planner_runners = create_runners(PlannerRunner) # TODO: make customizable @collector = Collector.new(["basic", "search"]) end @@ -76,7 +75,9 @@ module Droonga def shutdown @forwarder.shutdown - @planner.shutdown + @planner_runners.each_value do |planner_runner| + planner_runner.shutdown + end @collector.shutdown @adapter_runners.each_value do |adapter_runner| adapter_runner.shutdown @@ -226,9 +227,14 @@ module Droonga dataset = message["dataset"] adapter_runner = @adapter_runners[dataset] adapted_message = adapter_runner.adapt_input(message) - plan =****@plann*****(adapted_message["type"], adapted_message) + planner_runner = @planner_runners[dataset] + plan = planner_runner.plan(adapted_message) distributor = Distributor.new(self) distributor.distribute(plan) + rescue Droonga::UnsupportedMessageError => error + target_message = error.message + raise UnknownCommand.new(target_message["type"], + target_message["dataset"]) rescue Droonga::LegacyPluggable::UnknownPlugin => error raise UnknownCommand.new(error.command, message["dataset"]) end @@ -243,10 +249,10 @@ module Droonga end end - def create_adapter_runners + def create_runners(runner_class) runners = {} @catalog.datasets.each do |name, configuration| - runners[name] = AdapterRunner.new(self, configuration["plugins"] || []) + runners[name] = runner_class.new(self, configuration["plugins"] || []) end runners end Modified: lib/droonga/error.rb (+10 -0) =================================================================== --- lib/droonga/error.rb 2014-02-17 12:12:03 +0900 (143a386) +++ lib/droonga/error.rb 2014-02-17 13:04:13 +0900 (39f8a49) @@ -31,4 +31,14 @@ module Droonga super(message) end end + + # TODO: Move to common file for runners + class UnsupportedMessageError < Error + attr_reader :phase, :message + def initialize(phase, message) + @phase = phase + @message = message + super("[#{@phase}] Unsupported message: #{@message.inspect}") + end + end end Modified: lib/droonga/planner.rb (+26 -9) =================================================================== --- lib/droonga/planner.rb 2014-02-17 12:12:03 +0900 (03d009e) +++ lib/droonga/planner.rb 2014-02-17 13:04:13 +0900 (6485e50) @@ -13,25 +13,42 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/legacy_pluggable" -require "droonga/planner_plugin" +require "droonga/pluggable" +require "droonga/plugin/metadata/planner_message" +require "droonga/distributed_command_planner" module Droonga class Planner - include LegacyPluggable + extend Pluggable - def initialize(dispatcher, plugins) + class << self + def message + Plugin::Metadata::PlannerMessage.new(self) + end + end + + def initialize(dispatcher) @dispatcher = dispatcher - load_plugins(plugins) + end + + def plan(message) + raise NotImplemented, "#{self.class.name}\##{__method__} must implement." end private - def instantiate_plugin(name) - PlannerPlugin.repository.instantiate(name, self) + def scatter(message, options={}) + planner = DistributedCommandPlanner.new(message) + planner.scatter + planner.key = options[:key] + planner.reduce(options[:reduce]) + planner.plan end - def log_tag - "[#{Process.pid}] planner" + def broadcast(message, options={}) + planner = DistributedCommandPlanner.new(message) + planner.broadcast(:write => options[:write]) + planner.reduce(options[:reduce]) + planner.plan end end end Added: lib/droonga/planner_runner.rb (+61 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/planner_runner.rb 2014-02-17 13:04:13 +0900 (c0d9467) @@ -0,0 +1,61 @@ +# Copyright (C) 2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "droonga/message_matcher" +require "droonga/planner" + +module Droonga + class PlannerRunner + def initialize(dispatcher, plugins) + @dispatcher = dispatcher + @planner_classes = Planner.find_sub_classes(plugins) + end + + def shutdown + end + + def plan(message) + $log.trace("#{log_tag}: plan: start", + :dataset => message["dataset"], + :type => message["type"]) + planner_class = find_planner_class(message) + if planner_class.nil? + raise UnsupportedMessageError.new(:planner, message) + end + planner = planner_class.new(@dispatcher) + plan = planner.plan(message) + $log.trace("#{log_tag}: plan: done", + :steps => plan.collect {|step| step["type"]}) + plan + end + + private + def find_planner_class(message) + @planner_classes.find do |planner_class| + pattern = planner_class.message.pattern + if pattern + matcher = MessageMatcher.new(pattern) + matcher.match?(message) + else + false + end + end + end + + def log_tag + "adapter-runner" + end + end +end Modified: lib/droonga/plugin.rb (+1 -0) =================================================================== --- lib/droonga/plugin.rb 2014-02-17 12:12:03 +0900 (9f1d085) +++ lib/droonga/plugin.rb 2014-02-17 13:04:13 +0900 (3928138) @@ -15,6 +15,7 @@ require "droonga/plugin_registry" require "droonga/adapter" +require "droonga/planner" require "droonga/handler" module Droonga Copied: lib/droonga/plugin/metadata/planner_message.rb (+31 -7) 53% =================================================================== --- lib/droonga/plugin.rb 2014-02-17 12:12:03 +0900 (9f1d085) +++ lib/droonga/plugin/metadata/planner_message.rb 2014-02-17 13:04:13 +0900 (63fc1e9) @@ -13,15 +13,39 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/plugin_registry" -require "droonga/adapter" -require "droonga/handler" - module Droonga module Plugin - class << self - def registry - @@registry ||= PluginRegistry.new + module Metadata + class PlannerMessage + def initialize(plugin_class) + @plugin_class = plugin_class + end + + def pattern + configuration[:pattern] || fallback_pattern + end + + def pattern=(pattern) + configuration[:pattern] = pattern + end + + def type + configuration[:type] + end + + def type=(type) + configuration[:type] = type + end + + private + def configuration + @plugin_class.options[:message] ||= {} + end + + def fallback_pattern + return nil if type.nil? + ["type", :equal, type] + end end end end Deleted: lib/droonga/plugin/planner/crud.rb (+0 -49) 100644 =================================================================== --- lib/droonga/plugin/planner/crud.rb 2014-02-17 12:12:03 +0900 (c569f64) +++ /dev/null @@ -1,49 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013-2014 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/planner_plugin" - -module Droonga - class CRUDPlanner < Droonga::PlannerPlugin - repository.register("crud", self) - - command :add - def add(message) - scatter(message) - end - - command :update - def update(message) - scatter(message) - end - - # TODO: What is this? - command :reset - def reset(message) - scatter(message) - end - - private - def scatter(message) - super(message, - :key => message["body"]["key"] || rand.to_s, - :reduce => { - "success" => "and" - }) - end - end -end Deleted: lib/droonga/plugin/planner/distributed_search_planner.rb (+0 -393) 100644 =================================================================== --- lib/droonga/plugin/planner/distributed_search_planner.rb 2014-02-17 12:12:03 +0900 (3800a51) +++ /dev/null @@ -1,393 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013-2014 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/searcher" -require "droonga/distributed_command_planner" - -module Droonga - class DistributedSearchPlanner < DistributedCommandPlanner - def initialize(search_request_message) - super - - @request = @source_message["body"] - raise NoQuery.new unless @request - - @request = Marshal.load(Marshal.dump(@request)) - @queries = @request["queries"] - end - - def plan - raise Searcher::NoQuery.new if****@queri*****? or****@queri*****? - - Searcher::QuerySorter.validate_dependencies(@queries) - - ensure_unifiable! - - @queries.each do |input_name, query| - transform_query(input_name, query) - end - - @dataset = @source_message["dataset"] || @request["dataset"] - broadcast(:body => @request) - - super - end - - private - UNLIMITED = -1 - - def reduce_command - "search_reduce" - end - - def gather_command - "search_gather" - end - - def ensure_unifiable! - @queries.each do |name, query| - if unifiable?(name) and query["output"] - query["output"]["unifiable"] = true - end - end - end - - def unifiable?(name) - query = @queries[name] - return true if query["groupBy"] - name = query["source"] - return false unles****@queri*****?(name) - unifiable?(name) - end - - def transform_query(input_name, query) - output = query["output"] - - # Skip reducing phase for a result with no output. - if output.nil? or - output["elements"].nil? or - (!output["elements"].include?("count") and - !output["elements"].include?("records")) - return - end - - transformer = QueryTransformer.new(query) - - elements = transformer.mappers - mapper = {} - mapper["elements"] = elements unless elements.empty? - reduce(input_name => { :reduce => transformer.reducers, - :gather => mapper }) - end - - class QueryTransformer - attr_reader :reducers, :mappers - - def initialize(query) - @query = query - @output = @query["output"] - @reducers = {} - @mappers = {} - @output_records = true - transform! - end - - def transform! - # The collector module supports only "simple" format search results. - # So we have to override the format and restore it on the gathering - # phase. - @records_format = @output["format"] || "simple" - if @output["format"] and @output["format"] != "simple" - @output["format"] = "simple" - end - - @sort_keys = @query["sortBy"] || [] - @sort_keys = @sort_keys["keys"] || [] if @sort_keys.is_a?(Hash) - - calculate_offset_and_limit! - build_count_mapper_and_reducer! - build_records_mapper_and_reducer! - end - - def calculate_offset_and_limit! - @original_sort_offset = sort_offset - @original_output_offset = output_offset - @original_sort_limit = sort_limit - @original_output_limit = output_limit - - calculate_sort_offset! - calculate_output_offset! - - # We have to calculate limit based on offset. - # <A, B = limited integer (0...MAXINT)> - # | sort limit | output limit | => | worker's sort limit | worker's output limit | final limit | - # ============================= ==================================================================== - # | UNLIMITED | UNLIMITED | => | UNLIMITED | UNLIMITED | UNLIMITED | - # | UNLIMITED | B | => | final_offset + B | final_offset + B | B | - # | A | UNLIMITED | => | final_offset + A | final_offset + A | A | - # | A | B | => | final_offset + max(A, B) | final_offset + min(A, B)| min(A, B) | - - # XXX final_limit and final_offset calculated in many times - - @records_offset = final_offset - @records_limit = final_limit - - updated_sort_limit = nil - updated_output_limit = nil - if final_limit == UNLIMITED - updated_output_limit = UNLIMITED - else - if rich_sort? - updated_sort_limit = final_offset + [sort_limit, output_limit].max - end - updated_output_limit = final_offset + final_limit - end - - if updated_sort_limit and updated_sort_limit != @query["sortBy"]["limit"] - @query["sortBy"]["limit"] = updated_sort_limit - end - if updated_output_limit and @output["limit"] and updated_output_limit != @output["limit"] - @output["limit"] = updated_output_limit - end - end - - def calculate_sort_offset! - # Offset for workers must be zero, because we have to apply "limit" and - # "offset" on the last gathering phase instead of each reducing phase. - if rich_sort? - @query["sortBy"]["offset"] = 0 - end - end - - def sort_offset - if rich_sort? - @query["sortBy"]["offset"] || 0 - else - 0 - end - end - - def output_offset - @output["offset"] || 0 - end - - def sort_limit - if rich_sort? - @query["sortBy"]["limit"] || UNLIMITED - else - UNLIMITED - end - end - - def output_limit - @output["limit"] || 0 - end - - def calculate_output_offset! - @output["offset"] = 0 if have_records? and @output["offset"] - end - - def final_offset - @original_sort_offset + @original_output_offset - end - - def final_limit - if @original_sort_limit == UNLIMITED and @original_output_limit == UNLIMITED - UNLIMITED - else - if @original_sort_limit == UNLIMITED - @original_output_limit - elsif @original_output_limit == UNLIMITED - @original_sort_limit - else - [@original_sort_limit, @original_output_limit].min - end - end - end - - def have_records? - @output["elements"].include?("records") - end - - def rich_sort? - @query["sortBy"].is_a?(Hash) - end - - def unifiable? - @output["unifiable"] - end - - def build_count_mapper_and_reducer! - return unless @output["elements"].include?("count") - - @reducers["count"] = { - "type" => "sum", - } - if unifiable? - @query["sortBy"]["limit"] = -1 if @query["sortBy"].is_a?(Hash) - @output["limit"] = -1 - mapper = { - "target" => "records", - } - unless @output["elements"].include?("records") - @records_limit = -1 - @output["elements"] << "records" - @output["attributes"] ||= ["_key"] - @output_records = false - end - @mappers["count"] = mapper - end - end - - def build_records_mapper_and_reducer! - # Skip reducing phase for a result with no record output. - return if !@output["elements"].include?("records") || @records_limit.zero? - - # Append sort key attributes to the list of output attributes - # temporarily, for the reducing phase. After all extra columns - # are removed on the gathering phase. - final_attributes = output_attribute_names - update_output_attributes! - - @reducers["records"] = build_records_reducer - - mapper = {} - if @output_records - mapper["format"] = @records_format unless @records_format == "simple" - mapper["attributes"] = final_attributes unless final_attributes.empty? - mapper["offset"] = @records_offset unless @records_offset.zero? - mapper["limit"] = @records_limit unless @records_limit.zero? - else - mapper["no_output"] = true - end - @mappers["records"] = mapper - end - - def output_attribute_names - attributes = @output["attributes"] || [] - if attributes.is_a?(Hash) - attributes.keys - else - attributes.collect do |attribute| - if attribute.is_a?(Hash) - attribute["label"] || attribute["source"] - else - attribute - end - end - end - end - - def update_output_attributes! - @output["attributes"] = array_style_attributes - @output["attributes"] += sort_attribute_names - if unifiable? and !source_column_names.include?("_key") - @output["attributes"] << "_key" - end - end - - def array_style_attributes - attributes = @output["attributes"] || [] - if attributes.is_a?(Hash) - attributes.keys.collect do |key| - attribute = attributes[key] - case attribute - when String - { - "label" => key, - "source" => attribute, - } - when Hash - attribute["label"] = key - attribute - end - end - else - attributes - end - end - - def source_column_names - attributes = @output["attributes"] || [] - if attributes.is_a?(Hash) - attributes_hash = attributes - attributes = [] - attributes_hash.each do |key, attribute| - attributes << attribute["source"] || key - end - attributes - else - attributes.collect do |attribute| - if attribute.is_a?(Hash) - attribute["source"] || attribute["label"] - else - attribute - end - end - end - end - - def sort_attribute_names - sort_attributes = @sort_keys.collect do |key| - key = key[1..-1] if key[0] == "-" - key - end - attributes = source_column_names - sort_attributes.reject! do |attribute| - attributes.include?(attribute) - end - sort_attributes - end - - ASCENDING_OPERATOR = "<" - DESCENDING_OPERATOR = ">" - - def build_records_reducer - attributes = source_column_names - key_column_index = attributes.index("_key") - - operators = @sort_keys.collect do |sort_key| - operator = ASCENDING_OPERATOR - if sort_key[0] == "-" - operator = DESCENDING_OPERATOR - sort_key = sort_key[1..-1] - end - { - "operator" => operator, - "column" => attributes.index(sort_key), - } - end - - reducer = { - "type" => "sort", - "operators" => operators, - } - if unifiable? and !key_column_index.nil? - reducer["key_column"] = key_column_index - end - - # On the reducing phase, we apply only "limit". We cannot apply - # "offset" on this phase because the collector merges a pair of - # results step by step even if there are three or more results. - # Instead, we apply "offset" on the gathering phase. - reducer["limit"] = @output["limit"] - - reducer - end - end - end -end Deleted: lib/droonga/plugin/planner/groonga.rb (+0 -54) 100644 =================================================================== --- lib/droonga/plugin/planner/groonga.rb 2014-02-17 12:12:03 +0900 (665a5be) +++ /dev/null @@ -1,54 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/planner_plugin" - -module Droonga - class GroongaPlanner < Droonga::PlannerPlugin - repository.register("groonga", self) - - command :table_create - def table_create(message) - unless message["dataset"] - raise "dataset must be set. FIXME: This error should return client." - end - broadcast(message) - end - - command :table_remove - def table_remove(message) - unless message["dataset"] - raise "dataset must be set. FIXME: This error should return client." - end - broadcast(message) - end - - command :column_create - def column_create(message) - broadcast(message) - end - - private - def broadcast(message) - super(message, - :write => true, - :reduce => { - "result" => "or" - }) - end - end -end Deleted: lib/droonga/plugin/planner/search.rb (+0 -31) 100644 =================================================================== --- lib/droonga/plugin/planner/search.rb 2014-02-17 12:12:03 +0900 (fba7010) +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013-2014 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/planner_plugin" -require "droonga/plugin/planner/distributed_search_planner" - -module Droonga - class SearchPlanner < Droonga::PlannerPlugin - repository.register("search", self) - - command :search - def search(message) - planner = DistributedSearchPlanner.new(message) - planner.plan - end - end -end Deleted: lib/droonga/plugin/planner/watch.rb (+0 -53) 100644 =================================================================== --- lib/droonga/plugin/planner/watch.rb 2014-02-17 12:12:03 +0900 (8e87ed1) +++ /dev/null @@ -1,53 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/planner_plugin" - -module Droonga - class WatchPlanner < Droonga::PlannerPlugin - repository.register("watch", self) - - command "watch.feed" => :feed - def feed(message) - broadcast(message) - end - - command "watch.subscribe" => :subscribe - def subscribe(message) - broadcast(message) - end - - command "watch.unsubscribe" => :unsubscribe - def unsubscribe(message) - broadcast(message) - end - - command "watch.sweep" => :sweep - def sweep(message) - broadcast(message) - end - - private - def broadcast(message) - super(message, - :write => true, - :reduce => { - "success" => "and" - }) - end - end -end Modified: lib/droonga/plugins/crud.rb (+12 -0) =================================================================== --- lib/droonga/plugins/crud.rb 2014-02-17 12:12:03 +0900 (316930b) +++ lib/droonga/plugins/crud.rb 2014-02-17 13:04:13 +0900 (01513ee) @@ -35,6 +35,18 @@ module Droonga end end + class Planner < Droonga::Planner + message.type = "add" + + def plan(message) + scatter(message, + :key => message["body"]["key"] || rand.to_s, + :reduce => { + "success" => "and" + }) + end + end + class Handler < Droonga::Handler message.type = "add" Modified: lib/droonga/plugins/groonga.rb (+1 -0) =================================================================== --- lib/droonga/plugins/groonga.rb 2014-02-17 12:12:03 +0900 (9cd6b7e) +++ lib/droonga/plugins/groonga.rb 2014-02-17 13:04:13 +0900 (3fb0113) @@ -15,6 +15,7 @@ require "droonga/plugin" require "droonga/plugins/groonga/generic_response" +require "droonga/plugins/groonga/schema_planer" require "droonga/plugins/groonga/select" require "droonga/plugins/groonga/table_create" require "droonga/plugins/groonga/table_remove" Copied: lib/droonga/plugins/groonga/schema_planer.rb (+16 -13) 62% =================================================================== --- lib/droonga/error.rb 2014-02-17 12:12:03 +0900 (143a386) +++ lib/droonga/plugins/groonga/schema_planer.rb 2014-02-17 13:04:13 +0900 (0b71f26) @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # Copyright (C) 2014 Droonga Project # # This library is free software; you can redistribute it and/or @@ -16,19 +14,24 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA module Droonga - class Error < StandardError - end - - class MultiplexError < Error - attr_reader :errors + module Plugins + module Groonga + class SchemaPlaner < Droonga::Planner + schema_commands = [ + "table_create", + "table_remove", + "column_create", + ] + message.pattern = ["type", :in, schema_commands] - def initialize(errors=[]) - @errors = errors - error_messages =****@error***** do |error| - error.message + def plan(message) + broadcast(message, + :write => true, + :reduce => { + "result" => "or" + }) + end end - message = error_messages.sort.join("\n-----------------------\n") - super(message) end end end Modified: lib/droonga/plugins/search.rb (+10 -0) =================================================================== --- lib/droonga/plugins/search.rb 2014-02-17 12:12:03 +0900 (84bffc1) +++ lib/droonga/plugins/search.rb 2014-02-17 13:04:13 +0900 (6d82c0c) @@ -15,12 +15,22 @@ require "droonga/plugin" require "droonga/searcher" +require "droonga/plugins/search/distributed_search_planner" module Droonga module Plugins module Search Plugin.registry.register("search", self) + class Planner < Droonga::Planner + message.type = "search" + + def plan(message) + planner = DistributedSearchPlanner.new(message) + planner.plan + end + end + class Handler < Droonga::Handler message.type = "search" Added: lib/droonga/plugins/search/distributed_search_planner.rb (+398 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/plugins/search/distributed_search_planner.rb 2014-02-17 13:04:13 +0900 (7c08a2d) @@ -0,0 +1,398 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013-2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "droonga/searcher" +require "droonga/distributed_command_planner" + +module Droonga + module Plugins + module Search + class DistributedSearchPlanner < DistributedCommandPlanner + def initialize(search_request_message) + super + + @request = @source_message["body"] + raise NoQuery.new unless @request + + @request = Marshal.load(Marshal.dump(@request)) + @queries = @request["queries"] + end + + def plan + raise Searcher::NoQuery.new if****@queri*****? or****@queri*****? + + Searcher::QuerySorter.validate_dependencies(@queries) + + ensure_unifiable! + + @queries.each do |input_name, query| + transform_query(input_name, query) + end + + @dataset = @source_message["dataset"] || @request["dataset"] + broadcast(:body => @request) + + super + end + + private + UNLIMITED = -1 + + def reduce_command + "search_reduce" + end + + def gather_command + "search_gather" + end + + def ensure_unifiable! + @queries.each do |name, query| + if unifiable?(name) and query["output"] + query["output"]["unifiable"] = true + end + end + end + + def unifiable?(name) + query = @queries[name] + return true if query["groupBy"] + name = query["source"] + return false unles****@queri*****?(name) + unifiable?(name) + end + + def transform_query(input_name, query) + output = query["output"] + + # Skip reducing phase for a result with no output. + if output.nil? or + output["elements"].nil? or + (!output["elements"].include?("count") and + !output["elements"].include?("records")) + return + end + + transformer = QueryTransformer.new(query) + + elements = transformer.mappers + mapper = {} + mapper["elements"] = elements unless elements.empty? + reduce(input_name => { :reduce => transformer.reducers, + :gather => mapper }) + end + + class QueryTransformer + attr_reader :reducers, :mappers + + def initialize(query) + @query = query + @output = @query["output"] + @reducers = {} + @mappers = {} + @output_records = true + transform! + end + + def transform! + # The collector module supports only "simple" format search results. + # So we have to override the format and restore it on the gathering + # phase. + @records_format = @output["format"] || "simple" + if @output["format"] and @output["format"] != "simple" + @output["format"] = "simple" + end + + @sort_keys = @query["sortBy"] || [] + @sort_keys = @sort_keys["keys"] || [] if @sort_keys.is_a?(Hash) + + calculate_offset_and_limit! + build_count_mapper_and_reducer! + build_records_mapper_and_reducer! + end + + def calculate_offset_and_limit! + @original_sort_offset = sort_offset + @original_output_offset = output_offset + @original_sort_limit = sort_limit + @original_output_limit = output_limit + + calculate_sort_offset! + calculate_output_offset! + + # We have to calculate limit based on offset. + # <A, B = limited integer (0...MAXINT)> + # | sort limit | output limit | => | worker's sort limit | worker's output limit | final limit | + # ============================= ==================================================================== + # | UNLIMITED | UNLIMITED | => | UNLIMITED | UNLIMITED | UNLIMITED | + # | UNLIMITED | B | => | final_offset + B | final_offset + B | B | + # | A | UNLIMITED | => | final_offset + A | final_offset + A | A | + # | A | B | => | final_offset + max(A, B) | final_offset + min(A, B)| min(A, B) | + + # XXX final_limit and final_offset calculated in many times + + @records_offset = final_offset + @records_limit = final_limit + + updated_sort_limit = nil + updated_output_limit = nil + if final_limit == UNLIMITED + updated_output_limit = UNLIMITED + else + if rich_sort? + updated_sort_limit = final_offset + [sort_limit, output_limit].max + end + updated_output_limit = final_offset + final_limit + end + + if updated_sort_limit and updated_sort_limit != @query["sortBy"]["limit"] + @query["sortBy"]["limit"] = updated_sort_limit + end + if updated_output_limit and @output["limit"] and updated_output_limit != @output["limit"] + @output["limit"] = updated_output_limit + end + end + + def calculate_sort_offset! + # Offset for workers must be zero, because we have to apply "limit" and + # "offset" on the last gathering phase instead of each reducing phase. + if rich_sort? + @query["sortBy"]["offset"] = 0 + end + end + + def sort_offset + if rich_sort? + @query["sortBy"]["offset"] || 0 + else + 0 + end + end + + def output_offset + @output["offset"] || 0 + end + + def sort_limit + if rich_sort? + @query["sortBy"]["limit"] || UNLIMITED + else + UNLIMITED + end + end + + def output_limit + @output["limit"] || 0 + end + + def calculate_output_offset! + @output["offset"] = 0 if have_records? and @output["offset"] + end + + def final_offset + @original_sort_offset + @original_output_offset + end + + def final_limit + if @original_sort_limit == UNLIMITED and + @original_output_limit == UNLIMITED + UNLIMITED + else + if @original_sort_limit == UNLIMITED + @original_output_limit + elsif @original_output_limit == UNLIMITED + @original_sort_limit + else + [@original_sort_limit, @original_output_limit].min + end + end + end + + def have_records? + @output["elements"].include?("records") + end + + def rich_sort? + @query["sortBy"].is_a?(Hash) + end + + def unifiable? + @output["unifiable"] + end + + def build_count_mapper_and_reducer! + return unless @output["elements"].include?("count") + + @reducers["count"] = { + "type" => "sum", + } + if unifiable? + @query["sortBy"]["limit"] = -1 if @query["sortBy"].is_a?(Hash) + @output["limit"] = -1 + mapper = { + "target" => "records", + } + unless @output["elements"].include?("records") + @records_limit = -1 + @output["elements"] << "records" + @output["attributes"] ||= ["_key"] + @output_records = false + end + @mappers["count"] = mapper + end + end + + def build_records_mapper_and_reducer! + # Skip reducing phase for a result with no record output. + return if !@output["elements"].include?("records") || @records_limit.zero? + + # Append sort key attributes to the list of output attributes + # temporarily, for the reducing phase. After all extra columns + # are removed on the gathering phase. + final_attributes = output_attribute_names + update_output_attributes! + + @reducers["records"] = build_records_reducer + + mapper = {} + if @output_records + mapper["format"] = @records_format unless @records_format == "simple" + mapper["attributes"] = final_attributes unless final_attributes.empty? + mapper["offset"] = @records_offset unless @records_offset.zero? + mapper["limit"] = @records_limit unless @records_limit.zero? + else + mapper["no_output"] = true + end + @mappers["records"] = mapper + end + + def output_attribute_names + attributes = @output["attributes"] || [] + if attributes.is_a?(Hash) + attributes.keys + else + attributes.collect do |attribute| + if attribute.is_a?(Hash) + attribute["label"] || attribute["source"] + else + attribute + end + end + end + end + + def update_output_attributes! + @output["attributes"] = array_style_attributes + @output["attributes"] += sort_attribute_names + if unifiable? and !source_column_names.include?("_key") + @output["attributes"] << "_key" + end + end + + def array_style_attributes + attributes = @output["attributes"] || [] + if attributes.is_a?(Hash) + attributes.keys.collect do |key| + attribute = attributes[key] + case attribute + when String + { + "label" => key, + "source" => attribute, + } + when Hash + attribute["label"] = key + attribute + end + end + else + attributes + end + end + + def source_column_names + attributes = @output["attributes"] || [] + if attributes.is_a?(Hash) + attributes_hash = attributes + attributes = [] + attributes_hash.each do |key, attribute| + attributes << attribute["source"] || key + end + attributes + else + attributes.collect do |attribute| + if attribute.is_a?(Hash) + attribute["source"] || attribute["label"] + else + attribute + end + end + end + end + + def sort_attribute_names + sort_attributes = @sort_keys.collect do |key| + key = key[1..-1] if key[0] == "-" + key + end + attributes = source_column_names + sort_attributes.reject! do |attribute| + attributes.include?(attribute) + end + sort_attributes + end + + ASCENDING_OPERATOR = "<" + DESCENDING_OPERATOR = ">" + + def build_records_reducer + attributes = source_column_names + key_column_index = attributes.index("_key") + + operators = @sort_keys.collect do |sort_key| + operator = ASCENDING_OPERATOR + if sort_key[0] == "-" + operator = DESCENDING_OPERATOR + sort_key = sort_key[1..-1] + end + { + "operator" => operator, + "column" => attributes.index(sort_key), + } + end + + reducer = { + "type" => "sort", + "operators" => operators, + } + if unifiable? and !key_column_index.nil? + reducer["key_column"] = key_column_index + end + + # On the reducing phase, we apply only "limit". We cannot apply + # "offset" on this phase because the collector merges a pair of + # results step by step even if there are three or more results. + # Instead, we apply "offset" on the gathering phase. + reducer["limit"] = @output["limit"] + + reducer + end + end + end + end + end +end Modified: lib/droonga/plugins/watch.rb (+12 -0) =================================================================== --- lib/droonga/plugins/watch.rb 2014-02-17 12:12:03 +0900 (4b1482b) +++ lib/droonga/plugins/watch.rb 2014-02-17 13:04:13 +0900 (7484279) @@ -23,6 +23,18 @@ module Droonga module Watch Plugin.registry.register("watch", self) + class Planner < Droonga::Planner + message.pattern = ["type", :start_with, "watch."] + + def plan(message) + broadcast(message, + :write => true, + :reduce => { + "success" => "and" + }) + end + end + module SchemaCreatable private def ensure_schema_created Modified: test/unit/helper/distributed_search_planner_helper.rb (+2 -2) =================================================================== --- test/unit/helper/distributed_search_planner_helper.rb 2014-02-17 12:12:03 +0900 (b81b61b) +++ test/unit/helper/distributed_search_planner_helper.rb 2014-02-17 13:04:13 +0900 (fcd96f1) @@ -13,11 +13,11 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/plugin/planner/distributed_search_planner" +require "droonga/plugins/search/distributed_search_planner" module DistributedSearchPlannerHelper def plan(search_request) - planner = Droonga::DistributedSearchPlanner.new(search_request) + planner = Droonga::Plugins::Search::DistributedSearchPlanner.new(search_request) planner.plan end Renamed: test/unit/plugins/search/planner/test_basic.rb (+0 -0) 100% =================================================================== Renamed: test/unit/plugins/search/planner/test_group_by.rb (+0 -0) 100% =================================================================== Renamed: test/unit/plugins/search/planner/test_output.rb (+0 -0) 100% =================================================================== Renamed: test/unit/plugins/search/planner/test_sort_by.rb (+0 -0) 100% =================================================================== Renamed: test/unit/plugins/search/test_handler.rb (+0 -0) 100% =================================================================== Renamed: test/unit/plugins/search/test_planner.rb (+3 -3) 96% =================================================================== --- test/unit/plugin/planner/test_search.rb 2014-02-17 12:12:03 +0900 (9f0f625) +++ test/unit/plugins/search/test_planner.rb 2014-02-17 13:04:13 +0900 (bbff123) @@ -13,13 +13,13 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/plugin/planner/search" +require "droonga/plugins/search" class SearchPlannerTest < Test::Unit::TestCase def setup setup_database @planner = Droonga::Test::StubPlanner.new - @plugin = Droonga::SearchPlanner.new(@planner) + @plugin = Droonga::Plugins::Search::Planner.new(@planner) end def teardown @@ -56,7 +56,7 @@ class SearchPlannerTest < Test::Unit::TestCase }, } - @planner.distribute(@plugin.process("search", envelope)) + @planner.distribute(@plugin.plan(envelope)) message = [] -------------- next part -------------- HTML����������������������������...Download