From cd1d4fd8ce074945b7b6e494194c198c095cf59c Mon Sep 17 00:00:00 2001 From: ruogu Date: Tue, 21 Mar 2023 17:10:09 -0700 Subject: [PATCH 01/14] Add sample code for streaming_transcription. 1. Support long form model (single_utteracen=false) 2. Add audio buffering. --- dialogflow/participant_management.py | 49 +++++- dialogflow/requirements.txt | 2 + dialogflow/streaming_transcription.py | 226 ++++++++++++++++++++++++++ 3 files changed, 276 insertions(+), 1 deletion(-) create mode 100644 dialogflow/streaming_transcription.py diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index e2bf6169f43..4a32286159e 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -114,7 +114,7 @@ def analyze_content_text(project_id, conversation_id, participant_id, text): def analyze_content_audio_stream( project_id, conversation_id, participant_id, audio_file_path ): - """Analyze audio content for END_USER + """Analyze audio content for END_USER with audio files. Args: project_id: The GCP project linked with the conversation profile. @@ -175,3 +175,50 @@ def request_generator(audio_config, audio_file_path): # [END dialogflow_analyze_content_audio_stream] + + +# [START streaming_analyze_content_audio] +def streaming_analyze_content_audio(participant_name, + sample_rate_herz, + stream, + timeout, + language_code, + single_utterance=False): + """Stream audio to Dialogflow and receive transcripts and suggestions. + + Args: + participant_name: resource name of the participant. + sample_rate_herz: herz rate of the sample. + audio_generator: a sequence of audio data. + """ + from google.cloud import dialogflow_v2beta1 as dialogflow_beta + client = dialogflow_beta.ParticipantsClient() + + audio_config = dialogflow_beta.types.audio_config.InputAudioConfig( + audio_encoding=dialogflow_beta.types.audio_config.AudioEncoding. + AUDIO_ENCODING_LINEAR_16, + sample_rate_hertz=sample_rate_herz, + language_code=language_code, + single_utterance=single_utterance) + + def gen_requests(participant_name, audio_config, stream): + """Generates requests for streaming. + """ + audio_generator = stream.generator() + while not stream.closed: + print("Yield config to streaming analyze content.") + yield dialogflow_beta.types.participant.StreamingAnalyzeContentRequest( + participant=participant_name, + audio_config=audio_config) + print("Yield audios to streaming analyze content.") + for content in audio_generator: + # print('Yield audio to streaming analyze content') + yield dialogflow_beta.types.participant.StreamingAnalyzeContentRequest( + input_audio=content) + + return client.streaming_analyze_content(gen_requests( + participant_name, audio_config, stream), + timeout=timeout) + + +# [END streaming_analyze_content_audio] diff --git a/dialogflow/requirements.txt b/dialogflow/requirements.txt index d2a43254ce5..2bfad2c849e 100644 --- a/dialogflow/requirements.txt +++ b/dialogflow/requirements.txt @@ -1,2 +1,4 @@ google-cloud-dialogflow==2.19.1 Flask==2.2.2 +pyaudio +termcolor \ No newline at end of file diff --git a/dialogflow/streaming_transcription.py b/dialogflow/streaming_transcription.py new file mode 100644 index 00000000000..74f995c38d9 --- /dev/null +++ b/dialogflow/streaming_transcription.py @@ -0,0 +1,226 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google Cloud Dialogflow API sample code using the StreamingAnalyzeContent +API. + +Also please contact Google to get credentials of this project and set up the +credential file json locations by running: +export GOOGLE_APPLICATION_CREDENTIALS= + +Example usage: + export GOOGLE_CLOUD_PROJECT='cloud-contact-center-ext-demo' + export CONVERSATION_PROFILE='FnuBYO8eTBWM8ep1i-eOng' + export GOOGLE_APPLICATION_CREDENTIALS='/Users/ruogu/Desktop/keys/cloud-contact-center-ext-demo-78798f9f9254.json' + python streaming_transcription.py + +Then started to talk in English, you should see transcription shows up as you speak. + +Say "Quit" or "Exit" to stop. +""" + +import time +import re +import sys +import os + +import conversation_management +import participant_management + +import pyaudio +from six.moves import queue +from google.api_core import client_options +from google.api_core.exceptions import DeadlineExceeded +from google.cloud import dialogflow_v2beta1 as dialogflow + +PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT') +CONVERSATION_PROFILE_ID = os.getenv('CONVERSATION_PROFILE') + +# Audio recording parameters +SAMPLE_RATE = 16000 +CHUNK_SIZE = int(SAMPLE_RATE / 10) # 100ms +RESTART_TIMEOUT = 160 # seconds +MAX_LOOKBACK = 3 # seconds + +YELLOW = '\033[0;33m' + + +class ResumableMicrophoneStream: + """Opens a recording stream as a generator yielding the audio chunks.""" + + def __init__(self, rate, chunk_size): + self._rate = rate + self.chunk_size = chunk_size + self._num_channels = 1 + self._buff = queue.Queue() + self.closed = True + # Count the number of times the stream analyze content restarts. + self.restart_counter = 0 + self.last_start_time = 0 + # Time end of the last is_final in millisec since last_start_time. + self.is_final_offset = 0 + # Save the audio chunks generated from the start of the audio stream for + # replay after restart. + self.audio_input_chunks = [] + self.new_stream = True + self._audio_interface = pyaudio.PyAudio() + self._audio_stream = self._audio_interface.open( + format=pyaudio.paInt16, + channels=self._num_channels, + rate=self._rate, + input=True, + frames_per_buffer=self.chunk_size, + # Run the audio stream asynchronously to fill the buffer object. + # This is necessary so that the input device's buffer doesn't + # overflow while the calling thread makes network requests, etc. + stream_callback=self._fill_buffer, + ) + + def __enter__(self): + + self.closed = False + return self + + def __exit__(self, type, value, traceback): + + self._audio_stream.stop_stream() + self._audio_stream.close() + self.closed = True + # Signal the generator to terminate so that the client's + # streaming_recognize method will not block the process termination. + self._buff.put(None) + self._audio_interface.terminate() + + def _fill_buffer(self, in_data, *args, **kwargs): + """Continuously collect data from the audio stream, into the buffer in + chunksize.""" + + self._buff.put(in_data) + return None, pyaudio.paContinue + + def generator(self): + """Stream Audio from microphone to API and to local buffer""" + # Handle restart. + if not self.closed: + total_processed_time = self.last_start_time + self.is_final_offset + processed_bytes_length = int( + total_processed_time * SAMPLE_RATE * 16 / 8) / 1000 + self.last_start_time = total_processed_time + # Send out bytes stored in self.audio_input_chunks that is after the + # processed_bytes_length. + if (processed_bytes_length != 0): + audio_bytes = b''.join(self.audio_input_chunks) + # Lookback for unprocessed audio data. + need_to_process_length = min( + int(len(audio_bytes) - processed_bytes_length), + int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8)) + # Note that you need to explicitly use `int` type for substring. + need_to_process_bytes = audio_bytes[(-1) + * need_to_process_length:] + yield need_to_process_bytes + + while not self.closed: + data = [] + # Use a blocking get() to ensure there's at least one chunk of + # data, and stop iteration if the chunk is None, indicating the + # end of the audio stream. + chunk = self._buff.get() + + if chunk is None: + return + data.append(chunk) + # Now try to the rest of chunks if there are any left in the _buff. + while True: + try: + chunk = self._buff.get(block=False) + + if chunk is None: + return + data.append(chunk) + + except queue.Empty: + break + self.audio_input_chunks.extend(data) + if data: + yield b''.join(data) + + +def main(): + """start bidirectional streaming from microphone input to Dialogflow API""" + # Create conversation. + conversation = conversation_management.create_conversation( + project_id=PROJECT_ID, conversation_profile_id=CONVERSATION_PROFILE_ID) + + conversation_id = conversation.name.split('conversations/')[1].rstrip() + + # Create end user participant. + end_user = participant_management.create_participant( + project_id=PROJECT_ID, conversation_id=conversation_id, role='END_USER') + + mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE) + print(mic_manager.chunk_size) + sys.stdout.write(YELLOW) + sys.stdout.write('\nListening, say "Quit" or "Exit" to stop.\n\n') + sys.stdout.write('End (ms) Transcript Results/Status\n') + sys.stdout.write('=====================================================\n') + + with mic_manager as stream: + + while not stream.closed: + terminate = False + while not terminate: + try: + print("New Streaming Analyze Request: {}".format( + stream.restart_counter)) + stream.restart_counter += 1 + # Send request to streaming and get response. + responses = participant_management.streaming_analyze_content_audio( + participant_name=end_user.name, + sample_rate_herz=SAMPLE_RATE, + stream=stream, + timeout=RESTART_TIMEOUT, + language_code='en-US', + single_utterance=False) + + # Now, put the transcription responses to user. + for response in responses: + # if response.human_agent_suggestion_results: + # print(response) + if response.recognition_result.is_final: + print(response.recognition_result) + # offset return from recognition_result is relative + # to the beginning of audio stream. + offset = response.recognition_result.speech_end_offset + stream.is_final_offset = int( + offset.seconds * 1000 + offset.microseconds * 1000000) + transcript = response.recognition_result.transcript + # Exit recognition if any of the transcribed phrases could be + # one of our keywords. + if re.search(r'\b(exit|quit)\b', transcript, re.I): + sys.stdout.write(YELLOW) + sys.stdout.write('Exiting...\n') + terminate = True + break + except DeadlineExceeded: + print('Deadline Exceeded, restarting.') + + if terminate: + conversation_management.complete_conversation( + project_id=PROJECT_ID, conversation_id=conversation_id) + break + + +if __name__ == '__main__': + + main() From 45f0be4486fa7a73cdbd281ef3815f6fefdff193 Mon Sep 17 00:00:00 2001 From: ruogu Date: Tue, 21 Mar 2023 17:33:39 -0700 Subject: [PATCH 02/14] Add function parameter comments and product prefix on tags. --- dialogflow/participant_management.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 4a32286159e..08ebf57d84f 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -177,7 +177,7 @@ def request_generator(audio_config, audio_file_path): # [END dialogflow_analyze_content_audio_stream] -# [START streaming_analyze_content_audio] +# [START ccai_streaming_analyze_content_audio] def streaming_analyze_content_audio(participant_name, sample_rate_herz, stream, @@ -189,7 +189,10 @@ def streaming_analyze_content_audio(participant_name, Args: participant_name: resource name of the participant. sample_rate_herz: herz rate of the sample. - audio_generator: a sequence of audio data. + stream: the stream to process. + timeout: the timeout of one stream. + language_code: the language code of the audio + single_utterance: whether to use single_utterance. """ from google.cloud import dialogflow_v2beta1 as dialogflow_beta client = dialogflow_beta.ParticipantsClient() @@ -221,4 +224,4 @@ def gen_requests(participant_name, audio_config, stream): timeout=timeout) -# [END streaming_analyze_content_audio] +# [END ccai_streaming_analyze_content_audio] From 68010e90ec5371b23f9bf7666a44dfec5c537b70 Mon Sep 17 00:00:00 2001 From: ruogu Date: Tue, 21 Mar 2023 17:46:57 -0700 Subject: [PATCH 03/14] Fix the nox lint issue. --- dialogflow/streaming_transcription.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/dialogflow/streaming_transcription.py b/dialogflow/streaming_transcription.py index 74f995c38d9..42f6c4d1790 100644 --- a/dialogflow/streaming_transcription.py +++ b/dialogflow/streaming_transcription.py @@ -30,19 +30,18 @@ Say "Quit" or "Exit" to stop. """ -import time +import os import re import sys -import os -import conversation_management -import participant_management +from google.api_core.exceptions import DeadlineExceeded import pyaudio + from six.moves import queue -from google.api_core import client_options -from google.api_core.exceptions import DeadlineExceeded -from google.cloud import dialogflow_v2beta1 as dialogflow + +import conversation_management +import participant_management PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT') CONVERSATION_PROFILE_ID = os.getenv('CONVERSATION_PROFILE') From 5affcd5eafdefa9835689e4cf34775e443e2e546 Mon Sep 17 00:00:00 2001 From: ruogu Date: Wed, 22 Mar 2023 10:13:43 -0700 Subject: [PATCH 04/14] Fix the region tag. --- dialogflow/participant_management.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 08ebf57d84f..f6a62c94d3e 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -177,7 +177,7 @@ def request_generator(audio_config, audio_file_path): # [END dialogflow_analyze_content_audio_stream] -# [START ccai_streaming_analyze_content_audio] +# [START dialogflow_streaming_analyze_content_audio] def streaming_analyze_content_audio(participant_name, sample_rate_herz, stream, @@ -224,4 +224,4 @@ def gen_requests(participant_name, audio_config, stream): timeout=timeout) -# [END ccai_streaming_analyze_content_audio] +# [END dialogflow_streaming_analyze_content_audio] From c29ae56aae331f2317e68e83d94e6fb079cfcccd Mon Sep 17 00:00:00 2001 From: ruogu Date: Wed, 26 Apr 2023 16:35:49 -0700 Subject: [PATCH 05/14] Add half-close on is_final feature. --- dialogflow/streaming_transcription.py | 108 ++++++++++++++------------ 1 file changed, 58 insertions(+), 50 deletions(-) diff --git a/dialogflow/streaming_transcription.py b/dialogflow/streaming_transcription.py index 42f6c4d1790..64a9fd35236 100644 --- a/dialogflow/streaming_transcription.py +++ b/dialogflow/streaming_transcription.py @@ -64,6 +64,7 @@ def __init__(self, rate, chunk_size): self._num_channels = 1 self._buff = queue.Queue() self.closed = True + self.is_final = False # Count the number of times the stream analyze content restarts. self.restart_counter = 0 self.last_start_time = 0 @@ -110,49 +111,53 @@ def _fill_buffer(self, in_data, *args, **kwargs): def generator(self): """Stream Audio from microphone to API and to local buffer""" - # Handle restart. - if not self.closed: - total_processed_time = self.last_start_time + self.is_final_offset - processed_bytes_length = int( - total_processed_time * SAMPLE_RATE * 16 / 8) / 1000 - self.last_start_time = total_processed_time - # Send out bytes stored in self.audio_input_chunks that is after the - # processed_bytes_length. - if (processed_bytes_length != 0): - audio_bytes = b''.join(self.audio_input_chunks) - # Lookback for unprocessed audio data. - need_to_process_length = min( - int(len(audio_bytes) - processed_bytes_length), - int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8)) - # Note that you need to explicitly use `int` type for substring. - need_to_process_bytes = audio_bytes[(-1) - * need_to_process_length:] - yield need_to_process_bytes - - while not self.closed: - data = [] - # Use a blocking get() to ensure there's at least one chunk of - # data, and stop iteration if the chunk is None, indicating the - # end of the audio stream. - chunk = self._buff.get() - - if chunk is None: - return - data.append(chunk) - # Now try to the rest of chunks if there are any left in the _buff. - while True: - try: - chunk = self._buff.get(block=False) - - if chunk is None: - return - data.append(chunk) - - except queue.Empty: - break - self.audio_input_chunks.extend(data) - if data: - yield b''.join(data) + while True and not self.closed: + if self.is_final: + # Handle restart. + print("restart generator") + # Flip the bit of is_final so it can continue stream. + self.is_final = False + total_processed_time = self.last_start_time + self.is_final_offset + processed_bytes_length = int( + total_processed_time * SAMPLE_RATE * 16 / 8) / 1000 + self.last_start_time = total_processed_time + # Send out bytes stored in self.audio_input_chunks that is after the + # processed_bytes_length. + if (processed_bytes_length != 0): + audio_bytes = b''.join(self.audio_input_chunks) + # Lookback for unprocessed audio data. + need_to_process_length = min( + int(len(audio_bytes) - processed_bytes_length), + int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8)) + # Note that you need to explicitly use `int` type for substring. + need_to_process_bytes = audio_bytes[(-1) + * need_to_process_length:] + yield need_to_process_bytes + + while not self.is_final: + data = [] + # Use a blocking get() to ensure there's at least one chunk of + # data, and stop iteration if the chunk is None, indicating the + # end of the audio stream. + chunk = self._buff.get() + + if chunk is None: + return + data.append(chunk) + # Now try to the rest of chunks if there are any left in the _buff. + while True: + try: + chunk = self._buff.get(block=False) + + if chunk is None: + return + data.append(chunk) + + except queue.Empty: + break + self.audio_input_chunks.extend(data) + if data: + yield b''.join(data) def main(): @@ -192,25 +197,28 @@ def main(): language_code='en-US', single_utterance=False) - # Now, put the transcription responses to user. + # Now, print the final transcription responses to user. for response in responses: - # if response.human_agent_suggestion_results: - # print(response) if response.recognition_result.is_final: - print(response.recognition_result) + print(response) # offset return from recognition_result is relative # to the beginning of audio stream. offset = response.recognition_result.speech_end_offset stream.is_final_offset = int( - offset.seconds * 1000 + offset.microseconds * 1000000) + offset.seconds * 1000 + offset.microseconds / 1000) transcript = response.recognition_result.transcript - # Exit recognition if any of the transcribed phrases could be + stream.is_final = True + # Half-close the stream with gRPC. + print("Halfclose stream as is_final is received.") + responses.cancel() + # Exit recognition if any of the transcribed phrase could be # one of our keywords. if re.search(r'\b(exit|quit)\b', transcript, re.I): sys.stdout.write(YELLOW) sys.stdout.write('Exiting...\n') terminate = True - break + stream.closed + break except DeadlineExceeded: print('Deadline Exceeded, restarting.') From 8028f8d97e926a3049a9798af244f9a376e75027 Mon Sep 17 00:00:00 2001 From: ruogu Date: Wed, 26 Apr 2023 21:00:27 -0700 Subject: [PATCH 06/14] Halfclose the client send stream instead of the receiving stream. Final response from Dialogflow and Agent Assist are triggered only when the send stream is half-closed. So need to wait for the response to finish. --- dialogflow/participant_management.py | 17 ++++---- dialogflow/streaming_transcription.py | 62 +++++++++++++-------------- 2 files changed, 39 insertions(+), 40 deletions(-) diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 54e5c794d8d..934ae6888d4 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -209,16 +209,15 @@ def gen_requests(participant_name, audio_config, stream): """Generates requests for streaming. """ audio_generator = stream.generator() - while not stream.closed: - print("Yield config to streaming analyze content.") + print("Yield config to streaming analyze content.") + yield dialogflow_beta.types.participant.StreamingAnalyzeContentRequest( + participant=participant_name, + audio_config=audio_config) + print("Yield audios to streaming analyze content.") + for content in audio_generator: + # print('Yield audio to streaming analyze content') yield dialogflow_beta.types.participant.StreamingAnalyzeContentRequest( - participant=participant_name, - audio_config=audio_config) - print("Yield audios to streaming analyze content.") - for content in audio_generator: - # print('Yield audio to streaming analyze content') - yield dialogflow_beta.types.participant.StreamingAnalyzeContentRequest( - input_audio=content) + input_audio=content) return client.streaming_analyze_content(gen_requests( participant_name, audio_config, stream), diff --git a/dialogflow/streaming_transcription.py b/dialogflow/streaming_transcription.py index 64a9fd35236..a4730d5d836 100644 --- a/dialogflow/streaming_transcription.py +++ b/dialogflow/streaming_transcription.py @@ -63,8 +63,8 @@ def __init__(self, rate, chunk_size): self.chunk_size = chunk_size self._num_channels = 1 self._buff = queue.Queue() - self.closed = True self.is_final = False + self.closed = True # Count the number of times the stream analyze content restarts. self.restart_counter = 0 self.last_start_time = 0 @@ -111,30 +111,29 @@ def _fill_buffer(self, in_data, *args, **kwargs): def generator(self): """Stream Audio from microphone to API and to local buffer""" - while True and not self.closed: - if self.is_final: - # Handle restart. - print("restart generator") - # Flip the bit of is_final so it can continue stream. - self.is_final = False - total_processed_time = self.last_start_time + self.is_final_offset - processed_bytes_length = int( - total_processed_time * SAMPLE_RATE * 16 / 8) / 1000 - self.last_start_time = total_processed_time - # Send out bytes stored in self.audio_input_chunks that is after the - # processed_bytes_length. - if (processed_bytes_length != 0): - audio_bytes = b''.join(self.audio_input_chunks) - # Lookback for unprocessed audio data. - need_to_process_length = min( - int(len(audio_bytes) - processed_bytes_length), - int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8)) - # Note that you need to explicitly use `int` type for substring. - need_to_process_bytes = audio_bytes[(-1) - * need_to_process_length:] - yield need_to_process_bytes - - while not self.is_final: + try: + # Handle restart. + print("restart generator") + # Flip the bit of is_final so it can continue stream. + self.is_final = False + total_processed_time = self.last_start_time + self.is_final_offset + processed_bytes_length = int( + total_processed_time * SAMPLE_RATE * 16 / 8) / 1000 + self.last_start_time = total_processed_time + # Send out bytes stored in self.audio_input_chunks that is after the + # processed_bytes_length. + if (processed_bytes_length != 0): + audio_bytes = b''.join(self.audio_input_chunks) + # Lookback for unprocessed audio data. + need_to_process_length = min( + int(len(audio_bytes) - processed_bytes_length), + int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8)) + # Note that you need to explicitly use `int` type for substring. + need_to_process_bytes = audio_bytes[(-1) + * need_to_process_length:] + yield need_to_process_bytes + + while not self.closed and not self.is_final: data = [] # Use a blocking get() to ensure there's at least one chunk of # data, and stop iteration if the chunk is None, indicating the @@ -158,6 +157,8 @@ def generator(self): self.audio_input_chunks.extend(data) if data: yield b''.join(data) + finally: + print("Stop generator") def main(): @@ -180,7 +181,6 @@ def main(): sys.stdout.write('=====================================================\n') with mic_manager as stream: - while not stream.closed: terminate = False while not terminate: @@ -199,6 +199,8 @@ def main(): # Now, print the final transcription responses to user. for response in responses: + if response.message: + print(response) if response.recognition_result.is_final: print(response) # offset return from recognition_result is relative @@ -207,18 +209,16 @@ def main(): stream.is_final_offset = int( offset.seconds * 1000 + offset.microseconds / 1000) transcript = response.recognition_result.transcript + # Half-close the stream with gRPC (in Python just stop yielding requests) stream.is_final = True - # Half-close the stream with gRPC. - print("Halfclose stream as is_final is received.") - responses.cancel() # Exit recognition if any of the transcribed phrase could be # one of our keywords. if re.search(r'\b(exit|quit)\b', transcript, re.I): sys.stdout.write(YELLOW) sys.stdout.write('Exiting...\n') terminate = True - stream.closed - break + stream.closed = True + break except DeadlineExceeded: print('Deadline Exceeded, restarting.') From 945ad4a70103c3dff8aab63e55427c7614a650c9 Mon Sep 17 00:00:00 2001 From: ruogu Date: Fri, 12 May 2023 14:57:36 -0700 Subject: [PATCH 07/14] Unify the function name in participant sample code. --- dialogflow/analyze_content_stream_test.py | 2 +- dialogflow/participant_management.py | 14 +++++++------- dialogflow/streaming_transcription.py | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dialogflow/analyze_content_stream_test.py b/dialogflow/analyze_content_stream_test.py index 67a99e4bb44..b53f07e3885 100644 --- a/dialogflow/analyze_content_stream_test.py +++ b/dialogflow/analyze_content_stream_test.py @@ -76,7 +76,7 @@ def participant_id(conversation_id): # Test live transcription with streaming_analyze_content. def test_analyze_content_audio_stream(capsys, conversation_id, participant_id): # Call StreamingAnalyzeContent to transcribe the audio. - participant_management.analyze_content_audio_stream( + participant_management.analyze_content_audio( conversation_id=conversation_id, participant_id=participant_id, audio_file_path=AUDIO_FILE_PATH, diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 934ae6888d4..af637dfddca 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -111,8 +111,8 @@ def analyze_content_text(project_id, conversation_id, participant_id, text): # [END dialogflow_analyze_content_text] -# [START dialogflow_analyze_content_audio_stream] -def analyze_content_audio_stream( +# [START dialogflow_analyze_content_audio] +def analyze_content_audio( conversation_id, participant_id, audio_file_path ): """Analyze audio content for END_USER with audio files. @@ -175,17 +175,17 @@ def request_generator(audio_config, audio_file_path): print("=" * 20) -# [END dialogflow_analyze_content_audio_stream] +# [END dialogflow_analyze_content_audio] -# [START dialogflow_streaming_analyze_content_audio] -def streaming_analyze_content_audio(participant_name, +# [START dialogflow_analyze_content_audio] +def analyze_content_audio_stream(participant_name, sample_rate_herz, stream, timeout, language_code, single_utterance=False): - """Stream audio to Dialogflow and receive transcripts and suggestions. + """Stream audio streams to Dialogflow and receive transcripts and suggestions. Args: participant_name: resource name of the participant. @@ -224,4 +224,4 @@ def gen_requests(participant_name, audio_config, stream): timeout=timeout) -# [END dialogflow_streaming_analyze_content_audio] +# [END dialogflow_analyze_content_audio] diff --git a/dialogflow/streaming_transcription.py b/dialogflow/streaming_transcription.py index a4730d5d836..956170d03ee 100644 --- a/dialogflow/streaming_transcription.py +++ b/dialogflow/streaming_transcription.py @@ -189,7 +189,7 @@ def main(): stream.restart_counter)) stream.restart_counter += 1 # Send request to streaming and get response. - responses = participant_management.streaming_analyze_content_audio( + responses = participant_management.analyze_content_audio_stream( participant_name=end_user.name, sample_rate_herz=SAMPLE_RATE, stream=stream, From 3c06630d7b33e2bda8a19ce629d10b943952f1bd Mon Sep 17 00:00:00 2001 From: ruogu Date: Fri, 12 May 2023 15:34:32 -0700 Subject: [PATCH 08/14] Add test for analyze_content_audio_stream sample code. --- dialogflow/analyze_content_stream_test.py | 33 ++++++++++++++++-- dialogflow/participant_management.py | 42 +++++++++++++---------- dialogflow/streaming_transcription.py | 4 ++- 3 files changed, 57 insertions(+), 22 deletions(-) diff --git a/dialogflow/analyze_content_stream_test.py b/dialogflow/analyze_content_stream_test.py index b53f07e3885..c7e47e5d4c7 100644 --- a/dialogflow/analyze_content_stream_test.py +++ b/dialogflow/analyze_content_stream_test.py @@ -73,8 +73,8 @@ def participant_id(conversation_id): yield participant_id -# Test live transcription with streaming_analyze_content. -def test_analyze_content_audio_stream(capsys, conversation_id, participant_id): +# Test live transcription of an audio file with streaming_analyze_content. +def test_analyze_content_audio(capsys, conversation_id, participant_id): # Call StreamingAnalyzeContent to transcribe the audio. participant_management.analyze_content_audio( conversation_id=conversation_id, @@ -83,3 +83,32 @@ def test_analyze_content_audio_stream(capsys, conversation_id, participant_id): ) out, _ = capsys.readouterr() assert "book a room" in out.lower() + + +# Test live transcription of an audio stream with streaming_analyze_content. +def test_analyze_content_audio_stream(capsys, conversation_id, participant_id): + class stream_generator(): + def __init__(self, audio_file_path): + self.audio_file_path = audio_file_path + + def generator(self): + with open(self.audio_file_path, "rb") as audio_file: + while True: + chunk = audio_file.read(4096) + if not chunk: + break + # The later requests contains audio data. + yield chunk + # Call StreamingAnalyzeContent to transcribe the audio. + responses = participant_management.analyze_content_audio_stream( + conversation_id=conversation_id, + participant_id=participant_id, + sample_rate_herz=16000, + stream=stream_generator(AUDIO_FILE_PATH), + language_code="en-US", + timeout=300 + ) + for response in responses: + print(response) + out, _ = capsys.readouterr() + assert "book a room" in out.lower() diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index af637dfddca..5669fbfd64a 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -178,28 +178,35 @@ def request_generator(audio_config, audio_file_path): # [END dialogflow_analyze_content_audio] -# [START dialogflow_analyze_content_audio] -def analyze_content_audio_stream(participant_name, - sample_rate_herz, - stream, - timeout, - language_code, - single_utterance=False): - """Stream audio streams to Dialogflow and receive transcripts and suggestions. +# [START dialogflow_analyze_content_audio_stream] +def analyze_content_audio_stream(conversation_id, + participant_id, + sample_rate_herz, + stream, + timeout, + language_code, + single_utterance=False): + """Stream audio streams to Dialogflow and receive transcripts and + suggestions. Args: - participant_name: resource name of the participant. + conversation_id: Id of the conversation. + participant_id: Id of the participant. sample_rate_herz: herz rate of the sample. stream: the stream to process. timeout: the timeout of one stream. language_code: the language code of the audio single_utterance: whether to use single_utterance. """ - from google.cloud import dialogflow_v2beta1 as dialogflow_beta - client = dialogflow_beta.ParticipantsClient() + credentials, project_id = google.auth.default() + client = dialogflow.ParticipantsClient(credentials=credentials) - audio_config = dialogflow_beta.types.audio_config.InputAudioConfig( - audio_encoding=dialogflow_beta.types.audio_config.AudioEncoding. + participant_name = client.participant_path( + project_id, conversation_id, participant_id + ) + + audio_config = dialogflow.types.audio_config.InputAudioConfig( + audio_encoding=dialogflow.types.audio_config.AudioEncoding. AUDIO_ENCODING_LINEAR_16, sample_rate_hertz=sample_rate_herz, language_code=language_code, @@ -209,14 +216,11 @@ def gen_requests(participant_name, audio_config, stream): """Generates requests for streaming. """ audio_generator = stream.generator() - print("Yield config to streaming analyze content.") - yield dialogflow_beta.types.participant.StreamingAnalyzeContentRequest( + yield dialogflow.types.participant.StreamingAnalyzeContentRequest( participant=participant_name, audio_config=audio_config) - print("Yield audios to streaming analyze content.") for content in audio_generator: - # print('Yield audio to streaming analyze content') - yield dialogflow_beta.types.participant.StreamingAnalyzeContentRequest( + yield dialogflow.types.participant.StreamingAnalyzeContentRequest( input_audio=content) return client.streaming_analyze_content(gen_requests( @@ -224,4 +228,4 @@ def gen_requests(participant_name, audio_config, stream): timeout=timeout) -# [END dialogflow_analyze_content_audio] +# [END dialogflow_analyze_content_audio_stream] diff --git a/dialogflow/streaming_transcription.py b/dialogflow/streaming_transcription.py index 956170d03ee..0f703090fbd 100644 --- a/dialogflow/streaming_transcription.py +++ b/dialogflow/streaming_transcription.py @@ -172,6 +172,7 @@ def main(): # Create end user participant. end_user = participant_management.create_participant( project_id=PROJECT_ID, conversation_id=conversation_id, role='END_USER') + participant_id = end_user.name.split("participants/")[1].rstrip() mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE) print(mic_manager.chunk_size) @@ -190,7 +191,8 @@ def main(): stream.restart_counter += 1 # Send request to streaming and get response. responses = participant_management.analyze_content_audio_stream( - participant_name=end_user.name, + conversation_id=conversation_id, + participant_id=participant_id, sample_rate_herz=SAMPLE_RATE, stream=stream, timeout=RESTART_TIMEOUT, From aff78aef5bd53221924881030c69d2375186aa49 Mon Sep 17 00:00:00 2001 From: rogers140 Date: Mon, 26 Jun 2023 10:39:12 -0700 Subject: [PATCH 09/14] Update participant_management.py Fix the issue introduced by merging --- dialogflow/participant_management.py | 44 ++++++++++++++-------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index fe06ad0f139..61965ec10f1 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2021 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -68,60 +68,59 @@ def analyze_content_text(project_id, conversation_id, participant_id, text): participant=participant_path, text_input=text_input ) print("AnalyzeContent Response:") - print(f"Reply Text: {response.reply_text}") - print(f"Response: {response.human_agent_suggestion_results}") - # assert response.human_agent_suggestion_results=="test" + print("Reply Text: {}".format(response.reply_text)) + for suggestion_result in response.human_agent_suggestion_results: if suggestion_result.error is not None: - print(f"Error: {suggestion_result.error.message}") + print("Error: {}".format(suggestion_result.error.message)) if suggestion_result.suggest_articles_response: for answer in suggestion_result.suggest_articles_response.article_answers: - print(f"Article Suggestion Answer: {answer.title}") - print(f"Answer Record: {answer.answer_record}") + print("Article Suggestion Answer: {}".format(answer.title)) + print("Answer Record: {}".format(answer.answer_record)) if suggestion_result.suggest_faq_answers_response: for answer in suggestion_result.suggest_faq_answers_response.faq_answers: - print(f"Faq Answer: {answer.answer}") - print(f"Answer Record: {answer.answer_record}") + print("Faq Answer: {}".format(answer.answer)) + print("Answer Record: {}".format(answer.answer_record)) if suggestion_result.suggest_smart_replies_response: for ( answer ) in suggestion_result.suggest_smart_replies_response.smart_reply_answers: - print(f"Smart Reply: {answer.reply}") - print(f"Answer Record: {answer.answer_record}") + print("Smart Reply: {}".format(answer.reply)) + print("Answer Record: {}".format(answer.answer_record)) for suggestion_result in response.end_user_suggestion_results: if suggestion_result.error: print("Error: {}".format(suggestion_result.error.message)) if suggestion_result.suggest_articles_response: for answer in suggestion_result.suggest_articles_response.article_answers: - print(f"Article Suggestion Answer: {answer.title}") - print(f"Answer Record: {answer.answer_record}") + print("Article Suggestion Answer: {}".format(answer.title)) + print("Answer Record: {}".format(answer.answer_record)) if suggestion_result.suggest_faq_answers_response: for answer in suggestion_result.suggest_faq_answers_response.faq_answers: - print(f"Faq Answer: {answer.answer}") - print(f"Answer Record: {answer.answer_record}") + print("Faq Answer: {}".format(answer.answer)) + print("Answer Record: {}".format(answer.answer_record)) if suggestion_result.suggest_smart_replies_response: for ( answer ) in suggestion_result.suggest_smart_replies_response.smart_reply_answers: - print(f"Smart Reply: {answer.reply}") - print(f"Answer Record: {answer.answer_record}") + print("Smart Reply: {}".format(answer.reply)) + print("Answer Record: {}".format(answer.answer_record)) return response # [END dialogflow_analyze_content_text] - -# [START dialogflow_analyze_content_audio_stream] -def analyze_content_audio_stream(conversation_id, participant_id, audio_file_path): +# [START dialogflow_analyze_content_audio] +def analyze_content_audio( + conversation_id, participant_id, audio_file_path +): """Analyze audio content for END_USER with audio files. Args: conversation_id: Id of the conversation. participant_id: Id of the participant. - audio_file_path: audio file in wav/mp3 format contains utterances of END_USER. - """ + audio_file_path: audio file in wav/mp3 format contains utterances of END_USER.""" # Initialize client that will be used to send requests across threads. This # client only needs to be created once, and can be reused for multiple requests. @@ -141,6 +140,7 @@ def analyze_content_audio_stream(conversation_id, participant_id, audio_file_pat # Generates requests based on the audio files. Will by default use the first channel as # END_USER, and second channel as HUMAN_AGENT. def request_generator(audio_config, audio_file_path): + # The first request contains the configuration. yield dialogflow.StreamingAnalyzeContentRequest( participant=participant_path, audio_config=audio_config From e5e050a71243300995caa12216bd20fe099cc2ed Mon Sep 17 00:00:00 2001 From: "Leah E. Cole" <6719667+leahecole@users.noreply.github.com> Date: Tue, 27 Jun 2023 10:25:12 -0400 Subject: [PATCH 10/14] Apply suggestions from code review --- dialogflow/participant_management.py | 2 +- dialogflow/requirements.txt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 61965ec10f1..5669fbfd64a 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2023 Google LLC +# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/dialogflow/requirements.txt b/dialogflow/requirements.txt index 098fa20ea04..5dd7dc25972 100644 --- a/dialogflow/requirements.txt +++ b/dialogflow/requirements.txt @@ -1,5 +1,5 @@ google-cloud-dialogflow==2.22.0 Flask==2.2.2 -pyaudio -termcolor +pyaudio==0.2.13 +termcolor==2.3.0 functions-framework==3.3.0 From 2e9622c3f9d188eb590af6e15c7ffb128709b17f Mon Sep 17 00:00:00 2001 From: ruogu Date: Tue, 27 Jun 2023 09:35:24 -0700 Subject: [PATCH 11/14] Fix string formatt style --- dialogflow/participant_management.py | 81 ++++++++++++++------------- dialogflow/streaming_transcription.py | 60 ++++++++++---------- 2 files changed, 72 insertions(+), 69 deletions(-) diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 5669fbfd64a..6f692c06295 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -40,8 +40,8 @@ def create_participant(project_id, conversation_id, role): parent=conversation_path, participant={"role": role}, timeout=600 ) print("Participant Created.") - print("Role: {}".format(response.role)) - print("Name: {}".format(response.name)) + print(f"Role: {response.role}") + print(f"Name: {response.name}") return response @@ -68,59 +68,59 @@ def analyze_content_text(project_id, conversation_id, participant_id, text): participant=participant_path, text_input=text_input ) print("AnalyzeContent Response:") - print("Reply Text: {}".format(response.reply_text)) + print(f"Reply Text: {response.reply_text}") for suggestion_result in response.human_agent_suggestion_results: if suggestion_result.error is not None: - print("Error: {}".format(suggestion_result.error.message)) + print(f"Error: {suggestion_result.error.message}") if suggestion_result.suggest_articles_response: for answer in suggestion_result.suggest_articles_response.article_answers: - print("Article Suggestion Answer: {}".format(answer.title)) - print("Answer Record: {}".format(answer.answer_record)) + print(f"Article Suggestion Answer: {answer.title}") + print(f"Answer Record: {answer.answer_record}") if suggestion_result.suggest_faq_answers_response: for answer in suggestion_result.suggest_faq_answers_response.faq_answers: - print("Faq Answer: {}".format(answer.answer)) - print("Answer Record: {}".format(answer.answer_record)) + print(f"Faq Answer: {answer.answer}") + print(f"Answer Record: {answer.answer_record}") if suggestion_result.suggest_smart_replies_response: for ( answer ) in suggestion_result.suggest_smart_replies_response.smart_reply_answers: - print("Smart Reply: {}".format(answer.reply)) - print("Answer Record: {}".format(answer.answer_record)) + print(f"Smart Reply: {answer.reply}") + print(f"Answer Record: {answer.answer_record}") for suggestion_result in response.end_user_suggestion_results: if suggestion_result.error: - print("Error: {}".format(suggestion_result.error.message)) + print(f"Error: {suggestion_result.error.message}") if suggestion_result.suggest_articles_response: for answer in suggestion_result.suggest_articles_response.article_answers: - print("Article Suggestion Answer: {}".format(answer.title)) - print("Answer Record: {}".format(answer.answer_record)) + print(f"Article Suggestion Answer: {answer.title}") + print(f"Answer Record: {answer.answer_record}") if suggestion_result.suggest_faq_answers_response: for answer in suggestion_result.suggest_faq_answers_response.faq_answers: - print("Faq Answer: {}".format(answer.answer)) - print("Answer Record: {}".format(answer.answer_record)) + print(f"Faq Answer: {answer.answer}") + print(f"Answer Record: {answer.answer_record}") if suggestion_result.suggest_smart_replies_response: for ( answer ) in suggestion_result.suggest_smart_replies_response.smart_reply_answers: - print("Smart Reply: {}".format(answer.reply)) - print("Answer Record: {}".format(answer.answer_record)) + print(f"Smart Reply: {answer.reply}") + print(f"Answer Record: {answer.answer_record}") return response # [END dialogflow_analyze_content_text] + # [START dialogflow_analyze_content_audio] -def analyze_content_audio( - conversation_id, participant_id, audio_file_path -): +def analyze_content_audio(conversation_id, participant_id, audio_file_path): """Analyze audio content for END_USER with audio files. Args: conversation_id: Id of the conversation. participant_id: Id of the participant. - audio_file_path: audio file in wav/mp3 format contains utterances of END_USER.""" + audio_file_path: audio file in wav/mp3 format contains utterances of END_USER. + """ # Initialize client that will be used to send requests across threads. This # client only needs to be created once, and can be reused for multiple requests. @@ -140,7 +140,6 @@ def analyze_content_audio( # Generates requests based on the audio files. Will by default use the first channel as # END_USER, and second channel as HUMAN_AGENT. def request_generator(audio_config, audio_file_path): - # The first request contains the configuration. yield dialogflow.StreamingAnalyzeContentRequest( participant=participant_path, audio_config=audio_config @@ -179,13 +178,15 @@ def request_generator(audio_config, audio_file_path): # [START dialogflow_analyze_content_audio_stream] -def analyze_content_audio_stream(conversation_id, - participant_id, - sample_rate_herz, - stream, - timeout, - language_code, - single_utterance=False): +def analyze_content_audio_stream( + conversation_id, + participant_id, + sample_rate_herz, + stream, + timeout, + language_code, + single_utterance=False, +): """Stream audio streams to Dialogflow and receive transcripts and suggestions. @@ -206,26 +207,26 @@ def analyze_content_audio_stream(conversation_id, ) audio_config = dialogflow.types.audio_config.InputAudioConfig( - audio_encoding=dialogflow.types.audio_config.AudioEncoding. - AUDIO_ENCODING_LINEAR_16, + audio_encoding=dialogflow.types.audio_config.AudioEncoding.AUDIO_ENCODING_LINEAR_16, sample_rate_hertz=sample_rate_herz, language_code=language_code, - single_utterance=single_utterance) + single_utterance=single_utterance, + ) def gen_requests(participant_name, audio_config, stream): - """Generates requests for streaming. - """ + """Generates requests for streaming.""" audio_generator = stream.generator() yield dialogflow.types.participant.StreamingAnalyzeContentRequest( - participant=participant_name, - audio_config=audio_config) + participant=participant_name, audio_config=audio_config + ) for content in audio_generator: yield dialogflow.types.participant.StreamingAnalyzeContentRequest( - input_audio=content) + input_audio=content + ) - return client.streaming_analyze_content(gen_requests( - participant_name, audio_config, stream), - timeout=timeout) + return client.streaming_analyze_content( + gen_requests(participant_name, audio_config, stream), timeout=timeout + ) # [END dialogflow_analyze_content_audio_stream] diff --git a/dialogflow/streaming_transcription.py b/dialogflow/streaming_transcription.py index 0f703090fbd..6395a30b3a8 100644 --- a/dialogflow/streaming_transcription.py +++ b/dialogflow/streaming_transcription.py @@ -43,8 +43,8 @@ import conversation_management import participant_management -PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT') -CONVERSATION_PROFILE_ID = os.getenv('CONVERSATION_PROFILE') +PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") +CONVERSATION_PROFILE_ID = os.getenv("CONVERSATION_PROFILE") # Audio recording parameters SAMPLE_RATE = 16000 @@ -52,7 +52,7 @@ RESTART_TIMEOUT = 160 # seconds MAX_LOOKBACK = 3 # seconds -YELLOW = '\033[0;33m' +YELLOW = "\033[0;33m" class ResumableMicrophoneStream: @@ -88,12 +88,10 @@ def __init__(self, rate, chunk_size): ) def __enter__(self): - self.closed = False return self def __exit__(self, type, value, traceback): - self._audio_stream.stop_stream() self._audio_stream.close() self.closed = True @@ -117,20 +115,21 @@ def generator(self): # Flip the bit of is_final so it can continue stream. self.is_final = False total_processed_time = self.last_start_time + self.is_final_offset - processed_bytes_length = int( - total_processed_time * SAMPLE_RATE * 16 / 8) / 1000 + processed_bytes_length = ( + int(total_processed_time * SAMPLE_RATE * 16 / 8) / 1000 + ) self.last_start_time = total_processed_time # Send out bytes stored in self.audio_input_chunks that is after the # processed_bytes_length. - if (processed_bytes_length != 0): - audio_bytes = b''.join(self.audio_input_chunks) + if processed_bytes_length != 0: + audio_bytes = b"".join(self.audio_input_chunks) # Lookback for unprocessed audio data. need_to_process_length = min( int(len(audio_bytes) - processed_bytes_length), - int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8)) + int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8), + ) # Note that you need to explicitly use `int` type for substring. - need_to_process_bytes = audio_bytes[(-1) - * need_to_process_length:] + need_to_process_bytes = audio_bytes[(-1) * need_to_process_length :] yield need_to_process_bytes while not self.closed and not self.is_final: @@ -156,7 +155,7 @@ def generator(self): break self.audio_input_chunks.extend(data) if data: - yield b''.join(data) + yield b"".join(data) finally: print("Stop generator") @@ -165,29 +164,30 @@ def main(): """start bidirectional streaming from microphone input to Dialogflow API""" # Create conversation. conversation = conversation_management.create_conversation( - project_id=PROJECT_ID, conversation_profile_id=CONVERSATION_PROFILE_ID) + project_id=PROJECT_ID, conversation_profile_id=CONVERSATION_PROFILE_ID + ) - conversation_id = conversation.name.split('conversations/')[1].rstrip() + conversation_id = conversation.name.split("conversations/")[1].rstrip() # Create end user participant. end_user = participant_management.create_participant( - project_id=PROJECT_ID, conversation_id=conversation_id, role='END_USER') + project_id=PROJECT_ID, conversation_id=conversation_id, role="END_USER" + ) participant_id = end_user.name.split("participants/")[1].rstrip() mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE) print(mic_manager.chunk_size) sys.stdout.write(YELLOW) sys.stdout.write('\nListening, say "Quit" or "Exit" to stop.\n\n') - sys.stdout.write('End (ms) Transcript Results/Status\n') - sys.stdout.write('=====================================================\n') + sys.stdout.write("End (ms) Transcript Results/Status\n") + sys.stdout.write("=====================================================\n") with mic_manager as stream: while not stream.closed: terminate = False while not terminate: try: - print("New Streaming Analyze Request: {}".format( - stream.restart_counter)) + print(f"New Streaming Analyze Request: {stream.restart_counter}") stream.restart_counter += 1 # Send request to streaming and get response. responses = participant_management.analyze_content_audio_stream( @@ -196,8 +196,9 @@ def main(): sample_rate_herz=SAMPLE_RATE, stream=stream, timeout=RESTART_TIMEOUT, - language_code='en-US', - single_utterance=False) + language_code="en-US", + single_utterance=False, + ) # Now, print the final transcription responses to user. for response in responses: @@ -209,27 +210,28 @@ def main(): # to the beginning of audio stream. offset = response.recognition_result.speech_end_offset stream.is_final_offset = int( - offset.seconds * 1000 + offset.microseconds / 1000) + offset.seconds * 1000 + offset.microseconds / 1000 + ) transcript = response.recognition_result.transcript # Half-close the stream with gRPC (in Python just stop yielding requests) stream.is_final = True # Exit recognition if any of the transcribed phrase could be # one of our keywords. - if re.search(r'\b(exit|quit)\b', transcript, re.I): + if re.search(r"\b(exit|quit)\b", transcript, re.I): sys.stdout.write(YELLOW) - sys.stdout.write('Exiting...\n') + sys.stdout.write("Exiting...\n") terminate = True stream.closed = True break except DeadlineExceeded: - print('Deadline Exceeded, restarting.') + print("Deadline Exceeded, restarting.") if terminate: conversation_management.complete_conversation( - project_id=PROJECT_ID, conversation_id=conversation_id) + project_id=PROJECT_ID, conversation_id=conversation_id + ) break -if __name__ == '__main__': - +if __name__ == "__main__": main() From 3353a5716d45d8b4353a4f052c3551ddd58ad28b Mon Sep 17 00:00:00 2001 From: ruogu Date: Wed, 28 Jun 2023 10:40:51 -0700 Subject: [PATCH 12/14] Check response instead of printed text --- dialogflow/analyze_content_stream_test.py | 19 +++++++++---------- dialogflow/participant_management.py | 11 +++++++---- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/dialogflow/analyze_content_stream_test.py b/dialogflow/analyze_content_stream_test.py index c7e47e5d4c7..da0f803d7d7 100644 --- a/dialogflow/analyze_content_stream_test.py +++ b/dialogflow/analyze_content_stream_test.py @@ -76,18 +76,18 @@ def participant_id(conversation_id): # Test live transcription of an audio file with streaming_analyze_content. def test_analyze_content_audio(capsys, conversation_id, participant_id): # Call StreamingAnalyzeContent to transcribe the audio. - participant_management.analyze_content_audio( + results = participant_management.analyze_content_audio( conversation_id=conversation_id, participant_id=participant_id, audio_file_path=AUDIO_FILE_PATH, ) - out, _ = capsys.readouterr() - assert "book a room" in out.lower() + out = " ".join([result.message.content for result in results]).lower() + assert "book a room" in out # Test live transcription of an audio stream with streaming_analyze_content. def test_analyze_content_audio_stream(capsys, conversation_id, participant_id): - class stream_generator(): + class stream_generator: def __init__(self, audio_file_path): self.audio_file_path = audio_file_path @@ -99,16 +99,15 @@ def generator(self): break # The later requests contains audio data. yield chunk + # Call StreamingAnalyzeContent to transcribe the audio. - responses = participant_management.analyze_content_audio_stream( + results = participant_management.analyze_content_audio_stream( conversation_id=conversation_id, participant_id=participant_id, sample_rate_herz=16000, stream=stream_generator(AUDIO_FILE_PATH), language_code="en-US", - timeout=300 + timeout=300, ) - for response in responses: - print(response) - out, _ = capsys.readouterr() - assert "book a room" in out.lower() + out = " ".join([result.message.content for result in results]).lower() + assert "book a room" in out diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 6f692c06295..6b0ec81eba3 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -167,11 +167,13 @@ def request_generator(audio_config, audio_file_path): ) requests = request_generator(audio_config, audio_file_path) responses = client.streaming_analyze_content(requests=requests) + results = [response for response in responses] print("=" * 20) - for response in responses: - print(f'Transcript: "{response.message.content}".') + for result in results: + print(f'Transcript: "{result.message.content}".') print("=" * 20) + return results # [END dialogflow_analyze_content_audio] @@ -194,9 +196,10 @@ def analyze_content_audio_stream( conversation_id: Id of the conversation. participant_id: Id of the participant. sample_rate_herz: herz rate of the sample. - stream: the stream to process. + stream: the stream to process. It should have generator() method to + yield input_audio. timeout: the timeout of one stream. - language_code: the language code of the audio + language_code: the language code of the audio. Example: en-US single_utterance: whether to use single_utterance. """ credentials, project_id = google.auth.default() From 3a976be129597e5a3c12a53f9841e2e133764c66 Mon Sep 17 00:00:00 2001 From: ruogu Date: Wed, 28 Jun 2023 10:48:18 -0700 Subject: [PATCH 13/14] add param type suggestion --- dialogflow/participant_management.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 6b0ec81eba3..8a344b9d937 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -181,12 +181,12 @@ def request_generator(audio_config, audio_file_path): # [START dialogflow_analyze_content_audio_stream] def analyze_content_audio_stream( - conversation_id, - participant_id, - sample_rate_herz, + conversation_id: str, + participant_id: str, + sample_rate_herz: int, stream, - timeout, - language_code, + timeout: int, + language_code: str, single_utterance=False, ): """Stream audio streams to Dialogflow and receive transcripts and From 89e9808b452231d0fbb696a246f9f2bc034eb005 Mon Sep 17 00:00:00 2001 From: ruogu Date: Wed, 28 Jun 2023 10:54:54 -0700 Subject: [PATCH 14/14] add param type suggestion for other methods in participant_management --- dialogflow/participant_management.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 8a344b9d937..57801b072e8 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -23,7 +23,7 @@ # [START dialogflow_create_participant] -def create_participant(project_id, conversation_id, role): +def create_participant(project_id: str, conversation_id: str, role: str): """Creates a participant in a given conversation. Args: @@ -50,7 +50,9 @@ def create_participant(project_id, conversation_id, role): # [START dialogflow_analyze_content_text] -def analyze_content_text(project_id, conversation_id, participant_id, text): +def analyze_content_text( + project_id: str, conversation_id: str, participant_id: str, text: str +): """Analyze text message content from a participant. Args: @@ -113,7 +115,9 @@ def analyze_content_text(project_id, conversation_id, participant_id, text): # [START dialogflow_analyze_content_audio] -def analyze_content_audio(conversation_id, participant_id, audio_file_path): +def analyze_content_audio( + conversation_id: str, participant_id: str, audio_file_path: str +): """Analyze audio content for END_USER with audio files. Args: