Kouhei Sutou
null+****@clear*****
Sun Jul 20 19:18:04 JST 2014
Kouhei Sutou 2014-07-20 19:18:04 +0900 (Sun, 20 Jul 2014) New Revision: 4f184cfefa24651402afa7bf7e964d3d4a3d9905 https://github.com/droonga/droonga-engine/commit/4f184cfefa24651402afa7bf7e964d3d4a3d9905 Message: Implement supervisor system Because serverengine uses signal based start/stop mechanism. The mechanism is difficult to implement graceful stop/restart. Because we can't get stopped notification from worker processes. We need to create a pipe to get notifications from worker processes by ourselves. And serverengine uses the original logging mechanism. It is different format with on droonga-engine. The new supervisor system uses pipe to communicate with worker processes. It can be got notifications from workers. We can implement graceful stop/restart. TODO: * Support auto-restart worker process when worker process is crashed. * Heartbeat should be implemented. * Support service ready notification for drntest. Added files: bin/droonga-engine-worker lib/droonga/command/droonga_engine_worker.rb lib/droonga/supervisor.rb Removed files: lib/droonga/server.rb lib/droonga/worker.rb Modified files: droonga-engine.gemspec lib/droonga/slice.rb Added: bin/droonga-engine-worker (+20 -0) 100755 =================================================================== --- /dev/null +++ bin/droonga-engine-worker 2014-07-20 19:18:04 +0900 (ab3c1d8) @@ -0,0 +1,20 @@ +#!/usr/bin/env ruby +# +# Copyright (C) 2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "droonga/command/droonga_engine_worker" + +exit(Droonga::Command::DroongaEngineWorker.run(ARGV)) Modified: droonga-engine.gemspec (+0 -1) =================================================================== --- droonga-engine.gemspec 2014-07-20 19:07:20 +0900 (11e7017) +++ droonga-engine.gemspec 2014-07-20 19:18:04 +0900 (43039ed) @@ -39,7 +39,6 @@ Gem::Specification.new do |gem| gem.add_dependency "groonga-command-parser" gem.add_dependency "json" gem.add_dependency "cool.io" - gem.add_dependency "serverengine" gem.add_dependency "droonga-message-pack-packer", ">= 1.0.1" gem.add_dependency "faraday" gem.add_dependency "faraday_middleware" Added: lib/droonga/command/droonga_engine_worker.rb (+231 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/command/droonga_engine_worker.rb 2014-07-20 19:18:04 +0900 (9462f99) @@ -0,0 +1,231 @@ +# Copyright (C) 2013-2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "optparse" +require "fileutils" + +require "coolio" + +require "droonga/job_receiver" +require "droonga/plugin_loader" +require "droonga/worker_process_agent" +require "droonga/handler_runner" + +module Droonga + module Command + class DroongaEngineWorker + class << self + def run(command_line_arguments) + new.run(command_line_arguments) + end + end + + include Loggable + + def initialize + @job_queue_socket_path = nil + @contrtol_read_fd = nil + @contrtol_write_fd = nil + @pid_file_path = nil + @dataset = nil + @database_path = nil + @plugins = [] + @worker_process_agent = nil + end + + def run(command_line_arguments) + create_new_process_group + + parse_command_line_arguments!(command_line_arguments) + PluginLoader.load_all + + write_pid_file do + run_main_loop + end + end + + private + def create_new_process_group + begin + Process.setsid + rescue SystemCallError, NotImplementedError + end + end + + def parse_command_line_arguments!(command_line_arguments) + parser = OptionParser.new + add_internal_options(parser) + parser.parse!(command_line_arguments) + end + + def add_internal_options(parser) + parser.separator("") + parser.separator("Internal:") + parser.on("--job-queue-socket-path=PATH", + "Read jobs from PATH") do |path| + @job_queue_socket_path = Pathname.new(path) + end + parser.on("--control-read-fd=FD", Integer, + "Use FD to read control messages from the service") do |fd| + @control_read_fd = fd + end + parser.on("--control-write-fd=FD", Integer, + "Use FD to write control messages from the service") do |fd| + @control_write_fd = fd + end + parser.on("--pid-file=PATH", + "Put PID to PATH") do |path| + @pid_file_path = Pathname.new(path) + end + parser.on("--dataset=DATASET", + "Process DATASET") do |dataset| + @dataset = dataset + end + parser.on("--database-path=PATH", + "Use database at PATH") do |path| + @database_path = Pathname.new(path) + end + parser.on("--plugins=PLUGIN1,PLUGIN2,...", Array, + "Use PLUGINs") do |plugins| + @plugins = plugins + end + end + + def write_pid_file + if @pid_file_path + @pid_file_path.open("w") do |file| + file.puts(Process.pid) + end + begin + yield + ensure + FileUtils.rm_f(@pid_file_path.to_s) + end + else + yield + end + end + + def run_main_loop + begin + start + true + rescue + logger.exception("failed while running", $!) + false + ensure + stop_worker_process_agent + end + end + + def start + @stopping = false + @loop = Coolio::Loop.default + + start_forwarder + start_handler_runner + start_job_receiver + start_worker_process_agent + + @loop.run + end + + def stop_gracefully + return if @stopping + @stopping = true + + stop_worker_process_agent + stop_job_receiver + stop_handler_runner + stop_forwarder + end + + # It may be called after stop_gracefully. + def stop_immediately + stop_gracefully + @loop.stop + end + + def start_forwarder + @forwarder = Forwarder.new(@loop) + @forwarder.start + end + + def stop_forwarder + @forwarder.shutdown + end + + def start_handler_runner + options = { + :forwarder => @forwarder, + :dataset => @dataset, + :database => @database_path.to_s, + :plugins => @plugins, + } + @handler_runner = HandlerRunner.new(@loop, options) + @handler_runner.start + end + + def stop_handler_runner + @handler_runner.shutdown + end + + def start_job_receiver + @job_receiver = create_job_receiver + @job_receiver.start + end + + def create_job_receiver + JobReceiver.new(@loop, @job_queue_socket_path.to_s) do |message| + process(message) + end + end + + def process(message) + logger.trace("process: start") + @handler_runner.process(message) + logger.trace("process: done") + end + + def stop_job_receiver + @job_receiver.shutdown + end + + def start_worker_process_agent + input = IO.new(@control_read_fd) + @control_read_fd = nil + output = IO.new(@control_write_fd) + @control_write_fd = nil + @worker_process_agent = WorkerProcessAgent.new(@loop, input, output) + @worker_process_agent.on_stop_gracefully = lambda do + stop_gracefully + end + @worker_process_agent.on_stop_immediately = lambda do + stop_immediately + end + @worker_process_agent.start + end + + def stop_worker_process_agent + return if @worker_process_agent.nil? + @worker_process_agent.stop + end + + def log_tag + "[#{Process.ppid}][#{Process.pid}] worker" + end + end + end +end Deleted: lib/droonga/server.rb (+0 -45) 100644 =================================================================== --- lib/droonga/server.rb 2014-07-20 19:07:20 +0900 (7885299) +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/logger" - -module Droonga - module Server - def before_run - Droonga.logger.trace("#{log_tag}: before_run: start") - config[:job_pusher].close - config.delete(:job_pusher) - Droonga.logger.trace("#{log_tag}: before_run: done") - end - - def after_run - Droonga.logger.trace("#{log_tag}: after_run: start") - Droonga.logger.trace("#{log_tag}: after_run: done") - end - - def stop(stop_gracefully) - Droonga.logger.trace("#{log_tag}: stop: start") - super(stop_gracefully) - Droonga.logger.trace("#{log_tag}: stop: done") - end - - private - def log_tag - "[#{Process.ppid}][#{Process.pid}] server" - end - end -end Modified: lib/droonga/slice.rb (+19 -29) =================================================================== --- lib/droonga/slice.rb 2014-07-20 19:07:20 +0900 (76f6f4f) +++ lib/droonga/slice.rb 2014-07-20 19:18:04 +0900 (2b8520e) @@ -13,11 +13,8 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "serverengine" - require "droonga/loggable" -require "droonga/server" -require "droonga/worker" +require "droonga/supervisor" require "droonga/event_loop" require "droonga/job_pusher" require "droonga/processor" @@ -32,7 +29,8 @@ module Droonga @loop = loop @options = options @n_workers = @options[:n_workers] || 0 - @job_pusher = JobPusher.new(@loop, @options[:database]) + @database_path = @options[:database] + @job_pusher = JobPusher.new(@loop, @database_path) @processor = Processor.new(@loop, @job_pusher, @options) @supervisor = nil end @@ -41,7 +39,7 @@ module Droonga ensure_database @processor.start @job_pusher.start - start_supervisor if @n_workers > 0 + start_supervisor end def shutdown @@ -63,14 +61,13 @@ module Droonga enforce_umask context = Groonga::Context.new begin - database_path = @options[:database] - if File.exist?(database_path) - context.open_database(database_path) do + if File.exist?(@database_path) + context.open_database(@database_path) do apply_schema(context) end else - FileUtils.mkdir_p(File.dirname(database_path)) - context.create_database(database_path) do + FileUtils.mkdir_p(File.dirname(@database_path)) + context.create_database(@database_path) do apply_schema(context) end end @@ -89,28 +86,21 @@ module Droonga end def start_supervisor - @supervisor = ServerEngine::Supervisor.new(Server, Worker) do - force_options = { - :worker_type => "process", - :workers => @options[:n_workers], - :log_level => logger.level, - :server_process_name => "Server[#{@options[:database]}] #$0", - :worker_process_name => "Worker[#{@options[:database]}] #$0", - :job_receive_socket_path => @job_pusher.socket_path, - :job_pusher => @job_pusher, - } - @options.merge(force_options) - end - @supervisor_thread = Thread.new do - @supervisor.main - end + return if @n_workers.zero? + + config = Supervisor::WorkerConfiguration.new + config.name = @options[:name] + config.dataset = @dataset + config.database_path = @database_path + config.plugins = @options[:plugins] + config.job_pusher = @job_pusher + @supervisor = Supervisor.new(@loop, @n_workers, config) + @supervisor.start end def shutdown_supervisor logger.trace("supervisor: shutdown: start") - @supervisor.stop(true) - logger.trace("supervisor: shutdown: stopped") - @supervisor_thread.join + @supervisor.stop_gracefully logger.trace("supervisor: shutdown: done") end Added: lib/droonga/supervisor.rb (+161 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/supervisor.rb 2014-07-20 19:18:04 +0900 (4c1f3d1) @@ -0,0 +1,161 @@ +# Copyright (C) 2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "droonga/loggable" +require "droonga/process_supervisor" + +module Droonga + class Supervisor + include Loggable + + def initialize(loop, n_workers, config) + @loop = loop + @n_workers = n_workers + @config = config + end + + def start + @worker_runners = @n_workers.times.collect do |i| + worker_runner = WorkerRunner.new(@loop, i, @config) + worker_runner.start + # TODO: support auto re-run + worker_runner + end + end + + def stop_gracefully + @worker_runners.each do |worker_runner| + worker_runner.stop_gracefully + end + end + + def stop_immediately + @worker_runners.each do |worker_runner| + worker_runner.stop_immediately + end + end + + private + def log_tag + "[#{Process.pid}] supervisor" + end + + class WorkerConfiguration + attr_accessor :name + attr_accessor :dataset + attr_accessor :database_path + attr_accessor :plugins + attr_accessor :job_pusher + def initialize + @name = nil + @dataset = nil + @database_path = nil + @plugins = [] + @job_pusher = nil + end + end + + class WorkerRunner + include Loggable + + attr_writer :on_ready + attr_writer :on_failure + def initialize(loop, id, config) + @loop = loop + @id = id + @config = config + @on_ready = nil + @on_failure = nil + end + + def start + control_write_in, control_write_out = IO.pipe + control_read_in, control_read_out = IO.pipe + env = {} + command_line = [ + RbConfig.ruby, + "-S", + "droonga-engine-worker", + "--control-read-fd", control_write_in.fileno.to_s, + "--control-write-fd", control_read_out.fileno.to_s, + "--job-queue-socket-path", @config.job_pusher.socket_path.to_s, + "--pid-file", pid_path.to_s, + "--dataset", @config.dataset.name, + "--database-path", @config.database_path.to_s, + "--plugins", @config.plugins.join(","), + ] + options = { + control_write_in => control_write_in, + control_read_out => control_read_out, + } + @pid = spawn(env, *command_line, options) + control_write_in.close + control_read_out.close + @supervisor = create_process_supervisor(control_read_in, + control_write_out) + @supervisor.start + end + + def stop_gracefully + @supervisor.stop_gracefully + end + + def stop_immediately + @supervisor.stop_immediately + end + + def success? + @success + end + + private + def pid_path + @config.database_path + "droonga-worker-#{@id}.pid" + end + + def create_process_supervisor(input, output) + supervisor = ProcessSupervisor.new(@loop, input, output) + supervisor.on_ready = lambda do + on_ready + end + supervisor.on_finish = lambda do + on_finish + end + supervisor + end + + def on_ready + @on_ready.call if @on_ready + end + + def on_failure + # TODO: log + @on_failure.call if @on_failure + end + + def on_finish + _, status = Process.waitpid2(@pid) + @success = status.success? + @supervisor.stop + on_failure unless success? + end + + private + def log_tag + "[#{Process.pid}] worker-runner" + end + end + end +end Deleted: lib/droonga/worker.rb (+0 -66) 100644 =================================================================== --- lib/droonga/worker.rb 2014-07-20 19:07:20 +0900 (4cb95ae) +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright (C) 2013-2014 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/event_loop" -require "droonga/handler_runner" -require "droonga/job_receiver" - -module Droonga - module Worker - def initialize - @raw_loop = Coolio::Loop.new - @loop = EventLoop.new(@raw_loop) - @forwarder = Forwarder.new(@loop) - @handler_runner = HandlerRunner.new(@loop, - config.merge(:dispatcher => nil, - :engine_state => nil, - :forwarder => @forwarder)) - receive_socket_path = config[:job_receive_socket_path] - @job_receiver = JobReceiver.new(@loop, receive_socket_path) do |message| - process(message) - end - end - - def run - Droonga.logger.trace("#{log_tag}: run: start") - @forwarder.start - @handler_runner.start - @job_receiver.start - @raw_loop.run - @handler_runner.shutdown - @forwarder.shutdown - Droonga.logger.trace("#{log_tag}: run: done") - end - - def stop - Droonga.logger.trace("#{log_tag}: stop: start") - @job_receiver.shutdown - @raw_loop.stop - @loop.break_current_loop - Droonga.logger.trace("#{log_tag}: stop: done") - end - - private - def process(message) - Droonga.logger.trace("#{log_tag}: process: start") - @handler_runner.process(message) - Droonga.logger.trace("#{log_tag}: process: done") - end - - def log_tag - "[#{Process.ppid}][#{Process.pid}] worker" - end - end -end -------------- next part -------------- HTML����������������������������...Download