使用 RabbitMQ 的工作池和多租户队列
我开发的 Web 应用程序是一个基于多租户云的应用程序(很多客户端,每个客户端都有自己独立的“环境”,但都在共享的硬件集上),我们正在引入用户批量处理的功能为后期处理工作。批处理工作的类型实际上并不重要,只是数量足够,没有工作队列就不太实际。我们选择 RabbitMQ 作为我们的底层队列框架。
因为我们是一个多租户应用程序,所以我们不一定希望客户端能够导致另一个客户端的队列处理时间过长,因此我们提出的一个想法是在每个客户端的基础上创建一个队列,并让一个共享工作池指向我们所有的客户端队列。问题是,据我所知,工作人员直接绑定到特定队列,而不是交换器。在我们的理想世界中,我们的客户端队列仍将在共享工作池中进行处理,而不会出现一个客户端阻塞另一个客户端的情况,我们可以根据需要通过启动更多工作人员或关闭空闲工作人员来扩大或缩小该工作人员池。从实际意义上讲,将工作人员绑定到特定队列可以防止我们出现这种情况,因为我们经常有很多工作人员在队列中闲置而没有任何活动。
有没有一个相对直接的方法可以实现这一点?我对 RabbitMQ 相当陌生,还没有真正能够完成我们所追求的目标。我们也不想编写非常复杂的多线程消费者应用程序,这会浪费我们可能无法承受的开发和测试时间。如果确实如此,我们的堆栈是基于 Windows/.Net/C# 的,但我认为这不会对当前的问题产生重大影响。
I work on a web application that is a multi-tenant cloud based application (lots of clients, each with their own separate "environment", but all on shared sets of hardware) and we're introducing the ability for a user to batch up work for later processing. The types of batched work really isn't important, it's just of sufficient quantity that doing it without a work queue isn't really practical. We've selected RabbitMQ as our underlying queue framework.
Because we're a multi-tenant app, we don't necessarily want clients to be able to cause lengthy queue process times for another client, so one idea that we've floated up is creating a queue on a per client basis and having a shared worker pool pointed across ALL our client queues. The problem is that, to the best that I can figure, workers are directly bound to a specific queue, not an exchange. In our ideal world, our client queues will still be processed, without one client blocking another, from a shared worker pool that we can grow or shrink as necessary by launching more workers or closing down idle ones. Having workers tied to a specific queue prevents us from this in a practical sense, as we'd frequently have lots of workers just idling on a queue with no activity.
Is there a relatively straight forward to accomplish this? I'm fairly new to RabbitMQ and haven't really been able to accomplish what we're after. We also don't want to have to write a very complex multithreaded consumer application either, that's a time sink in dev and test time that we likely can't afford. Our stack is Windows/.Net/C# based if that's germaine, but I don't think that should have a major bearing in the question at hand.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
您可以查看优先级队列实现(最初提出此问题时尚未实现): https:// /www.rabbitmq.com/priority.html
如果这对你不起作用,你可以尝试一些其他的技巧来实现你想要的(这应该适用于旧版本的 RabbitMQ):
你可以有 100 个队列绑定到一个主题交换并将路由密钥设置为用户 ID % 100 的哈希值,即每个任务将具有 1 到 100 之间的密钥,并且同一用户的任务将具有相同的密钥。每个队列都与 1 到 100 之间的唯一模式绑定。现在,您拥有一组工作人员,它们以随机队列号开始,然后在每个作业后递增该队列号,再次 % 100 在队列 100 后循环回到队列 1。
现在,您的工作人员队列可以并行处理最多 100 个唯一用户,或者如果没有其他工作要做,所有工作人员都可以专注于单个用户。如果工作人员需要在每个作业之间循环遍历所有 100 个队列,那么在只有单个用户在单个队列上有大量作业的情况下,每个作业之间自然会产生一些开销。减少队列数量是解决此问题的一种方法。您还可以让每个工作线程保持与每个队列的连接,并使用每个队列最多一条未确认的消息。如果未确认的消息超时设置得足够高,工作人员就可以更快地循环浏览内存中的待处理消息。
或者,您可以创建两个交换器,每个交换器都有一个绑定队列。所有工作都进入第一个交换器和队列,由工作池消耗。如果一个工作单元花费的时间太长,工作人员可以取消它并将其推送到第二个队列。当第一个队列上没有任何内容时,工作人员仅处理第二个队列。您可能还需要几个具有相反队列优先级的工作人员,以确保当有永无休止的短任务流到达时,长时间运行的任务仍然得到处理,以便最终始终处理用户批次。这不会真正将您的工作人员队列分布在所有任务中,但它会阻止一个用户的长时间运行任务,从而阻止您的工作人员为同一用户或另一个用户执行短期运行任务。它还假设您可以取消作业并稍后重新运行它,不会出现任何问题。这还意味着超时且需要以低优先级重新运行的任务将会浪费资源。除非您可以提前识别快任务和慢任务
如果单个用户有 100 个慢任务,然后另一个用户发布一批任务,则第一个建议的 100 个队列也可能存在问题。在其中一项缓慢任务完成之前,不会查看这些任务。如果事实证明这是一个合理的问题,您可以将这两种解决方案结合起来。
You could look at the priority queue implementation (which wasn't implemented when this question was originally asked): https://www.rabbitmq.com/priority.html
If that doesn't work for you, you could try some other hacks to achieve what you want (which should work with older versions of RabbitMQ):
You could have 100 queues bound to a topic exchange and set the routing key to a hash of the user ID % 100, i.e. each task will have a key between 1 and 100 and tasks for the same user will have the same key. Each queue is bound with a unique pattern between 1 and 100. Now you have a fleet of workers which start with a random queue number and then increment that queue number after each job, again % 100 to cycle back to queue 1 after queue 100.
Now your worker fleet can process up to 100 unique users in parallel, or all the workers can focus on a single user if there is no other work to do. If the workers need to cycle through all 100 queues between each job, in the scenario that only a single user has lot of jobs on a single queue, you're naturally going to have some overhead between each job. A smaller number of queues is one way to deal with this. You could also have each worker hold a connection to each of the queues and consume up to one un-acknowledged message from each. The worker can then cycle through the pending messages in memory much faster, provided the un-acknowledged message timeout is set sufficiently high.
Alternatively you could create two exchanges, each with a bound queue. All work goes to the first exchange and queue, which a pool of workers consume. If a unit of work takes too long the worker can cancel it and push it to the second queue. Workers only process the second queue when there's nothing on the first queue. You might also want a couple of workers with the opposite queue prioritization to make sure long running tasks are still processed when there's a never ending stream of short tasks arriving, so that a users batch will always be processed eventually. This won't truly distribute your worker fleet across all tasks, but it will stop long running tasks from one user holding up your workers from executing short running tasks for that same user or another. It also assumes you can cancel a job and re-run it later without any problems. It also means there will be wasted resources from tasks that timeout and need to be re-run as low priority. Unless you can identify fast and slow tasks in advance
The first suggestion with the 100 queues could also have a problem if there are 100 slow tasks for a single user, then another user posts a batch of tasks. Those tasks won't get looked at until one of the slow tasks is finished. If this turns out to be a legitimate problem you could potentially combine the two solutions.
您可以让您的工作人员池全部使用相同的唯一队列。然后,工作将在它们之间分配,您将能够扩大/缩小您的池,以增加/减少您的工作处理能力。
You can just have your pool of workers all consume the same unique queue. Work will then be distributed across them and you'll be able to grow/shrink your pool in order to increase/decrease your work processing capacity.
我不明白为什么您不使用 RabbitMQ 的虚拟主机,也不让您的应用程序登录到 RabbitMQ 并在每个用户的单独连接上进行身份验证。
这并不意味着您不能让工作主管将工作人员分配给一个或另一个用户。但这确实意味着每个用户的所有消息都由完全独立的交换器和队列处理。
I don't understand why you don't use RabbitMQ's vhosts and have your app login to RabbitMQ and authenticate on a separate connection for each user.
This doesn't mean that you can't have a worker supervisor that assigns workers to one user or another. But it does mean that all messages for each user are processed by entirely separate exchanges and queues.
工作人员被分配了 0+ 个队列,而不是交换器。
从每个工作线程的哪些队列中获取任务的逻辑是在通过
CELERYD_CONSUMER
指示的类中实现的,默认情况下是celery.worker.consumer.Consumer
。您可以创建一个自定义消费者类来实现您喜欢的任何逻辑。困难的部分是决定你想要使用的“公平”算法的细节;但是一旦您决定了这一点,您就可以通过创建自定义消费者类并将其分配给适当的工作人员来实现它。
Workers are assigned 0+ queues, not exchanges.
The logic for which tasks will be taken from which queues for each worker is implemented in the class indicated via
CELERYD_CONSUMER
, which is by defaultcelery.worker.consumer.Consumer
.You can create a custom consumer class ro implements whatever logic you like. The hard part will be deciding the details ofthe "fairness" algorithm you want to use; but once you've decided that, you can implement it be creating a custom consumer class and assigning that to appropriate workers.