Kouhei Sutou
null+****@clear*****
Mon Mar 17 18:25:05 JST 2014
Kouhei Sutou 2014-03-17 18:25:05 +0900 (Mon, 17 Mar 2014) New Revision: 693d18c8fc7cbd5d89ecdaa7fb5fdf9047457b9a https://github.com/droonga/fluent-plugin-droonga/commit/693d18c8fc7cbd5d89ecdaa7fb5fdf9047457b9a Message: Don't write to socket from non loop.run thread Modified files: lib/droonga/fluent_message_sender.rb Modified: lib/droonga/fluent_message_sender.rb (+46 -9) =================================================================== --- lib/droonga/fluent_message_sender.rb 2014-03-17 17:11:25 +0900 (3c40ffb) +++ lib/droonga/fluent_message_sender.rb 2014-03-17 18:25:05 +0900 (6b26618) @@ -15,6 +15,8 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +require "thread" + require "cool.io" require "droonga/loggable" @@ -28,32 +30,40 @@ module Droonga @loop = loop @host = host @port = port - @connected = false + @socket = nil + @buffer = [] + @write_mutex = Mutex.new end def start logger.trace("start: start") - connect + start_writer logger.trace("start: done") end def shutdown logger.trace("shutdown: start") - @socket.close unles****@socke*****? + shutdown_writer + shutdown_socket logger.trace("shutdown: done") end def send(tag, data) logger.trace("send: start") - connect unless @connected fluent_message = [tag, Time.now.to_i, data] packed_fluent_message = MessagePackPacker.pack(fluent_message) - @socket.write(packed_fluent_message) - @loop.break_current_loop + @write_mutex.synchronize do + @buffer << packed_fluent_message + @writer.signal + end logger.trace("send: done") end private + def connected? + not****@socke*****? + end + def connect logger.trace("connect: start") @@ -62,16 +72,15 @@ module Droonga end log_connect = lambda do logger.trace("connected to #{@host}:#{@port}") - @connected = true end log_failed = lambda do logger.error("failed to connect to #{@host}:#{@port}") + @socket = nil end on_close = lambda do - @connected = false + @socket = nil end - @connected = false @socket = Coolio::TCPSocket.connect(@host, @port) @socket.on_write_complete do log_write_complete.call @@ -90,6 +99,34 @@ module Droonga logger.trace("connect: done") end + def shutdown_socket + return unless connected? + @socket.close unles****@socke*****? + end + + def start_writer + @writer = Coolio::AsyncWatcher.new + + on_signal = lambda do + @write_mutex.synchronize do + connect unless connected? + @buffer.each do |data| + @socket.write(data) + end + @buffer.clear + end + end + @writer.on_signal do + on_signal.call + end + + @loop.attach(@writer) + end + + def shutdown_writer + @writer.detach + end + def log_tag "[#{Process.ppid}][#{Process.pid}] fluent-message-sender" end -------------- next part -------------- HTML����������������������������...Download