-
Notifications
You must be signed in to change notification settings - Fork 6.7k
streaming code used to hang when it was asked to "exit" - fixed now #635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7b4bfc7
6e3a073
1559d95
d3be2f8
a5d26c9
c0f8e5a
9648066
a891544
decf575
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,12 +73,10 @@ def _audio_data_generator(buff): | |
| A chunk of data that is the aggregate of all chunks of data in `buff`. | ||
| The function will block until at least one data chunk is available. | ||
| """ | ||
| while True: | ||
| # Use a blocking get() to ensure there's at least one chunk of data | ||
| stop = False | ||
| while not stop: | ||
| # Use a blocking get() to ensure there's at least one chunk of data. | ||
| chunk = buff.get() | ||
| if not chunk: | ||
| # A falsey value indicates the stream is closed. | ||
| break | ||
| data = [chunk] | ||
|
|
||
| # Now consume whatever other data's still buffered. | ||
|
|
@@ -87,22 +85,31 @@ def _audio_data_generator(buff): | |
| data.append(buff.get(block=False)) | ||
| except queue.Empty: | ||
| break | ||
|
|
||
| # If `_fill_buffer` adds `None` to the buffer, the audio stream is | ||
| # closed. Yield the final bit of the buffer and exit the loop. | ||
| if None in data: | ||
| stop = True | ||
| data.remove(None) | ||
| yield b''.join(data) | ||
|
|
||
|
|
||
| def _fill_buffer(audio_stream, buff, chunk): | ||
| def _fill_buffer(audio_stream, buff, chunk, stoprequest): | ||
| """Continuously collect data from the audio stream, into the buffer.""" | ||
| try: | ||
| while True: | ||
| while not stoprequest.is_set(): | ||
| buff.put(audio_stream.read(chunk)) | ||
| except IOError: | ||
| # This happens when the stream is closed. Signal that we're done. | ||
| pass | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without this, you'll have a race condition. ie if |
||
| finally: | ||
| # Add `None` to the buff, indicating that a stop request is made. | ||
| # This will signal `_audio_data_generator` to exit. | ||
| buff.put(None) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comment here describing what putting
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
|
||
|
|
||
| # [START audio_stream] | ||
| @contextlib.contextmanager | ||
| def record_audio(rate, chunk): | ||
| def record_audio(rate, chunk, stoprequest): | ||
| """Opens a recording stream in a context manager.""" | ||
| audio_interface = pyaudio.PyAudio() | ||
| audio_stream = audio_interface.open( | ||
|
|
@@ -120,14 +127,13 @@ def record_audio(rate, chunk): | |
| # This is necessary so that the input device's buffer doesn't overflow | ||
| # while the calling thread makes network requests, etc. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any particular reason not to stop the stream before closing? While I'm not certain it's necessary, all the examples stop the stream before closing it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once the stream is stopped, one may not call write or read. And thread _fill_buffer gets stuck. |
||
| fill_buffer_thread = threading.Thread( | ||
| target=_fill_buffer, args=(audio_stream, buff, chunk)) | ||
| target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest)) | ||
| fill_buffer_thread.start() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any particular reason to remove this line? It seems like good hygiene..
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That might have been a mistake. I will restore it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
|
||
| yield _audio_data_generator(buff) | ||
|
|
||
| audio_stream.stop_stream() | ||
| audio_stream.close() | ||
| fill_buffer_thread.join() | ||
| audio_stream.close() | ||
| audio_interface.terminate() | ||
| # [END audio_stream] | ||
|
|
||
|
|
@@ -166,7 +172,7 @@ def request_stream(data_stream, rate, interim_results=True): | |
| yield cloud_speech.StreamingRecognizeRequest(audio_content=data) | ||
|
|
||
|
|
||
| def listen_print_loop(recognize_stream): | ||
| def listen_print_loop(recognize_stream, stoprequest): | ||
| num_chars_printed = 0 | ||
| for resp in recognize_stream: | ||
| if resp.error.code != code_pb2.OK: | ||
|
|
@@ -198,6 +204,7 @@ def listen_print_loop(recognize_stream): | |
| # one of our keywords. | ||
| if re.search(r'\b(exit|quit)\b', transcript, re.I): | ||
| print('Exiting..') | ||
| stoprequest.set() | ||
| break | ||
|
|
||
| num_chars_printed = 0 | ||
|
|
@@ -206,9 +213,17 @@ def listen_print_loop(recognize_stream): | |
| def main(): | ||
| with cloud_speech.beta_create_Speech_stub( | ||
| make_channel('speech.googleapis.com', 443)) as service: | ||
|
|
||
| # stoprequest is event object which is set in `listen_print_loop` | ||
| # to indicate that the trancsription should be stopped. | ||
| # | ||
| # The `_fill_buffer` thread checks this object, and closes | ||
| # the `audio_stream` once it's set. | ||
| stoprequest = threading.Event() | ||
|
|
||
| # For streaming audio from the microphone, there are three threads. | ||
| # First, a thread that collects audio data as it comes in | ||
| with record_audio(RATE, CHUNK) as buffered_audio_data: | ||
| with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data: | ||
| # Second, a thread that sends requests with that data | ||
| requests = request_stream(buffered_audio_data, RATE) | ||
| # Third, a thread that listens for transcription responses | ||
|
|
@@ -220,7 +235,7 @@ def main(): | |
|
|
||
| # Now, put the transcription responses to use. | ||
| try: | ||
| listen_print_loop(recognize_stream) | ||
| listen_print_loop(recognize_stream, stoprequest) | ||
|
|
||
| recognize_stream.cancel() | ||
| except face.CancellationError: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - nice catch.