从BullJS工作队列中删除重复的输出
我正在Heroku(我的Web Server dyno)上运行Nodejs/Express API服务器,该服务在REDIS中排队,这些工作由BullJ在Worker Server(单独的Worker Dyno)上管理,
这些作业可能需要一段时间(最多2小时) 。但是,我注意到有时,ID X
的工作将开始,然后使用ID X+1
的作业将在5分钟后开始使用完全相同的输入开始。这些是重复的任务(它们都不是来自应用程序中的用户操作),我想防止它们发生或以后清洁它们,
我考虑在失败时删除工作(如下所示方法),但我担心,因为Redis似乎随着时间的流逝而重复使用Jobid,如果这是一个坏主意,此外,我不希望工作静静地失败,而在这里工作从未完成,
workQueue.on('global:failed', async function (jobId, error) {
console.log("Failed: " + JSON.stringify(jobId), error);
await workQueue.removeJobs(jobId);
await deleteDBEntryWithJobId(jobId); // A fnc that looks at DB and deleted the entry
});
如果在这里完成的工作是有关代码库的更多信息。有帮助。我会采取任何设计技巧或建议,以使此过程更加稳健。我不太关心重复的 jobs ,更关心DB中的重复数据,该数据显示给用户
// Data model for output from the job
{
jobId: string // This is the BullJS jobId
name: string
data: any[] // A lot of data that's pulled from various APIs during the job
}
// API endpoint that takes input and kicks off job
let workQueue = new Queue('work', REDIS_URL);
app.post('/createSegment', async (req: Request, res: Response) => {
corsHandler(req, res, async () => {
const jobOptions: JobOptions = { removeOnComplete: true, removeOnFail: true };
let job = await workQueue.add({ ...req.body, jobType: NEW_SEGMENT_JOB }, jobOptions);
res.json({ jobId: job.id, data: job.data });
return;
})
});
// The worker code
let REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
let workers = process.env.WEB_CONCURRENCY || 4;
let maxJobsPerWorker = 50;
function start() {
let workQueue = new Queue('work', REDIS_URL);
workQueue.process(maxJobsPerWorker, async (job) => {
const { jobType } = job.data;
if (jobType === NEW_SEGMENT_JOB) {
const segmentId = await createNewSegment(job.data, job.id); // This creates the initial record in the DB (relatively simple / safe operation, which is why is gets duplicated)
await analyzeSegment(segmentId);
}
})
}
throng({ workers, start });
I am running a nodejs/express API server on Heroku (my web server dyno) that queues up jobs in Redis that are managed by BullJS on a worker server (a separate worker dyno)
These jobs can take a while (up to 2 hours). However, I have noticed that sometimes, a job with id x
will start, and then a job with id x+1
will start 5 minutes later with the exact same inputs. These are duplicate tasks (they're not both coming in from user actions in the app), and I'd like to either prevent them from happening or clean them up later
I have considered deleting the job when it fails (something like the below approach), but I'm worried that because Redis seems to reuse jobIds over time if this is a bad idea, plus I don't exactly want jobs to fail silently and the work never gets done
workQueue.on('global:failed', async function (jobId, error) {
console.log("Failed: " + JSON.stringify(jobId), error);
await workQueue.removeJobs(jobId);
await deleteDBEntryWithJobId(jobId); // A fnc that looks at DB and deleted the entry
});
Here is some more info about the codebase if helpful. I'd take any design tips or suggestions to make this process more robust. I'm less concerned with duplicated jobs and more concerned with duplicated data in the DB which gets shown to a user
// Data model for output from the job
{
jobId: string // This is the BullJS jobId
name: string
data: any[] // A lot of data that's pulled from various APIs during the job
}
// API endpoint that takes input and kicks off job
let workQueue = new Queue('work', REDIS_URL);
app.post('/createSegment', async (req: Request, res: Response) => {
corsHandler(req, res, async () => {
const jobOptions: JobOptions = { removeOnComplete: true, removeOnFail: true };
let job = await workQueue.add({ ...req.body, jobType: NEW_SEGMENT_JOB }, jobOptions);
res.json({ jobId: job.id, data: job.data });
return;
})
});
// The worker code
let REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
let workers = process.env.WEB_CONCURRENCY || 4;
let maxJobsPerWorker = 50;
function start() {
let workQueue = new Queue('work', REDIS_URL);
workQueue.process(maxJobsPerWorker, async (job) => {
const { jobType } = job.data;
if (jobType === NEW_SEGMENT_JOB) {
const segmentId = await createNewSegment(job.data, job.id); // This creates the initial record in the DB (relatively simple / safe operation, which is why is gets duplicated)
await analyzeSegment(segmentId);
}
})
}
throng({ workers, start });
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我怀疑这项工作可能正在失败& Bulljs正在重新攻击。为了更安全,将明确的
尝试
选项放在零(我不确定默认值是什么)。按照 https://githbithub.com/optimalbits com开发/参考。md#queueadd ,
removeOnfail
如果是的,在所有尝试后失败时,它会删除工作。因此,当前removeonComplete
和removeOnfail
仍可能导致默认尝试
有效。I suspect that the job might be failing & BullJs is re-attempting. To be on safer side, put explicitly
attempts
option to zero (i'm not sure what's the default value for it).As per https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueadd,
removeOnFail
If true, removes the job when it fails after all attempts. So the currentremoveOnComplete
andremoveOnFail
may still cause the defaultattempts
to be valid.