我正在尝试从kafkajs消费kafka消息,但无法阅读该消息,

发布于 2025-02-11 07:03:56 字数 2756 浏览 3 评论 0原文

我正在尝试从Kafkajs食用Kafka消息,但无法阅读该消息,请让我知道这个问题。知道如何解决这个问题吗?

const kafka = new Kafka({
    logLevel: logLevel.INFO,
    ssl: true,
    brokers: ['b-1.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094',
        'b-2.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094',
        'b-3.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094'
    ],
    clientId: 'local-client'
})

const topic = 'aws.identity.users.0'
const consumer = kafka.consumer({
    groupId: 'test-group'
})
const consumedMessages = []

const run = async () => {
    await consumer.connect()
    await consumer.subscribe({
        topic,
        fromBeginning: true
    })
    await consumer.run({
        autoCommit: false,
        eachMessage: async ({
            topic,
            partition,
            message
        }) => {
            `not able to get inside this`
            consumedMessages.push(message);
            const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
            console.log(`- ${prefix} ${message.key}#${message.value}`)

        },
    })
}
`how to make this code work`
app.get('/', (req, res) => {
    res.send('Hello World!')
    run().catch(e => console.error(`[example/consumer] ${e.message}`, e));
});

app.listen(port, () => {
    console.log(`Example app listening on port ${port}!`)
});

{“ latve”:“ error”,“ timestamp”:“ 2022-06-29T04:54:54:57.414Z”,“ logger”:“ kafkajs”,“ message”:“ [runner] erne everyMessage”时,“主题”:“ aws.Identity.users.0”,“ partition”:7,“ offset”:“ 52”,“ stack”:“参考文献:commentedMessages in runner.eastmessage in conseach.message inseach.each message(c:\ each each n n oppectedmessages inshoce in each in each inseact in.each ins of consectional consect)consection.each inseact ofer.eaks consection.ception \ a inseactmesh prectory. a inseactofern.dem。 Biport \ testkafka \ index.js:24:3)\ n biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:393:20)\n at C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:409:15\n at retry ( C:\ biport \ testkafka \ node_modules \ kafkajs \ src \ entry \ endry \ index.js:43:5)\ n at C:\ biport \ biport \ biport \ biport \ testkafka \ node_modules \ node_modules \ kafkajs \ kafkajs \ kafkajs \ kafkajs \ src \ retry \ indry \ indry \ n.75.75 \n。 at new Promise ()\n at Runner.retrier (C:\biport\testkafka\node_modules\kafkajs\src\retry\index.js:72:10)\n at Runner.handleBatch (C:\biport\testkafka\node_modules \ kafkajs \ src \ contumer \ runner.js:407:17)\ n在Handler(c:\ biport \ biport \ testkafka \ node_modules \ kafkajs \ kafkajs \ kafkajs \ src \ src \ contumer \ commuter \ contumer \ runner.js:js:58:30:30:30):58:58:30) }}

I am trying to consume kafka message from kafkajs, but not able to read the message, please can you let me know the issue. Any Idea how to resolve this?

const kafka = new Kafka({
    logLevel: logLevel.INFO,
    ssl: true,
    brokers: ['b-1.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094',
        'b-2.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094',
        'b-3.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094'
    ],
    clientId: 'local-client'
})

const topic = 'aws.identity.users.0'
const consumer = kafka.consumer({
    groupId: 'test-group'
})
const consumedMessages = []

const run = async () => {
    await consumer.connect()
    await consumer.subscribe({
        topic,
        fromBeginning: true
    })
    await consumer.run({
        autoCommit: false,
        eachMessage: async ({
            topic,
            partition,
            message
        }) => {
            `not able to get inside this`
            consumedMessages.push(message);
            const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
            console.log(`- ${prefix} ${message.key}#${message.value}`)

        },
    })
}
`how to make this code work`
app.get('/', (req, res) => {
    res.send('Hello World!')
    run().catch(e => console.error(`[example/consumer] ${e.message}`, e));
});

app.listen(port, () => {
    console.log(`Example app listening on port ${port}!`)
});

{"level":"ERROR","timestamp":"2022-06-29T04:54:57.414Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"aws.identity.users.0","partition":7,"offset":"52","stack":"ReferenceError: consumedMessages is not defined\n at Runner.eachMessage (C:\biport\testkafka\index.js:24:3)\n at Runner.processEachMessage (C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:191:20)\n at onBatch (C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:393:20)\n at C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:409:15\n at retry (C:\biport\testkafka\node_modules\kafkajs\src\retry\index.js:43:5)\n at C:\biport\testkafka\node_modules\kafkajs\src\retry\index.js:75:5\n at new Promise ()\n at Runner.retrier (C:\biport\testkafka\node_modules\kafkajs\src\retry\index.js:72:10)\n at Runner.handleBatch (C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:407:17)\n at handler (C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:58:30)","error":{}}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文