[Groonga-commit] droonga/fluent-plugin-droonga at cb63b0f [master] planner: migrate to new plugin style

Back to archive index

Kouhei Sutou null+****@clear*****
Mon Feb 17 13:04:13 JST 2014


Kouhei Sutou	2014-02-17 13:04:13 +0900 (Mon, 17 Feb 2014)

  New Revision: cb63b0fc9d0dea15df4ad484c2dd4c73b5831a95
  https://github.com/droonga/fluent-plugin-droonga/commit/cb63b0fc9d0dea15df4ad484c2dd4c73b5831a95

  Message:
    planner: migrate to new plugin style

  Added files:
    lib/droonga/planner_runner.rb
    lib/droonga/plugins/search/distributed_search_planner.rb
  Copied files:
    lib/droonga/plugin/metadata/planner_message.rb
      (from lib/droonga/plugin.rb)
    lib/droonga/plugins/groonga/schema_planer.rb
      (from lib/droonga/error.rb)
  Removed files:
    lib/droonga/plugin/planner/crud.rb
    lib/droonga/plugin/planner/distributed_search_planner.rb
    lib/droonga/plugin/planner/groonga.rb
    lib/droonga/plugin/planner/search.rb
    lib/droonga/plugin/planner/watch.rb
  Modified files:
    lib/droonga/dispatcher.rb
    lib/droonga/error.rb
    lib/droonga/planner.rb
    lib/droonga/plugin.rb
    lib/droonga/plugins/crud.rb
    lib/droonga/plugins/groonga.rb
    lib/droonga/plugins/search.rb
    lib/droonga/plugins/watch.rb
    test/unit/helper/distributed_search_planner_helper.rb
  Renamed files:
    test/unit/plugins/search/planner/test_basic.rb
      (from test/unit/plugin/planner/search_planner/test_basic.rb)
    test/unit/plugins/search/planner/test_group_by.rb
      (from test/unit/plugin/planner/search_planner/test_group_by.rb)
    test/unit/plugins/search/planner/test_output.rb
      (from test/unit/plugin/planner/search_planner/test_output.rb)
    test/unit/plugins/search/planner/test_sort_by.rb
      (from test/unit/plugin/planner/search_planner/test_sort_by.rb)
    test/unit/plugins/search/test_handler.rb
      (from test/unit/plugins/test_search.rb)
    test/unit/plugins/search/test_planner.rb
      (from test/unit/plugin/planner/test_search.rb)

  Modified: lib/droonga/dispatcher.rb (+14 -8)
===================================================================
--- lib/droonga/dispatcher.rb    2014-02-17 12:12:03 +0900 (24c8b79)
+++ lib/droonga/dispatcher.rb    2014-02-17 13:04:13 +0900 (4e25495)
@@ -17,7 +17,7 @@ require "English"
 require "tsort"
 
 require "droonga/adapter_runner"
-require "droonga/planner"
+require "droonga/planner_runner"
 require "droonga/collector"
 require "droonga/farm"
 require "droonga/session"
@@ -56,12 +56,11 @@ module Droonga
       @sessions = {}
       @current_id = 0
       @local = Regexp.new("^#{@name}")
-      @adapter_runners = create_adapter_runners
+      @adapter_runners = create_runners(AdapterRunner)
       @farm = Farm.new(name, @catalog, @loop, :dispatcher => self)
       @forwarder = Forwarder.new(@loop)
       @replier = Replier.new(@forwarder)
-      # TODO: make customizable
-      @planner = Planner.new(self, ["search", "crud", "groonga", "watch"])
+      @planner_runners = create_runners(PlannerRunner)
       # TODO: make customizable
       @collector = Collector.new(["basic", "search"])
     end
@@ -76,7 +75,9 @@ module Droonga
 
     def shutdown
       @forwarder.shutdown
-      @planner.shutdown
+      @planner_runners.each_value do |planner_runner|
+        planner_runner.shutdown
+      end
       @collector.shutdown
       @adapter_runners.each_value do |adapter_runner|
         adapter_runner.shutdown
@@ -226,9 +227,14 @@ module Droonga
       dataset = message["dataset"]
       adapter_runner = @adapter_runners[dataset]
       adapted_message = adapter_runner.adapt_input(message)
-      plan =****@plann*****(adapted_message["type"], adapted_message)
+      planner_runner = @planner_runners[dataset]
+      plan = planner_runner.plan(adapted_message)
       distributor = Distributor.new(self)
       distributor.distribute(plan)
+    rescue Droonga::UnsupportedMessageError => error
+      target_message = error.message
+      raise UnknownCommand.new(target_message["type"],
+                               target_message["dataset"])
     rescue Droonga::LegacyPluggable::UnknownPlugin => error
       raise UnknownCommand.new(error.command, message["dataset"])
     end
@@ -243,10 +249,10 @@ module Droonga
       end
     end
 
-    def create_adapter_runners
+    def create_runners(runner_class)
       runners = {}
       @catalog.datasets.each do |name, configuration|
-        runners[name] = AdapterRunner.new(self, configuration["plugins"] || [])
+        runners[name] = runner_class.new(self, configuration["plugins"] || [])
       end
       runners
     end

  Modified: lib/droonga/error.rb (+10 -0)
