Erlang:动态节点集上的作业调度

发布于 2024-10-17 15:41:30 字数 901 浏览 4 评论 0原文

我需要一些在 Erlang 中编写作业调度程序的建议,该调度程序能够在一组工作节点上分配作业(外部操作系统进程)。一项工作可以持续几毫秒到几个小时。 “调度程序”应该是一个全局注册表,作业进入其中,进行排序,然后在连接的“工作节点”上进行分配和执行。工作节点应该能够通过告知它们能够并行处理多少作业(槽)来在调度程序上注册。工作节点应该能够随时加入和离开。

示例:

  • 调度程序有 10 个作业等待
  • 工作节点 A 连接并能够并行处理 3 个作业
  • 工作节点 B 连接并能够并行处理 1 个作业
  • 一段时间后,另一个工作节点加入,该节点能够并行处理 2 个作业并行

问题:

我认真地花了一些时间思考这个问题,但我仍然不确定该走哪条路。我当前的解决方案是为调度程序提供一个全局注册的 gen_server ,该调度程序将作业保持在其状态。每个工作节点都会生成 N 个工作进程并将它们注册到调度程序上。然后,工作进程从调度程序中提取作业(如果当前没有可用作业,则这是一个带有 {noreply, ...} 的无限阻塞调用)。

这里有一些问题:

  • 将每个新工作分配给现有工作人员是否是一个好主意,因为我知道在新工作人员连接时我必须将工作重新分配给另一个工作人员? (我认为这就是 Erlang SMP 调度程序的工作方式,但重新分配作业对我来说似乎是一件很头疼的事情)
  • 我是否应该为每个工作进程处理槽启动一个进程以及该进程应该驻留在哪里:在调度程序节点上还是在工作节点上?调度程序是否应该对工作节点进行 rpc 调用,或者工作节点拉取新作业然后自行执行它们会更好吗?
  • 最后:这个问题已经解决了吗?在哪里可以找到它的代码? :-) 我已经尝试过使用 RabbitMQ 进行作业调度,但自定义作业排序和部署增加了很多复杂性。

非常欢迎任何建议!

I need some advice writing a Job scheduler in Erlang which is able to distribute jobs ( external os processes) over a set of worker nodes. A job can last from a few milliseconds to a few hours. The "scheduler" should be a global registry where jobs come in, get sorted and then get assigned and executed on connected "worker nodes". Worker nodes should be able to register on the scheduler by telling how many jobs they are able to process in parallel (slots). Worker nodes should be able to join and leave at any time.

An Example:

  • Scheduler has 10 jobs waiting
  • Worker Node A connects and is able to process 3 jobs in parallel
  • Worker Node B connects and is able to process 1 job in parallel
  • Some time later, another worker node joins which is able to process 2 jobs in parallel

Questions:

I seriously spent some time thinking about the problem but I am still not sure which way to go. My current solution is to have a globally registered gen_server for the scheduler which holds the jobs in its state. Every worker node spawns N worker processes and registers them on the scheduler. The worker processes then pull a job from the scheduler (which is an infinite blocking call with {noreply, ...} if no jobs are currently availale).

Here are some questions:

  • Is it a good idea to assign every new job to an existing worker, knowing that I will have to re-assign the job to another worker at the time new workers connect? (I think this is how the Erlang SMP scheduler does things, but reassigning jobs seems like a big headache to me)
  • Should I start a process for every worker processing slot and where should this process live: on the scheduler node or on the worker node? Should the scheduler make rpc calls to the worker node or would it be better for the worker nodes to pull new jobs and then execute them on their own?
  • And finally: Is this problem already solved and where to find the code for it? :-)
    I already tried RabbitMQ for job scheduling but custom job sorting and deployment adds a lot of complexity.

Any advice is highly welcome!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

等你爱我 2024-10-24 15:41:30

阅读评论中的答案后,我仍然建议使用 pool(3)

  • 生成 100k 进程对于 Erlang 来说并不是什么大问题,因为生成进程比其他系统便宜得多.

  • 每个作业一个进程是 Erlang 中的一个非常好的模式,启动一个新进程,在进程中运行作业,保留进程中的所有状态,并在作业完成后终止进程。

  • 不要理会处理作业并等待新作业的工作进程。如果您使用操作系统进程或线程,那么这是一种可行的方法,因为生成的成本很高,但在 Erlang 中,这只会增加不必要的复杂性。

pool 设施作为低级构建块很有用,它唯一缺少您的功能是自动启动其他节点的能力。我要做的就是从池和一组固定的节点开始以获得基本功能。

