Kouhei Sutou
null+****@clear*****
Mon Jan 5 16:33:46 JST 2015
Kouhei Sutou 2015-01-05 16:33:46 +0900 (Mon, 05 Jan 2015) New Revision: eb9aea25428718351467480fd95c2698a1c19d87 https://github.com/droonga/droonga-engine/commit/eb9aea25428718351467480fd95c2698a1c19d87 Message: serf: extract code for "serf agent" as a class "serf agent" is a special command. It runs serf as a service. Other command runs serf as a command. Added files: lib/droonga/serf_agent.rb Modified files: lib/droonga/command/droonga_engine.rb lib/droonga/command/remote.rb lib/droonga/serf.rb Modified: lib/droonga/command/droonga_engine.rb (+8 -7) =================================================================== --- lib/droonga/command/droonga_engine.rb 2015-01-05 15:47:07 +0900 (febe71a) +++ lib/droonga/command/droonga_engine.rb 2015-01-05 16:33:46 +0900 (0ef3fbd) @@ -341,7 +341,7 @@ module Droonga end def run - @serf = run_serf + start_serf @service_runner = run_service setup_initial_on_ready @catalog_observer = run_catalog_observer @@ -390,14 +390,16 @@ module Droonga def stop_gracefully @command_runner.stop @catalog_observer.stop - @serf.stop + @serf.leave + @serf_agent.stop @service_runner.stop_gracefully end def stop_immediately @command_runner.stop @catalog_observer.stop - @serf.stop + @serf.leave + @serf_agent.stop @service_runner.stop_immediately end @@ -426,10 +428,9 @@ module Droonga service_runner end - def run_serf - serf = Serf.new(@loop, @configuration.engine_name) - serf.start - serf + def start_serf + @serf = Serf.new(@configuration.engine_name) + @serf_agent =****@serf*****_agent(@loop) end def run_catalog_observer Modified: lib/droonga/command/remote.rb (+1 -1) =================================================================== --- lib/droonga/command/remote.rb 2015-01-05 15:47:07 +0900 (2ffd9ec) +++ lib/droonga/command/remote.rb 2015-01-05 16:33:46 +0900 (e12b93d) @@ -37,7 +37,7 @@ module Droonga @response = { "log" => [] } - @serf = Serf.new(nil, @serf_name) + @serf = Serf.new(@serf_name) @service_installation = ServiceInstallation.new @service_installation.ensure_using_service_base_directory Modified: lib/droonga/serf.rb (+31 -173) =================================================================== --- lib/droonga/serf.rb 2015-01-05 15:47:07 +0900 (3a9d959) +++ lib/droonga/serf.rb 2015-01-05 16:33:46 +0900 (c889be4) @@ -13,8 +13,6 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -require "English" - require "json" require "coolio" require "open3" @@ -24,15 +22,13 @@ require "droonga/loggable" require "droonga/catalog_loader" require "droonga/node_metadata" require "droonga/serf_downloader" +require "droonga/serf_agent" require "droonga/line_buffer" require "droonga/safe_file_writer" require "droonga/service_installation" module Droonga class Serf - # the port must be different from droonga-http-server's agent! - AGENT_PORT = 7946 - class << self def path Droonga::Path.base + "serf" @@ -41,61 +37,44 @@ module Droonga include Loggable - def initialize(loop, name) - # TODO: Don't allow nil for loop. It reduces nil checks and - # simplifies source code. - @loop = loop + def initialize(name) @name = name - @agent = nil @service_installation = ServiceInstallation.new end - def start - logger.trace("start: start") + def run_agent(loop) + logger.trace("run_agent: start") ensure_serf retry_joins = [] detect_other_hosts.each do |other_host| retry_joins.push("-retry-join", other_host) end - @agent = run("agent", - "-node", @name, - "-bind", "#{extract_host(@name)}:#{port}", - "-event-handler", "droonga-engine-serf-event-handler", - "-log-level", log_level, - "-tag", "type=engine", - "-tag", "role=#{role}", - "-tag", "cluster_id=#{cluster_id}", - *retry_joins) - Thread.new do - sleep 1 # wait until the serf agent becomes running - update_cluster_state if****@agent*****? + agent = Agent.new(loop, @serf, + extract_host(@name), agent_port, rpc_port, + "-node", @name, + "-event-handler", "droonga-engine-serf-event-handler", + "-log-level", log_level, + "-tag", "type=engine", + "-tag", "role=#{role}", + "-tag", "cluster_id=#{cluster_id}", + *retry_joins) + agent.on_ready = lambda do + update_cluster_state end - logger.trace("start: done") - end - - def running? - @agent and****@agent*****? - end - - def stop - logger.trace("stop: start") - run("leave").stop - @agent.stop - @agent = nil - logger.trace("stop: done") + agent.start + logger.trace("run_agent: done") + agent end - def restart - logger.trace("restart: start") - stop - start - logger.trace("restart: done") + def leave + ensure_serf + run_once("leave") end def join(*hosts) ensure_serf nodes = hosts.collect do |host| - "#{host}:#{port}" + "#{host}:#{agent_port}" end run_once("join", *nodes) end @@ -203,16 +182,8 @@ module Droonga nil end - def run(command, *options) - process = SerfProcess.new(@loop, @serf, command, - "-rpc-addr", rpc_address, - *options) - process.start - process - end - def run_once(command, *options) - process = SerfProcess.new(@loop, @serf, command, + process = SerfProcess.new(@serf, command, "-rpc-addr", rpc_address, *options) process.run_once @@ -243,15 +214,19 @@ module Droonga end def rpc_address - "#{extract_host(@name)}:7373" + "#{extract_host(@name)}:#{rpc_port}" + end + + def rpc_port + 7373 end def node_metadata @node_metadata ||= NodeMetadata.new end - def port - AGENT_PORT + def agent_port + Agent::PORT end def detect_other_hosts @@ -272,35 +247,10 @@ module Droonga class SerfProcess include Loggable - def initialize(loop, serf, command, *options) - @loop = loop + def initialize(serf, command, *options) @serf = serf @command = command @options = options - @pid = nil - end - - def start - capture_output do |output_write, error_write| - env = {} - spawn_options = { - :out => output_write, - :err => error_write, - } - @pid = spawn(env, @serf, @command, *@options, spawn_options) - end - end - - def stop - return if****@pid*****? - Process.waitpid(@pid) - @output_io.close - @error_io.close - @pid = nil - end - - def running? - not****@pid*****? end def run_once @@ -311,98 +261,6 @@ module Droonga :status => status, } end - - private - def capture_output - result = nil - output_read, output_write = IO.pipe - error_read, error_write = IO.pipe - - begin - result = yield(output_write, error_write) - rescue - output_read.close unless output_read.closed? - output_write.close unless output_write.closed? - error_read.close unless error_read.closed? - error_write.close unless error_write.closed? - raise - end - - output_line_buffer = LineBuffer.new - on_read_output = lambda do |data| - on_standard_output(output_line_buffer, data) - end - @output_io = Coolio::IO.new(output_read) - @output_io.on_read do |data| - on_read_output.call(data) - end - # TODO: Don't allow nil for loop. It reduces nil checks and - # simplifies source code. - @loop.attach(@output_io) if @loop - - error_line_buffer = LineBuffer.new - on_read_error = lambda do |data| - on_error_output(error_line_buffer, data) - end - @error_io = Coolio::IO.new(error_read) - @error_io.on_read do |data| - on_read_error.call(data) - end - # TODO: Don't allow nil for loop. It reduces nil checks and - # simplifies source code. - @loop.attach(@error_io) if @loop - - result - end - - def on_standard_output(line_buffer, data) - line_buffer.feed(data) do |line| - line = line.chomp - case line - when /\A==> / - content = $POSTMATCH - logger.info(content) - when /\A / - content = $POSTMATCH - case content - when /\A(\d{4})\/(\d{2})\/(\d{2}) (\d{2}):(\d{2}):(\d{2}) \[(\w+)\] / - year, month, day = $1, $2, $3 - hour, minute, second = $4, $5, $6 - level = $7 - content = $POSTMATCH - level = normalize_level(level) - logger.send(level, content) - else - logger.info(content) - end - else - logger.info(line) - end - end - end - - def normalize_level(level) - level = level.downcase - case level - when "err" - "error" - else - level - end - end - - def on_error_output(line_buffer, data) - line_buffer.feed(data) do |line| - line = line.chomp - logger.error(line.gsub(/\A==> /, "")) - end - end - - def log_tag - tag = "serf" - tag << "[#{@pid}]" if @pid - tag - end end end end Added: lib/droonga/serf_agent.rb (+196 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/serf_agent.rb 2015-01-05 16:33:46 +0900 (b011098) @@ -0,0 +1,196 @@ +# Copyright (C) 2014-2015 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +require "English" + +require "coolio" + +require "droonga/loggable" + +module Droonga + class Serf + class Agent + # the port must be different from droonga-http-server's agent! + PORT = 7946 + + include Loggable + + MAX_N_READ_CHECKS = 10 + + attr_writer :on_ready + def initialize(loop, serf, host, bind_port, rpc_port, *options) + @loop = loop + @serf = serf + @host = host + @bind_port = bind_port + @rpc_port = rpc_port + @options = options + @pid = nil + @on_ready = nil + @n_ready_checks = 0 + end + + def start + capture_output do |output_write, error_write| + env = {} + spawn_options = { + :out => output_write, + :err => error_write, + } + @pid = spawn(env, @serf, "agent", + "-bind", "#{@host}:#{@bind_port}", + "-rpc-addr", "#{@host}:#{@rpc_port}", + *@options, spawn_options) + end + start_ready_check + end + + def stop + return if****@pid*****? + Process.waitpid(@pid) + @output_io.close + @error_io.close + @pid = nil + end + + def running? + not****@pid*****? + end + + private + def capture_output + result = nil + output_read, output_write = IO.pipe + error_read, error_write = IO.pipe + + begin + result = yield(output_write, error_write) + rescue + output_read.close unless output_read.closed? + output_write.close unless output_write.closed? + error_read.close unless error_read.closed? + error_write.close unless error_write.closed? + raise + end + + output_line_buffer = LineBuffer.new + on_read_output = lambda do |data| + on_standard_output(output_line_buffer, data) + end + @output_io = Coolio::IO.new(output_read) + @output_io.on_read do |data| + on_read_output.call(data) + end + @loop.attach(@output_io) + + error_line_buffer = LineBuffer.new + on_read_error = lambda do |data| + on_error_output(error_line_buffer, data) + end + @error_io = Coolio::IO.new(error_read) + @error_io.on_read do |data| + on_read_error.call(data) + end + @loop.attach(@error_io) + + result + end + + def on_standard_output(line_buffer, data) + line_buffer.feed(data) do |line| + line = line.chomp + case line + when /\A==> / + content = $POSTMATCH + logger.info(content) + when /\A / + content = $POSTMATCH + case content + when /\A(\d{4})\/(\d{2})\/(\d{2}) (\d{2}):(\d{2}):(\d{2}) \[(\w+)\] / + year, month, day = $1, $2, $3 + hour, minute, second = $4, $5, $6 + level = $7 + content = $POSTMATCH + level = normalize_level(level) + logger.send(level, content) + else + logger.info(content) + end + else + logger.info(line) + end + end + end + + def normalize_level(level) + level = level.downcase + case level + when "err" + "error" + else + level + end + end + + def on_error_output(line_buffer, data) + line_buffer.feed(data) do |line| + line = line.chomp + logger.error(line.gsub(/\A==> /, "")) + end + end + + def start_ready_check + @n_ready_checks += 1 + + checker = Coolio::TCPSocket.connect(@host, @bind_port) + + on_connect = lambda do + @on_ready.call if @on_ready + checker.close + end + checker.on_connect do + on_connect.call + end + + on_connect_failed = lambda do + if @n_ready_checks >= MAX_N_READ_CHECKS + # TODO: @on_fail.call if @on_fail + else + timer = Coolio::TimerWatcher.new(1) + on_timer = lambda do + start_ready_check + timer.detach + end + timer.on_timer do + on_timer.call + end + @loop.attach(timer) + end + end + checker.on_connect_failed do + on_connect_failed.call + end + + @loop.attach(checker) + end + + def log_tag + tag = "serf-agent" + tag << "[#{@pid}]" if @pid + tag + end + end + end +end -------------- next part -------------- HTML����������������������������...Download