Daijiro MORI
null+****@clear*****
Tue Mar 26 18:59:40 JST 2013
Daijiro MORI 2013-03-26 18:59:40 +0900 (Tue, 26 Mar 2013) New Revision: 1cc96e1b64f3bda3648793387c1cc15b576e63f1 https://github.com/groonga/fluent-plugin-droonga/commit/1cc96e1b64f3bda3648793387c1cc15b576e63f1 Message: Follow the newest replyTo field specification Modified files: lib/fluent/plugin/out_droonga.rb Modified: lib/fluent/plugin/out_droonga.rb (+51 -20) =================================================================== --- lib/fluent/plugin/out_droonga.rb 2013-03-01 14:06:00 +0900 (c3a0e56) +++ lib/fluent/plugin/out_droonga.rb 2013-03-26 18:59:40 +0900 (b6f6cf1) @@ -15,8 +15,7 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "socket" -require "msgpack" +require "fluent-logger" require "droonga/worker" module Fluent @@ -36,8 +35,8 @@ module Fluent def shutdown super @worker.shutdown - @outputs.each do |dest, socket| - socket.close + @outputs.each do |dest, output| + output[:logger].close if output[:logger] end end @@ -54,26 +53,58 @@ module Fluent exec(tag, time, record) end - def exec(tag, time, record) - result =****@worke*****_message(record) - if record["replyTo"] - post(record["replyTo"], tag, { - inReplyTo: record["id"], - type: (record["type"] || "") + ".result", - body: result - }) + def get_output(event) + receiver = event["replyTo"] + return nil unless receiver + unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/ + raise "format: hostname:port/tag(?params)" + end + host = $1 + port = $2 + tag = $3 + params = $4 + + host_port = "#{host}:#{port}" + @outputs[host_port] ||= {} + output = @outputs[host_port] + + has_connection_id = (not params.nil? \ + and params =~ /[\?&;]connection_id=([^&;]+)/) + if output[:logger].nil? or has_connection_id + connection_id = $1 + if not has_connection_id or output[:connection_id] != connection_id + output[:connection_id] = connection_id + logger = Fluent::Logger::FluentLogger.new(tag, :host => host, + :port=> port.to_i) + # output[:logger] should be closed if it exists beforehand? + output[:logger] = logger + end end + + has_client_session_id = (not params.nil? \ + and params =~ /[\?&;]client_session_id=([^&;]+)/) + if has_client_session_id + client_session_id = $1 + # some generic way to handle client_session_id is expected + end + + output[:logger] end - def post(dest, tag, result) - unless @outputs[dest] - host, port = dest.split(/:/, 2) - port = Integer(port) - socket = TCPSocket.new(host, port) - @outputs[dest] = socket + def exec(tag, time, record) + result =****@worke*****_message(record) + output = get_output(record) + if output + response = { + inReplyTo: record["id"], + statusCode: 200, + type: (record["type"] || "") + ".result", + body: { + result: result + } + } + output.post("message", response) end - data = {"tag" => tag, "data" => result}.to_msgpack - @outputs[dest].write(data) end end end -------------- next part -------------- HTML����������������������������...Download