如何在 Nest.js 中使用 SSE
谁能帮助我理解并找到在 Nest.js 中使用服务器发送事件的正确方法?
我尝试实现简单的 HTTP 端点,它创建一个作业队列,当作业完成时,队列处理器发出一个带有作业结果的事件,然后 SSE 端点返回一个可观察的管道,该管道侦听作业队列中的事件。
代码可以工作,但在控制台中,当我添加新作业或重新加载页面时,我在事件流中收到一条有关内存泄漏的消息,所以我知道我的解决方案是错误的。
如何正确控制连接和管理 SSE 端点? 另外如何正确地将多个事件的消息广播到SSE通道?
控制器代码:
@Controller('jobs')
export class EmailController {
constructor(private readonly emailService: EmailService) {}
@Post('add')
async confirm(@Session() session: Record<string, any>, @Body() batch: BatchDto) {
session.uuid = session.uuid ? session.uuid : uuidv4()
const jobId = await this.emailService.sendMail(batch, session.uuid)
if (!session.jobs) session.jobs = [+jobId]
else session.jobs.push(+jobId)
return { jobId }
}
@Sse('sse')
async sse(@Query() query: UuidDto): Promise<Observable<MessageEvent>> {
return await this.emailService.getResults(query.uuid)
}
}
服务代码:
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private readonly emailQueue: Queue) {}
async sendMail(batch: BatchDto, uuid: string) {
const job = await this.emailQueue.add('send', { ...batch, uuid })
return job.id
}
async getResults(uuid: string) {
const obs$: Observable<any> = new Observable((observer) => {
// Is it correct way to subscribe to events and broadcast them to SSE channel?
this.emailQueue.on('active', (job, jobPromise) => {
if (job.data.uuid === uuid) {
observer.next({
active: {
id: job.id,
data: job.data,
},
})
}
})
this.emailQueue.on('progress', (job, progress) => {
if (job.data.uuid === uuid) {
observer.next({
progress: {
id: job.id,
progress,
},
})
}
})
this.emailQueue.on('completed', (job, result) => {
if (job.data.uuid === uuid) {
observer.next({
completed: {
id: job.id,
subject: job.data.subject,
from: job.data.from,
to: job.data.to,
qty: job.progress,
completed: result,
},
})
}
})
})
return obs$.pipe(
map((obs$) => ({
data: {
active: obs$.active,
progress: obs$.progress,
completed: obs$.completed,
},
})),
)
}
}
错误:
(node:50677) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 completed listeners added to [Queue]. Use emitter.setMaxListeners() to increase limit
Can anyone help me please to understand and find correct way to use Server Sent Events in Nest.js?
i try to implement simple HTTP endpoint which creates a job queue, and when job is finished queue processor emits an event with job result, then SSE endpoint returns an observable pipe which listen an events from job queue.
Code is works but in console i get a message with memory leak in event stream when i adding a new jobs or reloading page, so i understand that my solution is wrong.
How can i correctly control connections and manage SSE endpoint?
Also how can correctly broadcast messages from multiple events to SSE channel?
Controller code:
@Controller('jobs')
export class EmailController {
constructor(private readonly emailService: EmailService) {}
@Post('add')
async confirm(@Session() session: Record<string, any>, @Body() batch: BatchDto) {
session.uuid = session.uuid ? session.uuid : uuidv4()
const jobId = await this.emailService.sendMail(batch, session.uuid)
if (!session.jobs) session.jobs = [+jobId]
else session.jobs.push(+jobId)
return { jobId }
}
@Sse('sse')
async sse(@Query() query: UuidDto): Promise<Observable<MessageEvent>> {
return await this.emailService.getResults(query.uuid)
}
}
Service code:
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private readonly emailQueue: Queue) {}
async sendMail(batch: BatchDto, uuid: string) {
const job = await this.emailQueue.add('send', { ...batch, uuid })
return job.id
}
async getResults(uuid: string) {
const obs$: Observable<any> = new Observable((observer) => {
// Is it correct way to subscribe to events and broadcast them to SSE channel?
this.emailQueue.on('active', (job, jobPromise) => {
if (job.data.uuid === uuid) {
observer.next({
active: {
id: job.id,
data: job.data,
},
})
}
})
this.emailQueue.on('progress', (job, progress) => {
if (job.data.uuid === uuid) {
observer.next({
progress: {
id: job.id,
progress,
},
})
}
})
this.emailQueue.on('completed', (job, result) => {
if (job.data.uuid === uuid) {
observer.next({
completed: {
id: job.id,
subject: job.data.subject,
from: job.data.from,
to: job.data.to,
qty: job.progress,
completed: result,
},
})
}
})
})
return obs$.pipe(
map((obs$) => ({
data: {
active: obs$.active,
progress: obs$.progress,
completed: obs$.completed,
},
})),
)
}
}
Error:
(node:50677) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 completed listeners added to [Queue]. Use emitter.setMaxListeners() to increase limit
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论