[Groonga-commit] droonga/droonga-engine at eb9aea2 [master] serf: extract code for "serf agent" as a class

Back to archive index

Kouhei Sutou null+****@clear*****
Mon Jan 5 16:33:46 JST 2015


Kouhei Sutou	2015-01-05 16:33:46 +0900 (Mon, 05 Jan 2015)

  New Revision: eb9aea25428718351467480fd95c2698a1c19d87
  https://github.com/droonga/droonga-engine/commit/eb9aea25428718351467480fd95c2698a1c19d87

  Message:
    serf: extract code for "serf agent" as a class
    
    "serf agent" is a special command. It runs serf as a service. Other
    command runs serf as a command.

  Added files:
    lib/droonga/serf_agent.rb
  Modified files:
    lib/droonga/command/droonga_engine.rb
    lib/droonga/command/remote.rb
    lib/droonga/serf.rb

  Modified: lib/droonga/command/droonga_engine.rb (+8 -7)
===================================================================
--- lib/droonga/command/droonga_engine.rb    2015-01-05 15:47:07 +0900 (febe71a)
+++ lib/droonga/command/droonga_engine.rb    2015-01-05 16:33:46 +0900 (0ef3fbd)
@@ -341,7 +341,7 @@ module Droonga
         end
 
         def run
-          @serf = run_serf
+          start_serf
           @service_runner = run_service
           setup_initial_on_ready
           @catalog_observer = run_catalog_observer
@@ -390,14 +390,16 @@ module Droonga
         def stop_gracefully
           @command_runner.stop
           @catalog_observer.stop
-          @serf.stop
+          @serf.leave
+          @serf_agent.stop
           @service_runner.stop_gracefully
         end
 
         def stop_immediately
           @command_runner.stop
           @catalog_observer.stop
-          @serf.stop
+          @serf.leave
+          @serf_agent.stop
           @service_runner.stop_immediately
         end
 
@@ -426,10 +428,9 @@ module Droonga
           service_runner
         end
 
-        def run_serf
-          serf = Serf.new(@loop, @configuration.engine_name)
-          serf.start
-          serf
+        def start_serf
+          @serf = Serf.new(@configuration.engine_name)
+          @serf_agent =****@serf*****_agent(@loop)
         end
 
         def run_catalog_observer

  Modified: lib/droonga/command/remote.rb (+1 -1)
===================================================================
--- lib/droonga/command/remote.rb    2015-01-05 15:47:07 +0900 (2ffd9ec)
+++ lib/droonga/command/remote.rb    2015-01-05 16:33:46 +0900 (e12b93d)
@@ -37,7 +37,7 @@ module Droonga
           @response  = {
             "log" => []
           }
-          @serf = Serf.new(nil, @serf_name)
+          @serf = Serf.new(@serf_name)
 
           @service_installation = ServiceInstallation.new
           @service_installation.ensure_using_service_base_directory

  Modified: lib/droonga/serf.rb (+31 -173)
===================================================================
--- lib/droonga/serf.rb    2015-01-05 15:47:07 +0900 (3a9d959)
+++ lib/droonga/serf.rb    2015-01-05 16:33:46 +0900 (c889be4)
@@ -13,8 +13,6 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
-require "English"
-
 require "json"
 require "coolio"
 require "open3"
@@ -24,15 +22,13 @@ require "droonga/loggable"
 require "droonga/catalog_loader"
 require "droonga/node_metadata"
 require "droonga/serf_downloader"
+require "droonga/serf_agent"
 require "droonga/line_buffer"
 require "droonga/safe_file_writer"
 require "droonga/service_installation"
 
 module Droonga
   class Serf
-    # the port must be different from droonga-http-server's agent!
-    AGENT_PORT = 7946
-
     class << self
       def path
         Droonga::Path.base + "serf"
@@ -41,61 +37,44 @@ module Droonga
 
     include Loggable
 
-    def initialize(loop, name)
-      # TODO: Don't allow nil for loop. It reduces nil checks and
-      # simplifies source code.
-      @loop = loop
+    def initialize(name)
       @name = name
-      @agent = nil
       @service_installation = ServiceInstallation.new
     end
 
-    def start
-      logger.trace("start: start")
+    def run_agent(loop)
+      logger.trace("run_agent: start")
       ensure_serf
       retry_joins = []
       detect_other_hosts.each do |other_host|
         retry_joins.push("-retry-join", other_host)
       end
-      @agent = run("agent",
-                   "-node", @name,
-                   "-bind", "#{extract_host(@name)}:#{port}",
-                   "-event-handler", "droonga-engine-serf-event-handler",
-                   "-log-level", log_level,
-                   "-tag", "type=engine",
-                   "-tag", "role=#{role}",
-                   "-tag", "cluster_id=#{cluster_id}",
-                   *retry_joins)
-      Thread.new do
-        sleep 1 # wait until the serf agent becomes running
-        update_cluster_state if****@agent*****?
+      agent = Agent.new(loop, @serf,
+                        extract_host(@name), agent_port, rpc_port,
+                        "-node", @name,
+                        "-event-handler", "droonga-engine-serf-event-handler",
+                        "-log-level", log_level,
+                        "-tag", "type=engine",
+                        "-tag", "role=#{role}",
+                        "-tag", "cluster_id=#{cluster_id}",
+                        *retry_joins)
+      agent.on_ready = lambda do
+        update_cluster_state
       end
