我用eggjs 尝试连接kafka?

发布于 2022-09-07 12:39:45 字数 2635 浏览 14 评论 0

我用eggjs 尝试连接kafka,做到topic对应app/kafka下的文件名,最终执行文件名下的所有的方法。,想在app目录下独立一个目录去处理业务,最开始我设想用loadController,kafka在触发onmessage的时候找不到对应的controller方法,最后用的比较粗暴的方式解决。不知道有没有问题,有没有可以指点一下的。代码如下:

load kafka:

//kafka-load
import * as path from 'path'
export default app => {
    let dirs = app.loader
        .getLoadUnits()
        .map(unit => path.join(unit.path, 'app', 'kafka'))

    app.kafka = app.kafka || {}
    new app.loader.FileLoader({
        directory: dirs,
        target: app.kafka,
        initializer: (kafka, opts) => {
            const fileName = path.basename(opts.path, path.extname(opts.path))
            Object.keys(kafka).map(action => {
                if (!app.kafka[fileName]) {
                    app.kafka[fileName] = {}
                }
                app.kafka[fileName][action] = kafka[action]
            })
            return null
        }
    }).load()
}


kafka connect

//kafka connect
import * as Kafka from 'kafka-node'
import { KafkaConfig } from '../config/config.d'
import load from './kafkaLoad'

export default app => {
    load(app)
    const config: KafkaConfig = app.config.kafka
    const zookeepers = config.host.join(',')
    const client = new Kafka.Client(zookeepers, config.clientId)
    const consumer = new Kafka.Consumer(client, config.topics, config.options)
    const topics = config.topics.map(item => item.topic)

    consumer.on('message', message => {
        const topicConsumers = app.kafka[message.topic]
        if (topicConsumers) {
            Object.keys(topicConsumers).map(name =>
                topicConsumers[name].call(app, message.value)
            )
        }
        app.logger.info(
            `[egg-kafka] Receive producer message`,
            JSON.stringify(message)
        )
    })

    consumer.on('error', error => {
        app.coreLogger.error(`[egg-kafka] init instance error`, error)
    })

    app.beforeStart(() => {
        app.coreLogger.info(
            `[egg-kafka] init instance success ,host@${zookeepers} -----> topic@${topics}`
        )
    })
}

kafka controller

export interface TopicNodejsMethods {
    test1(message: { [key: string]: any }): Promise<any>
    test2(message: { [key: string]: any }): Promise<any>
}

export type TopicNodejs = Application & TopicNodejsMethods

export default {
    async test1(message) {

        this.io.of('/').emit('passAlarm',message)
    },
    async test2(message) {
        console.log(message)
    }
} as TopicNodejs

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文