[Groonga-commit] droonga/droonga-engine at 4f184cf [master] Implement supervisor system

Back to archive index

Kouhei Sutou null+****@clear*****
Sun Jul 20 19:18:04 JST 2014


Kouhei Sutou	2014-07-20 19:18:04 +0900 (Sun, 20 Jul 2014)

  New Revision: 4f184cfefa24651402afa7bf7e964d3d4a3d9905
  https://github.com/droonga/droonga-engine/commit/4f184cfefa24651402afa7bf7e964d3d4a3d9905

  Message:
    Implement supervisor system
    
    Because serverengine uses signal based start/stop mechanism.
    
    The mechanism is difficult to implement graceful stop/restart. Because
    we can't get stopped notification from worker processes. We need to
    create a pipe to get notifications from worker processes by ourselves.
    
    And serverengine uses the original logging mechanism. It is different
    format with on droonga-engine.
    
    The new supervisor system uses pipe to communicate with worker
    processes. It can be got notifications from workers. We can implement
    graceful stop/restart.
    
    TODO:
    
      * Support auto-restart worker process when worker process is crashed.
        * Heartbeat should be implemented.
      * Support service ready notification for drntest.

  Added files:
    bin/droonga-engine-worker
    lib/droonga/command/droonga_engine_worker.rb
    lib/droonga/supervisor.rb
  Removed files:
    lib/droonga/server.rb
    lib/droonga/worker.rb
  Modified files:
    droonga-engine.gemspec
    lib/droonga/slice.rb

  Added: bin/droonga-engine-worker (+20 -0) 100755
===================================================================
--- /dev/null
+++ bin/droonga-engine-worker    2014-07-20 19:18:04 +0900 (ab3c1d8)
@@ -0,0 +1,20 @@
+#!/usr/bin/env ruby
+#
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/command/droonga_engine_worker"
+
+exit(Droonga::Command::DroongaEngineWorker.run(ARGV))

  Modified: droonga-engine.gemspec (+0 -1)
===================================================================
--- droonga-engine.gemspec    2014-07-20 19:07:20 +0900 (11e7017)
+++ droonga-engine.gemspec    2014-07-20 19:18:04 +0900 (43039ed)
@@ -39,7 +39,6 @@ Gem::Specification.new do |gem|
   gem.add_dependency "groonga-command-parser"
   gem.add_dependency "json"
   gem.add_dependency "cool.io"
