Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 32 additions & 39 deletions speech/grpc/transcribe_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
from __future__ import division

import contextlib
import functools
import re
import signal
import sys
import threading

from google.cloud import credentials
from google.cloud.speech.v1beta1 import cloud_speech_pb2 as cloud_speech
Expand Down Expand Up @@ -76,8 +76,7 @@ def _audio_data_generator(buff):
stop = False
while not stop:
# Use a blocking get() to ensure there's at least one chunk of data.
chunk = buff.get()
data = [chunk]
data = [buff.get()]

# Now consume whatever other data's still buffered.
while True:
Expand All @@ -86,54 +85,47 @@ def _audio_data_generator(buff):
except queue.Empty:
break

# If `_fill_buffer` adds `None` to the buffer, the audio stream is
# closed. Yield the final bit of the buffer and exit the loop.
# `None` in the buffer signals that the audio stream is closed. Yield
# the final bit of the buffer and exit the loop.
if None in data:
stop = True
data.remove(None)

yield b''.join(data)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: blank line above yield.



def _fill_buffer(audio_stream, buff, chunk, stoprequest):
def _fill_buffer(buff, in_data, frame_count, time_info, status_flags):
"""Continuously collect data from the audio stream, into the buffer."""
try:
while not stoprequest.is_set():
buff.put(audio_stream.read(chunk))
except IOError:
pass
finally:
# Add `None` to the buff, indicating that a stop request is made.
# This will signal `_audio_data_generator` to exit.
buff.put(None)
buff.put(in_data)
return None, pyaudio.paContinue


# [START audio_stream]
@contextlib.contextmanager
def record_audio(rate, chunk, stoprequest):
def record_audio(rate, chunk):
"""Opens a recording stream in a context manager."""
# Create a thread-safe buffer of audio data
buff = queue.Queue()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/buff/buffer? (throughout)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I avoided this since buffer is a keyword

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL.

(It's a built-in function which means it can be shadowed, like file, but I'm fine if you leave it as buff).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, me too. I just avoided it because vim bolded it after I spelled it out, which made me wary :-)


audio_interface = pyaudio.PyAudio()
audio_stream = audio_interface.open(
format=pyaudio.paInt16,
# The API currently only supports 1-channel (mono) audio
# https://goo.gl/z757pE
channels=1, rate=rate,
input=True, frames_per_buffer=chunk,
# Run the audio stream asynchronously to fill the buffer object.
# This is necessary so that the input device's buffer doesn't overflow
# while the calling thread makes network requests, etc.
stream_callback=functools.partial(_fill_buffer, buff),
)

# Create a thread-safe buffer of audio data
buff = queue.Queue()

# Spin up a separate thread to buffer audio data from the microphone
# This is necessary so that the input device's buffer doesn't overflow
# while the calling thread makes network requests, etc.
fill_buffer_thread = threading.Thread(
target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest))
fill_buffer_thread.start()

yield _audio_data_generator(buff)

fill_buffer_thread.join()
audio_stream.stop_stream()
audio_stream.close()
# Signal the _audio_data_generator to finish
buff.put(None)
audio_interface.terminate()
# [END audio_stream]

Expand Down Expand Up @@ -172,7 +164,17 @@ def request_stream(data_stream, rate, interim_results=True):
yield cloud_speech.StreamingRecognizeRequest(audio_content=data)


def listen_print_loop(recognize_stream, stoprequest):
def listen_print_loop(recognize_stream):
"""Iterates through server responses and prints them.

The recognize_stream passed is a generator that will block until a response
is provided by the server. When the transcription response comes, print it.

In this case, responses are provided for interim results as well. If the
response is an interim one, print a line feed at the end of it, to allow
the next result to overwrite it, until the response is a final one. For the
final one, print a newline to preserve the finalized transcription.
"""
num_chars_printed = 0
for resp in recognize_stream:
if resp.error.code != code_pb2.OK:
Expand Down Expand Up @@ -204,7 +206,6 @@ def listen_print_loop(recognize_stream, stoprequest):
# one of our keywords.
if re.search(r'\b(exit|quit)\b', transcript, re.I):
print('Exiting..')
stoprequest.set()
break

num_chars_printed = 0
Expand All @@ -213,17 +214,9 @@ def listen_print_loop(recognize_stream, stoprequest):
def main():
with cloud_speech.beta_create_Speech_stub(
make_channel('speech.googleapis.com', 443)) as service:

# stoprequest is event object which is set in `listen_print_loop`
# to indicate that the trancsription should be stopped.
#
# The `_fill_buffer` thread checks this object, and closes
# the `audio_stream` once it's set.
stoprequest = threading.Event()

# For streaming audio from the microphone, there are three threads.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these comments about the number of threads still accurate?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! It's just that before we were spinning up the buffer-the-audio thread manually, and now pyaudio is doing it for us.

# First, a thread that collects audio data as it comes in
with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data:
with record_audio(RATE, CHUNK) as buffered_audio_data:
# Second, a thread that sends requests with that data
requests = request_stream(buffered_audio_data, RATE)
# Third, a thread that listens for transcription responses
Expand All @@ -235,7 +228,7 @@ def main():

# Now, put the transcription responses to use.
try:
listen_print_loop(recognize_stream, stoprequest)
listen_print_loop(recognize_stream)

recognize_stream.cancel()
except face.CancellationError:
Expand Down
37 changes: 19 additions & 18 deletions speech/grpc/transcribe_streaming_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

import re
import threading
import time

import transcribe_streaming
Expand All @@ -24,34 +25,34 @@ def __init__(self, audio_filename):
def __call__(self, *args):
return self

def open(self, *args, **kwargs):
self.audio_file = open(self.audio_filename, 'rb')
def open(self, stream_callback, *args, **kwargs):
self.closed = threading.Event()
self.stream_thread = threading.Thread(
target=self.stream_audio, args=(
self.audio_filename, stream_callback, self.closed))
self.stream_thread.start()
return self

def close(self):
self.audio_file.close()
self.closed.set()

def stop_stream(self):
pass

def terminate(self):
pass

def read(self, num_frames):
if self.audio_file.closed:
raise IOError()
# Approximate realtime by sleeping for the appropriate time for the
# requested number of frames
time.sleep(num_frames / float(transcribe_streaming.RATE))
# audio is 16-bit samples, whereas python byte is 8-bit
num_bytes = 2 * num_frames
try:
chunk = self.audio_file.read(num_bytes)
except ValueError:
raise IOError()
if not chunk:
raise IOError()
return chunk
@staticmethod
def stream_audio(audio_filename, callback, closed, num_frames=512):
with open(audio_filename, 'rb') as audio_file:
while not closed.is_set():
# Approximate realtime by sleeping for the appropriate time for
# the requested number of frames
time.sleep(num_frames / float(transcribe_streaming.RATE))
# audio is 16-bit samples, whereas python byte is 8-bit
num_bytes = 2 * num_frames
chunk = audio_file.read(num_bytes) or b'\0' * num_bytes
callback(chunk, None, None, None)


def test_main(resource, monkeypatch, capsys):
Expand Down