Develop and Download Open Source Software

Browse Subversion Repository

Contents of /branches/ProcessWatchImpl/Muffin/OriginalAlertApiReceiever.cs

Parent Directory Parent Directory | Revision Log Revision Log


Revision 44 - (show annotations) (download)
Tue Dec 15 16:20:54 2009 UTC (14 years, 3 months ago) by netseed
File size: 8836 byte(s)


1 using System;
2 using System.IO;
3 using System.Net.Sockets;
4 using System.Text;
5 using System.Text.RegularExpressions;
6 using System.Threading;
7 using SeedCreate.CommonLibrary.Threading;
8 using dbg = SeedCreate.CommonLibrary.DebugHelper;
9
10
11 namespace Toast.NicoNico
12 {
13 public sealed class OriginalAlertApiReceiever:IDisposable
14 {
15 private static readonly string RequestFormat =
16 "<?xml version=\"1.0\" encoding=\"utf-8\" ?> <thread thread=\"{0}\" version=\"20061206\" res_from=\"-200\"/>\0";
17
18 private static readonly Regex Splitter;
19
20
21 private class DelayController:IDisposable
22 {
23 private static readonly TimeSpan RecoverLongDelay = new TimeSpan(0, 10, 0);
24 private static readonly TimeSpan RecoverShortDelay = new TimeSpan(0, 0, 10);
25 private static readonly int RetryThreshold = 2;
26
27 private AutoResetEvent delayGate;
28 private int tryCount;
29
30 public DelayController()
31 {
32 delayGate = new AutoResetEvent(false);
33 tryCount = 0;
34 }
35
36 public void WaitOne()
37 {
38 lock (delayGate)
39 {
40 if (++tryCount > RetryThreshold)
41 {
42 delayGate.WaitOne(RecoverLongDelay);
43 }
44 else
45 {
46 delayGate.WaitOne(RecoverShortDelay);
47 }
48 }
49 }
50
51 public void ResetCount()
52 {
53 lock (delayGate)
54 {
55 tryCount = 0;
56 }
57 }
58
59 public void Dispose()
60 {
61 delayGate.Close();
62 GC.SuppressFinalize(this);
63 }
64 }
65
66 public event EventHandler<DetectProgramStartEventArgs> ProgramStarted;
67 public event EventHandler<ReceieverStatusChangedEventArgs> StatusChanged;
68
69 private Thread recieveWorker;
70 private Thread eventWorker;
71
72 private ExclusiveQueue<LiveProgram> queue;
73
74 private ReceieverStatus status;
75 private readonly object syncRoot = new object();
76 private DelayController delayGate;
77 private bool isRunning;
78
79
80 static OriginalAlertApiReceiever()
81 {
82 Splitter = new Regex("^<.*?>(?<id>.*?),(?<comunity>.*?),(?<user>.*?)<.*?>.*$");
83
84 }
85
86 public OriginalAlertApiReceiever(uint myId)
87 {
88 recieveWorker = new Thread(new ThreadStart(RecieveProcess));
89 recieveWorker.Name = "ReceieverWorker";
90 recieveWorker.IsBackground = true;
91
92 eventWorker = new Thread(new ThreadStart(EventProcess));
93 eventWorker.Name = "EventWorker";
94 eventWorker.IsBackground = true;
95
96 queue = new ExclusiveQueue<LiveProgram>();
97
98 MyId = myId;
99 status = ReceieverStatus.PreRunning;
100
101 delayGate = new DelayController();
102 }
103
104 public uint MyId
105 {
106 get;
107 private set;
108 }
109
110 public ReceieverStatus Status
111 {
112 get
113 {
114 lock (syncRoot)
115 {
116 return status;
117 }
118 }
119 }
120
121 public bool Start()
122 {
123 lock (syncRoot)
124 {
125 if (isRunning)
126 {
127 return false;
128 }
129
130
131 recieveWorker.Start();
132 eventWorker.Start();
133 isRunning = true;
134 return true;
135 }
136 }
137
138 #if DEBUG_TESTING
139 private bool IsDebug=false;
140
141 public bool DebugStart()
142 {
143 if (recieveWorker.ThreadState == ThreadState.Running)
144 {
145 return false;
146 }
147
148 IsDebug = true;
149
150 recieveWorker.Start();
151 eventWorker.Start();
152 return true;
153
154 }
155 #endif
156
157 public bool Stop()
158 {
159 lock (syncRoot)
160 {
161 if (isRunning)
162 {
163 recieveWorker.Abort();
164 recieveWorker.Join();
165 eventWorker.Interrupt();
166 eventWorker.Join();
167
168 isRunning = false;
169 return true;
170 }
171 else
172 {
173 return false;
174 }
175
176 }
177 }
178
179 private void RecieveProcess()
180 {
181 try
182 {
183 for (; ; )
184 {
185 RequestResponse response = null;
186
187 try
188 {
189 response = PreConnectControl.GetConnectInformation();
190
191 byte[] buff = Encoding.UTF8.GetBytes(string.Format(RequestFormat, response.ThreadId));
192
193 using (TcpClient Reciever = new TcpClient())
194 {
195 Reciever.SendTimeout = Constants.DefaultTimeOut;
196 Reciever.ReceiveTimeout = Constants.DefaultTimeOut;
197 Reciever.Connect(response.Address, response.Port);
198
199 using (NetworkStream str = Reciever.GetStream())
200 {
201 str.Write(buff, 0, buff.Length);
202 str.Flush();
203 buff = null;
204
205 MemoryStream memstr = new MemoryStream();
206 int charbuff = 0;
207 StreamReader rdr = new StreamReader(memstr, Encoding.UTF8);
208
209
210 for (; ; )
211 {
212 charbuff = str.ReadByte();
213
214 if (charbuff == 0)
215 {
216 //NOTE:効率化のため、あり得ないコトやってる。完全に処理系依存なので、注意すること。
217 memstr.Position = 0;
218 #if !DEBUG_TESTING
219 Inspect(rdr.ReadLine());
220 #else
221 if(IsDebug)
222 {
223 DebugInspect(rdr.ReadLine());
224 }
225 else
226 {
227 Inspect(rdr.ReadLine());
228 }
229 #endif
230 memstr.Position = 0;
231 delayGate.ResetCount();
232 }
233 else
234 {
235 memstr.WriteByte((byte)charbuff);
236 }
237 }
238 }
239 }
240 }
241 catch (ThreadAbortException)
242 {
243 break;
244 }
245 catch (ThreadInterruptedException)
246 {
247 break;
248 }
249 catch
250 {
251 try
252 {
253 delayGate.WaitOne();
254 }
255 catch
256 {
257 throw;
258 }
259 continue;
260 }
261 }
262 }
263 catch (ThreadAbortException)
264 {
265 }
266 catch (ThreadInterruptedException)
267 {
268 }
269
270 }
271
272 private void EventProcess()
273 {
274 try
275 {
276 for (; ; )
277 {
278
279 var info = queue.Dequeue();
280 if (info.ValueU)
281 {
282 OnProgramStarted(this, new DetectProgramStartEventArgs(info.ValueT));
283 }
284 }
285 }
286 catch (ThreadAbortException)
287 {
288 }
289 catch (ThreadInterruptedException)
290 {
291 }
292 }
293
294 private void Inspect(string Source)
295 {
296 if (Splitter.IsMatch(Source == null ? string.Empty : Source))
297 {
298 uint UID = 0;
299 if (uint.TryParse(Splitter.Replace(Source, "${user}"), out UID))
300 {
301 if (MyId == UID)
302 {
303
304 //Console.WriteLine(UID);
305 uint lvid = 0;
306 if (uint.TryParse(Splitter.Replace(Source, "${id}"), out lvid))
307 {
308 RaiseEventPreProcess(lvid);
309 }
310 }
311 }
312 }
313 }
314
315 #if DEBUG_TESTING
316 private int Count = 0;
317
318 private void DebugInspect(string Source)
319 {
320 if (Splitter.IsMatch(Source == null ? string.Empty : Source))
321 {
322 if (++Count < 200)
323 {
324 return;
325 }
326
327 if (Count % 50 == 0)
328 {
329 if (Splitter.IsMatch(Source == null ? string.Empty : Source))
330 {
331 uint UID = 0;
332 if (uint.TryParse(Splitter.Replace(Source, "${user}"), out UID))
333 {
334
335 //Console.WriteLine(UID);
336 uint lvid = 0;
337 if (uint.TryParse(Splitter.Replace(Source, "${id}"), out lvid))
338 {
339 RaiseEventPreProcess(lvid);
340 }
341 }
342 }
343 }
344 }
345 }
346 #endif
347
348
349 private void RaiseEventPreProcess(uint Id)
350 {
351 try
352 {
353 LiveProgram program = QueryProgramDetail.Query(Id);
354
355 Console.WriteLine("RaiseEvent:{0}\n\t{1}\n\t{2}", program.Title, program.CommunityName, program.Description);
356 Console.WriteLine();
357
358 queue.Enqueue(program);
359 }
360 catch
361 {
362 //TODO:ここにエラーレポートの処理挟む
363 }
364 }
365
366
367 private void OnProgramStarted(object sender, DetectProgramStartEventArgs e)
368 {
369 if (ProgramStarted != null)
370 {
371 ProgramStarted(sender, e);
372 }
373 }
374
375 private void OnStatusChanged(object sender, ReceieverStatusChangedEventArgs e)
376 {
377 if (StatusChanged != null)
378 {
379 StatusChanged(sender, e);
380 }
381
382 lock (syncRoot)
383 {
384 status = e.Current;
385 }
386 }
387
388 #region IDisposable メンバ
389
390 public void Dispose()
391 {
392 dbg::TinyDebugWriter.WriteLine();
393 dbg::TinyDebugWriter.WriteLine("Dispose process start");
394
395 while (!recieveWorker.Join(Constants.DefaultJoinWait))
396 {
397 try
398 {
399 recieveWorker.Abort();
400 }
401 catch (ThreadStateException)
402 {
403 //No need to anyting.
404 dbg::TinyDebugWriter.WriteLine("RecieveWorker ThreadStateException occured");
405 }
406 }
407
408 dbg::TinyDebugWriter.WriteLine("Recieveworkker aborted");
409 dbg::TinyDebugWriter.WriteLine(string.Format("RecieveWorker.IsAlive={0}", recieveWorker.IsAlive));
410 dbg::TinyDebugWriter.WriteLine();
411
412 while (!eventWorker.Join(1000))
413 {
414 eventWorker.Interrupt();
415 dbg::TinyDebugWriter.WriteLine("ThreadState:{0}", eventWorker.ThreadState.ToString());
416 }
417
418 dbg::TinyDebugWriter.WriteLine("Eventworker interrupted");
419 dbg::TinyDebugWriter.WriteLine(string.Format("RecieveWorker.IsAlive={0}", recieveWorker.IsAlive));
420 dbg::TinyDebugWriter.WriteLine();
421
422
423 delayGate.Dispose();
424 queue.Dispose();
425
426 GC.SuppressFinalize(this);
427 }
428
429 #endregion
430 }
431 }

Back to OSDN">Back to OSDN
ViewVC Help
Powered by ViewVC 1.1.26