-      logger.trace("start: done")
-    end
-
-    def running?
-      @agent and****@agent*****?
-    end
-
-    def stop
-      logger.trace("stop: start")
-      run("leave").stop
-      @agent.stop
-      @agent = nil
-      logger.trace("stop: done")
+      agent.start
+      logger.trace("run_agent: done")
+      agent
     end
 
-    def restart
-      logger.trace("restart: start")
-      stop
-      start
-      logger.trace("restart: done")
+    def leave
+      ensure_serf
+      run_once("leave")
     end
 
     def join(*hosts)
       ensure_serf
       nodes = hosts.collect do |host|
-        "#{host}:#{port}"
+        "#{host}:#{agent_port}"
       end
       run_once("join", *nodes)
     end
@@ -203,16 +182,8 @@ module Droonga
       nil
     end
 
-    def run(command, *options)
-      process = SerfProcess.new(@loop, @serf, command,
-                                "-rpc-addr", rpc_address,
-                                *options)
-      process.start
-      process
-    end
-
     def run_once(command, *options)
-      process = SerfProcess.new(@loop, @serf, command,
+      process = SerfProcess.new(@serf, command,
                                 "-rpc-addr", rpc_address,
                                 *options)
       process.run_once
@@ -243,15 +214,19 @@ module Droonga
     end
 
     def rpc_address
-      "#{extract_host(@name)}:7373"
+      "#{extract_host(@name)}:#{rpc_port}"
+    end
+
+    def rpc_port
+      7373
     end
 
     def node_metadata
       @node_metadata ||= NodeMetadata.new
     end
 
-    def port
-      AGENT_PORT
+    def agent_port
+      Agent::PORT
     end
 
     def detect_other_hosts
@@ -272,35 +247,10 @@ module Droonga
     class SerfProcess
       include Loggable
 
-      def initialize(loop, serf, command, *options)
-        @loop = loop
+      def initialize(serf, command, *options)
         @serf = serf
         @command = command
         @options = options
-        @pid = nil
-      end
-
-      def start
-        capture_output do |output_write, error_write|
-          env = {}
-          spawn_options = {
-            :out => output_write,
-            :err => error_write,
-          }
-          @pid = spawn(env, @serf, @command, *@options, spawn_options)
-        end
-      end
-
-      def stop
-        return if****@pid*****?
-        Process.waitpid(@pid)
-        @output_io.close
-        @error_io.close
-        @pid = nil
-      end
-
-      def running?
-        not****@pid*****?
       end
 
       def run_once
@@ -311,98 +261,6 @@ module Droonga
           :status => status,
         }
       end
