[Groonga-commit] droonga/fluent-plugin-droonga at e4a91ef [master] Introduce step

Back to archive index

Kouhei Sutou null+****@clear*****
Thu Feb 27 23:35:23 JST 2014


Kouhei Sutou	2014-02-27 23:35:23 +0900 (Thu, 27 Feb 2014)

  New Revision: e4a91efcd82282c39b1835f7adefb2d58b38ac8b
  https://github.com/droonga/fluent-plugin-droonga/commit/e4a91efcd82282c39b1835f7adefb2d58b38ac8b

  Message:
    Introduce step
    
    It needs more works! The current implementation just provides
    define_single_step API. The internal implementation still uses old
    mechanism.

  Added files:
    lib/droonga/single_step.rb
  Copied files:
    lib/droonga/collectors.rb
      (from lib/droonga/plugin.rb)
    lib/droonga/single_step_definition.rb
      (from lib/droonga/handler.rb)
  Modified files:
    lib/droonga/dispatcher.rb
    lib/droonga/handler.rb
    lib/droonga/handler_runner.rb
    lib/droonga/planner.rb
    lib/droonga/plugin.rb
    lib/droonga/plugins/basic.rb
    lib/droonga/plugins/crud.rb
    lib/droonga/plugins/error.rb
    lib/droonga/plugins/groonga.rb
    lib/droonga/plugins/groonga/column_create.rb
    lib/droonga/plugins/groonga/table_create.rb
    lib/droonga/plugins/groonga/table_remove.rb
    lib/droonga/plugins/search.rb
    lib/droonga/plugins/watch.rb
    test/unit/plugins/crud/test_add.rb
    test/unit/plugins/groonga/test_column_create.rb
    test/unit/plugins/groonga/test_table_create.rb
    test/unit/plugins/groonga/test_table_remove.rb
    test/unit/plugins/search/test_handler.rb
    test/unit/plugins/search/test_planner.rb
    test/unit/plugins/test_groonga.rb
    test/unit/plugins/test_watch.rb
  Renamed files:
    lib/droonga/collectors/add.rb
      (from lib/droonga/plugin/metadata/planner_message.rb)
    lib/droonga/collectors/and.rb
      (from lib/droonga/plugins/groonga/schema_planer.rb)
    lib/droonga/step_runner.rb
      (from lib/droonga/planner_runner.rb)

  Copied: lib/droonga/collectors.rb (+2 -14) 73%
===================================================================
--- lib/droonga/plugin.rb    2014-02-27 17:05:10 +0900 (3928138)
+++ lib/droonga/collectors.rb    2014-02-27 23:35:23 +0900 (e1d9fee)
@@ -13,17 +13,5 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/plugin_registry"
-require "droonga/adapter"
-require "droonga/planner"
-require "droonga/handler"
-
-module Droonga
-  module Plugin
-    class << self
-      def registry
-        @@registry ||= PluginRegistry.new
-      end
-    end
-  end
-end
+require "droonga/collectors/add"
+require "droonga/collectors/and"

  Renamed: lib/droonga/collectors/add.rb (+5 -18) 65%
===================================================================
--- lib/droonga/plugin/metadata/planner_message.rb    2014-02-27 17:05:10 +0900 (4b4cd21)
+++ lib/droonga/collectors/add.rb    2014-02-27 23:35:23 +0900 (160beef)
@@ -14,24 +14,11 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 module Droonga
-  module Plugin
-    module Metadata
-      class PlannerMessage
-        def initialize(plugin_class)
-          @plugin_class = plugin_class
-        end
-
-        def pattern
-          configuration[:pattern]
-        end
-
-        def pattern=(pattern)
-          configuration[:pattern] = pattern
-        end
-
-        private
-        def configuration
-          @plugin_class.options[:message] ||= {}
+  module Collectors
+    class Add
+      class << self
+        def operator
+          "or"
         end
       end
     end

  Renamed: lib/droonga/collectors/and.rb (+5 -16) 63%
===================================================================
--- lib/droonga/plugins/groonga/schema_planer.rb    2014-02-27 17:05:10 +0900 (0b71f26)
+++ lib/droonga/collectors/and.rb    2014-02-27 23:35:23 +0900 (0c74752)
@@ -14,22 +14,11 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 module Droonga
-  module Plugins
-    module Groonga
-      class SchemaPlaner < Droonga::Planner
-        schema_commands = [
-          "table_create",
-          "table_remove",
-          "column_create",
-        ]
-        message.pattern = ["type", :in, schema_commands]
-
-        def plan(message)
-          broadcast(message,
-                    :write => true,
-                    :reduce => {
-                      "result" => "or"
-                    })
+  module Collectors
+    class And
+      class << self
+        def operator
+          "and"
         end
       end
     end

  Modified: lib/droonga/dispatcher.rb (+8 -11)
===================================================================
--- lib/droonga/dispatcher.rb    2014-02-27 17:05:10 +0900 (af8d99d)
+++ lib/droonga/dispatcher.rb    2014-02-27 23:35:23 +0900 (55fb030)
@@ -17,8 +17,8 @@ require "English"
 require "tsort"
 
 require "droonga/adapter_runner"
-require "droonga/planner_runner"
 require "droonga/collector_runner"
+require "droonga/step_runner"
 require "droonga/farm"
 require "droonga/session"
 require "droonga/replier"
@@ -59,8 +59,8 @@ module Droonga
       @farm = Farm.new(name, @catalog, @loop, :dispatcher => self)
       @forwarder = Forwarder.new(@loop)
       @replier = Replier.new(@forwarder)
-      @planner_runners = create_planner_runners
       @collector_runners = create_collector_runners
+      @step_runners = create_step_runners
     end
 
     def start
