我应该在此处使用Java同步处理并发吗?

发布于 2025-01-22 13:46:56 字数 6768 浏览 2 评论 0 原文

我是处理并发的新手。我有一个发送和接收音频数据的Web插座应用程序。同时,我有一个来自Azure的EventListeners寻找语音来转录文本和其他一些线程,寻找文本以合成语音。但是,当我讲话时,响应将无限重复(?)。我不确定我是否需要在这里使用同步,在这种情况下我会通过什么作为参数?还是需要做的其他事情?以下是一些代码摘录。

Twiliomediastreamshandler类:

private final Map<WebSocketSession, AzureSpeechToTextService> sessions = new ConcurrentHashMap<>();
static final Map<WebSocketSession, Integer> messageCounts = new ConcurrentHashMap<>();
static final Map<WebSocketSession, Boolean> hasSessionSeenMedia = new ConcurrentHashMap<>();
static final Integer repeatThreshold = 10;
ArrayList<String> mediaMessages = new ArrayList<String>();

private final ObjectMapper jsonMapper = new ObjectMapper();
private final Base64.Decoder base64Decoder = Base64.getDecoder();

private static final Logger LOGGER = LoggerFactory.getLogger(TwilioMediaStreamsHandler.class);

@Override
public void afterConnectionEstablished(WebSocketSession session) {
    LOGGER.info("Connection Established");
    sessions.put(session, new AzureSpeechToTextService());
}

@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
    JsonNode request = jsonMapper.readTree(message.getPayload());
    Boolean hasSeenMedia = hasSessionSeenMedia.getOrDefault(session, false);

    if (request.path("media").path("track").asText().equals("inbound")) {

        if (!hasSeenMedia) {
            LOGGER.info("Media WS: Media message received: {}", message);
            LOGGER.warn("Media WS: Additional messages from WebSocket are now being suppressed");
            hasSessionSeenMedia.put(session, true);
        }

        mediaMessages.add(message.getPayload());
        if (mediaMessages.size() >= repeatThreshold) {
            repeat(session);
        }

        messageCounts.merge(session, 1, Integer::sum);
    }
}

public void repeat(WebSocketSession session) throws IOException {
    ArrayList<String> playMessages = (ArrayList<String>) mediaMessages.clone();
    mediaMessages.clear();

    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

    String streamSid = "";
    byte[] decoded = null;
    for (String playMessage : playMessages) {
        JsonNode request = jsonMapper.readTree(playMessage);
        streamSid = request.path("streamSid").asText();
        String base64EncodedAudio = request.path("media").path("payload").asText();

        decoded = base64Decoder.decode(base64EncodedAudio);
        sessions.get(session).pushData(decoded);
        decoded = sessions.get(session).getBytes();
    }

    String resultText = sessions.get(session).getResultText();

    try {
        decoded = new AzureTextToSpeechService().textToBytes(resultText);
    } catch (ExecutionException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    if (decoded != null)
        outputStream.write(decoded);

    ObjectMapper objectMapper = new ObjectMapper();
    byte[] encodedBytes = Base64.getEncoder().encode(outputStream.toByteArray());
    String response = objectMapper.writeValueAsString(new OutBoundMessage("media", new Media(new String(encodedBytes)), streamSid));

    session.sendMessage(new TextMessage(response));
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
    LOGGER.info("Connection Closed");
    }
}

AzuresPeechTotextService类:

private static final String SPEECH_SUBSCRIPTION_KEY = System.getenv("AZURE_SPEECH_SUBSCRIPTION_KEY");
private static final String SERVICE_REGION = System.getenv("AZURE_SERVICE_REGION");

private static final Logger LOGGER = LoggerFactory.getLogger(AzureSpeechToTextService.class);

private final PushAudioInputStream azurePusher;
private String resultText;

public AzureSpeechToTextService() {
    azurePusher = AudioInputStream.createPushStream(AudioStreamFormat.getWaveFormatPCM(8000L, (short) 16, (short) 1));

    SourceLanguageConfig sourceLanguageConfig = SourceLanguageConfig.fromLanguage("sv-SE");

    SpeechRecognizer speechRecognizer = new SpeechRecognizer(
            SpeechConfig.fromSubscription(SPEECH_SUBSCRIPTION_KEY, SERVICE_REGION),
            sourceLanguageConfig,
            AudioConfig.fromStreamInput(azurePusher));

    speechRecognizer.recognized.addEventListener((o, speechRecognitionEventArgs) -> {
        SpeechRecognitionResult speechRecognitionResult = speechRecognitionEventArgs.getResult();
        resultText = speechRecognitionResult.getText();
        LOGGER.info("Recognized text from speech: {}", resultText);
    });

    speechRecognizer.startContinuousRecognitionAsync();
}

    public String getResultText() {
        return this.resultText;
    }

    public void pushData(byte[] mulawData) {
        azurePusher.write(MulawToPcm.transcode(mulawData));
    }
}