然后添加一些额外的逻辑来监视节点上的负载,例如池使用statistics(run_queue)来实现。如果您发现所有节点都超过了特定的负载阈值,只需在额外的机器上slave:start/2,3一个新节点并使用pool:attach/1添加到你的泳池。

这不会重新平衡旧的正在运行的作业,但新作业将自动移动到新启动的节点,因为它仍然空闲。

有了这个,您可以快速地控制传入作业的分配,并以较慢的完全独立的方式添加和删除节点。

如果你完成了所有这些工作并且仍然发现 - 请在进行一些现实世界基准测试之后 - 你需要重新平衡作业,你总是可以在作业主循环中构建一些东西,在消息rebalance之后它可以重新生成本身使用池主机传递其当前状态作为参数。

最重要的是,继续构建一些简单的东西,然后运行并优化它。

Having read your answer in the comments I'd still recommend to use pool(3):

  • Spawning 100k processes is not a big deal for Erlang because spawning a process is much cheaper than in other systems.

  • One process per job is a very good pattern in Erlang, start a new process run the job in the process keeping all the state in the process and terminate the process after the job is done.

  • Don't bother with worker processes that process a job and wait for a new one. This is the way to go if you are using OS-processes or threads because spawning is expensive but in Erlang this only adds unnecessary complexity.

The pool facility is useful as a low level building block, the only thing it misses your your functionality is the ability to start additional nodes automatically. What I would do is start with pool and a fixed set of nodes to get the basic functionality.

Then add some extra logic that watches the load on the nodes e.g. also like pool does it with statistics(run_queue). If you find that all nodes are over a certain load threshold just slave:start/2,3 a new node on a extra machine and use pool:attach/1to add it to your pool.

This won't rebalance old running jobs but new jobs will automatically be moved to the newly started node since its still idle.

With this you can have a fast pool controlled distribution of incoming jobs and a slower totally separate way of adding and removing nodes.

If you got all this working and still find out -- after some real world benchmarking please -- you need rebalancing of jobs you can always build something into the jobs main loops, after a message rebalance it can respawn itself using the pool master passing its current state as a argument.

Most important just go ahead and build something simple and working and optimize it later.

2024-10-24 15:41:30

我的问题解决方案:

“distributor” - gen_server,
“工人”- gen_server。

“distributor”使用slave:start_link启动“workers”,每个“worker”都是用max_processes参数启动的,

"distributor" behavior:

handle_call(submit,...)
  * put job to the queue,
  * cast itself check_queue

handle_cast(check_queue,...)
  * gen_call all workers for load (current_processes / max_processes),
  * find the least busy,
  * if chosen worker load is < 1 gen_call(submit,...) worker 
      with next job if any, remove job from the queue,

"worker" behavior (trap_exit = true):

handle_call(report_load, ...)
  * return current_process / max_process,

handle_call(submit, ...)
  * spawn_link job,

handle_call({'EXIT', Pid, Reason}, ...)
  * gen_cast distributor with check_queue

事实上它比这更复杂,因为我需要跟踪正在运行的作业,如果需要的话杀死它们,但很容易在这样的架构中实现。

虽然这不是一组动态的节点,但您可以随时从分发器启动新节点。

PS 看起来与池类似,但在我的例子中,我正在提交端口进程,因此我需要限制它们并更好地控制哪些内容会去哪里。

My solution to the problem:

"distributor" - gen_server,
"worker" - gen_server.

"distributor" starts "workers" using slave:start_link, each "worker" is started with max_processes parameter,

"distributor" behavior:

handle_call(submit,...)
  * put job to the queue,
  * cast itself check_queue

handle_cast(check_queue,...)
  * gen_call all workers for load (current_processes / max_processes),
  * find the least busy,
  * if chosen worker load is < 1 gen_call(submit,...) worker 
      with next job if any, remove job from the queue,

"worker" behavior (trap_exit = true):

handle_call(report_load, ...)
  * return current_process / max_process,

handle_call(submit, ...)
  * spawn_link job,

handle_call({'EXIT', Pid, Reason}, ...)
  * gen_cast distributor with check_queue

In fact it is more complex than that as I need to track running jobs, kill them if I need to, but it is easy to implement in such architecture.

This is not a dynamic set of nodes though, but you can start new node from the distributor whenever you need.

P.S. Looks similar to pool, but in my case I am submitting port processes, so I need to limit them and have better control of what is going where.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文