我应该在此处使用Java同步处理并发吗?
我是处理并发的新手。我有一个发送和接收音频数据的Web插座应用程序。同时,我有一个来自Azure的EventListeners寻找语音来转录文本和其他一些线程,寻找文本以合成语音。但是,当我讲话时,响应将无限重复(?)。我不确定我是否需要在这里使用同步,在这种情况下我会通过什么作为参数?还是需要做的其他事情?以下是一些代码摘录。
Twiliomediastreamshandler类:
private final Map<WebSocketSession, AzureSpeechToTextService> sessions = new ConcurrentHashMap<>();
static final Map<WebSocketSession, Integer> messageCounts = new ConcurrentHashMap<>();
static final Map<WebSocketSession, Boolean> hasSessionSeenMedia = new ConcurrentHashMap<>();
static final Integer repeatThreshold = 10;
ArrayList<String> mediaMessages = new ArrayList<String>();
private final ObjectMapper jsonMapper = new ObjectMapper();
private final Base64.Decoder base64Decoder = Base64.getDecoder();
private static final Logger LOGGER = LoggerFactory.getLogger(TwilioMediaStreamsHandler.class);
@Override
public void afterConnectionEstablished(WebSocketSession session) {
LOGGER.info("Connection Established");
sessions.put(session, new AzureSpeechToTextService());
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
JsonNode request = jsonMapper.readTree(message.getPayload());
Boolean hasSeenMedia = hasSessionSeenMedia.getOrDefault(session, false);
if (request.path("media").path("track").asText().equals("inbound")) {
if (!hasSeenMedia) {
LOGGER.info("Media WS: Media message received: {}", message);
LOGGER.warn("Media WS: Additional messages from WebSocket are now being suppressed");
hasSessionSeenMedia.put(session, true);
}
mediaMessages.add(message.getPayload());
if (mediaMessages.size() >= repeatThreshold) {
repeat(session);
}
messageCounts.merge(session, 1, Integer::sum);
}
}
public void repeat(WebSocketSession session) throws IOException {
ArrayList<String> playMessages = (ArrayList<String>) mediaMessages.clone();
mediaMessages.clear();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
String streamSid = "";
byte[] decoded = null;
for (String playMessage : playMessages) {
JsonNode request = jsonMapper.readTree(playMessage);
streamSid = request.path("streamSid").asText();
String base64EncodedAudio = request.path("media").path("payload").asText();
decoded = base64Decoder.decode(base64EncodedAudio);
sessions.get(session).pushData(decoded);
decoded = sessions.get(session).getBytes();
}
String resultText = sessions.get(session).getResultText();
try {
decoded = new AzureTextToSpeechService().textToBytes(resultText);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (decoded != null)
outputStream.write(decoded);
ObjectMapper objectMapper = new ObjectMapper();
byte[] encodedBytes = Base64.getEncoder().encode(outputStream.toByteArray());
String response = objectMapper.writeValueAsString(new OutBoundMessage("media", new Media(new String(encodedBytes)), streamSid));
session.sendMessage(new TextMessage(response));
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LOGGER.info("Connection Closed");
}
}
AzuresPeechTotextService类:
private static final String SPEECH_SUBSCRIPTION_KEY = System.getenv("AZURE_SPEECH_SUBSCRIPTION_KEY");
private static final String SERVICE_REGION = System.getenv("AZURE_SERVICE_REGION");
private static final Logger LOGGER = LoggerFactory.getLogger(AzureSpeechToTextService.class);
private final PushAudioInputStream azurePusher;
private String resultText;
public AzureSpeechToTextService() {
azurePusher = AudioInputStream.createPushStream(AudioStreamFormat.getWaveFormatPCM(8000L, (short) 16, (short) 1));
SourceLanguageConfig sourceLanguageConfig = SourceLanguageConfig.fromLanguage("sv-SE");
SpeechRecognizer speechRecognizer = new SpeechRecognizer(
SpeechConfig.fromSubscription(SPEECH_SUBSCRIPTION_KEY, SERVICE_REGION),
sourceLanguageConfig,
AudioConfig.fromStreamInput(azurePusher));
speechRecognizer.recognized.addEventListener((o, speechRecognitionEventArgs) -> {
SpeechRecognitionResult speechRecognitionResult = speechRecognitionEventArgs.getResult();
resultText = speechRecognitionResult.getText();
LOGGER.info("Recognized text from speech: {}", resultText);
});
speechRecognizer.startContinuousRecognitionAsync();
}
public String getResultText() {
return this.resultText;
}
public void pushData(byte[] mulawData) {
azurePusher.write(MulawToPcm.transcode(mulawData));
}
}
azuretextTtospeechService类:
private static final String SPEECH_SUBSCRIPTION_KEY = System.getenv("AZURE_SPEECH_SUBSCRIPTION_KEY");
private static final String SERVICE_REGION = System.getenv("AZURE_SERVICE_REGION");
private static final Logger LOGGER = LoggerFactory.getLogger(AzureTextToSpeechService.class);
public byte[] textToBytes(String text) throws ExecutionException, InterruptedException {
SpeechConfig speechConfig = SpeechConfig.fromSubscription(SPEECH_SUBSCRIPTION_KEY, SERVICE_REGION);
speechConfig.setSpeechSynthesisOutputFormat(SpeechSynthesisOutputFormat.Raw8Khz8BitMonoMULaw);
AutoDetectSourceLanguageConfig autoDetectSourceLanguageConfig = AutoDetectSourceLanguageConfig.fromOpenRange();
SpeechSynthesizer synthesizer = new SpeechSynthesizer(speechConfig, autoDetectSourceLanguageConfig, null);
SpeechSynthesisResult result = synthesizer.SpeakTextAsync(text).get();
byte[] audioData = null;
if (result.getReason() == ResultReason.SynthesizingAudioCompleted) {
LOGGER.info("Speech synthesized for: {}", text);
audioData = result.getAudioData();
LOGGER.info("{} bytes recieved", audioData.length);
} else if (result.getReason() == ResultReason.Canceled) {
SpeechSynthesisCancellationDetails cancellation = SpeechSynthesisCancellationDetails.fromResult(result);
System.out.println("CANCELED: Reason=" + cancellation.getReason());
if (cancellation.getReason() == CancellationReason.Error) {
System.out.println("CANCELED: ErrorCode=" + cancellation.getErrorCode());
System.out.println("CANCELED: ErrorDetails=" + cancellation.getErrorDetails());
System.out.println("CANCELED: Did you update the subscription info?");
}
}
result.close();
synthesizer.close();
return audioData;
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在这种情况下,暗示的操作是两个:
https://azure.microsoft.com/en-in-in/blog/managing-concurrency-incurrency-in-microsoft-azure-storage-2/
有助于了解一致的实施。
The suggestable operation is two in this scenario:
https://azure.microsoft.com/en-in/blog/managing-concurrency-in-microsoft-azure-storage-2/
helps to understand the implementation of concurrency.