azuretextTtospeechService类:

private static final String SPEECH_SUBSCRIPTION_KEY = System.getenv("AZURE_SPEECH_SUBSCRIPTION_KEY");
private static final String SERVICE_REGION = System.getenv("AZURE_SERVICE_REGION");

private static final Logger LOGGER = LoggerFactory.getLogger(AzureTextToSpeechService.class);

public byte[] textToBytes(String text) throws ExecutionException, InterruptedException {
    SpeechConfig speechConfig = SpeechConfig.fromSubscription(SPEECH_SUBSCRIPTION_KEY, SERVICE_REGION);
    speechConfig.setSpeechSynthesisOutputFormat(SpeechSynthesisOutputFormat.Raw8Khz8BitMonoMULaw);

    AutoDetectSourceLanguageConfig autoDetectSourceLanguageConfig = AutoDetectSourceLanguageConfig.fromOpenRange();
    SpeechSynthesizer synthesizer = new SpeechSynthesizer(speechConfig, autoDetectSourceLanguageConfig, null);

    SpeechSynthesisResult result = synthesizer.SpeakTextAsync(text).get();

    byte[] audioData = null;
    if (result.getReason() == ResultReason.SynthesizingAudioCompleted) {
        LOGGER.info("Speech synthesized for: {}", text);
        audioData = result.getAudioData();
        LOGGER.info("{} bytes recieved", audioData.length);
    } else if (result.getReason() == ResultReason.Canceled) {
        SpeechSynthesisCancellationDetails cancellation = SpeechSynthesisCancellationDetails.fromResult(result);
        System.out.println("CANCELED: Reason=" + cancellation.getReason());

        if (cancellation.getReason() == CancellationReason.Error) {
            System.out.println("CANCELED: ErrorCode=" + cancellation.getErrorCode());
            System.out.println("CANCELED: ErrorDetails=" + cancellation.getErrorDetails());
            System.out.println("CANCELED: Did you update the subscription info?");
        }
    }

        result.close();
        synthesizer.close();
        return audioData;
    }
}

I am new to handling concurrency. I have a web socket application that sends and receives audio data. Meanwhile I have one eventlisteners from Azure looking for speech to transcribe to text and some other thread looking for text to synthesize to speech. However when I speak the response gets repeated infinitely(?). I am not sure if I need to use synchronized here and what do I pass as argument in that case? Or is it something else that needs to be done? Below is some code excerpts.

TwilioMediaStreamsHandler class:

private final Map<WebSocketSession, AzureSpeechToTextService> sessions = new ConcurrentHashMap<>();
static final Map<WebSocketSession, Integer> messageCounts = new ConcurrentHashMap<>();
static final Map<WebSocketSession, Boolean> hasSessionSeenMedia = new ConcurrentHashMap<>();
static final Integer repeatThreshold = 10;
ArrayList<String> mediaMessages = new ArrayList<String>();

private final ObjectMapper jsonMapper = new ObjectMapper();
private final Base64.Decoder base64Decoder = Base64.getDecoder();

private static final Logger LOGGER = LoggerFactory.getLogger(TwilioMediaStreamsHandler.class);

@Override
public void afterConnectionEstablished(WebSocketSession session) {
    LOGGER.info("Connection Established");
    sessions.put(session, new AzureSpeechToTextService());
}

@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
    JsonNode request = jsonMapper.readTree(message.getPayload());
    Boolean hasSeenMedia = hasSessionSeenMedia.getOrDefault(session, false);

    if (request.path("media").path("track").asText().equals("inbound")) {

        if (!hasSeenMedia) {
            LOGGER.info("Media WS: Media message received: {}", message);
            LOGGER.warn("Media WS: Additional messages from WebSocket are now being suppressed");
            hasSessionSeenMedia.put(session, true);
        }

        mediaMessages.add(message.getPayload());
        if (mediaMessages.size() >= repeatThreshold) {
            repeat(session);
        }

        messageCounts.merge(session, 1, Integer::sum);
    }
}

public void repeat(WebSocketSession session) throws IOException {
    ArrayList<String> playMessages = (ArrayList<String>) mediaMessages.clone();
    mediaMessages.clear();

    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

    String streamSid = "";
    byte[] decoded = null;
    for (String playMessage : playMessages) {
        JsonNode request = jsonMapper.readTree(playMessage);
        streamSid = request.path("streamSid").asText();
        String base64EncodedAudio = request.path("media").path("payload").asText();

        decoded = base64Decoder.decode(base64EncodedAudio);
        sessions.get(session).pushData(decoded);
        decoded = sessions.get(session).getBytes();
    }

    String resultText = sessions.get(session).getResultText();

    try {
        decoded = new AzureTextToSpeechService().textToBytes(resultText);
    } catch (ExecutionException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    if (decoded != null)
        outputStream.write(decoded);

    ObjectMapper objectMapper = new ObjectMapper();
    byte[] encodedBytes = Base64.getEncoder().encode(outputStream.toByteArray());
    String response = objectMapper.writeValueAsString(new OutBoundMessage("media", new Media(new String(encodedBytes)), streamSid));

    session.sendMessage(new TextMessage(response));
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
    LOGGER.info("Connection Closed");
    }
}