@@ -75,9 +75,6 @@ module Droonga
 
     def shutdown
       @forwarder.shutdown
-      @planner_runners.each_value do |planner_runner|
-        planner_runner.shutdown
-      end
       @collector_runners.each_value do |collector_runner|
         collector_runner.shutdown
       end
@@ -231,8 +228,8 @@ module Droonga
       dataset = message["dataset"]
       adapter_runner = @adapter_runners[dataset]
       adapted_message = adapter_runner.adapt_input(message)
-      planner_runner = @planner_runners[dataset]
-      plan = planner_runner.plan(adapted_message)
+      step_runner = @step_runners[dataset]
+      plan = step_runner.plan(message)
       distributor = Distributor.new(self)
       distributor.distribute(plan)
     rescue Droonga::UnsupportedMessageError => error
@@ -264,15 +261,15 @@ module Droonga
       end
     end
 
-    def create_planner_runners
+    def create_collector_runners
       create_runners do |configuration|
-        PlannerRunner.new(self, configuration["plugins"] || [])
+        CollectorRunner.new(configuration["plugins"] || [])
       end
     end
 
-    def create_collector_runners
+    def create_step_runners
       create_runners do |configuration|
-        CollectorRunner.new(configuration["plugins"] || [])
+        StepRunner.new(configuration["plugins"] || [])
       end
     end
 

  Modified: lib/droonga/handler.rb (+4 -2)
===================================================================
--- lib/droonga/handler.rb    2014-02-27 17:05:10 +0900 (b1fdf11)
+++ lib/droonga/handler.rb    2014-02-27 23:35:23 +0900 (170134a)
@@ -33,12 +33,14 @@ module Droonga
       end
     end
 
-    def initialize(name, context)
+    attr_reader :messenger
+    def initialize(name, context, messenger)
       @name = name
       @context = context
+      @messenger = messenger
     end
 
-    def handle(message, messenger)
+    def handle(message)
     end
   end
 end

  Modified: lib/droonga/handler_runner.rb (+8 -54)
===================================================================
--- lib/droonga/handler_runner.rb    2014-02-27 17:05:10 +0900 (d5ff029)
+++ lib/droonga/handler_runner.rb    2014-02-27 23:35:23 +0900 (e320697)
@@ -18,29 +18,10 @@ require "groonga"
 require "droonga/forwarder"
 require "droonga/handler_message"
 require "droonga/handler_messenger"
-require "droonga/handler"
+require "droonga/step_runner"
 
 module Droonga
   class HandlerRunner
-    class HandlerError < Error
-    end
-
-    class MissingMessageType < HandlerError
-      def initialize(handler_classes, dataset_name)
-        message = "[#{dataset_name}] \"message.type\" is not specified for " +
-                    "handler class(es): <#{handler_classes.inspect}>"
-        super(message)
-      end
-    end
-
-    class ConflictForSameType < HandlerError
-      def initialize(types, dataset_name)
-        message = "[#{dataset_name}] There are conflicting handlers for " +
-                    "same message type: <#{types.inspect}>"
-        super(message)
-      end
-    end
-
     def initialize(loop, options={})
       @loop = loop
       @options = options
@@ -96,42 +77,14 @@ module Droonga
       end
       $log.debug("#{self.class.name}: activating plugins for the dataset \"#{@dataset_name}\": " +
                    "#{@options[:plugins].join(", ")}")
-      @handler_classes = Handler.find_sub_classes(@options[:plugins] || [])
-      validate_handler_classes
-      $log.debug("#{self.class.name}: activated:\n#{@handler_classes.join("\n")}")
+      @step_runner = StepRunner.new(@options[:plugins] || [])
       @forwarder = Forwarder.new(@loop)
     end
 
     def find_handler_class(type)
-      @handler_classes.find do |handler_class|
-        handler_class.message.type == type
-      end
-    end
-
-    def validate_handler_classes
-      types = {}
-      missing_type_handlers = []
-
-      @handler_classes.each do |handler_class|
-        type = handler_class.message.type
-        if type.nil? or type.empty?
-          missing_type_handlers << handler_class
-          next
-        end
-        types[type] ||= []
-        types[type] << handler_class
-      end
-
-      if missing_type_handlers.size > 0
-        raise MissingMessageType.new(missing_type_handlers, @dataset_name)
-      end
-
-      types.each do |type, handler_classes|
-        types.delete(type) if handler_classes.size == 1
-      end
-      if types.size > 0
-        raise ConflictForSameType.new(types, @dataset_name)
-      end
+      step_definition = @step_runner.find(type)
+      return nil if step_definition.nil?
+      step_definition.handler_class
     end
 
     def process_type(handler_class, type, raw_message)
@@ -139,9 +92,10 @@ module Droonga
       handler_message.validate
 
       messenger = HandlerMessenger.new(@forwarder, handler_message, @options)
-      handler = handler_class.new(@name, @context)
+      handler = handler_class.new(@name, @context, messenger)
       begin
-        handler.handle(handler_message, messenger)
+        result = handler.handle(handler_message)
+        messenger.emit(result) unless result.nil?
       rescue ErrorMessage => error
         messenger.error(error.status_code, error.response_body)
       end

  Modified: lib/droonga/planner.rb (+1 -11)
===================================================================
--- lib/droonga/planner.rb    2014-02-27 17:05:10 +0900 (285581d)
+++ lib/droonga/planner.rb    2014-02-27 23:35:23 +0900 (4294675)
@@ -13,24 +13,14 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/pluggable"
-require "droonga/plugin/metadata/planner_message"
 require "droonga/distributed_command_planner"
 require "droonga/error_messages"
 
 module Droonga
   class Planner
-    extend Pluggable
     include ErrorMessages
 
