亚马逊转录Python API:事件处理程序仅在流结束后才处理音频

发布于 2025-02-13 02:58:41 字数 2416 浏览 3 评论 0原文

(没有在AWS重新发布中得到答案,所以在这里尝试)

我将通过WebSocket从Web浏览器发送流音频数据作为Blob。在后端,我使用Django Channels的AsyncWebsocketConsumer接收它,然后将其发送到Amazon转录并尝试使用成绩单接近实时地回复浏览器。

我能够发送单个斑点的转录,但无法以流方式发送。我看到事件处理程序仅在流结束时才会触发,这使我思考,也许我错误地使用了异步或转录API。

除了下面的代码外,我到目前为止尝试过的是:

为每个音频块创建一个单独的流。错误Amazon_Transcribe.Exceptions.InternalFailureException:发生了内部错误。被扔了。

  1. 使用的asyncio.gather函数 stream.input_stream.send_audio_event函数和 Handler.handle_events函数。处理程序没有被调用。
  2. 使用的asyncio.create_task(Handler.handle_events)创建一个 处理程序的非阻滞任务。处理程序没有被调用,并且 另外,它没有等待15秒。任务完成了 立即地。
stream_client = TranscribeStreamingClient(region="us-west-2")

class AWSTranscriptHandler(TranscriptResultStreamHandler):
    def __init__(self, transcript_result_stream):
        self.channel_layer = get_channel_layer()
        self.channel_name = "test"
        super().__init__(transcript_result_stream)

    async def handle_transcript_event(self, transcript_event: TranscriptEvent):
        results = transcript_event.transcript.results
        for result in results:
            if not (result.is_partial):
                for alt in result.alternatives:
                    await self.channel_layer.send(
                        self.channel_name,
                        {"type": "send_transcript", "message": alt.transcript},
                    )


class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()
        await self.send(
            text_data=json.dumps(
                {"type": "connection_established", "channel_name": self.channel_name}
            )
        )
        self.stream = await stream_client.start_stream_transcription(
            language_code="en-US",
            media_sample_rate_hz=48000,
            media_encoding="ogg-opus",
        )
        self.handler = AWSTranscriptHandler(self.stream.output_stream)

    async def disconnect(self, close_code):
        await self.stream.input_stream.end_stream()

    async def receive(self, text_data=None, bytes_data=None):
        if bytes_data:
            await self.stream.input_stream.send_audio_event(audio_chunk=bytes_data)
            await self.handler.handle_events()

    async def send_transcript(self, event):
        await self.send(
            text_data=json.dumps({"type": "transcript", "message": event["message"]})
        )

(Didn't get answer in AWS re-post, so trying here)

I am sending streaming audio data from web browser as a blob via websocket. In backend, I am using Django Channels' AsyncWebsocketConsumer to receive it, then send it to Amazon Transcribe and attempt to reply to the browser with the transcript in near real-time.

I am able to send the transcription for a single blob, but couldn't send it in a streaming manner. I see the event handler is triggered only if the stream ends, which led me to think, maybe I am incorrectly using either the asyncio or transcribe API.

What I have tried so far, apart from the code below:

Created a separate stream for each audio chunk. The error amazon_transcribe.exceptions.InternalFailureException: An internal error occurred. is thrown.

  1. Used asyncio.gather function to group
    stream.input_stream.send_audio_event function and
    handler.handle_events function. The handler is not invoked.
  2. Used asyncio.create_task(handler.handle_events) to create a
    non-blocking task for the handler. The handler is not invoked and
    also it didn't wait for 15 seconds. The task got completed
    immediately.
stream_client = TranscribeStreamingClient(region="us-west-2")

class AWSTranscriptHandler(TranscriptResultStreamHandler):
    def __init__(self, transcript_result_stream):
        self.channel_layer = get_channel_layer()
        self.channel_name = "test"
        super().__init__(transcript_result_stream)

    async def handle_transcript_event(self, transcript_event: TranscriptEvent):
        results = transcript_event.transcript.results
        for result in results:
            if not (result.is_partial):
                for alt in result.alternatives:
                    await self.channel_layer.send(
                        self.channel_name,
                        {"type": "send_transcript", "message": alt.transcript},
                    )


class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()
        await self.send(
            text_data=json.dumps(
                {"type": "connection_established", "channel_name": self.channel_name}
            )
        )
        self.stream = await stream_client.start_stream_transcription(
            language_code="en-US",
            media_sample_rate_hz=48000,
            media_encoding="ogg-opus",
        )
        self.handler = AWSTranscriptHandler(self.stream.output_stream)

    async def disconnect(self, close_code):
        await self.stream.input_stream.end_stream()

    async def receive(self, text_data=None, bytes_data=None):
        if bytes_data:
            await self.stream.input_stream.send_audio_event(audio_chunk=bytes_data)
            await self.handler.handle_events()

    async def send_transcript(self, event):
        await self.send(
            text_data=json.dumps({"type": "transcript", "message": event["message"]})
        )

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文