我用eggjs 尝试连接kafka?
我用eggjs 尝试连接kafka,做到topic对应app/kafka下的文件名,最终执行文件名下的所有的方法。,想在app目录下独立一个目录去处理业务,最开始我设想用loadController,kafka在触发onmessage的时候找不到对应的controller方法,最后用的比较粗暴的方式解决。不知道有没有问题,有没有可以指点一下的。代码如下:
load kafka:
//kafka-load
import * as path from 'path'
export default app => {
let dirs = app.loader
.getLoadUnits()
.map(unit => path.join(unit.path, 'app', 'kafka'))
app.kafka = app.kafka || {}
new app.loader.FileLoader({
directory: dirs,
target: app.kafka,
initializer: (kafka, opts) => {
const fileName = path.basename(opts.path, path.extname(opts.path))
Object.keys(kafka).map(action => {
if (!app.kafka[fileName]) {
app.kafka[fileName] = {}
}
app.kafka[fileName][action] = kafka[action]
})
return null
}
}).load()
}
kafka connect
//kafka connect
import * as Kafka from 'kafka-node'
import { KafkaConfig } from '../config/config.d'
import load from './kafkaLoad'
export default app => {
load(app)
const config: KafkaConfig = app.config.kafka
const zookeepers = config.host.join(',')
const client = new Kafka.Client(zookeepers, config.clientId)
const consumer = new Kafka.Consumer(client, config.topics, config.options)
const topics = config.topics.map(item => item.topic)
consumer.on('message', message => {
const topicConsumers = app.kafka[message.topic]
if (topicConsumers) {
Object.keys(topicConsumers).map(name =>
topicConsumers[name].call(app, message.value)
)
}
app.logger.info(
`[egg-kafka] Receive producer message`,
JSON.stringify(message)
)
})
consumer.on('error', error => {
app.coreLogger.error(`[egg-kafka] init instance error`, error)
})
app.beforeStart(() => {
app.coreLogger.info(
`[egg-kafka] init instance success ,host@${zookeepers} -----> topic@${topics}`
)
})
}
kafka controller
export interface TopicNodejsMethods {
test1(message: { [key: string]: any }): Promise<any>
test2(message: { [key: string]: any }): Promise<any>
}
export type TopicNodejs = Application & TopicNodejsMethods
export default {
async test1(message) {
this.io.of('/').emit('passAlarm',message)
},
async test2(message) {
console.log(message)
}
} as TopicNodejs
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论