-
-      private
-      def capture_output
-        result = nil
-        output_read, output_write = IO.pipe
-        error_read, error_write = IO.pipe
-
-        begin
-          result = yield(output_write, error_write)
-        rescue
-          output_read.close  unless output_read.closed?
-          output_write.close unless output_write.closed?
-          error_read.close   unless error_read.closed?
-          error_write.close  unless error_write.closed?
-          raise
-        end
-
-        output_line_buffer = LineBuffer.new
-        on_read_output = lambda do |data|
-          on_standard_output(output_line_buffer, data)
-        end
-        @output_io = Coolio::IO.new(output_read)
-        @output_io.on_read do |data|
-          on_read_output.call(data)
-        end
-        # TODO: Don't allow nil for loop. It reduces nil checks and
-        # simplifies source code.
-        @loop.attach(@output_io) if @loop
-
-        error_line_buffer = LineBuffer.new
-        on_read_error = lambda do |data|
-          on_error_output(error_line_buffer, data)
-        end
-        @error_io = Coolio::IO.new(error_read)
-        @error_io.on_read do |data|
-          on_read_error.call(data)
-        end
-        # TODO: Don't allow nil for loop. It reduces nil checks and
-        # simplifies source code.
-        @loop.attach(@error_io) if @loop
-
-        result
-      end
-
-      def on_standard_output(line_buffer, data)
-        line_buffer.feed(data) do |line|
-          line = line.chomp
-          case line
-          when /\A==> /
-            content = $POSTMATCH
-            logger.info(content)
-          when /\A    /
-            content = $POSTMATCH
-            case content
-            when /\A(\d{4})\/(\d{2})\/(\d{2}) (\d{2}):(\d{2}):(\d{2}) \[(\w+)\] /
-              year, month, day = $1, $2, $3
-              hour, minute, second = $4, $5, $6
-              level = $7
-              content = $POSTMATCH
-              level = normalize_level(level)
-              logger.send(level, content)
-            else
-              logger.info(content)
-            end
-          else
-            logger.info(line)
-          end
-        end
-      end
-
-      def normalize_level(level)
-        level = level.downcase
-        case level
-        when "err"
-          "error"
-        else
-          level
-        end
-      end
-
-      def on_error_output(line_buffer, data)
-        line_buffer.feed(data) do |line|
-          line = line.chomp
-          logger.error(line.gsub(/\A==> /, ""))
-        end
-      end
-
-      def log_tag
-        tag = "serf"
-        tag << "[#{@pid}]" if @pid
-        tag
-      end
     end
   end
 end

  Added: lib/droonga/serf_agent.rb (+196 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/serf_agent.rb    2015-01-05 16:33:46 +0900 (b011098)
@@ -0,0 +1,196 @@
+# Copyright (C) 2014-2015 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+
+require "English"
+
+require "coolio"
+
+require "droonga/loggable"
+
+module Droonga
+  class Serf
+    class Agent
+      # the port must be different from droonga-http-server's agent!
+      PORT = 7946
+
+      include Loggable
+
+      MAX_N_READ_CHECKS = 10
+
+      attr_writer :on_ready
+      def initialize(loop, serf, host, bind_port, rpc_port, *options)
+        @loop = loop
+        @serf = serf
+        @host = host
+        @bind_port = bind_port
+        @rpc_port = rpc_port
+        @options = options
+        @pid = nil
+        @on_ready = nil
+        @n_ready_checks = 0
+      end
+
+      def start
+        capture_output do |output_write, error_write|
+          env = {}
+          spawn_options = {
+            :out => output_write,
+            :err => error_write,
+          }
+          @pid = spawn(env, @serf, "agent",
+                       "-bind", "#{@host}:#{@bind_port}",
+                       "-rpc-addr", "#{@host}:#{@rpc_port}",
+                       *@options, spawn_options)
+        end
+        start_ready_check
+      end
+
+      def stop
+        return if****@pid*****?
+        Process.waitpid(@pid)
+        @output_io.close
+        @error_io.close
+        @pid = nil
+      end
+
+      def running?
+        not****@pid*****?
+      end
+
+      private
+      def capture_output
+        result = nil
+        output_read, output_write = IO.pipe
+        error_read, error_write = IO.pipe
+
+        begin
+          result = yield(output_write, error_write)
+        rescue
+          output_read.close  unless output_read.closed?
+          output_write.close unless output_write.closed?
+          error_read.close   unless error_read.closed?
+          error_write.close  unless error_write.closed?
+          raise
+        end
+
+        output_line_buffer = LineBuffer.new
+        on_read_output = lambda do |data|
+          on_standard_output(output_line_buffer, data)
+        end
+        @output_io = Coolio::IO.new(output_read)
+        @output_io.on_read do |data|
+          on_read_output.call(data)
+        end
+        @loop.attach(@output_io)
+
+        error_line_buffer = LineBuffer.new
+        on_read_error = lambda do |data|
+          on_error_output(error_line_buffer, data)
+        end
+        @error_io = Coolio::IO.new(error_read)
+        @error_io.on_read do |data|
+          on_read_error.call(data)
+        end
+        @loop.attach(@error_io)
+
+        result
+      end
+
+      def on_standard_output(line_buffer, data)
+        line_buffer.feed(data) do |line|
+          line = line.chomp
+          case line
+          when /\A==> /
+            content = $POSTMATCH
+            logger.info(content)
+          when /\A    /
+            content = $POSTMATCH
+            case content
+            when /\A(\d{4})\/(\d{2})\/(\d{2}) (\d{2}):(\d{2}):(\d{2}) \[(\w+)\] /
+              year, month, day = $1, $2, $3
+              hour, minute, second = $4, $5, $6
+              level = $7
+              content = $POSTMATCH
+              level = normalize_level(level)
+              logger.send(level, content)
+            else
+              logger.info(content)
+            end
+          else
+            logger.info(line)
+          end
+        end
+      end
+
+      def normalize_level(level)
+        level = level.downcase
+        case level
+        when "err"
+          "error"
+        else
+          level
+        end
+      end
+
+      def on_error_output(line_buffer, data)
+        line_buffer.feed(data) do |line|
+          line = line.chomp
+          logger.error(line.gsub(/\A==> /, ""))
+        end
+      end
+
+      def start_ready_check
+        @n_ready_checks += 1
+
+        checker = Coolio::TCPSocket.connect(@host, @bind_port)
+
+        on_connect = lambda do
+          @on_ready.call if @on_ready
+          checker.close
+        end
+        checker.on_connect do
+          on_connect.call
+        end
+
+        on_connect_failed = lambda do
+          if @n_ready_checks >= MAX_N_READ_CHECKS
+            # TODO: @on_fail.call if @on_fail
+          else
+            timer = Coolio::TimerWatcher.new(1)
+            on_timer = lambda do
+              start_ready_check
+              timer.detach
+            end
+            timer.on_timer do
+              on_timer.call
+            end
+            @loop.attach(timer)
+          end
+        end
+        checker.on_connect_failed do
+          on_connect_failed.call
+        end
+
+        @loop.attach(checker)
+      end
+
+      def log_tag
+        tag = "serf-agent"
+        tag << "[#{@pid}]" if @pid
+        tag
+      end
+    end
+  end
+end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index