[Groonga-commit] droonga/fluent-plugin-droonga at e91405f [master] Extract distribution code from adapter as distributor

Back to archive index

Kouhei Sutou null+****@clear*****
Sun Nov 24 18:14:07 JST 2013


Kouhei Sutou	2013-11-24 18:14:07 +0900 (Sun, 24 Nov 2013)

  New Revision: e91405f212644651484782bf3f51fcc38e83f320
  https://github.com/droonga/fluent-plugin-droonga/commit/e91405f212644651484782bf3f51fcc38e83f320

  Message:
    Extract distribution code from adapter as distributor

  Added files:
    lib/droonga/distributor.rb
    lib/droonga/distributor_plugin.rb
    lib/droonga/plugin/distributor/crud.rb
  Copied files:
    lib/droonga/plugin/distributor/groonga.rb
      (from lib/droonga/plugin/adapter/groonga.rb)
    lib/droonga/plugin/distributor/search.rb
      (from lib/droonga/adapter.rb)
    lib/droonga/plugin/distributor/watch.rb
      (from lib/droonga/plugin/adapter/groonga.rb)
  Modified files:
    lib/droonga/adapter.rb
    lib/droonga/dispatcher.rb
    lib/droonga/executor.rb
    lib/droonga/plugin/adapter/groonga.rb
    test/unit/test_adapter.rb

  Modified: lib/droonga/adapter.rb (+1 -139)
===================================================================
--- lib/droonga/adapter.rb    2013-11-24 17:40:03 +0900 (a0246bc)
+++ lib/droonga/adapter.rb    2013-11-24 18:14:07 +0900 (137ba55)
@@ -15,145 +15,7 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/legacy_plugin"
-
 module Droonga
-  class Adapter < Droonga::LegacyPlugin
-    def scatter_all(request, key)
-      message = [{
-        "command"=> envelope["type"],
-        "dataset"=> envelope["dataset"],
-        "body"=> request,
-        "key"=> key,
-        "type"=> "scatter",
-        "replica"=> "all",
-        "post"=> true
-      }]
-      post(message, "dispatcher")
-    end
-
-    def broadcast_all(request)
-      message = [{
-        "command"=> envelope["type"],
-        "dataset"=> envelope["dataset"],
-        "body"=> request,
-        "type"=> "broadcast",
-        "replica"=> "all",
-        "post"=> true
-      }]
-      post(message, "dispatcher")
-    end
-
-    def prefer_synchronous?(command)
-      return true
-    end
-  end
-
-  class BasicAdapter < Adapter
-    Droonga::LegacyPlugin.repository.register("adapter", self)
-
-    command :table_create
-    def table_create(request)
-      unless envelope["dataset"]
-        raise "dataset must be set. FIXME: This error should return client."
-      end
-      broadcast_all(request)
-    end
-
-    command :column_create
-    def column_create(request)
-      broadcast_all(request)
-    end
-
-    command "watch.feed" => :feed
-    def feed(request)
-      broadcast_all(request)
-    end
-
-    command "watch.subscribe" => :subscribe
-    def subscribe(request)
-      broadcast_all(request)
-    end
-
-    command "watch.unsubscribe" => :unsubscribe
-    def unsubscribe(request)
-      broadcast_all(request)
-    end
-
-    command :add
-    def add(request)
-      # TODO: update events must be serialized in the primary node of replicas.
-      key = request["key"] || rand.to_s
-      scatter_all(request, key)
-    end
-
-    command :update
-    def update(request)
-      # TODO: update events must be serialized in the primary node of replicas.
-      key = request["key"] || rand.to_s
-      scatter_all(request, key)
-    end
-
-    command :reset
-    def reset(request)
-      # TODO: update events must be serialized in the primary node of replicas.
-      key = request["key"] || rand.to_s
-      scatter_all(request, key)
-    end
-
-    command :search
-    def search(request)
-      message = []
-      input_names = []
-      output_names = []
-      name_mapper = {}
-      request["queries"].each do |input_name, query|
-        output = query["output"]
-        next unless output
-        input_names << input_name
-        output_name = input_name + "_reduced"
-        output_names << output_name
-        name_mapper[output_name] = input_name
-        # TODO: offset & limit must be arranged here.
-        elements = {}
-        output["elements"].each do |element|
-          case element
-          when "count"
-            elements[element] = ["sum"]
-          when "records"
-            # TODO: must take "sortBy" section into account.
-            elements[element] = ["sort", "<"]
-          end
-        end
-        reducer = {
-          "inputs"=> [input_name],
-          "outputs"=> [output_name],
-          "type"=> "reduce",
-          "body"=> {
-            input_name=> {
-              output_name=> elements
-            }
-          }
-        }
-        message << reducer
-      end
-      gatherer = {
-        "inputs"=> output_names,
-        "type"=> "gather",
-        "body"=> name_mapper,
-        "post"=> true
-      }
-      message << gatherer
-      searcher = {
-        "dataset"=> envelope["dataset"] || request["dataset"],
-        "outputs"=> input_names,
-        "type"=> "broadcast",
-        "command"=> "search",
-        "replica"=> "random",
-        "body"=> request
-      }
-      message.push(searcher)
-      post(message, "dispatcher")
-    end
+  class Adapter
   end
 end

  Modified: lib/droonga/dispatcher.rb (+1 -1)