===================================================================
--- lib/droonga/error.rb    2014-02-17 12:12:03 +0900 (143a386)
+++ lib/droonga/error.rb    2014-02-17 13:04:13 +0900 (39f8a49)
@@ -31,4 +31,14 @@ module Droonga
       super(message)
     end
   end
+
+  # TODO: Move to common file for runners
+  class UnsupportedMessageError < Error
+    attr_reader :phase, :message
+    def initialize(phase, message)
+      @phase = phase
+      @message = message
+      super("[#{@phase}] Unsupported message: #{@message.inspect}")
+    end
+  end
 end

  Modified: lib/droonga/planner.rb (+26 -9)
===================================================================
--- lib/droonga/planner.rb    2014-02-17 12:12:03 +0900 (03d009e)
+++ lib/droonga/planner.rb    2014-02-17 13:04:13 +0900 (6485e50)
@@ -13,25 +13,42 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/legacy_pluggable"
-require "droonga/planner_plugin"
+require "droonga/pluggable"
+require "droonga/plugin/metadata/planner_message"
+require "droonga/distributed_command_planner"
 
 module Droonga
   class Planner
-    include LegacyPluggable
+    extend Pluggable
 
-    def initialize(dispatcher, plugins)
+    class << self
+      def message
+        Plugin::Metadata::PlannerMessage.new(self)
+      end
+    end
+
+    def initialize(dispatcher)
       @dispatcher = dispatcher
-      load_plugins(plugins)
+    end
+
+    def plan(message)
+      raise NotImplemented, "#{self.class.name}\##{__method__} must implement."
     end
 
     private
-    def instantiate_plugin(name)
-      PlannerPlugin.repository.instantiate(name, self)
+    def scatter(message, options={})
+      planner = DistributedCommandPlanner.new(message)
+      planner.scatter
+      planner.key = options[:key]
+      planner.reduce(options[:reduce])
+      planner.plan
     end
 