-    class << self
-      def message
-        Plugin::Metadata::PlannerMessage.new(self)
-      end
-    end
-
-    def initialize(dispatcher)
-      @dispatcher = dispatcher
+    def initialize
     end
 
     def plan(message)

  Modified: lib/droonga/plugin.rb (+14 -0)
===================================================================
--- lib/droonga/plugin.rb    2014-02-27 17:05:10 +0900 (3928138)
+++ lib/droonga/plugin.rb    2014-02-27 23:35:23 +0900 (4357cd0)
@@ -14,9 +14,11 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 require "droonga/plugin_registry"
+require "droonga/single_step_definition"
 require "droonga/adapter"
 require "droonga/planner"
 require "droonga/handler"
+require "droonga/collectors"
 
 module Droonga
   module Plugin
@@ -25,5 +27,17 @@ module Droonga
         @@registry ||= PluginRegistry.new
       end
     end
+
+    def register(name)
+      Plugin.registry.register(name, self)
+    end
+
+    def define_single_step(&block)
+      single_step_definitions << SingleStepDefinition.new(self, &block)
+    end
+
+    def single_step_definitions
+      @single_step_definitions ||= []
+    end
   end
 end

  Modified: lib/droonga/plugins/basic.rb (+2 -1)
===================================================================
--- lib/droonga/plugins/basic.rb    2014-02-27 17:05:10 +0900 (52ccdff)
+++ lib/droonga/plugins/basic.rb    2014-02-27 23:35:23 +0900 (c371f64)
@@ -19,7 +19,8 @@ require "droonga/reducer"
 module Droonga
   module Plugins
     module Basic
-      Plugin.registry.register("basic", self)
+      extend Plugin
+      register("basic")
 
       class GatherCollector < Droonga::Collector
         message.pattern = ["task.step.type", :equal, "gather"]

  Modified: lib/droonga/plugins/crud.rb (+36 -20)
===================================================================
--- lib/droonga/plugins/crud.rb    2014-02-27 17:05:10 +0900 (b4caf52)
+++ lib/droonga/plugins/crud.rb    2014-02-27 23:35:23 +0900 (02cd5e1)
@@ -21,35 +21,36 @@ require "droonga/error_messages"
 module Droonga
   module Plugins
     module CRUD
-      Plugin.registry.register("crud", self)
+      extend Plugin
+      register("crud")
 
       class Adapter < Droonga::Adapter
         input_message.pattern  = ["type", :equal, "add"]
         output_message.pattern = ["body.success", :exist]
 
+        def adapt_input(input_message)
+          request = input_message.body
+          key = request["key"] || rand.to_s
+          values = request["values"] || {}
+          request["filter"] = values.merge("key" => key)
+        end
+
         def adapt_output(output_message)
-          success = output_message.body["success"]
-          unless success.nil?
+          if output_message.errors
+            detail = output_message.body["detail"]
+            return if detail.nil?
+            detail.delete("filter")
+            output_message.errors.each do |path, error|
+              error["body"]["detail"].delete("filter")
+            end
+          else
+            output_message.body.delete("filter")
             output_message.body = output_message.body["success"]
           end
         end
       end
 
-      class Planner < Droonga::Planner
-        message.pattern = ["type", :equal, "add"]
-
-        def plan(message)
-          scatter(message,
-                  :key => message["body"]["key"] || rand.to_s,
-                  :reduce => {
-                    "success" => "and"
-                  })
-        end
-      end
-
       class Handler < Droonga::Handler
-        message.type = "add"
-
         class MissingTableParameter < BadRequest
           def initialize
             super("\"table\" must be specified.")
@@ -83,12 +84,11 @@ module Droonga
           end
         end
 
-        def handle(message, messenger)
+        def handle(message)
           succeeded = process_add(message.request)
