Kouhei Sutou
null+****@clear*****
Sun Jul 20 17:32:43 JST 2014
Kouhei Sutou 2014-07-20 17:32:43 +0900 (Sun, 20 Jul 2014) New Revision: 0f74e87da9f8128aa6c6add8b716f15575822434 https://github.com/droonga/droonga-engine/commit/0f74e87da9f8128aa6c6add8b716f15575822434 Message: Extract child process control code Added files: lib/droonga/process_supervisor.rb lib/droonga/worker_process_agent.rb Modified files: lib/droonga/command/droonga_engine.rb lib/droonga/command/droonga_engine_service.rb Modified: lib/droonga/command/droonga_engine.rb (+18 -34) =================================================================== --- lib/droonga/command/droonga_engine.rb 2014-07-20 17:17:16 +0900 (1b286bc) +++ lib/droonga/command/droonga_engine.rb 2014-07-20 17:32:43 +0900 (ee6a06c) @@ -24,8 +24,7 @@ require "sigdump" require "droonga/path" require "droonga/serf" require "droonga/file_observer" -require "droonga/process_control_protocol" -require "droonga/line_buffer" +require "droonga/process_supervisor" module Droonga module Command @@ -351,8 +350,6 @@ module Droonga end class ServiceRunner - include ProcessControlProtocol - def initialize(raw_loop, configuration) @raw_loop = raw_loop @configuration = configuration @@ -394,16 +391,17 @@ module Droonga @pid = spawn(env, *command_line, options) control_write_in.close control_read_out.close - attach_control_write_out(control_write_out) - attach_control_read_in(control_read_in) + @supervisor = create_process_supervisor(control_read_in, + control_write_out) + @supervisor.start end def stop_gracefully - @control_write_out.write(Messages::STOP_GRACEFUL) + @supervisor.stop_gracefully end def stop_immediately - @control_write_out.write(Messages::STOP_IMMEDIATELY) + @supervisor.stop_immediately end def success? @@ -411,6 +409,17 @@ module Droonga end private + def create_process_supervisor(input, output) + supervisor = ProcessSupervisor.new(@raw_loop, input, output) + supervisor.on_ready = lambda do + on_ready + end + supervisor.on_finish = lambda do + on_finish + end + supervisor + end + def on_ready @on_ready.call if @on_ready end @@ -422,34 +431,9 @@ module Droonga def on_finish _, status = Process.waitpid2(@pid) @success = status.success? - @control_write_out.close - @control_read_in.close + @supervisor.stop on_failure unless success? end - - def attach_control_write_out(control_write_out) - @control_write_out = Coolio::IO.new(control_write_out) - @raw_loop.attach(@control_write_out) - end - - def attach_control_read_in(control_read_in) - @control_read_in = Coolio::IO.new(control_read_in) - line_buffer = LineBuffer.new - on_read = lambda do |data| - line_buffer.feed(data) do |line| - case line - when Messages::READY - on_ready - when Messages::FINISH - on_finish - end - end - end - @control_read_in.on_read do |data| - on_read.call(data) - end - @raw_loop.attach(@control_read_in) - end end end end Modified: lib/droonga/command/droonga_engine_service.rb (+16 -58) =================================================================== --- lib/droonga/command/droonga_engine_service.rb 2014-07-20 17:17:16 +0900 (024022f) +++ lib/droonga/command/droonga_engine_service.rb 2014-07-20 17:32:43 +0900 (f5d74ec) @@ -17,8 +17,7 @@ require "optparse" require "coolio" -require "droonga/process_control_protocol" -require "droonga/line_buffer" +require "droonga/worker_process_agent" require "droonga/engine" require "droonga/fluent_message_receiver" require "droonga/internal_fluent_message_receiver" @@ -34,7 +33,6 @@ module Droonga end include Loggable - include ProcessControlProtocol def initialize @engine_name = nil @@ -59,10 +57,7 @@ module Droonga logger.exception("failed to run services", $!) success = false ensure - unless @control_write_closed - control_write_io.write(Messages::FINISH) - control_write_io.close - end + shutdown_worker_process_agent end success @@ -120,7 +115,7 @@ module Droonga run_internal_message_receiver run_engine run_receiver - run_control_io + run_worker_process_agent @loop.run end @@ -159,60 +154,23 @@ module Droonga receiver.shutdown end - def run_control_io - @control_read = Coolio::IO.new(IO.new(@control_read_fd)) + def run_worker_process_agent + input = IO.new(@control_read_fd) @control_read_fd = nil - on_read = lambda do |data| - line_buffer = LineBuffer.new - line_buffer.feed(data) do |line| - case line - when Messages::STOP_GRACEFUL - stop_gracefully - when Messages::STOP_IMMEDIATELY - stop_immediately - end - end - end - @control_read.on_read do |data| - on_read.call(data) - end - read_on_close = lambda do - if @control_read - @control_read = nil - stop_immediately - end - end - @control_read.on_close do - read_on_close.call - end - @loop.attach(@control_read) - - @control_write = Coolio::IO.new(IO.new(@control_write_fd)) + output = IO.new(@control_write_fd) @control_write_fd = nil - write_on_close = lambda do - if @control_write - @control_write = nil - stop_immediately - end - @control_write_closed = true + @worker_process_agent = WorkerProcessAgent.new(@loop, input, output) + @worker_process_agent.on_stop_gracefully = lambda do + stop_gracefully end - @control_write.on_close do - write_on_close.call + @worker_process_agent.on_stop_immediately = lambda do + stop_immediately end - @loop.attach(@control_write) - - @control_write.write(Messages::READY) + @worker_process_agent.start end - def shutdown_control_io - if @control_write - @control_write, control_write = nil, @control_write - control_write.detach - end - if @control_read - @control_read, control_read = nil, @control_read - control_read.close - end + def shutdown_worker_process_agent + @worker_process_agent.stop end def create_receiver @@ -252,14 +210,14 @@ module Droonga @stopping = true shutdown_receiver @engine.stop_gracefully do - shutdown_control_io + shutdown_worker_process_agent shutdown_internal_message_receiver end end # It may be called after stop_gracefully. def stop_immediately - shutdown_control_io + shutdown_worker_process_agent shutdown_receiver if @receiver shutdown_internal_message_receiver @engine.stop_immediately Added: lib/droonga/process_supervisor.rb (+91 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/process_supervisor.rb 2014-07-20 17:32:43 +0900 (936a245) @@ -0,0 +1,91 @@ +# Copyright (C) 2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "coolio" + +require "droonga/process_control_protocol" +require "droonga/line_buffer" + +module Droonga + class ProcessSupervisor + include ProcessControlProtocol + + def initialize(loop, input, output) + @loop = loop + @input = create_input(input) + @output = create_output(output) + @on_ready = nil + @on_finish = nil + end + + def start + @loop.attach(@input) + @loop.attach(@output) + end + + def stop + @input.close + @output.close + end + + def stop_gracefully + @output.write(Messages::STOP_GRACEFUL) + end + + def stop_immediately + @output.write(Messages::STOP_IMMEDIATELY) + end + + def on_ready=(callback) + @on_ready = callback + end + + def on_finish=(callback) + @on_finish = callback + end + + private + def create_input(raw_input) + input = Coolio::IO.new(raw_input) + line_buffer = LineBuffer.new + on_read = lambda do |data| + line_buffer.feed(data) do |line| + case line + when Messages::READY + on_ready + when Messages::FINISH + on_finish + end + end + end + input.on_read do |data| + on_read.call(data) + end + input + end + + def create_output(raw_output) + Coolio::IO.new(raw_output) + end + + def on_ready + @on_ready.call if @on_ready + end + + def on_finish + @on_finish.call if @on_finish + end + end +end Added: lib/droonga/worker_process_agent.rb (+109 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/worker_process_agent.rb 2014-07-20 17:32:43 +0900 (878cdb4) @@ -0,0 +1,109 @@ +# Copyright (C) 2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "coolio" + +require "droonga/process_control_protocol" +require "droonga/line_buffer" + +module Droonga + class WorkerProcessAgent + include ProcessControlProtocol + + def initialize(loop, input, output) + @loop = loop + create_input(input) + create_output(output) + @on_ready = nil + @on_finish = nil + end + + def start + @loop.attach(@input) + @loop.attach(@output) + + @output.write(Messages::READY) + end + + def stop + if @output + @output, output = nil, @output + output.write(Messages::FINISH) + output.close + end + if @input + @input, input = nil, @input + input.close + end + end + + def on_stop_gracefully=(callback) + @on_stop_gracefully = callback + end + + def on_stop_immediately=(callback) + @on_stop_immediately = callback + end + + private + def create_input(raw_input) + @input = Coolio::IO.new(raw_input) + on_read = lambda do |data| + line_buffer = LineBuffer.new + line_buffer.feed(data) do |line| + case line + when Messages::STOP_GRACEFUL + on_stop_gracefully + when Messages::STOP_IMMEDIATELY + on_stop_immediately + end + end + end + @input.on_read do |data| + on_read.call(data) + end + on_close = lambda do + if @input + @input = nil + on_stop_immediately + end + end + @input.on_close do + on_close.call + end + end + + def create_output(raw_output) + @output = Coolio::IO.new(raw_output) + on_close = lambda do + if @output + @output = nil + on_stop_immediately + end + end + @output.on_close do + on_close.call + end + end + + def on_stop_gracefully + @on_stop_gracefully.call if @on_stop_gracefully + end + + def on_stop_immediately + @on_stop_immediately.call if @on_stop_immediately + end + end +end -------------- next part -------------- HTML����������������������������...Download