Daijiro MORI
null+****@clear*****
Wed Oct 9 12:45:24 JST 2013
Daijiro MORI 2013-10-09 12:45:24 +0900 (Wed, 09 Oct 2013) New Revision: 646cab257a18a6a1cec7cc495d5d340a2fd08e64 https://github.com/droonga/droonga.org/commit/646cab257a18a6a1cec7cc495d5d340a2fd08e64 Message: Add push_example. Added files: push_example/ddl.grn push_example/events.jsons push_example/push_example.md push_example/scantest.rb push_example/subscriptions.jsons Added: push_example/ddl.grn (+50 -0) 100644 =================================================================== --- /dev/null +++ push_example/ddl.grn 2013-10-09 12:45:24 +0900 (64073fb) @@ -0,0 +1,50 @@ +table_create Postername TABLE_HASH_KEY ShortText +table_create Poster TABLE_HASH_KEY ShortText +table_create Beuser TABLE_HASH_KEY ShortText +table_create Email TABLE_HASH_KEY ShortText +table_create Site TABLE_HASH_KEY ShortText +table_create Host TABLE_HASH_KEY ShortText +table_create Category TABLE_HASH_KEY ShortText +table_create Board TABLE_HASH_KEY ShortText +column_create Board name COLUMN_SCALAR ShortText +table_create Thread TABLE_PAT_KEY ShortText +column_create Thread alive COLUMN_SCALAR Time +column_create Thread buzz COLUMN_SCALAR Float +column_create Thread created COLUMN_SCALAR Time +column_create Thread lastmod COLUMN_SCALAR Time +column_create Thread name COLUMN_SCALAR ShortText +column_create Thread ncomments COLUMN_SCALAR Int32 +column_create Thread size COLUMN_SCALAR Int32 +column_create Thread snip COLUMN_SCALAR ShortText +table_create Comment TABLE_NO_KEY +column_create Comment body COLUMN_SCALAR ShortText +column_create Comment date COLUMN_SCALAR Time +column_create Comment num COLUMN_SCALAR Int32 +column_create Comment status COLUMN_SCALAR Int32 +table_create Term TABLE_PAT_KEY ShortText --default_tokenizer TokenBigram --normalizer NormalizerAuto +column_create Term thread_name COLUMN_INDEX|WITH_POSITION Thread name +column_create Term thread__key COLUMN_INDEX|WITH_POSITION Thread _key +column_create Term comment_body COLUMN_INDEX|WITH_POSITION Comment body +column_create Comment thread COLUMN_SCALAR Thread +column_create Comment postername COLUMN_SCALAR Postername +column_create Comment posterid COLUMN_SCALAR Poster +column_create Comment email COLUMN_SCALAR Email +column_create Comment beid COLUMN_SCALAR Beuser +column_create Thread comment_thread COLUMN_INDEX Comment thread +column_create Thread board COLUMN_SCALAR Board +column_create Board thread_board COLUMN_INDEX Thread board +column_create Board host COLUMN_SCALAR Host +column_create Board category COLUMN_SCALAR Category +column_create Host site COLUMN_SCALAR Site + +table_create User TABLE_HASH_KEY ShortText +table_create Route TABLE_HASH_KEY ShortText +table_create Query TABLE_HASH_KEY ShortText +table_create Keyword TABLE_PAT_KEY ShortText --normalizer NormalizerAuto +column_create User subscriptions COLUMN_VECTOR Query +column_create User route COLUMN_SCALAR Route +column_create Query keywords COLUMN_VECTOR Keyword +column_create Query subscribers COLUMN_INDEX User subscriptions +column_create Keyword queries COLUMN_INDEX Query keywords + +column_create Thread queries COLUMN_VECTOR Query Added: push_example/events.jsons (+4 -0) 100644 =================================================================== --- /dev/null +++ push_example/events.jsons 2013-10-09 12:45:24 +0900 (52e4ad1) @@ -0,0 +1,4 @@ +{"table":"Comment", "values":{"thread":"fuga/000", "body" : "abc aaa bb"}} +{"table":"Thread", "key":"hoge/000", "values":{"name" : "abc aaa bb"}} +{"table":"Comment", "values":{"thread":"fuga/000", "body" : "ddd eee"}} +{"table":"Comment", "values":{"thread":"hoge/000", "body" : "fff ggg"}} Added: push_example/push_example.md (+85 -0) 100644 =================================================================== --- /dev/null +++ push_example/push_example.md 2013-10-09 12:45:24 +0900 (89df98f) @@ -0,0 +1,85 @@ +--- +title: push機能の実装方式を検討するための単体プログラム +layout: default +--- + +# 単体プログラムの目的 + +droongaのpub/sub機能については、 +分散方式とのすり合わせなど検討すべき項目がいくつも残っていますが、 +それ以前に、droongaの仕組みに乗っかって実際に動作する +イベントのスキャン機能(更新イベントの中からユーザが購読するクエリを検出する機能) +がどの程度の性能で動作するのか見当をつけておく必要があります。 + +そこでまずはdroongaに依存せず単体で動作するスキャン機能を作りました。 +購読するクエリの数や種類によって、スキャン性能がどのように変化するのか調べておけば、 +あとからdroongaと結合したときに、性能を予測したり最適化するのに役立ちます。 + +# 単体プログラム(scantest.rb)の使い方 + +DBを準備します。 + + mkdir testdb + groonga -n testdb/db < ddl.grn + +実行します。 + + ./scantest.rb testdb/db subscriptions.jsons events.jsons + +# scantest.rbの説明 + +* 第一引数には、使用するデータベースのパスを指定します。 +* 第二引数には、subscribeイベントが格納されたjsonsファイルを指定します。 +* 第三引数には、scan対象となるイベントが格納されたjsonsファイルを指定します。 + +# イベントの形式の説明 + +ここで使用しているイベントの形式は、実際にdroongaで採用されるpub/subコマンドの +形式と必ずしも一致しないかも知れません。 +ここでは単独で実行するテストのための都合しか考慮されていません。 + +## subscribeイベント + +subscribeイベントは以下の要素を含むObjectです。 + + user: 購読するユーザの識別子です + route: ユーザに検出したイベントを配送するときの接続先を示す文字列です + condition: 購読する検索条件です。以下の形式で条件を与えます。 + + OR条件: ["||", 条件1, 条件2,..] + AND条件: ["&&", 条件1, 条件2,..] + NOT条件: ["-", 条件1, 条件2,..] + + 配列の2番目以降の要素は文字列か条件のnestが指定できます。 + 文字列は、更新イベントのThread.nameかComment.bodyのいずれかにマッチすれば真となります。 + +## 実際の動作 + +### subscriptions.jsonsの内容 + + {"user":"user1", "route":"localhost:23003/output", "condition":["||", "aaa", "bbb"]} + {"user":"user2", "route":"localhost:23003/output", "condition":["&&", "aaa", "bbb"]} + {"user":"user3", "route":"localhost:23003/output", "condition":["-", "aaa", "bbb"]} + +ここでは、3人のユーザがそれぞれ異なる条件を購読しています。 + +### events.jsonsの内容 + + {"table":"Comment", "values":{"thread":"fuga/000", "body" : "abc aaa bb"}} + {"table":"Thread", "key":"hoge/000", "values":{"name" : "abc aaa bb"}} + {"table":"Comment", "values":{"thread":"fuga/000", "body" : "ddd eee"}} + {"table":"Comment", "values":{"thread":"hoge/000", "body" : "fff ggg"}} + +1行目は既存のスレッドへの新たな発言です。user1とuser3の購読する条件にヒットします。 +2行目は新規のスレッドが立てられたイベントです。1行目と同じ条件にヒットします。 +3行目は既存のスレッドへの新たな発言ですが、いずれの条件にもヒットしません。 +4行目については発言内容は条件にヒットしませんが、スレッド名がヒットします。 + +### 期待される出力 + + ["localhost:23003/output", ["user1", "user3"], {"table"=>"Comment", "values"=>{"thread"=>"fuga/000", "body"=>"abc aaa bb"}}] + ["localhost:23003/output", ["user1", "user3"], {"table"=>"Thread", "key"=>"hoge/000","values"=>{"name"=>"abc aaa bb"}}] + ["localhost:23003/output", ["user1", "user3"], {"table"=>"Comment", "values"=>{"thread"=>"hoge/000", "body"=>"fff ggg"}}] + +route毎にまとめて出力しています。実際にpublish機能がdroongaで実装されたときに、route単位でまとめて通知するのが効率が良いのかなぁと思ってそうしています。 + Added: push_example/scantest.rb (+202 -0) 100755 =================================================================== --- /dev/null +++ push_example/scantest.rb 2013-10-09 12:45:24 +0900 (b875deb) @@ -0,0 +1,202 @@ +#!/usr/bin/env ruby + +require "groonga" + +#command :subscribe +#command :unsubscribe +#command :scan + +def subscribe(request) + user, condition, query, route = parse_request(request) + query_table = @context['Query'] + query_record = query_table[query] + unless query_record + keywords = pick_keywords([], condition) + query_record = query_table.add(query, :keywords => keywords) + # todo: update Thread.queries value of corresponding threads + end + user_table = @context['User'] + user_record = user_table[user] + if user_record + subscriptions = user_record.subscriptions.collect do |query| + return if query == query_record + query + end + subscriptions << query_record + user_record.subscriptions = subscriptions + else + user_table.add(user, + :subscriptions => [query_record], + :route => route) + end +end + +def unsubscribe(request) + user, condition, query, route = parse_request(request) + query_table = @context['Query'] + query = query_table[query] + return unless query + user_table = @context['User'] + user_record = user_table[user] + return unless user_record + subscriptions = user_record.subscriptions.select do |q| + q != query + end + user_record.subscriptions = subscriptions + # todo: update Thread.queries value when subscribers no longer exist +end + +def scan(request) + table = request["table"] + values = request["values"] + raise "invalid values" unless values.is_a? Hash + hits = [] + case table + when "Thread" + return if values["name"].nil? || values["name"].empty? + scan_body(hits, values["name"]) + thread_table = @context['Thread'] + thread_record = thread_table[request["key"]] + if thread_record + thread_record.queries = hits + else + thread_table.add(request["key"], + :name => values["name"], + :queries => hits) + end + when "Comment" + scan_thread(hits, values["thread"]) + scan_body(hits, values["body"]) + end + publish(hits, request) +end + +#private +def scan_thread(hits, thread) + thread_table = @context['Thread'] + thread_record = thread_table[thread] + return unless thread_record + thread_record.queries.each do |query| + hits << query + end +end + +def scan_body(hits, body) + trimmed = body.strip + candidates = {} + @context['Keyword'].scan(trimmed).each do |keyword, word, start, length| + @context['Query'].select do |query| + query.keywords =~ keyword + end.each do |record| + candidates[record.key] ||= [] + candidates[record.key] << keyword + end + end + candidates.each do |query, keywords| + hits << query if query_match(query, keywords) + end +end + +def publish(hits, request) + routes = {} + hits.each do |query| + @context['User'].select do |user| + user.subscriptions =~ query + end.each do |user| + routes[user.route.key] ||= [] + routes[user.route.key] << user.key.key + end + end + routes.each do |route, users| + p [route, users, request] + end +end + +def query_match(query, keywords) + return true unless EXACT_MATCH + @conditions = {} unless @conditions + condition = @conditions[query.id] + unless condition + condition = JSON.parse(query.key) + @conditions[query.id] = condition + # CAUTION: @conditions can be huge. + end + words = {} + keywords.each do |keyword| + words[keyword.key] = true + end + eval_condition(condition, words) +end + +def eval_condition(condition, words) + case condition + when Hash + # todo + when String + words[condition] + when Array + case condition.first + when "||" + condition[1..-1].each do |element| + return true if eval_condition(element, words) + end + false + when "&&" + condition[1..-1].each do |element| + return false unless eval_condition(element, words) + end + true + when "-" + return false unless eval_condition(condition[1], words) + condition[2..-1].each do |element| + return false if eval_condition(element, words) + end + true + end + end +end + +def parse_request(request) + user = request["user"] + condition = request["condition"] + route = request["route"] + raise "invalid request" if user.nil? || user.empty? || condition.nil? + query = condition.to_json + raise "too long query" if query.size > 4095 + [user, condition, query, route] +end + +def pick_keywords(memo, condition) + case condition + when Hash + memo << condition["query"] + when String + memo << condition + when Array + condition[1..-1].each do |element| + pick_keywords(memo, element) + end + end + memo +end + + +########## main ########## + +EXACT_MATCH = true + +database, subscriptions, events = ARGV + �� context = Groonga::Context.new + �� context.open_database(database) + +if File.exists?(subscriptions) + open(subscriptions).each do |line| + subscribe(JSON.parse(line)) + end +end + +if File.exists?(events) + open(events).each do |line| + scan(JSON.parse(line)) + end +end Added: push_example/subscriptions.jsons (+3 -0) 100644 =================================================================== --- /dev/null +++ push_example/subscriptions.jsons 2013-10-09 12:45:24 +0900 (d32e6b7) @@ -0,0 +1,3 @@ +{"user":"user1", "route":"localhost:23003/output", "condition":["||", "aaa", "bbb"]} +{"user":"user2", "route":"localhost:23003/output", "condition":["&&", "aaa", "bbb"]} +{"user":"user3", "route":"localhost:23003/output", "condition":["-", "aaa", "bbb"]} -------------- next part -------------- HTML����������������������������...Download