===================================================================
--- lib/droonga/dispatcher.rb    2013-11-24 17:40:03 +0900 (33082ac)
+++ lib/droonga/dispatcher.rb    2013-11-24 18:14:07 +0900 (6efac4e)
@@ -33,7 +33,7 @@ module Droonga
       @collectors = {}
       @current_id = 0
       @local = Regexp.new("^#{@name}")
-      plugins = ["collector"] + (Droonga.catalog.option("plugins")||[]) + ["adapter"]
+      plugins = ["collector"] + (Droonga.catalog.option("plugins")||[])
       plugins.each do |plugin|
         @worker.add_legacy_plugin(plugin)
       end

  Added: lib/droonga/distributor.rb (+74 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/distributor.rb    2013-11-24 18:14:07 +0900 (e4eaf1a)
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/distributor_plugin"
+
+module Droonga
+  class Distributor
+    def initialize(executor, options={})
+      @executor = executor
+      @plugins = []
+      @options = options
+      # TODO: don't put the default distributions
+      load_plugins(options[:distributors] || ["search", "crud", "groonga", "watch"])
+    end
+
+    def shutdown
+      $log.trace("#{log_tag}: shutdown: start")
+      @plugins.each do |plugin|
+        plugins.shutdown
+      end
+      $log.trace("#{log_tag}: shutdown: done")
+    end
+
+    def distribute(envelope)
+      command = envelope["type"]
+      plugin = find_plugin(command)
+      if plugin.nil?
+        raise "unknown distributor plugin: <#{command}>: " +
+                "TODO: improve error hndling"
+      end
+      plugin.process(envelope)
+    end
+
+    def post(message)
+      @executor.post(message, "dispatcher")
+    end
+
+    private
+    def load_plugins(plugin_names)
+      plugin_names.each do |plugin_name|
+        add_plugin(plugin_name)
+      end
+    end
+
+    def add_plugin(name)
+      plugin = DistributorPlugin.repository.instantiate(name, self)
+      @plugins << plugin
+    end
+
+    def find_plugin(command)
+      @plugins.find do |plugin|
+        plugin.processable?(command)
+      end
+    end
+
+    def log_tag
+      "[#{Process.pid}] distributor"
+    end
+  end
+end

  Added: lib/droonga/distributor_plugin.rb (+98 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/distributor_plugin.rb    2013-11-24 18:14:07 +0900 (66cfe53)
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/plugin_repository"
+
+module Droonga
+  class DistributorPlugin
+    @@repository = PluginRepository.new
+
+    class << self
+      def inherited(sub_class)
+        super
+        sub_class.instance_variable_set(:@command_mapper, CommandMapper.new)
+      end
+
+      def repository
+        @@repository
+      end
+
+      def command(name_or_map)
+        @command_mapper.register(name_or_map)
+      end
+
+      def method_name(command)
+        @command_mapper[command]
+      end
+
+      def processable?(command)
+        not method_name(command).nil?
+      end
+    end
+
+    def initialize(distributor)
+      @distributor = distributor
+    end
+
+    # TODO: consider better name
+    def post(message)
+      @distributor.post(message)
+    end
+
+    def shutdown
+    end
+
+    def processable?(command)
+      self.class.processable?(command)
+    end
+
+    def process(envelope, *arguments)
+      command = envelope["type"]
+      __send__(self.class.method_name(command), envelope, *arguments)
+    rescue => exception
+      Logger.error("error while processing #{command}",
+                   envelope: envelope,
+                   arguments: arguments,
+                   exception: exception)
+    end
+
+    def scatter_all(envelope, key)
+      message = [{
+        "command"=> envelope["type"],
+        "dataset"=> envelope["dataset"],
+        "body"=> envelope["body"],
+        "key"=> key,
+        "type"=> "scatter",
+        "replica"=> "all",
+        "post"=> true
+      }]
+      post(message)
+    end
+
+    def broadcast_all(envelope)
+      distirubte_message = [{
+        "command"=> envelope["type"],
+        "dataset"=> envelope["dataset"],
+        "body"=> envelope["body"],
+        "type"=> "broadcast",
+        "replica"=> "all",
+        "post"=> true
+      }]
+      post(distirubte_message)
+    end
+  end
+end

  Modified: lib/droonga/executor.rb (+6 -0)
===================================================================
--- lib/droonga/executor.rb    2013-11-24 17:40:03 +0900 (1a27725)
+++ lib/droonga/executor.rb    2013-11-24 18:14:07 +0900 (da74bd9)
@@ -22,6 +22,7 @@ require "groonga"
 require "droonga/legacy_plugin"
 require "droonga/plugin_loader"
 require "droonga/dispatcher"
+require "droonga/distributor"
 
 module Droonga
   class Executor
@@ -42,6 +43,7 @@ module Droonga
 
     def shutdown
       $log.trace("#{log_tag}: shutdown: start")
+      @distributor.shutdown
       @legacy_plugins.each do |legacy_plugin|
         legacy_plugin.shutdown
       end
@@ -122,6 +124,9 @@ module Droonga
           legacy_plugin.handle(command, body, *arguments)
           $log.trace("#{log_tag}: post_or_push: handle: done: <#{command}>",
                      :plugin => legacy_plugin.class)
+        else
+          @distributor.distribute(envelope.merge("type" => command,
+                                                 "body" => body))
         end
       end
       add_route(route) if route
@@ -198,6 +203,7 @@ module Droonga
         @context = Groonga::Context.new
         @database =****@conte*****_database(@database_name)
       end
+      @distributor = Distributor.new(self, @options)
       add_legacy_plugin("dispatcher_message")
     end
 

  Modified: lib/droonga/plugin/adapter/groonga.rb (+2 -2)
===================================================================
--- lib/droonga/plugin/adapter/groonga.rb    2013-11-24 17:40:03 +0900 (635c09e)
+++ lib/droonga/plugin/adapter/groonga.rb    2013-11-24 18:14:07 +0900 (ef96ac1)
@@ -15,10 +15,10 @@
 
 require "groonga"
 
-require "droonga/adapter"
+require "droonga/legacy_plugin"
 
 module Droonga
-  class GroongaAdapter < Droonga::Adapter
+  class GroongaAdapter < Droonga::LegacyPlugin
     # TODO: AdapterPlugin or something should be defined to avoid conflicts.
     Droonga::LegacyPlugin.repository.register("select", self)
     command :select

  Added: lib/droonga/plugin/distributor/crud.rb (+46 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/plugin/distributor/crud.rb    2013-11-24 18:14:07 +0900 (e89f670)
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/distributor_plugin"
+
+module Droonga
+  class CRUDDistributor < Droonga::DistributorPlugin
+    repository.register("crud", self)
+
+    command :add
+    def add(envelope)
+      # TODO: update events must be serialized in the primary node of replicas.
+      key = envelope["body"]["key"] || rand.to_s
+      scatter_all(envelope, key)
+    end
+
+    command :update
+    def update(envelope)
+      # TODO: update events must be serialized in the primary node of replicas.
+      key = envelope["body"]["key"] || rand.to_s
+      scatter_all(envelope, key)
+    end
+
+    # TODO: What is this?
+    command :reset
+    def reset(envelope)
+      # TODO: update events must be serialized in the primary node of replicas.
+      key = envelope["body"]["key"] || rand.to_s
+      scatter_all(envelope, key)
+    end
+  end
+end

  Copied: lib/droonga/plugin/distributor/groonga.rb (+15 -18) 53%
===================================================================
--- lib/droonga/plugin/adapter/groonga.rb    2013-11-24 17:40:03 +0900 (635c09e)
+++ lib/droonga/plugin/distributor/groonga.rb    2013-11-24 18:14:07 +0900 (54d54cb)
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+#
 # Copyright (C) 2013 Droonga Project
 #
 # This library is free software; you can redistribute it and/or
@@ -13,28 +15,23 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "groonga"
-
-require "droonga/adapter"
+require "droonga/distributor_plugin"
 
 module Droonga
-  class GroongaAdapter < Droonga::Adapter
-    # TODO: AdapterPlugin or something should be defined to avoid conflicts.
-    Droonga::LegacyPlugin.repository.register("select", self)
-    command :select
-    def select(select_request)
-      command = Select.new
-      search_request = command.convert_request(select_request)
-      add_route("select_response")
-      post(search_request, "search")
+  class GroongaDistributor < Droonga::DistributorPlugin
+    repository.register("groonga", self)
+
+    command :table_create
+    def table_create(envelope)
+      unless envelope["dataset"]
+        raise "dataset must be set. FIXME: This error should return client."
+      end
+      broadcast_all(envelope)
     end
 
-    command :select_response
-    def select_response(search_response)
-      command = Select.new
-      emit(command.convert_response(search_response))
+    command :column_create
+    def column_create(envelope)
+      broadcast_all(envelope)
     end
   end
 end
-
-require "droonga/plugin/adapter/groonga/select"

  Copied: lib/droonga/plugin/distributor/search.rb (+6 -85) 51%
===================================================================
--- lib/droonga/adapter.rb    2013-11-24 17:40:03 +0900 (a0246bc)
+++ lib/droonga/plugin/distributor/search.rb    2013-11-24 18:14:07 +0900 (97d2973)
@@ -15,98 +15,19 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/legacy_plugin"
+require "droonga/distributor_plugin"
 
 module Droonga
-  class Adapter < Droonga::LegacyPlugin
-    def scatter_all(request, key)
-      message = [{
-        "command"=> envelope["type"],
-        "dataset"=> envelope["dataset"],
-        "body"=> request,
-        "key"=> key,
-        "type"=> "scatter",
-        "replica"=> "all",
-        "post"=> true
-      }]
-      post(message, "dispatcher")
-    end
-
-    def broadcast_all(request)
-      message = [{
-        "command"=> envelope["type"],
-        "dataset"=> envelope["dataset"],
-        "body"=> request,
-        "type"=> "broadcast",
-        "replica"=> "all",
-        "post"=> true
-      }]
-      post(message, "dispatcher")
-    end
-
-    def prefer_synchronous?(command)
-      return true
-    end
-  end
-
-  class BasicAdapter < Adapter
-    Droonga::LegacyPlugin.repository.register("adapter", self)
-
-    command :table_create
-    def table_create(request)
-      unless envelope["dataset"]
-        raise "dataset must be set. FIXME: This error should return client."
-      end
-      broadcast_all(request)
-    end
-
-    command :column_create
-    def column_create(request)
-      broadcast_all(request)
-    end
-
-    command "watch.feed" => :feed
-    def feed(request)
-      broadcast_all(request)
-    end
-
-    command "watch.subscribe" => :subscribe
-    def subscribe(request)
-      broadcast_all(request)
-    end
-
-    command "watch.unsubscribe" => :unsubscribe
-    def unsubscribe(request)
-      broadcast_all(request)
-    end
-
-    command :add
-    def add(request)
-      # TODO: update events must be serialized in the primary node of replicas.
-      key = request["key"] || rand.to_s
-      scatter_all(request, key)
-    end
-
-    command :update
-    def update(request)
-      # TODO: update events must be serialized in the primary node of replicas.
-      key = request["key"] || rand.to_s
-      scatter_all(request, key)
-    end
-
-    command :reset
-    def reset(request)
-      # TODO: update events must be serialized in the primary node of replicas.
-      key = request["key"] || rand.to_s
-      scatter_all(request, key)
-    end
+  class SearchDistributor < Droonga::DistributorPlugin
+    repository.register("search", self)
 
     command :search
-    def search(request)
+    def search(envelope)
       message = []
       input_names = []
       output_names = []
       name_mapper = {}
+      request = envelope["body"]
       request["queries"].each do |input_name, query|
         output = query["output"]
         next unless output
@@ -153,7 +74,7 @@ module Droonga
         "body"=> request
       }
       message.push(searcher)
-      post(message, "dispatcher")
+      post(message)
     end
   end
 end

  Copied: lib/droonga/plugin/distributor/watch.rb (+17 -18) 53%
===================================================================
--- lib/droonga/plugin/adapter/groonga.rb    2013-11-24 17:40:03 +0900 (635c09e)
+++ lib/droonga/plugin/distributor/watch.rb    2013-11-24 18:14:07 +0900 (07c5571)
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+#
 # Copyright (C) 2013 Droonga Project
 #
 # This library is free software; you can redistribute it and/or
@@ -13,28 +15,25 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "groonga"
-
-require "droonga/adapter"
+require "droonga/distributor_plugin"
 
 module Droonga
-  class GroongaAdapter < Droonga::Adapter
-    # TODO: AdapterPlugin or something should be defined to avoid conflicts.
-    Droonga::LegacyPlugin.repository.register("select", self)
-    command :select
-    def select(select_request)
-      command = Select.new
-      search_request = command.convert_request(select_request)
-      add_route("select_response")
-      post(search_request, "search")
+  class WatchDistributor < Droonga::DistributorPlugin
+    repository.register("watch", self)
+
+    command "watch.feed" => :feed
+    def feed(envelope)
+      broadcast_all(envelope)
     end
 
-    command :select_response
-    def select_response(search_response)
-      command = Select.new
-      emit(command.convert_response(search_response))
+    command "watch.subscribe" => :subscribe
+    def subscribe(envelope)
+      broadcast_all(envelope)
+    end
+
+    command "watch.unsubscribe" => :unsubscribe
+    def unsubscribe(envelope)
+      broadcast_all(envelope)
     end
   end
 end
-
-require "droonga/plugin/adapter/groonga/select"

  Modified: test/unit/test_adapter.rb (+9 -9)
===================================================================
--- test/unit/test_adapter.rb    2013-11-24 17:40:03 +0900 (29e807f)
+++ test/unit/test_adapter.rb    2013-11-24 18:14:07 +0900 (4f93455)
@@ -17,15 +17,15 @@ require "droonga/adapter"
 
 class AdapterTest < Test::Unit::TestCase
   class AdaptTest < self
-    class GroongaAdapter < Droonga::Adapter
-      command :select
-      def select(request)
-        post(:search) do |response|
-          # do nothing
-        end
-        :selected
-      end
-    end
+    # class GroongaAdapter < Droonga::Adapter
+    #   command :select
+    #   def select(request)
+    #     post(:search) do |response|
+    #       # do nothing
+    #     end
+    #     :selected
+    #   end
+    # end
 
     def setup
       omit("Pending")
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index