Kouhei Sutou
null+****@clear*****
Tue May 27 12:34:15 JST 2014
Kouhei Sutou 2014-05-27 12:34:15 +0900 (Tue, 27 May 2014) New Revision: aec8070a13b59224b7218ec0aafa23759f585bec https://github.com/droonga/droonga-engine/commit/aec8070a13b59224b7218ec0aafa23759f585bec Message: Support graceful shutdown and restart Modified files: lib/droonga/command/droonga_engine.rb lib/droonga/engine.rb lib/droonga/engine_state.rb Modified: lib/droonga/command/droonga_engine.rb (+107 -68) =================================================================== --- lib/droonga/command/droonga_engine.rb 2014-05-26 18:54:42 +0900 (99dd550) +++ lib/droonga/command/droonga_engine.rb 2014-05-27 12:34:15 +0900 (7d62cce) @@ -213,8 +213,7 @@ module Droonga service_runner = nil trap(:INT) do serf.shutdown if serf - service_runner.stop_immedieate if service_runner - raw_loop.stop + service_runner.stop_immediately if service_runner end trap(Signals::GRACEFUL_STOP) do serf.shutdown if serf @@ -222,8 +221,7 @@ module Droonga end trap(Signals::IMMEDIATE_STOP) do serf.shutdown if serf - service_runner.stop_immediate if service_runner - raw_loop.stop + service_runner.stop_immediately if service_runner end trap(Signals::GRACEFUL_RESTART) do old_service_runner = service_runner @@ -235,7 +233,7 @@ module Droonga trap(Signals::IMMEDIATE_RESTART) do old_service_runner = service_runner service_runner = run_service(raw_loop) - old_service_runner.stop_immediate + old_service_runner.stop_immediately end serf = run_serf(raw_loop) @@ -284,11 +282,8 @@ module Droonga @on_ready = callback end - def on_finish=(callback) - @on_finish = callback - end - def run + control_write_in, control_write_out = IO.pipe control_read_in, control_read_out = IO.pipe listen_fd =****@confi*****_socket.fileno heartbeat_fd =****@confi*****_socket.fileno @@ -299,12 +294,14 @@ module Droonga "#{$0}-service", "--listen-fd", listen_fd.to_s, "--heartbeat-fd", heartbeat_fd.to_s, + "--control-read-fd", control_write_in.fileno.to_s, "--control-write-fd", control_read_out.fileno.to_s, *@configuration.to_command_line, ] options = { listen_fd => listen_fd, heartbeat_fd => heartbeat_fd, + control_write_in => control_write_in, control_read_out => control_read_out, } if @log_output @@ -312,16 +309,18 @@ module Droonga options[:err] = @log_output end @pid = spawn(env, *command_line, options) + control_write_in.close control_read_out.close + attach_control_write_out(control_write_out) attach_control_read_in(control_read_in) end def stop_graceful - stop(Signals::GRACEFUL_STOP) + @control_write_out.write("stop-graceful\n") end - def stop_immedieate - stop(Signals::IMMEDIATE_STOP) + def stop_immediately + @control_write_out.write("stop-immediately\n") end def success? @@ -329,28 +328,22 @@ module Droonga end private - def stop(signal) - return if****@pid*****? - - pid = @pid - Process.kill(signal, pid) - @pid = nil - @stop_timer = Coolio::TimerWatcher.new(0.5, true) - on_timer = lambda do - _, status = Process.waitpid2(pid, Process::WNOHANG) - if status - @success = status.success? - @stop_timer.detach - end - end - @stop_timer.on_timer do - on_timer.call - end - @raw_loop.attach(@stop_timer) + def on_ready + @on_ready.call if @on_ready + end + def on_finish + _, status = Process.waitpid2(@pid) + @success = status.success? + @control_write_out.close @control_read_in.close end + def attach_control_write_out(control_write_out) + @control_write_out = Coolio::IO.new(control_write_out) + @raw_loop.attach(@control_write_out) + end + def attach_control_read_in(control_read_in) @control_read_in = Coolio::IO.new(control_read_in) on_read = lambda do |data| @@ -358,7 +351,9 @@ module Droonga data.each_line do |line| case line when "ready\n" - @on_ready.call if @on_ready + on_ready + when "finish\n" + on_finish end end end @@ -382,25 +377,40 @@ module Droonga @engine_name = nil @listen_fd = nil @heartbeat_fd = nil - @contrtol_fd = nil + @contrtol_read_fd = nil + @contrtol_write_fd = nil + @contrtol_write_closed = false end def run(command_line_arguments) + create_new_process_group + parse_command_line_arguments!(command_line_arguments) PluginLoader.load_all + control_write_io = IO.new(@control_write_fd) begin run_services rescue logger.exception("failed to run services", $!) ensure - shutdown_services + unless @control_write_closed + control_write_io.write("finish\n") + control_write_io.close + end end true end private + def create_new_process_group + begin + Process.setsid + rescue SystemCallError, NotImplementedError + end + end + def parse_command_line_arguments!(command_line_arguments) parser = OptionParser.new add_internal_options(parser) @@ -422,6 +432,10 @@ module Droonga "Use FD as the heartbeat file descriptor") do |fd| @heartbeat_fd = fd end + parser.on("--control-read-fd=FD", Integer, + "Use FD to read control messages from the service") do |fd| + @control_read_fd = fd + end parser.on("--control-write-fd=FD", Integer, "Use FD to write control messages from the service") do |fd| @control_write_fd = fd @@ -433,27 +447,18 @@ module Droonga end def run_services + @stopping = false @engine = nil @receiver = nil - raw_loop = Coolio::Loop.default - @loop = EventLoop.new(raw_loop) + @loop = Coolio::Loop.default run_internal_message_receiver run_engine run_receiver - setup_signals run_control_io @loop.run end - def shutdown_services - shutdown_control_io - shutdown_receiver - shutdown_internal_message_receiver - shutdown_engine - @loop = nil - end - def run_internal_message_receiver @internal_message_receiver = create_internal_message_receiver host, port = @internal_message_receiver.start @@ -478,12 +483,6 @@ module Droonga @engine.start end - def shutdown_engine - return if****@engin*****? - @engine, engine = nil, @engine - engine.shutdown - end - def run_receiver @receiver = create_receiver @receiver.start @@ -496,17 +495,59 @@ module Droonga end def run_control_io + @control_read = Coolio::IO.new(IO.new(@control_read_fd)) + @control_read_fd = nil + on_read = lambda do |data| + # TODO: should buffer data to handle half line received case + data.each_line do |line| + case line + when "stop-graceful\n" + stop_graceful + when "stop-immediately\n" + stop_immediately + end + end + end + @control_read.on_read do |data| + on_read.call(data) + end + read_on_close = lambda do + if @control_read + @control_read = nil + stop_immediately + end + end + @control_read.on_close do + read_on_close.call + end + @loop.attach(@control_read) + @control_write = Coolio::IO.new(IO.new(@control_write_fd)) @control_write_fd = nil + write_on_close = lambda do + if @control_write + @control_write = nil + stop_immediately + end + @control_write_closed = true + end + @control_write.on_close do + write_on_close.call + end @loop.attach(@control_write) @control_write.write("ready\n") end def shutdown_control_io - return if @control_write.nil? - @control_write, control_write = nil, @control_write - control_write.close + if @control_write + @control_write, control_write = nil, @control_write + control_write.detach + end + if @control_read + @control_read, control_read = nil, @control_read + control_read.close + end end def create_receiver @@ -541,27 +582,25 @@ module Droonga @engine.process(message) end - def setup_signals - trap(Signals::GRACEFUL_STOP) do - stop_graceful - end - trap(Signals::IMMEDIATE_STOP) do - stop_immediate - end - trap(:INT) do - stop_immediate - trap(:INT, "DEFAULT") + def stop_graceful + return if @stopping + @stopping = true + shutdown_receiver + @engine.stop_graceful do + shutdown_control_io + shutdown_internal_message_receiver end end - def stop_graceful + # It may be called after stop_graceful. + def stop_immediately + shutdown_control_io + shutdown_receiver if @receiver + shutdown_internal_message_receiver + @engine.stop_immediately @loop.stop end - def stop_immediate - shutdown_services - end - def log_tag "service" end Modified: lib/droonga/engine.rb (+22 -3) =================================================================== --- lib/droonga/engine.rb 2014-05-26 18:54:42 +0900 (d1177f6) +++ lib/droonga/engine.rb 2014-05-27 12:34:15 +0900 (662eb6e) @@ -59,14 +59,33 @@ module Droonga logger.trace("start: done") end - def shutdown - logger.trace("shutdown: start") + def stop_graceful + logger.trace("stop_graceful: start") + @catalog_observer.stop + @live_nodes_list_observer.stop + on_finish = lambda do + output_last_processed_timestamp + @dispatcher.shutdown + @state.shutdown + yield + end + if****@state*****_session? + @state.on_finish = on_finish + else + on_finish.call + end + logger.trace("stop_graceful: done") + end + + # It may be called after stop_graceful. + def stop_immediately + logger.trace("stop_immediately: start") output_last_processed_timestamp @catalog_observer.stop @live_nodes_list_observer.stop @dispatcher.shutdown @state.shutdown - logger.trace("shutdown: done") + logger.trace("stop_immediately: done") end def process(message) Modified: lib/droonga/engine_state.rb (+9 -0) =================================================================== --- lib/droonga/engine_state.rb 2014-05-26 18:54:42 +0900 (985bd1c) +++ lib/droonga/engine_state.rb 2014-05-27 12:34:15 +0900 (8a57c3a) @@ -31,6 +31,7 @@ module Droonga attr_reader :internal_name attr_reader :forwarder attr_reader :replier + attr_accessor :on_finish def initialize(loop, name, internal_name) @loop = loop @name = name @@ -39,6 +40,7 @@ module Droonga @current_id = 0 @forwarder = Forwarder.new(@loop) @replier = Replier.new(@forwarder) + @on_finish = nil end def start @@ -86,6 +88,13 @@ module Droonga def unregister_session(id) @sessions.delete(id) + unless have_session? + @on_finish.call if @on_finish + end + end + + def have_session? + not****@sessi*****? end private -------------- next part -------------- HTML����������������������������...Download