Aeron档案 - 扩展现有记录

发布于 2025-01-28 13:34:08 字数 2940 浏览 2 评论 0原文

我有一个Aeron档案馆,并希望扩展现有的录制,即它在服务重新启动后继续附加消息。

我找不到任何实际示例如何做到这一点,所以我根据Aeron Javadoc/ Aeron食谱

到目前为止,我的目标

我正试图从档案中获得lastrecordingid,位置等。

private void findLatestRecording() {
        final RecordingDescriptorConsumer consumer =
                (controlSessionId, correlationId, recordingId,
                        startTimestamp, stopTimestamp, startPosition,
                        stopPosition, initialTermId, segmentFileLength,
                        termBufferLength, mtuLength, sessionId,
                        streamId, strippedChannel, originalChannel,
                        sourceIdentity) ->  {
                    AeronArchiveJournal.this.lastRecordingId = recordingId;
                    AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
                    AeronArchiveJournal.this.initialTermId = initialTermId;
                    AeronArchiveJournal.this.termBufferLength = termBufferLength;
                };

        final long fromRecordingId = 0L;
        final int recordCount = 100;
        final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);

        if (foundCount == 0) {
            LOG.info("No previous recording found, will start a new one");
        }
    }

然后,我试图扩展录制,

 private void extendExistingRecording() {
        publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);

        String channelUri = new ChannelUriStringBuilder()
                .media(CommonContext.IPC_MEDIA)
                .initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
                .build();

        LOG.info("Extending existing recording");
        LOG.info("Recording id: {}", lastRecordingId);
        LOG.info("Channel URI: {}", channelUri);


        archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);

        LOG.info("Waiting for recording to start for session {}", publication.sessionId());
        final CountersReader counters = aeron.countersReader();
        int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());

        while (CountersReader.NULL_COUNTER_ID == counterId) {
            journalIdleStrategy.idle();
            counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
        }
        lastRecordingId = RecordingPos.getRecordingId(counters, counterId);

    }

但是countersreader循环在警告后永远不会完成和Aeron打印:

io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312

显然我缺少某些内容,但是到目前为止,我无法弄清楚什么。

I have an Aeron Archive and want to extend the existing recording, i.e. that it continuse to append messages after the service restart.

I wasn't able to find any actual example how to do that, so I came with the my own code based on Aeron javadoc/Aeron Cookbook.

What I have so far

I'm trying to get the lastRecordingId, the position etc. from the archive itself first.

private void findLatestRecording() {
        final RecordingDescriptorConsumer consumer =
                (controlSessionId, correlationId, recordingId,
                        startTimestamp, stopTimestamp, startPosition,
                        stopPosition, initialTermId, segmentFileLength,
                        termBufferLength, mtuLength, sessionId,
                        streamId, strippedChannel, originalChannel,
                        sourceIdentity) ->  {
                    AeronArchiveJournal.this.lastRecordingId = recordingId;
                    AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
                    AeronArchiveJournal.this.initialTermId = initialTermId;
                    AeronArchiveJournal.this.termBufferLength = termBufferLength;
                };

        final long fromRecordingId = 0L;
        final int recordCount = 100;
        final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);

        if (foundCount == 0) {
            LOG.info("No previous recording found, will start a new one");
        }
    }

then I'm trying to extend the recording

 private void extendExistingRecording() {
        publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);

        String channelUri = new ChannelUriStringBuilder()
                .media(CommonContext.IPC_MEDIA)
                .initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
                .build();

        LOG.info("Extending existing recording");
        LOG.info("Recording id: {}", lastRecordingId);
        LOG.info("Channel URI: {}", channelUri);


        archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);

        LOG.info("Waiting for recording to start for session {}", publication.sessionId());
        final CountersReader counters = aeron.countersReader();
        int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());

        while (CountersReader.NULL_COUNTER_ID == counterId) {
            journalIdleStrategy.idle();
            counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
        }
        lastRecordingId = RecordingPos.getRecordingId(counters, counterId);

    }

However the CountersReader loop never finishes and aeron prints following warning:

io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312

Clearly I'm missing something, but so far I'm not able to figure out what.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

木槿暧夏七纪年 2025-02-04 13:34:08

为了扩展现有记录,有必要设置您将使用的出版物的初始位置来扩展现有记录。需要初始化该流,因此它是真正的扩展。这测试显示了如何完成。

https://github.com/real-logic/aeron/blob/master/aeron-system-tests/src/src/test/java/io/aeron/aeron/aeron/archive/extendrecordingtest.java

To extend an existing recording it is necessary to set the initial position of the publication you will use to extend the existing recording. The stream needs to be initialised so it is a genuine extension. This tests shows how it is done.

https://github.com/real-logic/aeron/blob/master/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文