diff --git a/dialogflow/analyze_content_stream_test.py b/dialogflow/analyze_content_stream_test.py index 67a99e4bb44..da0f803d7d7 100644 --- a/dialogflow/analyze_content_stream_test.py +++ b/dialogflow/analyze_content_stream_test.py @@ -73,13 +73,41 @@ def participant_id(conversation_id): yield participant_id -# Test live transcription with streaming_analyze_content. -def test_analyze_content_audio_stream(capsys, conversation_id, participant_id): +# Test live transcription of an audio file with streaming_analyze_content. +def test_analyze_content_audio(capsys, conversation_id, participant_id): # Call StreamingAnalyzeContent to transcribe the audio. - participant_management.analyze_content_audio_stream( + results = participant_management.analyze_content_audio( conversation_id=conversation_id, participant_id=participant_id, audio_file_path=AUDIO_FILE_PATH, ) - out, _ = capsys.readouterr() - assert "book a room" in out.lower() + out = " ".join([result.message.content for result in results]).lower() + assert "book a room" in out + + +# Test live transcription of an audio stream with streaming_analyze_content. +def test_analyze_content_audio_stream(capsys, conversation_id, participant_id): + class stream_generator: + def __init__(self, audio_file_path): + self.audio_file_path = audio_file_path + + def generator(self): + with open(self.audio_file_path, "rb") as audio_file: + while True: + chunk = audio_file.read(4096) + if not chunk: + break + # The later requests contains audio data. + yield chunk + + # Call StreamingAnalyzeContent to transcribe the audio. + results = participant_management.analyze_content_audio_stream( + conversation_id=conversation_id, + participant_id=participant_id, + sample_rate_herz=16000, + stream=stream_generator(AUDIO_FILE_PATH), + language_code="en-US", + timeout=300, + ) + out = " ".join([result.message.content for result in results]).lower() + assert "book a room" in out diff --git a/dialogflow/participant_management.py b/dialogflow/participant_management.py index 097dc67d259..57801b072e8 100644 --- a/dialogflow/participant_management.py +++ b/dialogflow/participant_management.py @@ -23,7 +23,7 @@ # [START dialogflow_create_participant] -def create_participant(project_id, conversation_id, role): +def create_participant(project_id: str, conversation_id: str, role: str): """Creates a participant in a given conversation. Args: @@ -40,8 +40,8 @@ def create_participant(project_id, conversation_id, role): parent=conversation_path, participant={"role": role}, timeout=600 ) print("Participant Created.") - print("Role: {}".format(response.role)) - print("Name: {}".format(response.name)) + print(f"Role: {response.role}") + print(f"Name: {response.name}") return response @@ -50,7 +50,9 @@ def create_participant(project_id, conversation_id, role): # [START dialogflow_analyze_content_text] -def analyze_content_text(project_id, conversation_id, participant_id, text): +def analyze_content_text( + project_id: str, conversation_id: str, participant_id: str, text: str +): """Analyze text message content from a participant. Args: @@ -69,8 +71,7 @@ def analyze_content_text(project_id, conversation_id, participant_id, text): ) print("AnalyzeContent Response:") print(f"Reply Text: {response.reply_text}") - print(f"Response: {response.human_agent_suggestion_results}") - # assert response.human_agent_suggestion_results=="test" + for suggestion_result in response.human_agent_suggestion_results: if suggestion_result.error is not None: print(f"Error: {suggestion_result.error.message}") @@ -91,7 +92,7 @@ def analyze_content_text(project_id, conversation_id, participant_id, text): for suggestion_result in response.end_user_suggestion_results: if suggestion_result.error: - print("Error: {}".format(suggestion_result.error.message)) + print(f"Error: {suggestion_result.error.message}") if suggestion_result.suggest_articles_response: for answer in suggestion_result.suggest_articles_response.article_answers: print(f"Article Suggestion Answer: {answer.title}") @@ -113,9 +114,11 @@ def analyze_content_text(project_id, conversation_id, participant_id, text): # [END dialogflow_analyze_content_text] -# [START dialogflow_analyze_content_audio_stream] -def analyze_content_audio_stream(conversation_id, participant_id, audio_file_path): - """Analyze audio content for END_USER +# [START dialogflow_analyze_content_audio] +def analyze_content_audio( + conversation_id: str, participant_id: str, audio_file_path: str +): + """Analyze audio content for END_USER with audio files. Args: conversation_id: Id of the conversation. @@ -168,11 +171,69 @@ def request_generator(audio_config, audio_file_path): ) requests = request_generator(audio_config, audio_file_path) responses = client.streaming_analyze_content(requests=requests) + results = [response for response in responses] print("=" * 20) - for response in responses: - print(f'Transcript: "{response.message.content}".') + for result in results: + print(f'Transcript: "{result.message.content}".') print("=" * 20) + return results + + +# [END dialogflow_analyze_content_audio] + + +# [START dialogflow_analyze_content_audio_stream] +def analyze_content_audio_stream( + conversation_id: str, + participant_id: str, + sample_rate_herz: int, + stream, + timeout: int, + language_code: str, + single_utterance=False, +): + """Stream audio streams to Dialogflow and receive transcripts and + suggestions. + + Args: + conversation_id: Id of the conversation. + participant_id: Id of the participant. + sample_rate_herz: herz rate of the sample. + stream: the stream to process. It should have generator() method to + yield input_audio. + timeout: the timeout of one stream. + language_code: the language code of the audio. Example: en-US + single_utterance: whether to use single_utterance. + """ + credentials, project_id = google.auth.default() + client = dialogflow.ParticipantsClient(credentials=credentials) + + participant_name = client.participant_path( + project_id, conversation_id, participant_id + ) + + audio_config = dialogflow.types.audio_config.InputAudioConfig( + audio_encoding=dialogflow.types.audio_config.AudioEncoding.AUDIO_ENCODING_LINEAR_16, + sample_rate_hertz=sample_rate_herz, + language_code=language_code, + single_utterance=single_utterance, + ) + + def gen_requests(participant_name, audio_config, stream): + """Generates requests for streaming.""" + audio_generator = stream.generator() + yield dialogflow.types.participant.StreamingAnalyzeContentRequest( + participant=participant_name, audio_config=audio_config + ) + for content in audio_generator: + yield dialogflow.types.participant.StreamingAnalyzeContentRequest( + input_audio=content + ) + + return client.streaming_analyze_content( + gen_requests(participant_name, audio_config, stream), timeout=timeout + ) # [END dialogflow_analyze_content_audio_stream] diff --git a/dialogflow/requirements.txt b/dialogflow/requirements.txt index 7df35635c1a..5dd7dc25972 100644 --- a/dialogflow/requirements.txt +++ b/dialogflow/requirements.txt @@ -1,3 +1,5 @@ google-cloud-dialogflow==2.22.0 Flask==2.2.2 -functions-framework==3.3.0 \ No newline at end of file +pyaudio==0.2.13 +termcolor==2.3.0 +functions-framework==3.3.0 diff --git a/dialogflow/streaming_transcription.py b/dialogflow/streaming_transcription.py new file mode 100644 index 00000000000..6395a30b3a8 --- /dev/null +++ b/dialogflow/streaming_transcription.py @@ -0,0 +1,237 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google Cloud Dialogflow API sample code using the StreamingAnalyzeContent +API. + +Also please contact Google to get credentials of this project and set up the +credential file json locations by running: +export GOOGLE_APPLICATION_CREDENTIALS= + +Example usage: + export GOOGLE_CLOUD_PROJECT='cloud-contact-center-ext-demo' + export CONVERSATION_PROFILE='FnuBYO8eTBWM8ep1i-eOng' + export GOOGLE_APPLICATION_CREDENTIALS='/Users/ruogu/Desktop/keys/cloud-contact-center-ext-demo-78798f9f9254.json' + python streaming_transcription.py + +Then started to talk in English, you should see transcription shows up as you speak. + +Say "Quit" or "Exit" to stop. +""" + +import os +import re +import sys + +from google.api_core.exceptions import DeadlineExceeded + +import pyaudio + +from six.moves import queue + +import conversation_management +import participant_management + +PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") +CONVERSATION_PROFILE_ID = os.getenv("CONVERSATION_PROFILE") + +# Audio recording parameters +SAMPLE_RATE = 16000 +CHUNK_SIZE = int(SAMPLE_RATE / 10) # 100ms +RESTART_TIMEOUT = 160 # seconds +MAX_LOOKBACK = 3 # seconds + +YELLOW = "\033[0;33m" + + +class ResumableMicrophoneStream: + """Opens a recording stream as a generator yielding the audio chunks.""" + + def __init__(self, rate, chunk_size): + self._rate = rate + self.chunk_size = chunk_size + self._num_channels = 1 + self._buff = queue.Queue() + self.is_final = False + self.closed = True + # Count the number of times the stream analyze content restarts. + self.restart_counter = 0 + self.last_start_time = 0 + # Time end of the last is_final in millisec since last_start_time. + self.is_final_offset = 0 + # Save the audio chunks generated from the start of the audio stream for + # replay after restart. + self.audio_input_chunks = [] + self.new_stream = True + self._audio_interface = pyaudio.PyAudio() + self._audio_stream = self._audio_interface.open( + format=pyaudio.paInt16, + channels=self._num_channels, + rate=self._rate, + input=True, + frames_per_buffer=self.chunk_size, + # Run the audio stream asynchronously to fill the buffer object. + # This is necessary so that the input device's buffer doesn't + # overflow while the calling thread makes network requests, etc. + stream_callback=self._fill_buffer, + ) + + def __enter__(self): + self.closed = False + return self + + def __exit__(self, type, value, traceback): + self._audio_stream.stop_stream() + self._audio_stream.close() + self.closed = True + # Signal the generator to terminate so that the client's + # streaming_recognize method will not block the process termination. + self._buff.put(None) + self._audio_interface.terminate() + + def _fill_buffer(self, in_data, *args, **kwargs): + """Continuously collect data from the audio stream, into the buffer in + chunksize.""" + + self._buff.put(in_data) + return None, pyaudio.paContinue + + def generator(self): + """Stream Audio from microphone to API and to local buffer""" + try: + # Handle restart. + print("restart generator") + # Flip the bit of is_final so it can continue stream. + self.is_final = False + total_processed_time = self.last_start_time + self.is_final_offset + processed_bytes_length = ( + int(total_processed_time * SAMPLE_RATE * 16 / 8) / 1000 + ) + self.last_start_time = total_processed_time + # Send out bytes stored in self.audio_input_chunks that is after the + # processed_bytes_length. + if processed_bytes_length != 0: + audio_bytes = b"".join(self.audio_input_chunks) + # Lookback for unprocessed audio data. + need_to_process_length = min( + int(len(audio_bytes) - processed_bytes_length), + int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8), + ) + # Note that you need to explicitly use `int` type for substring. + need_to_process_bytes = audio_bytes[(-1) * need_to_process_length :] + yield need_to_process_bytes + + while not self.closed and not self.is_final: + data = [] + # Use a blocking get() to ensure there's at least one chunk of + # data, and stop iteration if the chunk is None, indicating the + # end of the audio stream. + chunk = self._buff.get() + + if chunk is None: + return + data.append(chunk) + # Now try to the rest of chunks if there are any left in the _buff. + while True: + try: + chunk = self._buff.get(block=False) + + if chunk is None: + return + data.append(chunk) + + except queue.Empty: + break + self.audio_input_chunks.extend(data) + if data: + yield b"".join(data) + finally: + print("Stop generator") + + +def main(): + """start bidirectional streaming from microphone input to Dialogflow API""" + # Create conversation. + conversation = conversation_management.create_conversation( + project_id=PROJECT_ID, conversation_profile_id=CONVERSATION_PROFILE_ID + ) + + conversation_id = conversation.name.split("conversations/")[1].rstrip() + + # Create end user participant. + end_user = participant_management.create_participant( + project_id=PROJECT_ID, conversation_id=conversation_id, role="END_USER" + ) + participant_id = end_user.name.split("participants/")[1].rstrip() + + mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE) + print(mic_manager.chunk_size) + sys.stdout.write(YELLOW) + sys.stdout.write('\nListening, say "Quit" or "Exit" to stop.\n\n') + sys.stdout.write("End (ms) Transcript Results/Status\n") + sys.stdout.write("=====================================================\n") + + with mic_manager as stream: + while not stream.closed: + terminate = False + while not terminate: + try: + print(f"New Streaming Analyze Request: {stream.restart_counter}") + stream.restart_counter += 1 + # Send request to streaming and get response. + responses = participant_management.analyze_content_audio_stream( + conversation_id=conversation_id, + participant_id=participant_id, + sample_rate_herz=SAMPLE_RATE, + stream=stream, + timeout=RESTART_TIMEOUT, + language_code="en-US", + single_utterance=False, + ) + + # Now, print the final transcription responses to user. + for response in responses: + if response.message: + print(response) + if response.recognition_result.is_final: + print(response) + # offset return from recognition_result is relative + # to the beginning of audio stream. + offset = response.recognition_result.speech_end_offset + stream.is_final_offset = int( + offset.seconds * 1000 + offset.microseconds / 1000 + ) + transcript = response.recognition_result.transcript + # Half-close the stream with gRPC (in Python just stop yielding requests) + stream.is_final = True + # Exit recognition if any of the transcribed phrase could be + # one of our keywords. + if re.search(r"\b(exit|quit)\b", transcript, re.I): + sys.stdout.write(YELLOW) + sys.stdout.write("Exiting...\n") + terminate = True + stream.closed = True + break + except DeadlineExceeded: + print("Deadline Exceeded, restarting.") + + if terminate: + conversation_management.complete_conversation( + project_id=PROJECT_ID, conversation_id=conversation_id + ) + break + + +if __name__ == "__main__": + main()