-          outputs = {
+          {
             "success" => succeeded,
           }
-          messenger.emit(outputs)
         end
 
         private
@@ -131,6 +131,22 @@ module Droonga
           end
         end
       end
+
+      define_single_step do |step|
+        step.name = "add"
+        step.inputs = {
+          "table" => {
+            :type => :table,
+            :filter => "filter",
+          },
+        }
+        step.output = {
+          :aggregate => "success",
+        }
+        step.write = true
+        step.handler = Handler
+        step.collector = Collectors::And
+      end
     end
   end
 end

  Modified: lib/droonga/plugins/error.rb (+2 -1)
===================================================================
--- lib/droonga/plugins/error.rb    2014-02-27 17:05:10 +0900 (8ba6342)
+++ lib/droonga/plugins/error.rb    2014-02-27 23:35:23 +0900 (74165fe)
@@ -18,7 +18,8 @@ require "droonga/plugin"
 module Droonga
   module Plugins
     module Error
-      Plugin.registry.register("error", self)
+      extend Plugin
+      register("error")
 
       class Adapter < Droonga::Adapter
         output_message.pattern = ["body.errors", :exist]

  Modified: lib/droonga/plugins/groonga.rb (+9 -7)
===================================================================
--- lib/droonga/plugins/groonga.rb    2014-02-27 17:05:10 +0900 (3fb0113)
+++ lib/droonga/plugins/groonga.rb    2014-02-27 23:35:23 +0900 (9fd812e)
@@ -14,17 +14,19 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 require "droonga/plugin"
-require "droonga/plugins/groonga/generic_response"
-require "droonga/plugins/groonga/schema_planer"
-require "droonga/plugins/groonga/select"
-require "droonga/plugins/groonga/table_create"
-require "droonga/plugins/groonga/table_remove"
-require "droonga/plugins/groonga/column_create"
 
 module Droonga
   module Plugins
     module Groonga
-      Plugin.registry.register("groonga", self)
+      extend Plugin
+      register("groonga")
     end
   end
 end
+
+require "droonga/plugins/groonga/generic_response"
+require "droonga/plugins/groonga/select"
+require "droonga/plugins/groonga/table_create"
+require "droonga/plugins/groonga/table_remove"
+require "droonga/plugins/groonga/column_create"
+

  Modified: lib/droonga/plugins/groonga/column_create.rb (+9 -4)
===================================================================
--- lib/droonga/plugins/groonga/column_create.rb    2014-02-27 17:05:10 +0900 (fc702af)
+++ lib/droonga/plugins/groonga/column_create.rb    2014-02-27 23:35:23 +0900 (bd7043b)
@@ -108,15 +108,20 @@ module Droonga
         end
 
         class Handler < Droonga::Handler
-          message.type = "column_create"
           action.synchronous = true
 
-          def handle(message, messenger)
+          def handle(message)
             command = Command.new(@context)
-            outputs = command.execute(message.request)
-            messenger.emit(outputs)
+            command.execute(message.request)
           end
         end
+
+        Groonga.define_single_step do |step|
+          step.name = "column_create"
+          step.write = true
+          step.handler = Handler
+          step.collector = Collectors::Add
+        end
       end
     end
   end

  Modified: lib/droonga/plugins/groonga/table_create.rb (+9 -4)
===================================================================
--- lib/droonga/plugins/groonga/table_create.rb    2014-02-27 17:05:10 +0900 (3e2e1f0)
+++ lib/droonga/plugins/groonga/table_create.rb    2014-02-27 23:35:23 +0900 (634fb8d)
@@ -91,15 +91,20 @@ module Droonga
         end
 
         class Handler < Droonga::Handler
-          message.type = "table_create"
           action.synchronous = true
 
-          def handle(message, messenger)
+          def handle(message)
             command = Command.new(@context)
-            outputs = command.execute(message.request)
-            messenger.emit(outputs)
+            command.execute(message.request)
           end
         end
+
+        Groonga.define_single_step do |step|
+          step.name = "table_create"
+          step.write = true
+          step.handler = Handler
+          step.collector = Collectors::Add
+        end
       end
     end
   end

  Modified: lib/droonga/plugins/groonga/table_remove.rb (+9 -4)
===================================================================
--- lib/droonga/plugins/groonga/table_remove.rb    2014-02-27 17:05:10 +0900 (790db05)
+++ lib/droonga/plugins/groonga/table_remove.rb    2014-02-27 23:35:23 +0900 (fcfa14c)
@@ -42,15 +42,20 @@ module Droonga
         end
 
         class Handler < Droonga::Handler
-          message.type = "table_remove"
           action.synchronous = true
 
-          def handle(message, messenger)
+          def handle(message)
             command = Command.new(@context)
-            outputs = command.execute(message.request)
-            messenger.emit(outputs)
+            command.execute(message.request)
           end
         end
+
+        Groonga.define_single_step do |step|
+          step.name = "table_remove"
+          step.write = true
+          step.handler = Handler
+          step.collector = Collectors::Add
+        end
       end
     end
   end

  Modified: lib/droonga/plugins/search.rb (+9 -7)
===================================================================
--- lib/droonga/plugins/search.rb    2014-02-27 17:05:10 +0900 (643781d)
+++ lib/droonga/plugins/search.rb    2014-02-27 23:35:23 +0900 (10bbdc7)
@@ -20,11 +20,10 @@ require "droonga/plugins/search/distributed_search_planner"
 module Droonga
   module Plugins
     module Search
-      Plugin.registry.register("search", self)
+      extend Plugin
+      register("search")
 
       class Planner < Droonga::Planner
-        message.pattern = ["type", :equal, "search"]
-
         def plan(message)
           planner = DistributedSearchPlanner.new(message)
           planner.plan
@@ -32,9 +31,7 @@ module Droonga
       end
 
       class Handler < Droonga::Handler
-        message.type = "search"
-
-        def handle(message, messenger)
+        def handle(message)
           searcher = Droonga::Searcher.new(@context)
           values = {}
           request = message.request
@@ -42,7 +39,7 @@ module Droonga
           searcher.search(request["queries"]).each do |output, value|
             values[output] = value
           end
-          messenger.emit(values)
+          values
         end
       end
 
@@ -134,6 +131,11 @@ module Droonga
           result
         end
       end
+
+      define_single_step do |step|
+        step.name = "search"
+        step.handler = Handler
+      end
     end
   end
 end

  Modified: lib/droonga/plugins/watch.rb (+42 -27)
===================================================================
--- lib/droonga/plugins/watch.rb    2014-02-27 17:05:10 +0900 (7484279)
+++ lib/droonga/plugins/watch.rb    2014-02-27 23:35:23 +0900 (4af5480)
@@ -21,19 +21,8 @@ require "droonga/watch_schema"
 module Droonga
   module Plugins
     module Watch
-      Plugin.registry.register("watch", self)
-
-      class Planner < Droonga::Planner
-        message.pattern = ["type", :start_with, "watch."]
-
-        def plan(message)
-          broadcast(message,
-                    :write => true,
-                    :reduce => {
-                      "success" => "and"
-                    })
-        end
-      end
+      extend Plugin
+      register("watch")
 
       module SchemaCreatable
         private
@@ -76,14 +65,12 @@ module Droonga
         include SchemaCreatable
         include MessageParsable
 
-        message.type = "watch.subscribe"
-
         def initialize(*args)
           super
           ensure_schema_created # TODO: REMOVE ME
         end
 
-        def handle(message, messenger)
+        def handle(message)
           subscriber, condition, query, route = parse_message(message)
           normalized_request = {
             :subscriber => subscriber,
@@ -93,25 +80,32 @@ module Droonga
           }
           watcher = Watcher.new(@context)
           watcher.subscribe(normalized_request)
-          outputs = {
+          {
             "success" => true,
           }
-          messenger.emit(outputs)
         end
       end
 
+      define_single_step do |step|
+        step.name = "watch.subscribe"
+        step.output = {
+          :aggregate => "success"
+        }
+        step.write = true
+        step.handler = SubscribeHandler
+        step.collector = Collectors::And
+      end
+
       class UnsubscribeHandler < Droonga::Handler
         include SchemaCreatable
         include MessageParsable
 
-        message.type = "watch.unsubscribe"
-
         def initialize(*args)
           super
           ensure_schema_created # TODO: REMOVE ME
         end
 
-        def handle(message, messenger)
+        def handle(message)
           subscriber, condition, query, route = parse_message(message)
           normalized_request = {
             :subscriber => subscriber,
@@ -120,24 +114,31 @@ module Droonga
           }
           watcher = Watcher.new(@context)
           watcher.unsubscribe(normalized_request)
-          outputs = {
+          {
             "success" => true,
           }
-          messenger.emit(outputs)
         end
       end
 
+      define_single_step do |step|
+        step.name = "watch.unsubscribe"
+        step.output = {
+          :aggregate => "success"
+        }
+        step.write = true
+        step.handler = UnsubscribeHandler
+        step.collector = Collectors::And
+      end
+
       class FeedHandler < Droonga::Handler
         include SchemaCreatable
 
-        message.type = "watch.feed"
-
         def initialize(*args)
           super
           ensure_schema_created # TODO: REMOVE ME
         end
 
-        def handle(message, messenger)
+        def handle(message)
           request = message.request
           watcher = Watcher.new(@context)
           watcher.feed(:targets => request["targets"]) do |route, subscribers|
@@ -149,9 +150,16 @@ module Droonga
             messenger.forward(published_message,
                               "to" => route, "type" => "watch.publish")
           end
+          nil
         end
       end
 
+      define_single_step do |step|
+        step.name = "watch.feed"
+        step.write = true
+        step.handler = FeedHandler
+      end
+
       class SweepHandler < Droonga::Handler
         include SchemaCreatable
 
@@ -162,11 +170,18 @@ module Droonga
           ensure_schema_created # TODO: REMOVE ME
         end
 
-        def sweep(message, messenger)
+        def handle(message)
           sweeper = Sweeper.new(@context)
           sweeper.sweep_expired_subscribers
+          nil
         end
       end
+
+      define_single_step do |step|
+        step.name = "watch.sweep"
+        step.write = true
+        step.handler = SweepHandler
+      end
     end
   end
 end

  Added: lib/droonga/single_step.rb (+53 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/single_step.rb    2014-02-27 23:35:23 +0900 (3bcdd7a)
@@ -0,0 +1,53 @@
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/planner"
+require "droonga/collectors"
+
+module Droonga
+  class SingleStep
+    def initialize(definition)
+      @definition = definition
+    end
+
+    def plan(message)
+      if message["type"] == "search"
+        # XXX: workaround
+        planner = Plugins::Search::Planner.new
+        return planner.plan(message)
+      end
+
+      # XXX: Re-implement me.
+      planner = Planner.new
+      options = {}
+      options[:write] =****@defin*****?
+      collector_class =****@defin*****_class
+      if collector_class
+        reduce_key =****@defin*****[:aggregate] || "result"
+        options[:reduce] = {
+          reduce_key => collector_class.operator,
+        }
+      end
+      inputs =****@defin*****
+      if inputs.empty?
+        planner.send(:broadcast, message, options)
+      else
+        input = inputs.values.first
+        options[:key] = message["body"][input[:filter]]["key"]
+        planner.send(:scatter, message, options)
+      end
+    end
+  end
+end

  Copied: lib/droonga/single_step_definition.rb (+29 -19) 51%
===================================================================
--- lib/droonga/handler.rb    2014-02-27 17:05:10 +0900 (b1fdf11)
+++ lib/droonga/single_step_definition.rb    2014-02-27 23:35:23 +0900 (bd18939)
@@ -13,32 +13,42 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/pluggable"
-require "droonga/plugin/metadata/input_message"
-require "droonga/plugin/metadata/handler_action"
-require "droonga/error_messages"
-
 module Droonga
-  class Handler
-    extend Pluggable
-    include ErrorMessages
+  class SingleStepDefinition
+    attr_accessor :name
+    attr_accessor :handler
+    attr_accessor :collector
+    attr_writer :write
+    attr_accessor :inputs
+    attr_accessor :output
+    def initialize(plugin_module)
+      @plugin_module = plugin_module
+      @name = nil
+      @handler = nil
+      @collector = nil
+      @write = false
+      @inputs = []
+      @output = {}
+      yield(self)
+    end
 
-    class << self
-      def message
-        Plugin::Metadata::InputMessage.new(self)
-      end
+    def write?
+      @write
+    end
 
-      def action
-        Plugin::Metadata::HandlerAction.new(self)
-      end
+    def handler_class
+      resolve_class(@handler)
     end
 
-    def initialize(name, context)
-      @name = name
-      @context = context
+    def collector_class
+      resolve_class(@collector)
     end
 
-    def handle(message, messenger)
+    private
+    def resolve_class(target)
+      return nil if target.nil?
+      return target if target.is_a?(Class)
+      @plugin_module.const_get(target)
     end
   end
 end

  Renamed: lib/droonga/step_runner.rb (+23 -24) 58%
===================================================================
--- lib/droonga/planner_runner.rb    2014-02-27 17:05:10 +0900 (1c7fc5d)
+++ lib/droonga/step_runner.rb    2014-02-27 23:35:23 +0900 (9c2ad37)
@@ -13,49 +13,48 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/message_matcher"
-require "droonga/planner"
+require "droonga/plugin"
+require "droonga/single_step"
 
 module Droonga
-  class PlannerRunner
-    def initialize(dispatcher, plugins)
-      @dispatcher = dispatcher
-      @planner_classes = Planner.find_sub_classes(plugins)
+  class StepRunner
+    def initialize(plugins)
+      @definitions = {}
+      plugins.each do |name|
+        plugin = Plugin.registry[name]
+        plugin.single_step_definitions.each do |definition|
+          @definitions[definition.name] = definition
+        end
+      end
     end
 
     def shutdown
     end
 
     def plan(message)
+      type = message["type"]
       $log.trace("#{log_tag}: plan: start",
                  :dataset => message["dataset"],
-                 :type => message["type"])
-      planner_class = find_planner_class(message)
-      if planner_class.nil?
+                 :type => type)
+      definition = find(type)
+      if definition.nil?
         raise UnsupportedMessageError.new(:planner, message)
       end
-      planner = planner_class.new(@dispatcher)
-      plan = planner.plan(message)
+      step = SingleStep.new(definition)
+      plan = step.plan(message)
       $log.trace("#{log_tag}: plan: done",
-                 :steps => plan.collect {|step| step["type"]})
+                 :dataset => message["dataset"],
+                 :type => type)
       plan
     end
 
-    private
-    def find_planner_class(message)
-      @planner_classes.find do |planner_class|
-        pattern = planner_class.message.pattern
-        if pattern
-          matcher = MessageMatcher.new(pattern)
-          matcher.match?(message)
-        else
-          false
-        end
-      end
+    def find(type)
+      @definitions[type]
     end
 
+    private
     def log_tag
-      "planner-runner"
+      "step-runner"
     end
   end
 end

  Modified: test/unit/plugins/crud/test_add.rb (+12 -10)
===================================================================
--- test/unit/plugins/crud/test_add.rb    2014-02-27 17:05:10 +0900 (0498baf)
+++ test/unit/plugins/crud/test_add.rb    2014-02-27 23:35:23 +0900 (2da59a9)
@@ -37,8 +37,10 @@ class CRUDAddHandlerTest < Test::Unit::TestCase
 
   def setup_handler
     @worker = StubWorker.new
-    @handler = Droonga::Plugins::CRUD::Handler.new("name", @worker.context)
     @messenger = Droonga::Test::StubHandlerMessenger.new
+    @handler = Droonga::Plugins::CRUD::Handler.new("name",
+                                                   @worker.context,
+                                                   @messenger)
   end
 
   def teardown_handler
@@ -47,7 +49,7 @@ class CRUDAddHandlerTest < Test::Unit::TestCase
 
   def process(request)
     message = Droonga::Test::StubHandlerMessage.new(request)
-    @handler.handle(message, @messenger)
+    @handler.handle(message)
   end
 
   public
@@ -70,8 +72,8 @@ class CRUDAddHandlerTest < Test::Unit::TestCase
         "key"    => "mori",
         "values" => {},
       }
-      process(request)
-      assert_equal([SUCCESS_RESPONSE_BODY], @messenger.values)
+      response = process(request)
+      assert_equal(SUCCESS_RESPONSE_BODY, response)
       table =****@worke*****["Users"]
       assert_equal(["mori"], table.collect(&:key))
     end
@@ -82,8 +84,8 @@ class CRUDAddHandlerTest < Test::Unit::TestCase
         "key"    => "mori",
         "values" => {"country" => "japan"},
       }
-      process(request)
-      assert_equal([SUCCESS_RESPONSE_BODY], @messenger.values)
+      response = process(request)
+      assert_equal(SUCCESS_RESPONSE_BODY, response)
       table =****@worke*****["Users"]
       assert_equal(["japan"], table.collect(&:country))
     end
@@ -147,8 +149,8 @@ class CRUDAddHandlerTest < Test::Unit::TestCase
         "table"  => "Books",
         "values" => {},
       }
-      process(request)
-      assert_equal([SUCCESS_RESPONSE_BODY], @messenger.values)
+      response = process(request)
+      assert_equal(SUCCESS_RESPONSE_BODY, response)
       table =****@worke*****["Books"]
       assert_equal([nil], table.collect(&:title))
     end
@@ -158,8 +160,8 @@ class CRUDAddHandlerTest < Test::Unit::TestCase
         "table"  => "Books",
         "values" => {"title" => "CSS"},
       }
-      process(request)
-      assert_equal([SUCCESS_RESPONSE_BODY], @messenger.values)
+      response = process(request)
+      assert_equal(SUCCESS_RESPONSE_BODY, response)
       table =****@worke*****["Books"]
       assert_equal(["CSS"], table.collect(&:title))
     end

  Modified: test/unit/plugins/groonga/test_column_create.rb (+14 -7)
