[Groonga-commit] droonga/fluent-plugin-droonga at 693d18c [master] Don't write to socket from non loop.run thread

Back to archive index

Kouhei Sutou null+****@clear*****
Mon Mar 17 18:25:05 JST 2014


Kouhei Sutou	2014-03-17 18:25:05 +0900 (Mon, 17 Mar 2014)

  New Revision: 693d18c8fc7cbd5d89ecdaa7fb5fdf9047457b9a
  https://github.com/droonga/fluent-plugin-droonga/commit/693d18c8fc7cbd5d89ecdaa7fb5fdf9047457b9a

  Message:
    Don't write to socket from non loop.run thread

  Modified files:
    lib/droonga/fluent_message_sender.rb

  Modified: lib/droonga/fluent_message_sender.rb (+46 -9)
===================================================================
--- lib/droonga/fluent_message_sender.rb    2014-03-17 17:11:25 +0900 (3c40ffb)
+++ lib/droonga/fluent_message_sender.rb    2014-03-17 18:25:05 +0900 (6b26618)
@@ -15,6 +15,8 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
+require "thread"
+
 require "cool.io"
 
 require "droonga/loggable"
@@ -28,32 +30,40 @@ module Droonga
       @loop = loop
       @host = host
       @port = port
-      @connected = false
+      @socket = nil
+      @buffer = []
+      @write_mutex = Mutex.new
     end
 
     def start
       logger.trace("start: start")
-      connect
+      start_writer
       logger.trace("start: done")
     end
 
     def shutdown
       logger.trace("shutdown: start")
-      @socket.close unles****@socke*****?
+      shutdown_writer
+      shutdown_socket
       logger.trace("shutdown: done")
     end
 
     def send(tag, data)
       logger.trace("send: start")
-      connect unless @connected
       fluent_message = [tag, Time.now.to_i, data]
       packed_fluent_message = MessagePackPacker.pack(fluent_message)
-      @socket.write(packed_fluent_message)
-      @loop.break_current_loop
+      @write_mutex.synchronize do
+        @buffer << packed_fluent_message
+        @writer.signal
+      end
       logger.trace("send: done")
     end
 
     private
+    def connected?
+      not****@socke*****?
+    end
+
     def connect
       logger.trace("connect: start")
 
@@ -62,16 +72,15 @@ module Droonga
       end
       log_connect = lambda do
         logger.trace("connected to #{@host}:#{@port}")
-        @connected = true
       end
       log_failed = lambda do
         logger.error("failed to connect to #{@host}:#{@port}")
+        @socket = nil
       end
       on_close = lambda do
-        @connected = false
+        @socket = nil
       end
 
-      @connected = false
       @socket = Coolio::TCPSocket.connect(@host, @port)
       @socket.on_write_complete do
         log_write_complete.call
@@ -90,6 +99,34 @@ module Droonga
       logger.trace("connect: done")
     end
 
+    def shutdown_socket
+      return unless connected?
+      @socket.close unles****@socke*****?
+    end
+
+    def start_writer
+      @writer = Coolio::AsyncWatcher.new
+
+      on_signal = lambda do
+        @write_mutex.synchronize do
+          connect unless connected?
+          @buffer.each do |data|
+            @socket.write(data)
+          end
+          @buffer.clear
+        end
+      end
+      @writer.on_signal do
+        on_signal.call
+      end
+
+      @loop.attach(@writer)
+    end
+
+    def shutdown_writer
+      @writer.detach
+    end
+
     def log_tag
       "[#{Process.ppid}][#{Process.pid}] fluent-message-sender"
     end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index