如何在 Nest.js 中使用 SSE

发布于 2025-01-10 22:10:27 字数 3220 浏览 0 评论 0原文

谁能帮助我理解并找到在 Nest.js 中使用服务器发送事件的正确方法?

我尝试实现简单的 HTTP 端点,它创建一个作业队列,当作业完成时,队列处理器发出一个带有作业结果的事件,然后 SSE 端点返回一个可观察的管道,该管道侦听作业队列中的事件。

代码可以工作,但在控制台中,当我添加新作业或重新加载页面时,我在事件流中收到一条有关内存泄漏的消息,所以我知道我的解决方案是错误的。

如何正确控制连接和管理 SSE 端点? 另外如何正确地将多个事件的消息广播到SSE通道?

控制器代码:

@Controller('jobs')
export class EmailController {
    constructor(private readonly emailService: EmailService) {}

    @Post('add')
    async confirm(@Session() session: Record<string, any>, @Body() batch: BatchDto) {
        session.uuid = session.uuid ? session.uuid : uuidv4()
        const jobId = await this.emailService.sendMail(batch, session.uuid)
        if (!session.jobs) session.jobs = [+jobId]
        else session.jobs.push(+jobId)
        return { jobId }
    }

    @Sse('sse')
    async sse(@Query() query: UuidDto): Promise<Observable<MessageEvent>> {
        return await this.emailService.getResults(query.uuid)
    }
}

服务代码:

@Injectable()
export class EmailService {
    constructor(@InjectQueue('email') private readonly emailQueue: Queue) {}

    async sendMail(batch: BatchDto, uuid: string) {
        const job = await this.emailQueue.add('send', { ...batch, uuid })
        return job.id
    }

    async getResults(uuid: string) {
        const obs$: Observable<any> = new Observable((observer) => {

        // Is it correct way to subscribe to events and broadcast them to SSE channel?

            this.emailQueue.on('active', (job, jobPromise) => {
                if (job.data.uuid === uuid) {
                    observer.next({
                        active: {
                            id: job.id,
                            data: job.data,
                        },
                    })
                }
            })

            this.emailQueue.on('progress', (job, progress) => {
                if (job.data.uuid === uuid) {
                    observer.next({
                        progress: {
                            id: job.id,
                            progress,
                        },
                    })
                }
            })

            this.emailQueue.on('completed', (job, result) => {
                if (job.data.uuid === uuid) {
                    observer.next({
                        completed: {
                            id: job.id,
                            subject: job.data.subject,
                            from: job.data.from,
                            to: job.data.to,
                            qty: job.progress,
                            completed: result,
                        },
                    })
                }
            })
        })

        return obs$.pipe(
            map((obs$) => ({
                data: {
                    active: obs$.active,
                    progress: obs$.progress,
                    completed: obs$.completed,
                },
            })),
        )
    }
}

错误:

(node:50677) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 completed listeners added to [Queue]. Use emitter.setMaxListeners() to increase limit

Can anyone help me please to understand and find correct way to use Server Sent Events in Nest.js?

i try to implement simple HTTP endpoint which creates a job queue, and when job is finished queue processor emits an event with job result, then SSE endpoint returns an observable pipe which listen an events from job queue.

Code is works but in console i get a message with memory leak in event stream when i adding a new jobs or reloading page, so i understand that my solution is wrong.

How can i correctly control connections and manage SSE endpoint?
Also how can correctly broadcast messages from multiple events to SSE channel?

Controller code:

@Controller('jobs')
export class EmailController {
    constructor(private readonly emailService: EmailService) {}

    @Post('add')
    async confirm(@Session() session: Record<string, any>, @Body() batch: BatchDto) {
        session.uuid = session.uuid ? session.uuid : uuidv4()
        const jobId = await this.emailService.sendMail(batch, session.uuid)
        if (!session.jobs) session.jobs = [+jobId]
        else session.jobs.push(+jobId)
        return { jobId }
    }

    @Sse('sse')
    async sse(@Query() query: UuidDto): Promise<Observable<MessageEvent>> {
        return await this.emailService.getResults(query.uuid)
    }
}

Service code:

@Injectable()
export class EmailService {
    constructor(@InjectQueue('email') private readonly emailQueue: Queue) {}

    async sendMail(batch: BatchDto, uuid: string) {
        const job = await this.emailQueue.add('send', { ...batch, uuid })
        return job.id
    }

    async getResults(uuid: string) {
        const obs$: Observable<any> = new Observable((observer) => {

        // Is it correct way to subscribe to events and broadcast them to SSE channel?

            this.emailQueue.on('active', (job, jobPromise) => {
                if (job.data.uuid === uuid) {
                    observer.next({
                        active: {
                            id: job.id,
                            data: job.data,
                        },
                    })
                }
            })

            this.emailQueue.on('progress', (job, progress) => {
                if (job.data.uuid === uuid) {
                    observer.next({
                        progress: {
                            id: job.id,
                            progress,
                        },
                    })
                }
            })

            this.emailQueue.on('completed', (job, result) => {
                if (job.data.uuid === uuid) {
                    observer.next({
                        completed: {
                            id: job.id,
                            subject: job.data.subject,
                            from: job.data.from,
                            to: job.data.to,
                            qty: job.progress,
                            completed: result,
                        },
                    })
                }
            })
        })

        return obs$.pipe(
            map((obs$) => ({
                data: {
                    active: obs$.active,
                    progress: obs$.progress,
                    completed: obs$.completed,
                },
            })),
        )
    }
}

Error:

(node:50677) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 completed listeners added to [Queue]. Use emitter.setMaxListeners() to increase limit

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文