-  gem.add_dependency "serverengine"
   gem.add_dependency "droonga-message-pack-packer", ">= 1.0.1"
   gem.add_dependency "faraday"
   gem.add_dependency "faraday_middleware"

  Added: lib/droonga/command/droonga_engine_worker.rb (+231 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/command/droonga_engine_worker.rb    2014-07-20 19:18:04 +0900 (9462f99)
@@ -0,0 +1,231 @@
+# Copyright (C) 2013-2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "optparse"
+require "fileutils"
+
+require "coolio"
+
+require "droonga/job_receiver"
+require "droonga/plugin_loader"
+require "droonga/worker_process_agent"
+require "droonga/handler_runner"
+
+module Droonga
+  module Command
+    class DroongaEngineWorker
+      class << self
+        def run(command_line_arguments)
+          new.run(command_line_arguments)
+        end
+      end
+
+      include Loggable
+
+      def initialize
+        @job_queue_socket_path = nil
+        @contrtol_read_fd = nil
+        @contrtol_write_fd = nil
+        @pid_file_path = nil
+        @dataset = nil
+        @database_path = nil
+        @plugins = []
+        @worker_process_agent = nil
+      end
+
+      def run(command_line_arguments)
+        create_new_process_group
+
+        parse_command_line_arguments!(command_line_arguments)
+        PluginLoader.load_all
+
+        write_pid_file do
+          run_main_loop
+        end
+      end
+
+      private
+      def create_new_process_group
+        begin
+          Process.setsid
+        rescue SystemCallError, NotImplementedError
+        end
+      end
+
+      def parse_command_line_arguments!(command_line_arguments)
+        parser = OptionParser.new
+        add_internal_options(parser)
+        parser.parse!(command_line_arguments)
+      end
+
+      def add_internal_options(parser)
+        parser.separator("")
+        parser.separator("Internal:")
+        parser.on("--job-queue-socket-path=PATH",
+                  "Read jobs from PATH") do |path|
+          @job_queue_socket_path = Pathname.new(path)
+        end
+        parser.on("--control-read-fd=FD", Integer,
+                  "Use FD to read control messages from the service") do |fd|
+          @control_read_fd = fd
+        end
+        parser.on("--control-write-fd=FD", Integer,
+                  "Use FD to write control messages from the service") do |fd|
+          @control_write_fd = fd
+        end
+        parser.on("--pid-file=PATH",
+                  "Put PID to PATH") do |path|
+          @pid_file_path = Pathname.new(path)
+        end
+        parser.on("--dataset=DATASET",
+                  "Process DATASET") do |dataset|
+          @dataset = dataset
+        end
+        parser.on("--database-path=PATH",
+                  "Use database at PATH") do |path|
+          @database_path = Pathname.new(path)
+        end
+        parser.on("--plugins=PLUGIN1,PLUGIN2,...", Array,
+                  "Use PLUGINs") do |plugins|
+          @plugins = plugins
+        end
+      end
+
+      def write_pid_file
+        if @pid_file_path
+          @pid_file_path.open("w") do |file|
+            file.puts(Process.pid)
+          end
+          begin
+            yield
+          ensure
+            FileUtils.rm_f(@pid_file_path.to_s)
+          end
+        else
+          yield
+        end
+      end
+
+      def run_main_loop
+        begin
+          start
+          true
+        rescue
+          logger.exception("failed while running", $!)
+          false
+        ensure
+          stop_worker_process_agent
+        end
+      end
+
+      def start
+        @stopping = false
+        @loop = Coolio::Loop.default
+
+        start_forwarder
+        start_handler_runner
+        start_job_receiver
+        start_worker_process_agent
+
+        @loop.run
+      end
+
+      def stop_gracefully
+        return if @stopping
+        @stopping = true
+
+        stop_worker_process_agent
+        stop_job_receiver
+        stop_handler_runner
+        stop_forwarder
+      end
+
+      # It may be called after stop_gracefully.
+      def stop_immediately
+        stop_gracefully
+        @loop.stop
+      end
+
+      def start_forwarder
+        @forwarder = Forwarder.new(@loop)
+        @forwarder.start
+      end
+
+      def stop_forwarder
+        @forwarder.shutdown
+      end
+
+      def start_handler_runner
+        options = {
+          :forwarder => @forwarder,
+          :dataset   => @dataset,
+          :database  => @database_path.to_s,
+          :plugins   => @plugins,
+        }
+        @handler_runner = HandlerRunner.new(@loop, options)
+        @handler_runner.start
+      end
+
+      def stop_handler_runner
+        @handler_runner.shutdown
+      end
+
+      def start_job_receiver
+        @job_receiver = create_job_receiver
+        @job_receiver.start
+      end
+
+      def create_job_receiver
+        JobReceiver.new(@loop, @job_queue_socket_path.to_s) do |message|
+          process(message)
+        end
+      end
+
+      def process(message)
+        logger.trace("process: start")
+        @handler_runner.process(message)
+        logger.trace("process: done")
+      end
+
+      def stop_job_receiver
+        @job_receiver.shutdown
+      end
+
+      def start_worker_process_agent
+        input = IO.new(@control_read_fd)
+        @control_read_fd = nil
+        output = IO.new(@control_write_fd)
+        @control_write_fd = nil
+        @worker_process_agent = WorkerProcessAgent.new(@loop, input, output)
+        @worker_process_agent.on_stop_gracefully = lambda do
+          stop_gracefully
+        end
+        @worker_process_agent.on_stop_immediately = lambda do
+          stop_immediately
+        end
+        @worker_process_agent.start
+      end
+
+      def stop_worker_process_agent
+        return if @worker_process_agent.nil?
+        @worker_process_agent.stop
+      end
+
+      def log_tag
+        "[#{Process.ppid}][#{Process.pid}] worker"
+      end
+    end
+  end
+end

  Deleted: lib/droonga/server.rb (+0 -45) 100644
===================================================================
--- lib/droonga/server.rb    2014-07-20 19:07:20 +0900 (7885299)
+++ /dev/null
@@ -1,45 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/logger"
-
-module Droonga
-  module Server
-    def before_run
-      Droonga.logger.trace("#{log_tag}: before_run: start")
-      config[:job_pusher].close
-      config.delete(:job_pusher)
-      Droonga.logger.trace("#{log_tag}: before_run: done")
-    end
-
-    def after_run
-      Droonga.logger.trace("#{log_tag}: after_run: start")
-      Droonga.logger.trace("#{log_tag}: after_run: done")
-    end
-
-    def stop(stop_gracefully)
-      Droonga.logger.trace("#{log_tag}: stop: start")
-      super(stop_gracefully)
-      Droonga.logger.trace("#{log_tag}: stop: done")
-    end
-
-    private
-    def log_tag
-      "[#{Process.ppid}][#{Process.pid}] server"
-    end
-  end
-end

  Modified: lib/droonga/slice.rb (+19 -29)
===================================================================
--- lib/droonga/slice.rb    2014-07-20 19:07:20 +0900 (76f6f4f)
+++ lib/droonga/slice.rb    2014-07-20 19:18:04 +0900 (2b8520e)
@@ -13,11 +13,8 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "serverengine"
-
 require "droonga/loggable"
-require "droonga/server"
-require "droonga/worker"
+require "droonga/supervisor"
 require "droonga/event_loop"
 require "droonga/job_pusher"
 require "droonga/processor"
@@ -32,7 +29,8 @@ module Droonga
       @loop = loop
       @options = options
       @n_workers = @options[:n_workers] || 0
-      @job_pusher = JobPusher.new(@loop, @options[:database])
+      @database_path = @options[:database]
+      @job_pusher = JobPusher.new(@loop, @database_path)
       @processor = Processor.new(@loop, @job_pusher, @options)
       @supervisor = nil
     end
@@ -41,7 +39,7 @@ module Droonga
       ensure_database
       @processor.start
       @job_pusher.start
-      start_supervisor if @n_workers > 0
+      start_supervisor
     end
 
     def shutdown
@@ -63,14 +61,13 @@ module Droonga
       enforce_umask
       context = Groonga::Context.new
       begin
-        database_path = @options[:database]
-        if File.exist?(database_path)
-          context.open_database(database_path) do
+        if File.exist?(@database_path)
+          context.open_database(@database_path) do
             apply_schema(context)
           end
         else
-          FileUtils.mkdir_p(File.dirname(database_path))
-          context.create_database(database_path) do
+          FileUtils.mkdir_p(File.dirname(@database_path))
+          context.create_database(@database_path) do
             apply_schema(context)
           end
         end
@@ -89,28 +86,21 @@ module Droonga
     end
 
     def start_supervisor
-      @supervisor = ServerEngine::Supervisor.new(Server, Worker) do
-        force_options = {
-          :worker_type   => "process",
-          :workers       => @options[:n_workers],
-          :log_level     => logger.level,
-          :server_process_name => "Server[#{@options[:database]}] #$0",
-          :worker_process_name => "Worker[#{@options[:database]}] #$0",
-          :job_receive_socket_path => @job_pusher.socket_path,
-          :job_pusher => @job_pusher,
-        }
-        @options.merge(force_options)
-      end
-      @supervisor_thread = Thread.new do
-        @supervisor.main
-      end
+      return if @n_workers.zero?
+
+      config = Supervisor::WorkerConfiguration.new
+      config.name = @options[:name]
+      config.dataset = @dataset
+      config.database_path = @database_path
+      config.plugins = @options[:plugins]
+      config.job_pusher = @job_pusher
+      @supervisor = Supervisor.new(@loop, @n_workers, config)
+      @supervisor.start
     end
 
     def shutdown_supervisor
       logger.trace("supervisor: shutdown: start")
-      @supervisor.stop(true)
-      logger.trace("supervisor: shutdown: stopped")
-      @supervisor_thread.join
+      @supervisor.stop_gracefully
       logger.trace("supervisor: shutdown: done")
     end
 

  Added: lib/droonga/supervisor.rb (+161 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/supervisor.rb    2014-07-20 19:18:04 +0900 (4c1f3d1)
@@ -0,0 +1,161 @@
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/loggable"
+require "droonga/process_supervisor"
+
+module Droonga
+  class Supervisor
+    include Loggable
+
+    def initialize(loop, n_workers, config)
+      @loop = loop
+      @n_workers = n_workers
+      @config = config
+    end
+
+    def start
+      @worker_runners = @n_workers.times.collect do |i|
+        worker_runner = WorkerRunner.new(@loop, i, @config)
+        worker_runner.start
+        # TODO: support auto re-run
+        worker_runner
+      end
+    end
+
+    def stop_gracefully
+      @worker_runners.each do |worker_runner|
+        worker_runner.stop_gracefully
+      end
+    end
+
+    def stop_immediately
+      @worker_runners.each do |worker_runner|
+        worker_runner.stop_immediately
+      end
+    end
+
+    private
+    def log_tag
+      "[#{Process.pid}] supervisor"
+    end
+
+    class WorkerConfiguration
+      attr_accessor :name
+      attr_accessor :dataset
+      attr_accessor :database_path
+      attr_accessor :plugins
+      attr_accessor :job_pusher
+      def initialize
+        @name = nil
+        @dataset = nil
+        @database_path = nil
+        @plugins = []
+        @job_pusher = nil
+      end
+    end
+
+    class WorkerRunner
+      include Loggable
+
+      attr_writer :on_ready
+      attr_writer :on_failure
+      def initialize(loop, id, config)
+        @loop = loop
+        @id = id
+        @config = config
+        @on_ready = nil
+        @on_failure = nil
+      end
+
+      def start
+        control_write_in, control_write_out = IO.pipe
+        control_read_in, control_read_out = IO.pipe
+        env = {}
+        command_line = [
+          RbConfig.ruby,
+          "-S",
+          "droonga-engine-worker",
+          "--control-read-fd", control_write_in.fileno.to_s,
+          "--control-write-fd", control_read_out.fileno.to_s,
+          "--job-queue-socket-path", @config.job_pusher.socket_path.to_s,
+          "--pid-file", pid_path.to_s,
+          "--dataset", @config.dataset.name,
+          "--database-path", @config.database_path.to_s,
+          "--plugins", @config.plugins.join(","),
+        ]
+        options = {
+          control_write_in => control_write_in,
+          control_read_out => control_read_out,
+        }
+        @pid = spawn(env, *command_line, options)
+        control_write_in.close
+        control_read_out.close
+        @supervisor = create_process_supervisor(control_read_in,
+                                                control_write_out)
+        @supervisor.start
+      end
+
+      def stop_gracefully
+        @supervisor.stop_gracefully
+      end
+
+      def stop_immediately
+        @supervisor.stop_immediately
+      end
+
+      def success?
+        @success
+      end
+
+      private
+      def pid_path
+        @config.database_path + "droonga-worker-#{@id}.pid"
+      end
+
+      def create_process_supervisor(input, output)
+        supervisor = ProcessSupervisor.new(@loop, input, output)
+        supervisor.on_ready = lambda do
+          on_ready
+        end
+        supervisor.on_finish = lambda do
+          on_finish
+        end
+        supervisor
+      end
+
+      def on_ready
+        @on_ready.call if @on_ready
+      end
+
+      def on_failure
+        # TODO: log
+        @on_failure.call if @on_failure
+      end
+
+      def on_finish
+        _, status = Process.waitpid2(@pid)
+        @success = status.success?
+        @supervisor.stop
+        on_failure unless success?
+      end
+
+      private
+      def log_tag
+        "[#{Process.pid}] worker-runner"
+      end
+    end
+  end
+end

  Deleted: lib/droonga/worker.rb (+0 -66) 100644
===================================================================
--- lib/droonga/worker.rb    2014-07-20 19:07:20 +0900 (4cb95ae)
+++ /dev/null
@@ -1,66 +0,0 @@
-# Copyright (C) 2013-2014 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/event_loop"
-require "droonga/handler_runner"
-require "droonga/job_receiver"
-
-module Droonga
-  module Worker
-    def initialize
-      @raw_loop = Coolio::Loop.new
-      @loop = EventLoop.new(@raw_loop)
-      @forwarder = Forwarder.new(@loop)
-      @handler_runner = HandlerRunner.new(@loop,
-                                          config.merge(:dispatcher => nil,
-                                                       :engine_state => nil,
-                                                       :forwarder => @forwarder))
-      receive_socket_path = config[:job_receive_socket_path]
-      @job_receiver = JobReceiver.new(@loop, receive_socket_path) do |message|
-        process(message)
-      end
-    end
-
-    def run
-      Droonga.logger.trace("#{log_tag}: run: start")
-      @forwarder.start
-      @handler_runner.start
-      @job_receiver.start
-      @raw_loop.run
-      @handler_runner.shutdown
-      @forwarder.shutdown
-      Droonga.logger.trace("#{log_tag}: run: done")
-    end
-
-    def stop
-      Droonga.logger.trace("#{log_tag}: stop: start")
-      @job_receiver.shutdown
-      @raw_loop.stop
-      @loop.break_current_loop
-      Droonga.logger.trace("#{log_tag}: stop: done")
-    end
-
-    private
-    def process(message)
-      Droonga.logger.trace("#{log_tag}: process: start")
-      @handler_runner.process(message)
-      Droonga.logger.trace("#{log_tag}: process: done")
-    end
-
-    def log_tag
-      "[#{Process.ppid}][#{Process.pid}] worker"
-    end
-  end
-end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index