Aeron档案 - 扩展现有记录
我有一个Aeron档案馆,并希望扩展现有的录制,即它在服务重新启动后继续附加消息。
我找不到任何实际示例如何做到这一点,所以我根据Aeron Javadoc/ Aeron食谱。
到目前为止,我的目标
我正试图从档案中获得lastrecordingid,位置等。
private void findLatestRecording() {
final RecordingDescriptorConsumer consumer =
(controlSessionId, correlationId, recordingId,
startTimestamp, stopTimestamp, startPosition,
stopPosition, initialTermId, segmentFileLength,
termBufferLength, mtuLength, sessionId,
streamId, strippedChannel, originalChannel,
sourceIdentity) -> {
AeronArchiveJournal.this.lastRecordingId = recordingId;
AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
AeronArchiveJournal.this.initialTermId = initialTermId;
AeronArchiveJournal.this.termBufferLength = termBufferLength;
};
final long fromRecordingId = 0L;
final int recordCount = 100;
final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);
if (foundCount == 0) {
LOG.info("No previous recording found, will start a new one");
}
}
然后,我试图扩展录制,
private void extendExistingRecording() {
publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);
String channelUri = new ChannelUriStringBuilder()
.media(CommonContext.IPC_MEDIA)
.initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
.build();
LOG.info("Extending existing recording");
LOG.info("Recording id: {}", lastRecordingId);
LOG.info("Channel URI: {}", channelUri);
archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);
LOG.info("Waiting for recording to start for session {}", publication.sessionId());
final CountersReader counters = aeron.countersReader();
int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
while (CountersReader.NULL_COUNTER_ID == counterId) {
journalIdleStrategy.idle();
counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
}
lastRecordingId = RecordingPos.getRecordingId(counters, counterId);
}
但是countersreader
循环在警告后永远不会完成和Aeron打印:
io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312
显然我缺少某些内容,但是到目前为止,我无法弄清楚什么。
I have an Aeron Archive and want to extend the existing recording, i.e. that it continuse to append messages after the service restart.
I wasn't able to find any actual example how to do that, so I came with the my own code based on Aeron javadoc/Aeron Cookbook.
What I have so far
I'm trying to get the lastRecordingId, the position etc. from the archive itself first.
private void findLatestRecording() {
final RecordingDescriptorConsumer consumer =
(controlSessionId, correlationId, recordingId,
startTimestamp, stopTimestamp, startPosition,
stopPosition, initialTermId, segmentFileLength,
termBufferLength, mtuLength, sessionId,
streamId, strippedChannel, originalChannel,
sourceIdentity) -> {
AeronArchiveJournal.this.lastRecordingId = recordingId;
AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
AeronArchiveJournal.this.initialTermId = initialTermId;
AeronArchiveJournal.this.termBufferLength = termBufferLength;
};
final long fromRecordingId = 0L;
final int recordCount = 100;
final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);
if (foundCount == 0) {
LOG.info("No previous recording found, will start a new one");
}
}
then I'm trying to extend the recording
private void extendExistingRecording() {
publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);
String channelUri = new ChannelUriStringBuilder()
.media(CommonContext.IPC_MEDIA)
.initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
.build();
LOG.info("Extending existing recording");
LOG.info("Recording id: {}", lastRecordingId);
LOG.info("Channel URI: {}", channelUri);
archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);
LOG.info("Waiting for recording to start for session {}", publication.sessionId());
final CountersReader counters = aeron.countersReader();
int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
while (CountersReader.NULL_COUNTER_ID == counterId) {
journalIdleStrategy.idle();
counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
}
lastRecordingId = RecordingPos.getRecordingId(counters, counterId);
}
However the CountersReader
loop never finishes and aeron prints following warning:
io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312
Clearly I'm missing something, but so far I'm not able to figure out what.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
为了扩展现有记录,有必要设置您将使用的出版物的初始位置来扩展现有记录。需要初始化该流,因此它是真正的扩展。这测试显示了如何完成。
https://github.com/real-logic/aeron/blob/master/aeron-system-tests/src/src/test/java/io/aeron/aeron/aeron/archive/extendrecordingtest.java
To extend an existing recording it is necessary to set the initial position of the publication you will use to extend the existing recording. The stream needs to be initialised so it is a genuine extension. This tests shows how it is done.
https://github.com/real-logic/aeron/blob/master/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java