AzureSpeechToTextService class:

private static final String SPEECH_SUBSCRIPTION_KEY = System.getenv("AZURE_SPEECH_SUBSCRIPTION_KEY");
private static final String SERVICE_REGION = System.getenv("AZURE_SERVICE_REGION");

private static final Logger LOGGER = LoggerFactory.getLogger(AzureSpeechToTextService.class);

private final PushAudioInputStream azurePusher;
private String resultText;

public AzureSpeechToTextService() {
    azurePusher = AudioInputStream.createPushStream(AudioStreamFormat.getWaveFormatPCM(8000L, (short) 16, (short) 1));

    SourceLanguageConfig sourceLanguageConfig = SourceLanguageConfig.fromLanguage("sv-SE");

    SpeechRecognizer speechRecognizer = new SpeechRecognizer(
            SpeechConfig.fromSubscription(SPEECH_SUBSCRIPTION_KEY, SERVICE_REGION),
            sourceLanguageConfig,
            AudioConfig.fromStreamInput(azurePusher));

    speechRecognizer.recognized.addEventListener((o, speechRecognitionEventArgs) -> {
        SpeechRecognitionResult speechRecognitionResult = speechRecognitionEventArgs.getResult();
        resultText = speechRecognitionResult.getText();
        LOGGER.info("Recognized text from speech: {}", resultText);
    });

    speechRecognizer.startContinuousRecognitionAsync();
}

    public String getResultText() {
        return this.resultText;
    }

    public void pushData(byte[] mulawData) {
        azurePusher.write(MulawToPcm.transcode(mulawData));
    }
}

AzureTextToSpeechService class:

private static final String SPEECH_SUBSCRIPTION_KEY = System.getenv("AZURE_SPEECH_SUBSCRIPTION_KEY");
private static final String SERVICE_REGION = System.getenv("AZURE_SERVICE_REGION");

private static final Logger LOGGER = LoggerFactory.getLogger(AzureTextToSpeechService.class);

public byte[] textToBytes(String text) throws ExecutionException, InterruptedException {
    SpeechConfig speechConfig = SpeechConfig.fromSubscription(SPEECH_SUBSCRIPTION_KEY, SERVICE_REGION);
    speechConfig.setSpeechSynthesisOutputFormat(SpeechSynthesisOutputFormat.Raw8Khz8BitMonoMULaw);

    AutoDetectSourceLanguageConfig autoDetectSourceLanguageConfig = AutoDetectSourceLanguageConfig.fromOpenRange();
    SpeechSynthesizer synthesizer = new SpeechSynthesizer(speechConfig, autoDetectSourceLanguageConfig, null);

    SpeechSynthesisResult result = synthesizer.SpeakTextAsync(text).get();

    byte[] audioData = null;
    if (result.getReason() == ResultReason.SynthesizingAudioCompleted) {
        LOGGER.info("Speech synthesized for: {}", text);
        audioData = result.getAudioData();
        LOGGER.info("{} bytes recieved", audioData.length);
    } else if (result.getReason() == ResultReason.Canceled) {
        SpeechSynthesisCancellationDetails cancellation = SpeechSynthesisCancellationDetails.fromResult(result);
        System.out.println("CANCELED: Reason=" + cancellation.getReason());

        if (cancellation.getReason() == CancellationReason.Error) {
            System.out.println("CANCELED: ErrorCode=" + cancellation.getErrorCode());
            System.out.println("CANCELED: ErrorDetails=" + cancellation.getErrorDetails());
            System.out.println("CANCELED: Did you update the subscription info?");
        }
    }

        result.close();
        synthesizer.close();
        return audioData;
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

墨离汐 2025-01-29 13:46:56

在这种情况下,暗示的操作是两个:

  1. 悲观的并发
  2. 乐观并发

https://azure.microsoft.com/en-in-in/blog/managing-concurrency-incurrency-in-microsoft-azure-storage-2/

有助于了解一致的实施。

The suggestable operation is two in this scenario:

  1. Pessimistic concurrency
  2. Optimistic concurrency

https://azure.microsoft.com/en-in/blog/managing-concurrency-in-microsoft-azure-storage-2/

helps to understand the implementation of concurrency.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文