Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions speech/grpc/transcribe_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,10 @@ def _audio_data_generator(buff):
A chunk of data that is the aggregate of all chunks of data in `buff`.
The function will block until at least one data chunk is available.
"""
while True:
# Use a blocking get() to ensure there's at least one chunk of data
stop = False
while not stop:
# Use a blocking get() to ensure there's at least one chunk of data.
chunk = buff.get()
if not chunk:
# A falsey value indicates the stream is closed.
break
data = [chunk]

# Now consume whatever other data's still buffered.
Expand All @@ -87,22 +85,31 @@ def _audio_data_generator(buff):
data.append(buff.get(block=False))
except queue.Empty:
break

# If `_fill_buffer` adds `None` to the buffer, the audio stream is
# closed. Yield the final bit of the buffer and exit the loop.
if None in data:
stop = True
data.remove(None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - nice catch.

yield b''.join(data)


def _fill_buffer(audio_stream, buff, chunk):
def _fill_buffer(audio_stream, buff, chunk, stoprequest):
"""Continuously collect data from the audio stream, into the buffer."""
try:
while True:
while not stoprequest.is_set():
buff.put(audio_stream.read(chunk))
except IOError:
# This happens when the stream is closed. Signal that we're done.
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, you'll have a race condition. ie if _audio_data_generator's buff.get() is waiting for data in the buffer when stoprequest is triggered, it could potentially wait forever and never yield a value.

finally:
# Add `None` to the buff, indicating that a stop request is made.
# This will signal `_audio_data_generator` to exit.
buff.put(None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment here describing what putting None means would be helpful.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done



# [START audio_stream]
@contextlib.contextmanager
def record_audio(rate, chunk):
def record_audio(rate, chunk, stoprequest):
"""Opens a recording stream in a context manager."""
audio_interface = pyaudio.PyAudio()
audio_stream = audio_interface.open(
Expand All @@ -120,14 +127,13 @@ def record_audio(rate, chunk):
# This is necessary so that the input device's buffer doesn't overflow
# while the calling thread makes network requests, etc.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason not to stop the stream before closing? While I'm not certain it's necessary, all the examples stop the stream before closing it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the stream is stopped, one may not call write or read. And thread _fill_buffer gets stuck.

fill_buffer_thread = threading.Thread(
target=_fill_buffer, args=(audio_stream, buff, chunk))
target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest))
fill_buffer_thread.start()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason to remove this line? It seems like good hygiene..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might have been a mistake. I will restore it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


yield _audio_data_generator(buff)

audio_stream.stop_stream()
audio_stream.close()
fill_buffer_thread.join()
audio_stream.close()
audio_interface.terminate()
# [END audio_stream]

Expand Down Expand Up @@ -166,7 +172,7 @@ def request_stream(data_stream, rate, interim_results=True):
yield cloud_speech.StreamingRecognizeRequest(audio_content=data)


def listen_print_loop(recognize_stream):
def listen_print_loop(recognize_stream, stoprequest):
num_chars_printed = 0
for resp in recognize_stream:
if resp.error.code != code_pb2.OK:
Expand Down Expand Up @@ -198,6 +204,7 @@ def listen_print_loop(recognize_stream):
# one of our keywords.
if re.search(r'\b(exit|quit)\b', transcript, re.I):
print('Exiting..')
stoprequest.set()
break

num_chars_printed = 0
Expand All @@ -206,9 +213,17 @@ def listen_print_loop(recognize_stream):
def main():
with cloud_speech.beta_create_Speech_stub(
make_channel('speech.googleapis.com', 443)) as service:

# stoprequest is event object which is set in `listen_print_loop`
# to indicate that the trancsription should be stopped.
#
# The `_fill_buffer` thread checks this object, and closes
# the `audio_stream` once it's set.
stoprequest = threading.Event()

# For streaming audio from the microphone, there are three threads.
# First, a thread that collects audio data as it comes in
with record_audio(RATE, CHUNK) as buffered_audio_data:
with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data:
# Second, a thread that sends requests with that data
requests = request_stream(buffered_audio_data, RATE)
# Third, a thread that listens for transcription responses
Expand All @@ -220,7 +235,7 @@ def main():

# Now, put the transcription responses to use.
try:
listen_print_loop(recognize_stream)
listen_print_loop(recognize_stream, stoprequest)

recognize_stream.cancel()
except face.CancellationError:
Expand Down