• R/O
  • SSH

Commit

Frequently used words (click to add to your profile)

javac++androidlinuxc#windowsobjective-ccocoa誰得qtpythonphprubygameguibathyscaphec計画中(planning stage)翻訳omegatframeworktwitterdomtestvb.netdirectxゲームエンジンbtronarduinopreviewer

High performance Asterisk AudioSocket server implementation in asynchronous Python.


Commit MetaInfo

Revisioncbeb3946c013a77a14a2f80152594e496628c9b6 (tree)
Time2023-03-10 03:12:14
AuthorSsor
CommiterSsor

Log Message

Initial commit.

Change Summary

Incremental Difference

diff -r 000000000000 -r cbeb3946c013 audiosocket.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/audiosocket.py Thu Mar 09 12:12:14 2023 -0600
@@ -0,0 +1,236 @@
1+import asyncio
2+
3+
4+__all__ = [
5+ "HANGUP_CALL_MESSAGE",
6+ "start_server",
7+ "AsteriskFrameForwardError",
8+ "AsteriskMemoryAllocError",
9+ "AudioSocketReadError"
10+]
11+
12+__version__ = 1.0
13+
14+
15+HANGUP_CALL_MESSAGE = b""
16+"""Can be returned from the callback function passed to `start_server` to
17+manually hangup the call represented by the value of the `uuid` parameter.
18+See documentation for `start_server()` for more information.
19+"""
20+
21+
22+class AsteriskFrameForwardError(Exception):
23+
24+ """Raised when the AudioSocket Asterisk module is unable forward a an audio
25+ frame.
26+ """
27+
28+ pass
29+
30+
31+class AsteriskMemoryAllocError(Exception):
32+
33+ """Raised when the AudioSocket Asterisk module is unable to allocate memory.
34+ """
35+
36+ pass
37+
38+
39+class AudioSocketReadError(Exception):
40+
41+ """Raised when an unexpected header or payload size is received from
42+ Asterisk.
43+ """
44+
45+
46+# AudioSocket message kinds (silence and hangup are never sent by Asterisk):
47+_KIND_HANGUP = 0x00
48+_KIND_UUID = 0x01
49+_KIND_SILENCE = 0x02
50+_KIND_AUDIO = 0x10
51+_KIND_AUDIO_AS_BYTES = b"\x10"
52+_KIND_ERROR = 0xff
53+
54+
55+# AudioSocket errors that can occur on Asterisk's end (technically call hangup
56+# is another, but we don't want to treat it as such on our end):
57+_ERROR_FRAME_FORWARD = 0x02
58+_ERROR_MEMEORY_ALLOC = 0x04
59+
60+
61+async def start_server(on_audio_callback,
62+ on_exception_callback,
63+ host = None,
64+ port = None,
65+ **kwargs):
66+
67+ """Returns an `asyncio.Server` instance using the AudioSocket protocol has
68+ its protocol factory. This function behaves just like
69+ `asyncio.start_server()`, meaning the `host`, `port` and keyword arguments
70+ accepted by that function have the same behavior with this one. Usually,
71+ `await server_forever()` is called on the returned server instance to start
72+ accepting connections.
73+
74+ The two callback arguments are expected to have the following function
75+ signatures.
76+
77+ `on_audio_callback`, called any time audio data from the connected peer is
78+ received. It is expected to be a function that accepts the following
79+ arguments:
80+
81+ - `uuid`: The universally unique ID that identifies this specific
82+ call's audio, provided as a hexdecimal string.
83+
84+ - `peer_name`: A tuple consisting of the IP address and port number of the
85+ remote host the audio is being sent from.
86+
87+ - `audio`: A `bytearray` instance containing the received audio data.
88+ An empty `bytearray` instance (`len(audio) == 0`)
89+ indicates the call hung up and no more audio will
90+ be received. Audio is either encoded in 8KHz, 16-bit
91+ mono PCM (when using the standalone dialplan applcation), or
92+ whatever audio codec was decided upon during call setup
93+ (when using Dial() application). If this argument is empty
94+ (has a length of 0), the call has been hung up and will not
95+ generate any more audio.
96+
97+ - Any extra keyword arguments are passed along to
98+ `asyncio.loop.create_server()`.
99+
100+ To send audio back to Asterisk, a bytes-like object must be returned by
101+ this callback. Audio must be sent back in chunks of 65,536 bytes or less
102+ (the size must be able to fit into a 16-bit unsigned integer). This audio must
103+ always be encoded as 8KHz, 16-bit mono PCM, regardless of the codec in use
104+ for the call.
105+
106+ Returning `audiosocket.HANGUP_CALL_MESSAGE` (or an empty `bytearray` instance)
107+ will request that the call represented by the value of the `uuid` parameter
108+ be hungup.
109+
110+ `on_exception_callback`, called any time an exception relating to the
111+ connected peer is raised. It is expected to be a function that accepts the
112+ following arguments:
113+
114+ - `uuid`: The universally unique ID that identifies the specific
115+ call which caused the exception.
116+
117+ - `peer_name`: A tuple consisting of the IP address and port number of the
118+ remote host the exception-causing call came from.
119+
120+ - `error`: An instance of the exception that occurred.
121+ """
122+
123+ def factory():
124+ protocol = _AudioSocketProtocol(on_audio_callback, on_exception_callback)
125+ return protocol
126+
127+ loop = asyncio.get_running_loop()
128+ return await loop.create_server(factory, host, port, **kwargs)
129+
130+
131+class _AudioSocketProtocol(asyncio.BufferedProtocol):
132+
133+ def __init__(self, on_audio, on_exception):
134+ super().__init__()
135+
136+ self._on_audio = on_audio
137+ self._on_exception = on_exception
138+ self._transport = None
139+ self._peer_name = None
140+ self._uuid = ""
141+ self._active_kind = None
142+ self._paused = False
143+ self._buffer = None
144+ self._write_spillover_buffer = []
145+ self._next_read_size = 3
146+
147+
148+ def _convey_or_raise_exception(self, exception):
149+ if callable(self._on_exception):
150+ self._on_exception(self._uuid, self._peer_name, exception)
151+ else:
152+ raise exception
153+
154+
155+ def connection_made(self, transport):
156+ self._peer_name = transport.get_extra_info("peername")
157+ self._transport = transport
158+
159+
160+ def connection_lost(self, exception):
161+ self._on_audio(self._uuid, self._peer_name, bytearray(0))
162+ if exception:
163+ self._convey_or_raise_exception(exception)
164+
165+
166+ def pause_writing(self):
167+ self._paused = True
168+
169+
170+ def resume_writing(self):
171+ self._paused = False
172+
173+
174+ def get_buffer(self, size_hint):
175+ self._buffer = bytearray(self._next_read_size)
176+ return self._buffer
177+
178+
179+ def buffer_updated(self, byte_count):
180+ if self._active_kind == None:
181+
182+ if byte_count != 3:
183+ self._convey_or_raise_exception(AudioSocketReadError(msg))
184+ return
185+
186+ self._active_kind = self._buffer[0]
187+ # TODO: Convey error on unknown message kind?
188+ self._next_read_size = int.from_bytes(self._buffer[1:3],
189+ byteorder = "big")
190+ return
191+
192+ if byte_count != self._next_read_size:
193+ msg = "Expected {} byte payload, but received {}.".format(
194+ self._next_read_size,
195+ byte_count
196+ )
197+ self._convey_or_raise_exception(AudioSocketReadError(msg))
198+ self._active_kind = None
199+
200+
201+ if self._active_kind == _KIND_AUDIO:
202+ to_send = self._on_audio(self._uuid, self._peer_name, self._buffer)
203+
204+ if to_send != None:
205+ if len(to_send) == 0:
206+ self._transport.write(b"\x00\x00\x00")
207+ self._transport.close()
208+ return
209+
210+ if self._paused:
211+ self._write_spillover_buffer.insert(0, to_send)
212+ return
213+
214+ if self._write_spillover_buffer:
215+ self._write_spillover_buffer.insert(0, to_send)
216+ to_send = self._write_spillover_buffer.pop()
217+
218+ audio_size = len(to_send).to_bytes(length = 2, byteorder = "big")
219+ to_send = _KIND_AUDIO_AS_BYTES + audio_size + to_send
220+ self._transport.write(to_send)
221+
222+ elif self._active_kind == _KIND_UUID:
223+ self._uuid = self._buffer[:byte_count].hex()
224+
225+ elif self._active_kind == _KIND_ERROR:
226+ if self._buffer[0] == _ERROR_FRAME_FORWARD:
227+ raise AsteriskFrameForwardError()
228+ elif self._buffer[0] == _ERROR_MEMEORY_ALLOC:
229+ raise AsteriskMemoryAllocError()
230+
231+ self._active_kind = None
232+ self._next_read_size = 3
233+
234+
235+ def eof_received(self):
236+ return False
diff -r 000000000000 -r cbeb3946c013 examples/google_stt_realtime_transcription.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/google_stt_realtime_transcription.py Thu Mar 09 12:12:14 2023 -0600
@@ -0,0 +1,187 @@
1+import asyncio
2+
3+from threading import Thread
4+from time import time
5+from queue import SimpleQueue
6+from socket import (
7+ gethostname,
8+ gethostbyname
9+)
10+
11+from google.cloud import speech
12+
13+import audiosocket
14+
15+
16+STREAM_TIME_LIMIT = 30.0 # 30 seconds.
17+
18+STREAM_CODEC = speech.RecognitionConfig.AudioEncoding.MULAW
19+# `OGG_OPUS`, `WEBM_OPUS`, `SPEEX_WITH_HEADER_BYTE` and `LINEAR16` are other
20+# available codecs that Asterisk can encode calls in.
21+
22+BIND_ADDRESS = (gethostbyname(gethostname()), 55150)
23+
24+STT_CONFIG = speech.RecognitionConfig(
25+ encoding = STREAM_CODEC,
26+ sample_rate_hertz = 8000,
27+ audio_channel_count = 1,
28+ model = "phone_call",
29+ language_code = "en-US"
30+)
31+
32+STT_STREAMING_CONFIG = speech.StreamingRecognitionConfig(
33+ config = STT_CONFIG,
34+ single_utterance = False,
35+ interim_results = True
36+)
37+
38+
39+class AudioSocketStream():
40+
41+ def __init__(self):
42+ super().__init__()
43+ self._input_audio_queue = SimpleQueue()
44+ self._active_uuids = []
45+ self._hang_up = False
46+
47+
48+ def audio_generator(self):
49+ while True:
50+ audio = self._input_audio_queue.get()
51+ if len(audio) == 0:
52+ return
53+ else:
54+ yield audio
55+
56+
57+ def hang_up(self):
58+ self._hang_up = True
59+
60+
61+ def on_exception(self, uuid, peer_name, exception):
62+ print(f"Call with UUID {uuid} from {peer_name} caused exception:")
63+ print(exception)
64+
65+
66+ def on_audio(self, uuid, peer_name, audio):
67+
68+ #print(f"UUID: {uuid}\nPeer name: {peer_name}\nAudio length: {len(audio)}")
69+
70+ if len(audio) == 0:
71+ if uuid in self._active_uuids:
72+ print("Call " + uuid + " is over.")
73+ self._input_audio_queue.put(b"")
74+ self._active_uuids.remove(uuid)
75+ self._hang_up = False
76+ return
77+
78+ if self._hang_up:
79+ return audiosocket.HANGUP_CALL_MESSAGE
80+
81+ # To keep the example simple, only one call is allowed to be transcribed at
82+ # a time. All other incoming calls are simply hung up if there is
83+ # already an active transcription being performed.
84+
85+ if uuid not in self._active_uuids:
86+ if len(self._active_uuids) == 1:
87+ print("Received another call while already transcribing one, " \
88+ + "hanging up...")
89+ return audiosocket.HANGUP_CALL_MESSAGE
90+ else:
91+ print(f"Received call {uuid} from peer {peer_name}, " \
92+ + "beginning transcription....")
93+ self._active_uuids.append(uuid)
94+
95+ self._input_audio_queue.put(bytes(audio))
96+ # Google cloud speech API expects a `bytes` object
97+ # (the AudioSocket callback returns a `bytearray`).
98+
99+
100+def transcribe(as_stream):
101+ stt_client = speech.SpeechClient()
102+
103+ while True:
104+
105+ print("Waiting for call to start... (say \"stop\", or simply hang up " \
106+ + "the call to end the transcription. There is a hard time limit of " \
107+ + str(STREAM_TIME_LIMIT) + " seconds, this can be changed with the " \
108+ + "`STREAM_TIME_LIMIT` variable.)"
109+ )
110+
111+ for audio in as_stream.audio_generator():
112+ # Wait until the first packet arrives, then start sending data to
113+ # Google's API.
114+ break
115+
116+ start_time = time()
117+ print("Call started.")
118+
119+ requests = (
120+ speech.StreamingRecognizeRequest(audio_content = content)
121+ for content in as_stream.audio_generator()
122+ )
123+
124+ responses = stt_client.streaming_recognize(STT_STREAMING_CONFIG, requests)
125+
126+ print("Interim result:")
127+ for response in responses:
128+
129+ print(response)
130+ # Responses typically look like the following:
131+ #
132+ # results {
133+ # alternatives {
134+ # transcript: "hello hello 123"
135+ # confidence: 0.58563143
136+ # }
137+ # is_final: true
138+ # result_end_time {
139+ # seconds: 4
140+ # nanos: 960000000
141+ # }
142+ # language_code: "en-us"
143+ # }
144+ # total_billed_time {
145+ # seconds: 5
146+ # }
147+ # request_id: 00000000000000000
148+
149+ if time() - start_time > STREAM_TIME_LIMIT:
150+ print("Hanging up because stream duration limit of " \
151+ + str(STREAM_TIME_LIMIT) + " seconds reached."
152+ )
153+ as_stream.hang_up()
154+ break
155+
156+ elif "stop" in response.results[0].alternatives[0].transcript:
157+ print("Hanging up because the \"Stop\" keyword was detected.")
158+ as_stream.hang_up()
159+ break
160+
161+
162+async def main():
163+ as_stream = AudioSocketStream()
164+
165+ server = await audiosocket.start_server(
166+ as_stream.on_audio,
167+ as_stream.on_exception,
168+ host = BIND_ADDRESS[0],
169+ port = BIND_ADDRESS[1]
170+ )
171+
172+ transcription_thread = Thread(
173+ target = transcribe,
174+ args = (as_stream,)
175+ )
176+
177+ transcription_thread.start()
178+
179+ print(f"Audiosocket server listening at {BIND_ADDRESS}, expecting" \
180+ + " calls to be encoded in MU-LAW, this can be changed with the " \
181+ + "`STREAM_CODEC` variable. Only one call can be transcribed at a time.\n")
182+
183+ await server.serve_forever()
184+
185+
186+if __name__ == "__main__":
187+ asyncio.run(main())
diff -r 000000000000 -r cbeb3946c013 examples/muffin_telephone.wav
Binary file examples/muffin_telephone.wav has changed
diff -r 000000000000 -r cbeb3946c013 examples/requirements.txt
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/requirements.txt Thu Mar 09 12:12:14 2023 -0600
@@ -0,0 +1,1 @@
1+google-cloud-speech
diff -r 000000000000 -r cbeb3946c013 examples/wave_playback.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/wave_playback.py Thu Mar 09 12:12:14 2023 -0600
@@ -0,0 +1,69 @@
1+import asyncio
2+import wave
3+from socket import gethostname, gethostbyname
4+
5+import audiosocket
6+
7+
8+BIND_ADDRESS = (gethostbyname(gethostname()), 55150)
9+
10+SAMPLE_SONG = wave.open("muffin_telephone.wav", mode = "r")
11+SAMPLE_SONG = SAMPLE_SONG.readframes(SAMPLE_SONG.getnframes())
12+SAMPLE_SONG_LENGTH = len(SAMPLE_SONG)
13+
14+
15+class CallState:
16+
17+ def __init__(self):
18+ self.call_frame_count = 0
19+ self.playback_slice_start = 0
20+ self.playback_slice_end = 320
21+
22+
23+call_states = {}
24+
25+
26+def on_audio(uuid, peer_name, audio):
27+ if len(audio) == 0:
28+ print("Call with UUID " + uuid + " was hungup.")
29+ call_states.pop(uuid)
30+ return
31+
32+ if uuid in call_states.keys():
33+ state = call_states[uuid]
34+ else:
35+ print("Received new call with UUID of " + uuid)
36+ state = CallState()
37+ call_states.update({uuid: state})
38+
39+ slice = SAMPLE_SONG[state.playback_slice_start:state.playback_slice_end]
40+
41+ state.call_frame_count += 1
42+ state.playback_slice_start += 320
43+ state.playback_slice_end += 320
44+
45+ if state.playback_slice_start >= SAMPLE_SONG_LENGTH:
46+ return audiosocket.HANGUP_CALL_MESSAGE
47+ else:
48+ return slice
49+
50+
51+def on_exception(uuid, peer_name, exception):
52+ print(f"Call with UUID {uuid} from {peer_name} caused exception:")
53+ print(exception)
54+
55+
56+async def main():
57+ print(f"Server listening at {BIND_ADDRESS}")
58+
59+ server = await audiosocket.start_server(
60+ on_audio,
61+ on_exception,
62+ host = BIND_ADDRESS[0],
63+ port = BIND_ADDRESS[1]
64+ )
65+
66+ await server.serve_forever()
67+
68+
69+asyncio.run(main())
diff -r 000000000000 -r cbeb3946c013 readme.md
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/readme.md Thu Mar 09 12:12:14 2023 -0600
@@ -0,0 +1,120 @@
1+# Python AudioSocket Server
2+
3+A high performance Asterisk AudioSocket server written in asynchronous Python.
4+
5+## AudioSocket Introduction
6+
7+[AudioSocket](https://github.com/CyCoreSystems/audiosocket) is a relatively new addition to the [Asterisk IP-PBX project](https://www.asterisk.org/)
8+that allows external applications to easily access the raw audio (both read/write) of
9+active calls, as opposed to audio only being available via RTP.
10+
11+Such external applications act as a TCP server that Asterisk connects to
12+after encountering an appropriate line of code in the dialplan. After, Asterisk
13+will begin sending audio to the specified address/port (the encoding of this
14+audio varies, as discussed in the tips section). This module is an
15+implementation of such a TCP server, in Python.
16+
17+## Module Usage
18+
19+*Note: Formal documentation is provided below and in the module file itself, for
20+real-world examples, see the the `examples` directory in the repository.*
21+
22+This Python module provides a clean and high performance interface for dealing
23+with AudioSocket connections. To take advantage of the conventions established
24+by the Python standard library, and to offer the best performance, this module
25+is modeled around Python's concept of [Protocols](https://docs.python.org/3.11/library/asyncio-protocol.html).
26+
27+Underneath, this means that all connections are handled within a single thread,
28+and I/O notification primitives provided by the underlying OS are used to
29+signal when there is activity on any given connection, preventing the need to
30+have an full OS thread per-connection. As for users of this module, this means
31+all interaction with it occurs via two callbacks, and requires `asyncio` to
32+be imported as well.
33+
34+The entire module is implemented within a single file, `audiosocket.py`. To
35+start using it, simply place the file somewhere your project can import it
36+from. After importing it (`import audiosocket`) and `asyncio`, the server can
37+be setup with two function calls - just like `asyncio`'s own `start_server` -
38+by invoking:
39+```python
40+server = await audiosocket.start_server(<on_audio_callback>, <on_exception_callback>, <address>, <port>)`
41+await server.serve_forever()
42+```
43+
44+`start_server()` behaves just like `asyncio.start_server()`, meaning the
45+`host`, `port` and keyword arguments accepted by that function have the same
46+behavior with this one.
47+
48+The function callbacks should accept the following arguments:
49+
50+`on_audio_callback`, called any time audio data from the connected peer is
51+received. It is expected to be a function that accepts the following
52+arguments:
53+ - `uuid`: The universally unique ID that identifies this specific
54+ call's audio, provided as a hexdecimal string.
55+
56+ - `peer_name`: A tuple consisting of the IP address and port number of the
57+ remote host the audio is being sent from.
58+
59+ - `audio`: A `bytearray` instance containing the received audio data.
60+ An empty `bytearray` instance (`len(audio) == 0`)
61+ indicates the call hung up and no more audio will
62+ be received. Audio is either encoded in 8KHz, 16-bit
63+ mono PCM (when using the standalone dialplan applcation), or
64+ whatever audio codec was decided upon during call setup
65+ (when using Dial() application). If this argument is empty
66+ (has a length of 0), the call has been hung up and will not
67+ generate any more audio.
68+
69+ - Any extra keyword arguments are passed along to
70+ `asyncio.loop.create_server()`.
71+
72+To send audio back to Asterisk, a bytes-like object must be returned by
73+this callback. Audio must be sent back in chunks of 65,536 bytes or less
74+(the size must be able to fit into a 16-bit unsigned integer). This audio must
75+always be encoded as 8KHz, 16-bit mono PCM, regardless of the codec in use
76+for the call.
77+
78+Returning `audiosocket.HANGUP_CALL_MESSAGE` (or an empty `bytearray` instance)
79+will request that the call represented by the value of the `uuid` parameter
80+be hungup.
81+
82+`on_exception_callback`, called any time an exception relating to the
83+connected peer is raised. It is expected to be a function that accepts the
84+following arguments:
85+ - `uuid`: The universally unique ID that identifies the specific
86+ call which caused the exception.
87+
88+ - `peer_name`: A tuple consisting of the IP address and port number of the
89+ remote host the exception-causing call came from.
90+
91+ - `error`: An instance of the exception that occurred.
92+
93+## Tips
94+
95+- There are two ways to begin an AudioSocket connection within Asterisk, via a
96+standalone dialplan application (`AudioSocket(<uuid>,<address:port>)`) or as a
97+channel driver to `Dial()` (`Dial(AudioSocket/<address:port>/<uuid>)`). Using
98+the standalone application will cause Asterisk to send the server 8KHz, 16-bit
99+mono PCM (see Issues section). Using the `Dial()` application will cause
100+Asterisk to send audio encoded in whatever codec was agreed upon during call
101+setup (most commonly ULAW).
102+
103+- Since the programming model with this module is callback-based (which is not
104+all that common), as opposed to making state that must persist between function
105+calls global, a recommended approach is to make the callback functions methods
106+in a class. That way instance variables can be used to reduce the scope of such
107+variables, while still allowing the callback paradigm to work as intended.
108+
109+## Issues (as of my testing with Asterisk 18)
110+
111+- If the standalone dialplan application is used to initiate an AudioSocket
112+connection, delivering PCM audio to th server, the CPU core/thread the
113+connection within Asterisk is running on will reach 100% utilization (likely
114+a runaway loop, which I will try to identify and fix/report to them).
115+
116+
117+- If there is a wireless medium at any point in the network link between
118+Asterisk and the AudioSocket server, the Asterisk module encounters strange
119+read/write errors. This is a problem with the Asterisk module/Asterisk itself.
120+