From 7b4bfc72efb1960323e37b1731dcc4eab65362fc Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Fri, 4 Nov 2016 15:24:12 -0700 Subject: [PATCH 1/9] added streaming fix when exit --- speech/grpc/transcribe_streaming.py | 33 +++++++++++++++++------------ 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 77a99fc779b..a3fb5467a86 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -64,7 +64,7 @@ def make_channel(host, port): return implementations.secure_channel(host, port, composite_channel) -def _audio_data_generator(buff): +def _audio_data_generator(buff, stoprequest): """A generator that yields all available data in the given buffer. Args: @@ -73,8 +73,8 @@ def _audio_data_generator(buff): A chunk of data that is the aggregate of all chunks of data in `buff`. The function will block until at least one data chunk is available. """ - while True: - # Use a blocking get() to ensure there's at least one chunk of data + while not stoprequest.isSet(): + # Use a blocking get() to ensure there's at least one chunk of data chunk = buff.get() if not chunk: # A falsey value indicates the stream is closed. @@ -90,19 +90,22 @@ def _audio_data_generator(buff): yield b''.join(data) -def _fill_buffer(audio_stream, buff, chunk): +def _fill_buffer(audio_stream, buff, chunk, stoprequest): """Continuously collect data from the audio stream, into the buffer.""" try: - while True: + while not stoprequest.isSet(): buff.put(audio_stream.read(chunk)) except IOError: # This happens when the stream is closed. Signal that we're done. buff.put(None) + audio_stream.stop_stream() + audio_stream.close() + # [START audio_stream] @contextlib.contextmanager -def record_audio(rate, chunk): +def record_audio(rate, chunk, stoprequest): """Opens a recording stream in a context manager.""" audio_interface = pyaudio.PyAudio() audio_stream = audio_interface.open( @@ -120,15 +123,12 @@ def record_audio(rate, chunk): # This is necessary so that the input device's buffer doesn't overflow # while the calling thread makes network requests, etc. fill_buffer_thread = threading.Thread( - target=_fill_buffer, args=(audio_stream, buff, chunk)) + target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest)) fill_buffer_thread.start() - yield _audio_data_generator(buff) + yield _audio_data_generator(buff, stoprequest) - audio_stream.stop_stream() - audio_stream.close() fill_buffer_thread.join() - audio_interface.terminate() # [END audio_stream] @@ -166,7 +166,7 @@ def request_stream(data_stream, rate, interim_results=True): yield cloud_speech.StreamingRecognizeRequest(audio_content=data) -def listen_print_loop(recognize_stream): +def listen_print_loop(recognize_stream, stoprequest): num_chars_printed = 0 for resp in recognize_stream: if resp.error.code != code_pb2.OK: @@ -198,6 +198,7 @@ def listen_print_loop(recognize_stream): # one of our keywords. if re.search(r'\b(exit|quit)\b', transcript, re.I): print('Exiting..') + stoprequest.set() break num_chars_printed = 0 @@ -208,7 +209,11 @@ def main(): make_channel('speech.googleapis.com', 443)) as service: # For streaming audio from the microphone, there are three threads. # First, a thread that collects audio data as it comes in - with record_audio(RATE, CHUNK) as buffered_audio_data: + + # stop request + stoprequest = threading.Event() + + with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data: # Second, a thread that sends requests with that data requests = request_stream(buffered_audio_data, RATE) # Third, a thread that listens for transcription responses @@ -220,7 +225,7 @@ def main(): # Now, put the transcription responses to use. try: - listen_print_loop(recognize_stream) + listen_print_loop(recognize_stream, stoprequest) recognize_stream.cancel() except face.CancellationError: From 6e3a073fe1f54f7ae6a2869e46d4111a718342d5 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Fri, 4 Nov 2016 15:49:23 -0700 Subject: [PATCH 2/9] added audio_interface.terminate() --- speech/grpc/transcribe_streaming.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index a3fb5467a86..a0c45f098b9 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -73,7 +73,7 @@ def _audio_data_generator(buff, stoprequest): A chunk of data that is the aggregate of all chunks of data in `buff`. The function will block until at least one data chunk is available. """ - while not stoprequest.isSet(): + while not stoprequest.is_set(): # Use a blocking get() to ensure there's at least one chunk of data chunk = buff.get() if not chunk: @@ -93,7 +93,7 @@ def _audio_data_generator(buff, stoprequest): def _fill_buffer(audio_stream, buff, chunk, stoprequest): """Continuously collect data from the audio stream, into the buffer.""" try: - while not stoprequest.isSet(): + while not stoprequest.is_set(): buff.put(audio_stream.read(chunk)) except IOError: # This happens when the stream is closed. Signal that we're done. @@ -129,6 +129,7 @@ def record_audio(rate, chunk, stoprequest): yield _audio_data_generator(buff, stoprequest) fill_buffer_thread.join() + audio_interface.terminate() # [END audio_stream] From 1559d9576efbbb850dd1d2526be1e1d98424a85c Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Fri, 4 Nov 2016 17:11:43 -0700 Subject: [PATCH 3/9] removed audio_stream stop --- speech/grpc/transcribe_streaming.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index a0c45f098b9..984480c75fd 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -64,7 +64,7 @@ def make_channel(host, port): return implementations.secure_channel(host, port, composite_channel) -def _audio_data_generator(buff, stoprequest): +def _audio_data_generator(buff): """A generator that yields all available data in the given buffer. Args: @@ -73,12 +73,9 @@ def _audio_data_generator(buff, stoprequest): A chunk of data that is the aggregate of all chunks of data in `buff`. The function will block until at least one data chunk is available. """ - while not stoprequest.is_set(): + while True: # Use a blocking get() to ensure there's at least one chunk of data chunk = buff.get() - if not chunk: - # A falsey value indicates the stream is closed. - break data = [chunk] # Now consume whatever other data's still buffered. @@ -96,11 +93,7 @@ def _fill_buffer(audio_stream, buff, chunk, stoprequest): while not stoprequest.is_set(): buff.put(audio_stream.read(chunk)) except IOError: - # This happens when the stream is closed. Signal that we're done. - buff.put(None) - - audio_stream.stop_stream() - audio_stream.close() + pass # [START audio_stream] @@ -126,9 +119,10 @@ def record_audio(rate, chunk, stoprequest): target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest)) fill_buffer_thread.start() - yield _audio_data_generator(buff, stoprequest) + yield _audio_data_generator(buff) fill_buffer_thread.join() + audio_stream.close() audio_interface.terminate() # [END audio_stream] From d3be2f8ad9059e0dd1fe57feefb034ba2b724439 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Fri, 4 Nov 2016 19:35:10 -0700 Subject: [PATCH 4/9] removed trailing whitespace --- speech/grpc/transcribe_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 984480c75fd..24526284b3c 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -74,7 +74,7 @@ def _audio_data_generator(buff): The function will block until at least one data chunk is available. """ while True: - # Use a blocking get() to ensure there's at least one chunk of data + # Use a blocking get() to ensure there's at least one chunk of data. chunk = buff.get() data = [chunk] From a5d26c9e1e86f647af6be3cae928969dda7fcdbd Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Fri, 4 Nov 2016 22:49:19 -0700 Subject: [PATCH 5/9] added None and checked for None in _audio_data_generator --- speech/grpc/transcribe_streaming.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 24526284b3c..85287486d6f 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -73,7 +73,8 @@ def _audio_data_generator(buff): A chunk of data that is the aggregate of all chunks of data in `buff`. The function will block until at least one data chunk is available. """ - while True: + stop = False + while not stop: # Use a blocking get() to ensure there's at least one chunk of data. chunk = buff.get() data = [chunk] @@ -84,6 +85,11 @@ def _audio_data_generator(buff): data.append(buff.get(block=False)) except queue.Empty: break + + # If the data contains None then set stop = True. + if None in data: + stop = True + data.remove(None) yield b''.join(data) @@ -94,6 +100,8 @@ def _fill_buffer(audio_stream, buff, chunk, stoprequest): buff.put(audio_stream.read(chunk)) except IOError: pass + finally: + buff.put(None) # [START audio_stream] From c0f8e5afb8b3bd82a5d0e9d773b69e40ed6b310b Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Mon, 7 Nov 2016 10:27:59 -0800 Subject: [PATCH 6/9] added elaborate comments in the code --- speech/grpc/transcribe_streaming.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 85287486d6f..8fe92bce53c 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -86,7 +86,8 @@ def _audio_data_generator(buff): except queue.Empty: break - # If the data contains None then set stop = True. + # If `_fill_buffer` adds `None` to the buffer, the audio stream is closed. + # Yield the final bit of the buffer and exit the loop. if None in data: stop = True data.remove(None) @@ -101,6 +102,8 @@ def _fill_buffer(audio_stream, buff, chunk, stoprequest): except IOError: pass finally: + # Add `None` to the buff, indicating that a stop request is made. + # This will signal `_audio_data_generator` to exit. buff.put(None) @@ -208,12 +211,14 @@ def listen_print_loop(recognize_stream, stoprequest): def main(): + # For streaming audio from the microphone, there are three threads. + # First, a thread that collects audio data as it comes in with cloud_speech.beta_create_Speech_stub( make_channel('speech.googleapis.com', 443)) as service: - # For streaming audio from the microphone, there are three threads. - # First, a thread that collects audio data as it comes in - # stop request + # stoprequest is event object which is set in `listen_print_loop`. + # To indicate that the trancsription should be stopped. + # `_fill_buffer` checks and stops collecting data from audio_stream. stoprequest = threading.Event() with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data: From 96480662ee3b4a54559f7354a38f9a5f3f4b1319 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Mon, 7 Nov 2016 11:43:49 -0800 Subject: [PATCH 7/9] fixed comments in the code --- speech/grpc/transcribe_streaming.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 8fe92bce53c..0910dd60277 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -211,16 +211,18 @@ def listen_print_loop(recognize_stream, stoprequest): def main(): - # For streaming audio from the microphone, there are three threads. - # First, a thread that collects audio data as it comes in with cloud_speech.beta_create_Speech_stub( make_channel('speech.googleapis.com', 443)) as service: - # stoprequest is event object which is set in `listen_print_loop`. - # To indicate that the trancsription should be stopped. - # `_fill_buffer` checks and stops collecting data from audio_stream. + # stoprequest is event object which is set in `listen_print_loop` + # to indicate that the trancsription should be stopped. + # + # The `_fill_buffer` thread checks this object, and closes + # the `audio_stream` once it's set. stoprequest = threading.Event() + # For streaming audio from the microphone, there are three threads. + # First, a thread that collects audio data as it comes in with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data: # Second, a thread that sends requests with that data requests = request_stream(buffered_audio_data, RATE) From a89154496252e89bf1676b8bac9eff6ca7148053 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Mon, 7 Nov 2016 14:40:27 -0800 Subject: [PATCH 8/9] fixed comments --- speech/grpc/transcribe_streaming.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 0910dd60277..17a4d73ad51 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -75,7 +75,7 @@ def _audio_data_generator(buff): """ stop = False while not stop: - # Use a blocking get() to ensure there's at least one chunk of data. + # Use a blocking get() to ensure there's at least one chunk of data. chunk = buff.get() data = [chunk] @@ -86,7 +86,7 @@ def _audio_data_generator(buff): except queue.Empty: break - # If `_fill_buffer` adds `None` to the buffer, the audio stream is closed. + # If `_fill_buffer` adds `None` to the buffer, audio stream is closed. # Yield the final bit of the buffer and exit the loop. if None in data: stop = True From decf5751756313b9068541fad52333035f218108 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Mon, 7 Nov 2016 15:06:47 -0800 Subject: [PATCH 9/9] added 'the' back --- speech/grpc/transcribe_streaming.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 17a4d73ad51..8c6b84bb8fb 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -86,8 +86,8 @@ def _audio_data_generator(buff): except queue.Empty: break - # If `_fill_buffer` adds `None` to the buffer, audio stream is closed. - # Yield the final bit of the buffer and exit the loop. + # If `_fill_buffer` adds `None` to the buffer, the audio stream is + # closed. Yield the final bit of the buffer and exit the loop. if None in data: stop = True data.remove(None) @@ -213,7 +213,7 @@ def listen_print_loop(recognize_stream, stoprequest): def main(): with cloud_speech.beta_create_Speech_stub( make_channel('speech.googleapis.com', 443)) as service: - + # stoprequest is event object which is set in `listen_print_loop` # to indicate that the trancsription should be stopped. #