我正在尝试从kafkajs消费kafka消息,但无法阅读该消息,
我正在尝试从Kafkajs食用Kafka消息,但无法阅读该消息,请让我知道这个问题。知道如何解决这个问题吗?
const kafka = new Kafka({
logLevel: logLevel.INFO,
ssl: true,
brokers: ['b-1.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094',
'b-2.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094',
'b-3.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094'
],
clientId: 'local-client'
})
const topic = 'aws.identity.users.0'
const consumer = kafka.consumer({
groupId: 'test-group'
})
const consumedMessages = []
const run = async () => {
await consumer.connect()
await consumer.subscribe({
topic,
fromBeginning: true
})
await consumer.run({
autoCommit: false,
eachMessage: async ({
topic,
partition,
message
}) => {
`not able to get inside this`
consumedMessages.push(message);
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
console.log(`- ${prefix} ${message.key}#${message.value}`)
},
})
}
`how to make this code work`
app.get('/', (req, res) => {
res.send('Hello World!')
run().catch(e => console.error(`[example/consumer] ${e.message}`, e));
});
app.listen(port, () => {
console.log(`Example app listening on port ${port}!`)
});
{“ latve”:“ error”,“ timestamp”:“ 2022-06-29T04:54:54:57.414Z”,“ logger”:“ kafkajs”,“ message”:“ [runner] erne everyMessage”时,“主题”:“ aws.Identity.users.0”,“ partition”:7,“ offset”:“ 52”,“ stack”:“参考文献:commentedMessages in runner.eastmessage in conseach.message inseach.each message(c:\ each each n n oppectedmessages inshoce in each in each inseact in.each ins of consectional consect)consection.each inseact ofer.eaks consection.ception \ a inseactmesh prectory. a inseactofern.dem。 Biport \ testkafka \ index.js:24:3)\ n biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:393:20)\n at C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:409:15\n at retry ( C:\ biport \ testkafka \ node_modules \ kafkajs \ src \ entry \ endry \ index.js:43:5)\ n at C:\ biport \ biport \ biport \ biport \ testkafka \ node_modules \ node_modules \ kafkajs \ kafkajs \ kafkajs \ kafkajs \ src \ retry \ indry \ indry \ n.75.75 \n。 at new Promise ()\n at Runner.retrier (C:\biport\testkafka\node_modules\kafkajs\src\retry\index.js:72:10)\n at Runner.handleBatch (C:\biport\testkafka\node_modules \ kafkajs \ src \ contumer \ runner.js:407:17)\ n在Handler(c:\ biport \ biport \ testkafka \ node_modules \ kafkajs \ kafkajs \ kafkajs \ src \ src \ contumer \ commuter \ contumer \ runner.js:js:58:30:30:30):58:58:30) }}
I am trying to consume kafka message from kafkajs, but not able to read the message, please can you let me know the issue. Any Idea how to resolve this?
const kafka = new Kafka({
logLevel: logLevel.INFO,
ssl: true,
brokers: ['b-1.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094',
'b-2.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094',
'b-3.dev-datafabric.nmz564.c20.kafka.us-east-1.amazonaws.com:9094'
],
clientId: 'local-client'
})
const topic = 'aws.identity.users.0'
const consumer = kafka.consumer({
groupId: 'test-group'
})
const consumedMessages = []
const run = async () => {
await consumer.connect()
await consumer.subscribe({
topic,
fromBeginning: true
})
await consumer.run({
autoCommit: false,
eachMessage: async ({
topic,
partition,
message
}) => {
`not able to get inside this`
consumedMessages.push(message);
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
console.log(`- ${prefix} ${message.key}#${message.value}`)
},
})
}
`how to make this code work`
app.get('/', (req, res) => {
res.send('Hello World!')
run().catch(e => console.error(`[example/consumer] ${e.message}`, e));
});
app.listen(port, () => {
console.log(`Example app listening on port ${port}!`)
});
{"level":"ERROR","timestamp":"2022-06-29T04:54:57.414Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"aws.identity.users.0","partition":7,"offset":"52","stack":"ReferenceError: consumedMessages is not defined\n at Runner.eachMessage (C:\biport\testkafka\index.js:24:3)\n at Runner.processEachMessage (C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:191:20)\n at onBatch (C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:393:20)\n at C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:409:15\n at retry (C:\biport\testkafka\node_modules\kafkajs\src\retry\index.js:43:5)\n at C:\biport\testkafka\node_modules\kafkajs\src\retry\index.js:75:5\n at new Promise ()\n at Runner.retrier (C:\biport\testkafka\node_modules\kafkajs\src\retry\index.js:72:10)\n at Runner.handleBatch (C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:407:17)\n at handler (C:\biport\testkafka\node_modules\kafkajs\src\consumer\runner.js:58:30)","error":{}}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论