===================================================================
--- test/unit/plugins/groonga/test_column_create.rb    2014-02-27 17:05:10 +0900 (3bf410d)
+++ test/unit/plugins/groonga/test_column_create.rb    2014-02-27 23:35:23 +0900 (7c4f9f1)
@@ -16,16 +16,20 @@
 class ColumnCreateTest < GroongaHandlerTest
   def create_handler
     Droonga::Plugins::Groonga::ColumnCreate::Handler.new("droonga",
-                                                         @handler.context)
+                                                         @handler.context,
+                                                         @messages)
   end
 
   def test_success
     Groonga::Schema.define(:context => @context) do |schema|
       schema.create_table("Books", :type => :hash)
     end
-    process(:column_create,
-            {"table" => "Books", "name" => "title", "type" => "ShortText"})
-    response =****@messe*****
+    message = {
+      "table" => "Books",
+      "name"  => "title",
+      "type"  => "ShortText",
+    }
+    response = process(:column_create, message)
     assert_valid_output(response)
     response = response["result"]
     assert_equal(
@@ -35,9 +39,12 @@ class ColumnCreateTest < GroongaHandlerTest
   end
 
   def test_unknown_table
-    process(:column_create,
-            {"table" => "Unknown", "name" => "title", "type" => "ShortText"})
-    response =****@messe*****
+    message = {
+      "table" => "Unknown",
+      "name"  => "title",
+      "type"  => "ShortText",
+    }
+    response = process(:column_create, message)
     assert_valid_output(response)
     response = response["result"]
     assert_equal(

  Modified: test/unit/plugins/groonga/test_table_create.rb (+4 -5)
===================================================================
--- test/unit/plugins/groonga/test_table_create.rb    2014-02-27 17:05:10 +0900 (90a2273)
+++ test/unit/plugins/groonga/test_table_create.rb    2014-02-27 23:35:23 +0900 (b17d7df)
@@ -16,12 +16,12 @@
 class TableCreateTest < GroongaHandlerTest
   def create_handler
     Droonga::Plugins::Groonga::TableCreate::Handler.new("droonga",
-                                                        @handler.context)
+                                                        @handler.context,
+                                                        @messenger)
   end
 
   def test_success
-    process(:table_create, {"name" => "Books"})
-    response =****@messe*****
+    response = process(:table_create, {"name" => "Books"})
     assert_valid_output(response)
     response = response["result"]
     assert_equal(
@@ -31,8 +31,7 @@ class TableCreateTest < GroongaHandlerTest
   end
 
   def test_failure
-    process(:table_create, {})
-    response =****@messe*****
+    response = process(:table_create, {})
     assert_valid_output(response)
     response = response["result"]
     assert_equal(

  Modified: test/unit/plugins/groonga/test_table_remove.rb (+4 -5)
===================================================================
--- test/unit/plugins/groonga/test_table_remove.rb    2014-02-27 17:05:10 +0900 (264d074)
+++ test/unit/plugins/groonga/test_table_remove.rb    2014-02-27 23:35:23 +0900 (ff14d83)
@@ -16,7 +16,8 @@
 class TableRemoveTest < GroongaHandlerTest
   def create_handler
     Droonga::Plugins::Groonga::TableRemove::Handler.new("droonga",
-                                                        @handler.context)
+                                                        @handler.context,
+                                                        @messenger)
   end
 
   def setup
@@ -27,8 +28,7 @@ class TableRemoveTest < GroongaHandlerTest
   end
 
   def test_success
-    process(:table_remove, {"name" => "Books"})
-    response =****@messe*****
+    response = process(:table_remove, {"name" => "Books"})
     assert_valid_output(response)
     response = response["result"]
     assert_equal(
@@ -40,8 +40,7 @@ class TableRemoveTest < GroongaHandlerTest
   end
 
   def test_failure
-    process(:table_remove, {})
-    response =****@messe*****
+    response = process(:table_remove, {})
     assert_valid_output(response)
     response = response["result"]
     assert_equal(

  Modified: test/unit/plugins/search/test_handler.rb (+6 -5)
===================================================================
--- test/unit/plugins/search/test_handler.rb    2014-02-27 17:05:10 +0900 (b70b613)
+++ test/unit/plugins/search/test_handler.rb    2014-02-27 23:35:23 +0900 (d1faaad)
@@ -1,4 +1,4 @@
-# Copyright (C) 2013 Droonga Project
+# Copyright (C) 2013-2014 Droonga Project
 #
 # This library is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -29,8 +29,10 @@ class SearchHandlerTest < Test::Unit::TestCase
 
   def setup_plugin
     @handler = Droonga::Test::StubHandler.new
-    @plugin = Droonga::Plugins::Search::Handler.new("droonga", @handler.context)
     @messenger = Droonga::Test::StubHandlerMessenger.new
+    @plugin = Droonga::Plugins::Search::Handler.new("droonga",
+                                                    @handler.context,
+                                                    @messenger)
   end
 
   def teardown_plugin
@@ -41,8 +43,7 @@ class SearchHandlerTest < Test::Unit::TestCase
   private
   def search(request, headers={})
     message = Droonga::Test::StubHandlerMessage.new(request, headers)
-    @plugin.handle(message, @messenger)
-    results_to_result_set(@messenger.values.first)
+    results_to_result_set(@plugin.handle(message))
   end
 
   def results_to_result_set(results)
@@ -51,7 +52,7 @@ class SearchHandlerTest < Test::Unit::TestCase
       result_set[name] = normalize_result(result)
     end
     result_set
-  end
+   end
 
   def normalize_result(result)
     result["startTime"] = start_time if result["startTime"]

  Modified: test/unit/plugins/search/test_planner.rb (+1 -1)
===================================================================
--- test/unit/plugins/search/test_planner.rb    2014-02-27 17:05:10 +0900 (bbff123)
+++ test/unit/plugins/search/test_planner.rb    2014-02-27 23:35:23 +0900 (08cd30c)
@@ -19,7 +19,7 @@ class SearchPlannerTest < Test::Unit::TestCase
   def setup
     setup_database
     @planner = Droonga::Test::StubPlanner.new
-    @plugin = Droonga::Plugins::Search::Planner.new(@planner)
+    @plugin = Droonga::Plugins::Search::Planner.new
   end
 
   def teardown

  Modified: test/unit/plugins/test_groonga.rb (+2 -2)
===================================================================
--- test/unit/plugins/test_groonga.rb    2014-02-27 17:05:10 +0900 (972f84a)
+++ test/unit/plugins/test_groonga.rb    2014-02-27 23:35:23 +0900 (8ef5d86)
@@ -1,4 +1,4 @@
-# Copyright (C) 2013 Droonga Project
+# Copyright (C) 2013-2014 Droonga Project
 #
 # This library is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -45,7 +45,7 @@ class GroongaHandlerTest < Test::Unit::TestCase
   def process(command, request)
     message = Droonga::Test::StubHandlerMessage.new(request)
     handler = create_handler
-    handler.handle(message, @messenger)
+    handler.handle(message)
   end
 
   NORMALIZED_START_TIME = Time.parse("2013-07-11T16:04:28+0900").to_i

  Modified: test/unit/plugins/test_watch.rb (+24 -21)
===================================================================
--- test/unit/plugins/test_watch.rb    2014-02-27 17:05:10 +0900 (ebfd9ec)
+++ test/unit/plugins/test_watch.rb    2014-02-27 23:35:23 +0900 (9552623)
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 #
-# Copyright (C) 2013 Droonga Project
+# Copyright (C) 2013-2014 Droonga Project
 #
 # This library is free software; you can redistribute it and/or
 # modify it under the terms of the GNU Lesser General Public
@@ -45,13 +45,15 @@ class WatchHandlerTest < Test::Unit::TestCase
 
   def process(command, request, headers={})
     message = Droonga::Test::StubHandlerMessage.new(request, headers)
-    create_plugin.handle(message, @messenger)
+    create_plugin.handle(message)
   end
 
   public
   class SubscribeTest < self
     def create_plugin
-      Droonga::Plugins::Watch::SubscribeHandler.new("droonga", @handler.context)
+      Droonga::Plugins::Watch::SubscribeHandler.new("droonga",
+                                                    @handler.context,
+                                                    @messenger)
     end
 
     def test_subscribe
@@ -60,8 +62,8 @@ class WatchHandlerTest < Test::Unit::TestCase
         "condition" => "たいやき",
         "subscriber" => "localhost"
       }
-      process(:subscribe, request)
-      assert_equal([SUCCESS_RESULT], @messenger.values)
+      response = process(:subscribe, request)
+      assert_equal(SUCCESS_RESULT, response)
 
       assert_equal(
         ["localhost:23003/output"],
@@ -74,8 +76,8 @@ class WatchHandlerTest < Test::Unit::TestCase
         "condition" => "たいやき",
         "subscriber" => "localhost"
       }
-      process(:subscribe, request, "from" => "localhost:23004/output")
-      assert_equal([SUCCESS_RESULT], @messenger.values)
+      response = process(:subscribe, request, "from" => "localhost:23004/output")
+      assert_equal(SUCCESS_RESULT, response)
 
       assert_equal(
         ["localhost:23004/output"],
@@ -89,8 +91,8 @@ class WatchHandlerTest < Test::Unit::TestCase
         "subscriber" => "localhost",
         "route" => "localhost:23003/output"
       }
-      process(:subscribe, request, "from" => "localhost:23004/output")
-      assert_equal([SUCCESS_RESULT], @messenger.values)
+      response = process(:subscribe, request, "from" => "localhost:23004/output")
+      assert_equal(SUCCESS_RESULT, response)
 
       assert_equal(
         ["localhost:23003/output"],
@@ -116,7 +118,8 @@ class WatchHandlerTest < Test::Unit::TestCase
 
     def create_plugin
       Droonga::Plugins::Watch::UnsubscribeHandler.new("droonga",
-                                                      @handler.context)
+                                                      @handler.context,
+                                                      @messenger)
     end
 
     def test_unsubscribe
@@ -125,8 +128,8 @@ class WatchHandlerTest < Test::Unit::TestCase
         "condition" => "たいやき",
         "subscriber" => "localhost"
       }
-      process(:unsubscribe, request)
-      assert_equal([SUCCESS_RESULT], @messenger.values)
+      response = process(:unsubscribe, request)
+      assert_equal(SUCCESS_RESULT, response)
     end
 
     private
@@ -136,9 +139,8 @@ class WatchHandlerTest < Test::Unit::TestCase
         "condition" => "たいやき",
         "subscriber" => "localhost"
       }
-      process(:subscribe, request)
-      assert_equal([SUCCESS_RESULT], @messenger.values)
-      @messenger.values.clear
+      response = process(:subscribe, request)
+      assert_equal(SUCCESS_RESULT, response)
     end
   end
 
@@ -150,7 +152,8 @@ class WatchHandlerTest < Test::Unit::TestCase
 
     def create_plugin
       Droonga::Plugins::Watch::FeedHandler.new("droonga",
-                                               @handler.context)
+                                               @handler.context,
+                                               @messenger)
     end
 
     def test_feed_match
@@ -181,8 +184,7 @@ class WatchHandlerTest < Test::Unit::TestCase
           "text" => "たこやきおいしいです"
         }
       }
-      process(:feed, request)
-      assert_equal([], @messenger.messages)
+      assert_nil(process(:feed, request))
     end
 
     private
@@ -195,9 +197,10 @@ class WatchHandlerTest < Test::Unit::TestCase
       message = Droonga::Test::StubHandlerMessage.new(request, {})
       subscribe_handler =
         Droonga::Plugins::Watch::SubscribeHandler.new("droonga",
-                                                      @handler.context)
-      subscribe_handler.handle(message, @messenger)
-      assert_equal([SUCCESS_RESULT], @messenger.values)
+                                                      @handler.context,
+                                                      @messenger)
+      response = subscribe_handler.handle(message)
+      assert_equal(SUCCESS_RESULT, response)
     end
   end
 end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index