-    def log_tag
-      "[#{Process.pid}] planner"
+    def broadcast(message, options={})
+      planner = DistributedCommandPlanner.new(message)
+      planner.broadcast(:write => options[:write])
+      planner.reduce(options[:reduce])
+      planner.plan
     end
   end
 end

  Added: lib/droonga/planner_runner.rb (+61 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/planner_runner.rb    2014-02-17 13:04:13 +0900 (c0d9467)
@@ -0,0 +1,61 @@
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/message_matcher"
+require "droonga/planner"
+
+module Droonga
+  class PlannerRunner
+    def initialize(dispatcher, plugins)
+      @dispatcher = dispatcher
+      @planner_classes = Planner.find_sub_classes(plugins)
+    end
+
+    def shutdown
+    end
+
+    def plan(message)
+      $log.trace("#{log_tag}: plan: start",
+                 :dataset => message["dataset"],
+                 :type => message["type"])
+      planner_class = find_planner_class(message)
+      if planner_class.nil?
+        raise UnsupportedMessageError.new(:planner, message)
+      end
+      planner = planner_class.new(@dispatcher)
+      plan = planner.plan(message)
+      $log.trace("#{log_tag}: plan: done",
+                 :steps => plan.collect {|step| step["type"]})
+      plan
+    end
+
+    private
+    def find_planner_class(message)
+      @planner_classes.find do |planner_class|
+        pattern = planner_class.message.pattern
+        if pattern
+          matcher = MessageMatcher.new(pattern)
+          matcher.match?(message)
+        else
+          false
+        end
+      end
+    end
+
+    def log_tag
+      "adapter-runner"
+    end
+  end
+end

  Modified: lib/droonga/plugin.rb (+1 -0)
===================================================================
--- lib/droonga/plugin.rb    2014-02-17 12:12:03 +0900 (9f1d085)
+++ lib/droonga/plugin.rb    2014-02-17 13:04:13 +0900 (3928138)
@@ -15,6 +15,7 @@
 
 require "droonga/plugin_registry"
 require "droonga/adapter"
+require "droonga/planner"
 require "droonga/handler"
 
 module Droonga

  Copied: lib/droonga/plugin/metadata/planner_message.rb (+31 -7) 53%
===================================================================
--- lib/droonga/plugin.rb    2014-02-17 12:12:03 +0900 (9f1d085)
+++ lib/droonga/plugin/metadata/planner_message.rb    2014-02-17 13:04:13 +0900 (63fc1e9)
@@ -13,15 +13,39 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/plugin_registry"
-require "droonga/adapter"
-require "droonga/handler"
-
 module Droonga
   module Plugin
-    class << self
-      def registry
-        @@registry ||= PluginRegistry.new
+    module Metadata
+      class PlannerMessage
+        def initialize(plugin_class)
+          @plugin_class = plugin_class
+        end
+
+        def pattern
+          configuration[:pattern] || fallback_pattern
+        end
+
+        def pattern=(pattern)
+          configuration[:pattern] = pattern
+        end
+
+        def type
+          configuration[:type]
+        end
+
+        def type=(type)
+          configuration[:type] = type
+        end
+
+        private
+        def configuration
+          @plugin_class.options[:message] ||= {}
+        end
+
+        def fallback_pattern
+          return nil if type.nil?
+          ["type", :equal, type]
+        end
       end
     end
   end

  Deleted: lib/droonga/plugin/planner/crud.rb (+0 -49) 100644
===================================================================
--- lib/droonga/plugin/planner/crud.rb    2014-02-17 12:12:03 +0900 (c569f64)
+++ /dev/null
@@ -1,49 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013-2014 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/planner_plugin"
-
-module Droonga
-  class CRUDPlanner < Droonga::PlannerPlugin
-    repository.register("crud", self)
-
-    command :add
-    def add(message)
-      scatter(message)
-    end
-
-    command :update
-    def update(message)
-      scatter(message)
-    end
-
-    # TODO: What is this?
-    command :reset
-    def reset(message)
-      scatter(message)
-    end
-
-    private
-    def scatter(message)
-      super(message,
-            :key => message["body"]["key"] || rand.to_s,
-            :reduce => {
-              "success" => "and"
-            })
-    end
-  end
-end

  Deleted: lib/droonga/plugin/planner/distributed_search_planner.rb (+0 -393) 100644
===================================================================
--- lib/droonga/plugin/planner/distributed_search_planner.rb    2014-02-17 12:12:03 +0900 (3800a51)
+++ /dev/null
@@ -1,393 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013-2014 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/searcher"
-require "droonga/distributed_command_planner"
-
-module Droonga
-  class DistributedSearchPlanner < DistributedCommandPlanner
-    def initialize(search_request_message)
-      super
-
-      @request = @source_message["body"]
-      raise NoQuery.new unless @request
-
-      @request = Marshal.load(Marshal.dump(@request))
-      @queries = @request["queries"]
-    end
-
-    def plan
-      raise Searcher::NoQuery.new if****@queri*****? or****@queri*****?
-
-      Searcher::QuerySorter.validate_dependencies(@queries)
-
-      ensure_unifiable!
-
-      @queries.each do |input_name, query|
-        transform_query(input_name, query)
-      end
-
-      @dataset = @source_message["dataset"] || @request["dataset"]
-      broadcast(:body => @request)
-      
-      super
-    end
-
-    private
-    UNLIMITED = -1
-
-    def reduce_command
-      "search_reduce"
-    end
-
-    def gather_command
-      "search_gather"
-    end
-
-    def ensure_unifiable!
-      @queries.each do |name, query|
-        if unifiable?(name) and query["output"]
-          query["output"]["unifiable"] = true
-        end
-      end
-    end
-
-    def unifiable?(name)
-      query = @queries[name]
-      return true if query["groupBy"]
-      name = query["source"]
-      return false unles****@queri*****?(name)
-      unifiable?(name)
-    end
-
-    def transform_query(input_name, query)
-      output = query["output"]
-
-      # Skip reducing phase for a result with no output.
-      if output.nil? or
-           output["elements"].nil? or
-           (!output["elements"].include?("count") and
-            !output["elements"].include?("records"))
-        return
-      end
-
-      transformer = QueryTransformer.new(query)
-
-      elements = transformer.mappers
-      mapper = {}
-      mapper["elements"] = elements unless elements.empty?
-      reduce(input_name => { :reduce => transformer.reducers,
-                             :gather => mapper })
-    end
-
-    class QueryTransformer
-      attr_reader :reducers, :mappers
-
-      def initialize(query)
-        @query = query
-        @output = @query["output"]
-        @reducers = {}
-        @mappers = {}
-        @output_records = true
-        transform!
-      end
-
-      def transform!
-        # The collector module supports only "simple" format search results.
-        # So we have to override the format and restore it on the gathering
-        # phase.
-        @records_format = @output["format"] || "simple"
-        if @output["format"] and @output["format"] != "simple"
-          @output["format"] = "simple"
-        end
-
-        @sort_keys = @query["sortBy"] || []
-        @sort_keys = @sort_keys["keys"] || [] if @sort_keys.is_a?(Hash)
-
-        calculate_offset_and_limit!
-        build_count_mapper_and_reducer!
-        build_records_mapper_and_reducer!
-      end
-
-      def calculate_offset_and_limit!
-        @original_sort_offset = sort_offset
-        @original_output_offset = output_offset
-        @original_sort_limit = sort_limit
-        @original_output_limit = output_limit
-
-        calculate_sort_offset!
-        calculate_output_offset!
-
-        # We have to calculate limit based on offset.
-        # <A, B = limited integer (0...MAXINT)>
-        # | sort limit | output limit | => | worker's sort limit      | worker's output limit   | final limit |
-        # =============================    ====================================================================
-        # | UNLIMITED  | UNLIMITED    | => | UNLIMITED                | UNLIMITED               | UNLIMITED   |
-        # | UNLIMITED  | B            | => | final_offset + B         | final_offset + B        | B           |
-        # | A          | UNLIMITED    | => | final_offset + A         | final_offset + A        | A           |
-        # | A          | B            | => | final_offset + max(A, B) | final_offset + min(A, B)| min(A, B)   |
-
-        # XXX final_limit and final_offset calculated in many times
-
-        @records_offset = final_offset
-        @records_limit = final_limit
-
-        updated_sort_limit = nil
-        updated_output_limit = nil
-        if final_limit == UNLIMITED
-          updated_output_limit = UNLIMITED
-        else
-          if rich_sort?
-            updated_sort_limit = final_offset + [sort_limit, output_limit].max
-          end
-          updated_output_limit = final_offset + final_limit
-        end
-
-        if updated_sort_limit and updated_sort_limit != @query["sortBy"]["limit"]
-          @query["sortBy"]["limit"] = updated_sort_limit
-        end
-        if updated_output_limit and @output["limit"] and updated_output_limit != @output["limit"]
-          @output["limit"] = updated_output_limit
-        end
-      end
-
-      def calculate_sort_offset!
-        # Offset for workers must be zero, because we have to apply "limit" and
-        # "offset" on the last gathering phase instead of each reducing phase.
-        if rich_sort?
-          @query["sortBy"]["offset"] = 0
-        end
-      end
-
-      def sort_offset
-        if rich_sort?
-          @query["sortBy"]["offset"] || 0
-        else
-          0
-        end
-      end
-
-      def output_offset
-        @output["offset"] || 0
-      end
-
-      def sort_limit
-        if rich_sort?
-          @query["sortBy"]["limit"] || UNLIMITED
-        else
-          UNLIMITED
-        end
-      end
-
-      def output_limit
-        @output["limit"] || 0
-      end
-
-      def calculate_output_offset!
-        @output["offset"] = 0 if have_records? and @output["offset"]
-      end
-
-      def final_offset
-        @original_sort_offset + @original_output_offset
-      end
-
-      def final_limit
-        if @original_sort_limit == UNLIMITED and @original_output_limit == UNLIMITED
-          UNLIMITED
-        else
-          if @original_sort_limit == UNLIMITED
-            @original_output_limit
-          elsif @original_output_limit == UNLIMITED
-            @original_sort_limit
-          else
-            [@original_sort_limit, @original_output_limit].min
-          end
-        end
-      end
-
-      def have_records?
-        @output["elements"].include?("records")
-      end
-
-      def rich_sort?
-        @query["sortBy"].is_a?(Hash)
-      end
-
-      def unifiable?
-        @output["unifiable"]
-      end
-
-      def build_count_mapper_and_reducer!
-        return unless @output["elements"].include?("count")
-
-        @reducers["count"] = {
-          "type" => "sum",
-        }
-        if unifiable?
-          @query["sortBy"]["limit"] = -1 if @query["sortBy"].is_a?(Hash)
-          @output["limit"] = -1
-          mapper = {
-            "target" => "records",
-          }
-          unless @output["elements"].include?("records")
-            @records_limit = -1
-            @output["elements"] << "records"
-            @output["attributes"] ||= ["_key"]
-            @output_records = false
-          end
-          @mappers["count"] = mapper
-        end
-      end
-
-      def build_records_mapper_and_reducer!
-        # Skip reducing phase for a result with no record output.
-        return if !@output["elements"].include?("records") || @records_limit.zero?
-
-        # Append sort key attributes to the list of output attributes
-        # temporarily, for the reducing phase. After all extra columns
-        # are removed on the gathering phase.
-        final_attributes = output_attribute_names
-        update_output_attributes!
-
-        @reducers["records"] = build_records_reducer
-
-        mapper = {}
-        if @output_records
-          mapper["format"]     = @records_format unless @records_format == "simple"
-          mapper["attributes"] = final_attributes unless final_attributes.empty?
-          mapper["offset"]     = @records_offset unless @records_offset.zero?
-          mapper["limit"]      = @records_limit unless @records_limit.zero?
-        else
-          mapper["no_output"] = true
-        end
-        @mappers["records"] = mapper
-      end
-
-      def output_attribute_names
-        attributes = @output["attributes"] || []
-        if attributes.is_a?(Hash)
-          attributes.keys
-        else
-          attributes.collect do |attribute|
-            if attribute.is_a?(Hash)
-              attribute["label"] || attribute["source"]
-            else
-              attribute
-            end
-          end
-        end
-      end
-
-      def update_output_attributes!
-        @output["attributes"] = array_style_attributes
-        @output["attributes"] += sort_attribute_names
-        if unifiable? and !source_column_names.include?("_key")
-          @output["attributes"] << "_key"
-        end
-      end
-
-      def array_style_attributes
-        attributes = @output["attributes"] || []
-        if attributes.is_a?(Hash)
-          attributes.keys.collect do |key|
-            attribute = attributes[key]
-            case attribute
-            when String
-              {
-                "label"  => key,
-                "source" => attribute,
-              }
-            when Hash
-              attribute["label"] = key
-              attribute
-            end
-          end
-        else
-          attributes
-        end
-      end
-
-      def source_column_names
-        attributes = @output["attributes"] || []
-        if attributes.is_a?(Hash)
-          attributes_hash = attributes
-          attributes = []
-          attributes_hash.each do |key, attribute|
-            attributes << attribute["source"] || key
-          end
-          attributes
-        else
-          attributes.collect do |attribute|
-            if attribute.is_a?(Hash)
-              attribute["source"] || attribute["label"]
-            else
-              attribute
-            end
-          end
-        end
-      end
-
-      def sort_attribute_names
-        sort_attributes = @sort_keys.collect do |key|
-          key = key[1..-1] if key[0] == "-"
-          key
-        end
-        attributes = source_column_names
-        sort_attributes.reject! do |attribute|
-          attributes.include?(attribute)
-        end
-        sort_attributes
-      end
-
-      ASCENDING_OPERATOR = "<"
-      DESCENDING_OPERATOR = ">"
-
-      def build_records_reducer
-        attributes = source_column_names
-        key_column_index = attributes.index("_key")
-
-        operators = @sort_keys.collect do |sort_key|
-          operator = ASCENDING_OPERATOR
-          if sort_key[0] == "-"
-            operator = DESCENDING_OPERATOR
-            sort_key = sort_key[1..-1]
-          end
-          {
-            "operator" => operator,
-            "column"   => attributes.index(sort_key),
-          }
-        end
-
-        reducer = {
-          "type"      => "sort",
-          "operators" => operators,
-        }
-        if unifiable? and !key_column_index.nil?
-          reducer["key_column"] = key_column_index
-        end
-
-        # On the reducing phase, we apply only "limit". We cannot apply
-        # "offset" on this phase because the collector merges a pair of
-        # results step by step even if there are three or more results.
-        # Instead, we apply "offset" on the gathering phase.
-        reducer["limit"] = @output["limit"]
-
-        reducer
-      end
-    end
-  end
-end

  Deleted: lib/droonga/plugin/planner/groonga.rb (+0 -54) 100644
===================================================================
--- lib/droonga/plugin/planner/groonga.rb    2014-02-17 12:12:03 +0900 (665a5be)
+++ /dev/null
@@ -1,54 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/planner_plugin"
-
-module Droonga
-  class GroongaPlanner < Droonga::PlannerPlugin
-    repository.register("groonga", self)
-
-    command :table_create
-    def table_create(message)
-      unless message["dataset"]
-        raise "dataset must be set. FIXME: This error should return client."
-      end
-      broadcast(message)
-    end
-
-    command :table_remove
-    def table_remove(message)
-      unless message["dataset"]
-        raise "dataset must be set. FIXME: This error should return client."
-      end
-      broadcast(message)
-    end
-
-    command :column_create
-    def column_create(message)
-      broadcast(message)
-    end
-
-    private
-    def broadcast(message)
-      super(message,
-            :write => true,
-            :reduce => {
-              "result" => "or"
-            })
-    end
-  end
-end

  Deleted: lib/droonga/plugin/planner/search.rb (+0 -31) 100644
===================================================================
--- lib/droonga/plugin/planner/search.rb    2014-02-17 12:12:03 +0900 (fba7010)
+++ /dev/null
@@ -1,31 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013-2014 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/planner_plugin"
-require "droonga/plugin/planner/distributed_search_planner"
-
-module Droonga
-  class SearchPlanner < Droonga::PlannerPlugin
-    repository.register("search", self)
-
-    command :search
-    def search(message)
-      planner = DistributedSearchPlanner.new(message)
-      planner.plan
-    end
-  end
-end

  Deleted: lib/droonga/plugin/planner/watch.rb (+0 -53) 100644
===================================================================
--- lib/droonga/plugin/planner/watch.rb    2014-02-17 12:12:03 +0900 (8e87ed1)
+++ /dev/null
@@ -1,53 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/planner_plugin"
-
-module Droonga
-  class WatchPlanner < Droonga::PlannerPlugin
-    repository.register("watch", self)
-
-    command "watch.feed" => :feed
-    def feed(message)
-      broadcast(message)
-    end
-
-    command "watch.subscribe" => :subscribe
-    def subscribe(message)
-      broadcast(message)
-    end
-
-    command "watch.unsubscribe" => :unsubscribe
-    def unsubscribe(message)
-      broadcast(message)
-    end
-
-    command "watch.sweep" => :sweep
-    def sweep(message)
-      broadcast(message)
-    end
-
-    private
-    def broadcast(message)
-      super(message,
-            :write => true,
-            :reduce => {
-              "success" => "and"
-            })
-    end
-  end
-end

  Modified: lib/droonga/plugins/crud.rb (+12 -0)
===================================================================
--- lib/droonga/plugins/crud.rb    2014-02-17 12:12:03 +0900 (316930b)
+++ lib/droonga/plugins/crud.rb    2014-02-17 13:04:13 +0900 (01513ee)
@@ -35,6 +35,18 @@ module Droonga
         end
       end
 
+      class Planner < Droonga::Planner
+        message.type = "add"
+
+        def plan(message)
+          scatter(message,
+                  :key => message["body"]["key"] || rand.to_s,
+                  :reduce => {
+                    "success" => "and"
+                  })
+        end
+      end
+
       class Handler < Droonga::Handler
         message.type = "add"
 

  Modified: lib/droonga/plugins/groonga.rb (+1 -0)
===================================================================
--- lib/droonga/plugins/groonga.rb    2014-02-17 12:12:03 +0900 (9cd6b7e)
+++ lib/droonga/plugins/groonga.rb    2014-02-17 13:04:13 +0900 (3fb0113)
@@ -15,6 +15,7 @@
 
 require "droonga/plugin"
 require "droonga/plugins/groonga/generic_response"
+require "droonga/plugins/groonga/schema_planer"
 require "droonga/plugins/groonga/select"
 require "droonga/plugins/groonga/table_create"
 require "droonga/plugins/groonga/table_remove"

  Copied: lib/droonga/plugins/groonga/schema_planer.rb (+16 -13) 62%
===================================================================
--- lib/droonga/error.rb    2014-02-17 12:12:03 +0900 (143a386)
+++ lib/droonga/plugins/groonga/schema_planer.rb    2014-02-17 13:04:13 +0900 (0b71f26)
@@ -1,5 +1,3 @@
-# -*- coding: utf-8 -*-
-#
 # Copyright (C) 2014 Droonga Project
 #
 # This library is free software; you can redistribute it and/or
@@ -16,19 +14,24 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 module Droonga
-  class Error < StandardError
-  end
-
-  class MultiplexError < Error
-    attr_reader :errors
+  module Plugins
+    module Groonga
+      class SchemaPlaner < Droonga::Planner
+        schema_commands = [
+          "table_create",
+          "table_remove",
+          "column_create",
+        ]
+        message.pattern = ["type", :in, schema_commands]
 
-    def initialize(errors=[])
-      @errors = errors
-      error_messages =****@error***** do |error|
-        error.message
+        def plan(message)
+          broadcast(message,
+                    :write => true,
+                    :reduce => {
+                      "result" => "or"
+                    })
+        end
       end
-      message = error_messages.sort.join("\n-----------------------\n")
-      super(message)
     end
   end
 end

  Modified: lib/droonga/plugins/search.rb (+10 -0)
===================================================================
--- lib/droonga/plugins/search.rb    2014-02-17 12:12:03 +0900 (84bffc1)
+++ lib/droonga/plugins/search.rb    2014-02-17 13:04:13 +0900 (6d82c0c)
@@ -15,12 +15,22 @@
 
 require "droonga/plugin"
 require "droonga/searcher"
+require "droonga/plugins/search/distributed_search_planner"
 
 module Droonga
   module Plugins
     module Search
       Plugin.registry.register("search", self)
 
+      class Planner < Droonga::Planner
+        message.type = "search"
+
+        def plan(message)
+          planner = DistributedSearchPlanner.new(message)
+          planner.plan
+        end
+      end
+
       class Handler < Droonga::Handler
         message.type = "search"
 

  Added: lib/droonga/plugins/search/distributed_search_planner.rb (+398 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/plugins/search/distributed_search_planner.rb    2014-02-17 13:04:13 +0900 (7c08a2d)
@@ -0,0 +1,398 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013-2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/searcher"
+require "droonga/distributed_command_planner"
+
+module Droonga
+  module Plugins
+    module Search
+      class DistributedSearchPlanner < DistributedCommandPlanner
+        def initialize(search_request_message)
+          super
+
+          @request = @source_message["body"]
+          raise NoQuery.new unless @request
+
+          @request = Marshal.load(Marshal.dump(@request))
+          @queries = @request["queries"]
+        end
+
+        def plan
+          raise Searcher::NoQuery.new if****@queri*****? or****@queri*****?
+
+          Searcher::QuerySorter.validate_dependencies(@queries)
+
+          ensure_unifiable!
+
+          @queries.each do |input_name, query|
+            transform_query(input_name, query)
+          end
+
+          @dataset = @source_message["dataset"] || @request["dataset"]
+          broadcast(:body => @request)
+
+          super
+        end
+
+        private
+        UNLIMITED = -1
+
+        def reduce_command
+          "search_reduce"
+        end
+
+        def gather_command
+          "search_gather"
+        end
+
+        def ensure_unifiable!
+          @queries.each do |name, query|
+            if unifiable?(name) and query["output"]
+              query["output"]["unifiable"] = true
+            end
+          end
+        end
+
+        def unifiable?(name)
+          query = @queries[name]
+          return true if query["groupBy"]
+          name = query["source"]
+          return false unles****@queri*****?(name)
+          unifiable?(name)
+        end
+
+        def transform_query(input_name, query)
+          output = query["output"]
+
+          # Skip reducing phase for a result with no output.
+          if output.nil? or
+              output["elements"].nil? or
+              (!output["elements"].include?("count") and
+              !output["elements"].include?("records"))
+            return
+          end
+
+          transformer = QueryTransformer.new(query)
+
+          elements = transformer.mappers
+          mapper = {}
+          mapper["elements"] = elements unless elements.empty?
+          reduce(input_name => { :reduce => transformer.reducers,
+                                 :gather => mapper })
+        end
+
+        class QueryTransformer
+          attr_reader :reducers, :mappers
+
+          def initialize(query)
+            @query = query
+            @output = @query["output"]
+            @reducers = {}
+            @mappers = {}
+            @output_records = true
+            transform!
+          end
+
+          def transform!
+            # The collector module supports only "simple" format search results.
+            # So we have to override the format and restore it on the gathering
+            # phase.
+            @records_format = @output["format"] || "simple"
+            if @output["format"] and @output["format"] != "simple"
+              @output["format"] = "simple"
+            end
+
+            @sort_keys = @query["sortBy"] || []
+            @sort_keys = @sort_keys["keys"] || [] if @sort_keys.is_a?(Hash)
+
+            calculate_offset_and_limit!
+            build_count_mapper_and_reducer!
+            build_records_mapper_and_reducer!
+          end
+
+          def calculate_offset_and_limit!
+            @original_sort_offset = sort_offset
+            @original_output_offset = output_offset
+            @original_sort_limit = sort_limit
+            @original_output_limit = output_limit
+
+            calculate_sort_offset!
+            calculate_output_offset!
+
+            # We have to calculate limit based on offset.
+            # <A, B = limited integer (0...MAXINT)>
+            # | sort limit | output limit | => | worker's sort limit      | worker's output limit   | final limit |
+            # =============================    ====================================================================
+            # | UNLIMITED  | UNLIMITED    | => | UNLIMITED                | UNLIMITED               | UNLIMITED   |
+            # | UNLIMITED  | B            | => | final_offset + B         | final_offset + B        | B           |
+            # | A          | UNLIMITED    | => | final_offset + A         | final_offset + A        | A           |
+            # | A          | B            | => | final_offset + max(A, B) | final_offset + min(A, B)| min(A, B)   |
+
+            # XXX final_limit and final_offset calculated in many times
+
+            @records_offset = final_offset
+            @records_limit = final_limit
+
+            updated_sort_limit = nil
+            updated_output_limit = nil
+            if final_limit == UNLIMITED
+              updated_output_limit = UNLIMITED
+            else
+              if rich_sort?
+                updated_sort_limit = final_offset + [sort_limit, output_limit].max
+              end
+              updated_output_limit = final_offset + final_limit
+            end
+
+            if updated_sort_limit and updated_sort_limit != @query["sortBy"]["limit"]
+              @query["sortBy"]["limit"] = updated_sort_limit
+            end
+            if updated_output_limit and @output["limit"] and updated_output_limit != @output["limit"]
+              @output["limit"] = updated_output_limit
+            end
+          end
+
+          def calculate_sort_offset!
+            # Offset for workers must be zero, because we have to apply "limit" and
+            # "offset" on the last gathering phase instead of each reducing phase.
+            if rich_sort?
+              @query["sortBy"]["offset"] = 0
+            end
+          end
+
+          def sort_offset
+            if rich_sort?
+              @query["sortBy"]["offset"] || 0
+            else
+              0
+            end
+          end
+
+          def output_offset
+            @output["offset"] || 0
+          end
+
+          def sort_limit
+            if rich_sort?
+              @query["sortBy"]["limit"] || UNLIMITED
+            else
+              UNLIMITED
+            end
+          end
+
+          def output_limit
+            @output["limit"] || 0
+          end
+
+          def calculate_output_offset!
+            @output["offset"] = 0 if have_records? and @output["offset"]
+          end
+
+          def final_offset
+            @original_sort_offset + @original_output_offset
+          end
+
+          def final_limit
+            if @original_sort_limit == UNLIMITED and
+                @original_output_limit == UNLIMITED
+              UNLIMITED
+            else
+              if @original_sort_limit == UNLIMITED
+                @original_output_limit
+              elsif @original_output_limit == UNLIMITED
+                @original_sort_limit
+              else
+                [@original_sort_limit, @original_output_limit].min
+              end
+            end
+          end
+
+          def have_records?
+            @output["elements"].include?("records")
+          end
+
+          def rich_sort?
+            @query["sortBy"].is_a?(Hash)
+          end
+
+          def unifiable?
+            @output["unifiable"]
+          end
+
+          def build_count_mapper_and_reducer!
+            return unless @output["elements"].include?("count")
+
+            @reducers["count"] = {
+              "type" => "sum",
+            }
+            if unifiable?
+              @query["sortBy"]["limit"] = -1 if @query["sortBy"].is_a?(Hash)
+              @output["limit"] = -1
+              mapper = {
+                "target" => "records",
+              }
+              unless @output["elements"].include?("records")
+                @records_limit = -1
+                @output["elements"] << "records"
+                @output["attributes"] ||= ["_key"]
+                @output_records = false
+              end
+              @mappers["count"] = mapper
+            end
+          end
+
+          def build_records_mapper_and_reducer!
+            # Skip reducing phase for a result with no record output.
+            return if !@output["elements"].include?("records") || @records_limit.zero?
+
+            # Append sort key attributes to the list of output attributes
+            # temporarily, for the reducing phase. After all extra columns
+            # are removed on the gathering phase.
+            final_attributes = output_attribute_names
+            update_output_attributes!
+
+            @reducers["records"] = build_records_reducer
+
+            mapper = {}
+            if @output_records
+              mapper["format"]     = @records_format unless @records_format == "simple"
+              mapper["attributes"] = final_attributes unless final_attributes.empty?
+              mapper["offset"]     = @records_offset unless @records_offset.zero?
+              mapper["limit"]      = @records_limit unless @records_limit.zero?
+            else
+              mapper["no_output"] = true
+            end
+            @mappers["records"] = mapper
+          end
+
+          def output_attribute_names
+            attributes = @output["attributes"] || []
+            if attributes.is_a?(Hash)
+              attributes.keys
+            else
+              attributes.collect do |attribute|
+                if attribute.is_a?(Hash)
+                  attribute["label"] || attribute["source"]
+                else
+                  attribute
+                end
+              end
+            end
+          end
+
+          def update_output_attributes!
+            @output["attributes"] = array_style_attributes
+            @output["attributes"] += sort_attribute_names
+            if unifiable? and !source_column_names.include?("_key")
+              @output["attributes"] << "_key"
+            end
+          end
+
+          def array_style_attributes
+            attributes = @output["attributes"] || []
+            if attributes.is_a?(Hash)
+              attributes.keys.collect do |key|
+                attribute = attributes[key]
+                case attribute
+                when String
+                  {
+                    "label"  => key,
+                    "source" => attribute,
+                  }
+                when Hash
+                  attribute["label"] = key
+              attribute
+                end
+              end
+            else
+              attributes
+            end
+          end
+
+          def source_column_names
+            attributes = @output["attributes"] || []
+            if attributes.is_a?(Hash)
+              attributes_hash = attributes
+              attributes = []
+              attributes_hash.each do |key, attribute|
+                attributes << attribute["source"] || key
+              end
+              attributes
+            else
+              attributes.collect do |attribute|
+                if attribute.is_a?(Hash)
+                  attribute["source"] || attribute["label"]
+                else
+                  attribute
+                end
+              end
+            end
+          end
+
+          def sort_attribute_names
+            sort_attributes = @sort_keys.collect do |key|
+              key = key[1..-1] if key[0] == "-"
+              key
+            end
+            attributes = source_column_names
+            sort_attributes.reject! do |attribute|
+              attributes.include?(attribute)
+            end
+            sort_attributes
+          end
+
+          ASCENDING_OPERATOR = "<"
+          DESCENDING_OPERATOR = ">"
+
+          def build_records_reducer
+            attributes = source_column_names
+            key_column_index = attributes.index("_key")
+
+            operators = @sort_keys.collect do |sort_key|
+              operator = ASCENDING_OPERATOR
+              if sort_key[0] == "-"
+                operator = DESCENDING_OPERATOR
+                sort_key = sort_key[1..-1]
+              end
+              {
+                "operator" => operator,
+                "column"   => attributes.index(sort_key),
+              }
+            end
+
+            reducer = {
+              "type"      => "sort",
+              "operators" => operators,
+            }
+            if unifiable? and !key_column_index.nil?
+              reducer["key_column"] = key_column_index
+            end
+
+            # On the reducing phase, we apply only "limit". We cannot apply
+            # "offset" on this phase because the collector merges a pair of
+            # results step by step even if there are three or more results.
+            # Instead, we apply "offset" on the gathering phase.
+            reducer["limit"] = @output["limit"]
+
+            reducer
+          end
+        end
+      end
+    end
+  end
+end

  Modified: lib/droonga/plugins/watch.rb (+12 -0)
===================================================================
--- lib/droonga/plugins/watch.rb    2014-02-17 12:12:03 +0900 (4b1482b)
+++ lib/droonga/plugins/watch.rb    2014-02-17 13:04:13 +0900 (7484279)
@@ -23,6 +23,18 @@ module Droonga
     module Watch
       Plugin.registry.register("watch", self)
 
+      class Planner < Droonga::Planner
+        message.pattern = ["type", :start_with, "watch."]
+
+        def plan(message)
+          broadcast(message,
+                    :write => true,
+                    :reduce => {
+                      "success" => "and"
+                    })
+        end
+      end
+
       module SchemaCreatable
         private
         def ensure_schema_created

  Modified: test/unit/helper/distributed_search_planner_helper.rb (+2 -2)
===================================================================
--- test/unit/helper/distributed_search_planner_helper.rb    2014-02-17 12:12:03 +0900 (b81b61b)
+++ test/unit/helper/distributed_search_planner_helper.rb    2014-02-17 13:04:13 +0900 (fcd96f1)
@@ -13,11 +13,11 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/plugin/planner/distributed_search_planner"
+require "droonga/plugins/search/distributed_search_planner"
 
 module DistributedSearchPlannerHelper
   def plan(search_request)
-    planner = Droonga::DistributedSearchPlanner.new(search_request)
+    planner = Droonga::Plugins::Search::DistributedSearchPlanner.new(search_request)
     planner.plan
   end
 

  Renamed: test/unit/plugins/search/planner/test_basic.rb (+0 -0) 100%
===================================================================

  Renamed: test/unit/plugins/search/planner/test_group_by.rb (+0 -0) 100%
===================================================================

  Renamed: test/unit/plugins/search/planner/test_output.rb (+0 -0) 100%
===================================================================

  Renamed: test/unit/plugins/search/planner/test_sort_by.rb (+0 -0) 100%
===================================================================

  Renamed: test/unit/plugins/search/test_handler.rb (+0 -0) 100%
===================================================================

  Renamed: test/unit/plugins/search/test_planner.rb (+3 -3) 96%
===================================================================
--- test/unit/plugin/planner/test_search.rb    2014-02-17 12:12:03 +0900 (9f0f625)
+++ test/unit/plugins/search/test_planner.rb    2014-02-17 13:04:13 +0900 (bbff123)
@@ -13,13 +13,13 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/plugin/planner/search"
+require "droonga/plugins/search"
 
 class SearchPlannerTest < Test::Unit::TestCase
   def setup
     setup_database
     @planner = Droonga::Test::StubPlanner.new
-    @plugin = Droonga::SearchPlanner.new(@planner)
+    @plugin = Droonga::Plugins::Search::Planner.new(@planner)
   end
 
   def teardown
@@ -56,7 +56,7 @@ class SearchPlannerTest < Test::Unit::TestCase
       },
     }
 
-    @planner.distribute(@plugin.process("search", envelope))
+    @planner.distribute(@plugin.plan(envelope))
 
     message = []
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index