有效的算法来分配工作?

发布于 2024-08-27 00:36:15 字数 2076 浏览 3 评论 0原文

解释起来有点复杂,但我们开始吧。基本上,问题是“如何有效地将问题分解为子问题”。这里的“高效”是指,分解的子问题尽可能大。基本上,如果我根本不需要分解问题,那就太理想了。然而,因为一个工人只能处理特定的问题块,所以我确实需要分手。但我想找到一种方法来尽可能粗略地做到这一点。

这是一些伪代码..

我们遇到了这样的问题(抱歉,这是用 Java 编写的。如果您不明白,我很乐意解释)。

class Problem {
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 };
    final Data data = //Some data
}

一个子问题是:

class SubProblem {
    final Set<Integer> targetedSectionIds;
    final Data data;

    SubProblem(Set<Integer> targetedSectionsIds, Data data){
        this.targetedSectionIds = targetedSectionIds;
        this.data = data;
    }
}

那么工作将是这样的。

class Work implements Runnable {
    final Set<Section> subSections;
    final Data data;
    final Result result;

    Work(Set<Section> subSections, Data data) {
        this.sections = SubSections;
        this.data = data;
    }

    @Override
    public void run(){
        for(Section section : subSections){
            result.addUp(compute(data, section));
        }
    }
}

现在我们有了“Worker”的实例,它们有自己的状态部分

class Worker implements ExecutorService {
    final Map<Integer,Section> sectionsIHave;
    {
        sectionsIHave = {1:section1, 5:section5, 8:section8 };
    }

    final ExecutorService executor = //some executor.

    @Override
    public void execute(SubProblem problem){
        Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds);
        super.execute(new Work(sectionsNeeded, problem.data);
    }

}

唷。

因此,我们有很多问题,而工作人员不断要求更多的子问题。我的任务是将问题分解为子问题并将其交给他们。然而,困难在于,我必须稍后收集子问题的所有结果,并将它们合并(减少)为整个问题结果

然而,这是昂贵的,所以我想给工作人员尽可能大的“块”(有尽可能多的 targetedSections )。

它不必是完美的(数学上尽可能高效或其他)。我的意思是,我想不可能有一个完美的解决方案,因为你无法预测每次计算需要多长时间等等。但是对此有一个好的启发式解决方案吗?或者也许我可以在开始设计之前阅读一些资源?

任何建议都将受到高度赞赏!

编辑: 我们也可以控制部分分配,因此控制它是另一种选择。基本上,唯一的限制是一个工人只能拥有固定数量的部分。

It's a bit complicated to explain but here we go. Basically, the issue is "How to break up problems into subproblems in an efficient way". "Efficient" here means, the broken up subproblems are as big as possible. Basically, it would be ideal if I don't have to break up the problems at all. However, because an worker can only work on specific blocks of the problems, I do need to break up. But I want to find a way to do this as coarse as possible.

Here is some pseudo code..

We have problems like this (Sorry it's in Java. If you don't understand, I'd be glad to explain).

class Problem {
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 };
    final Data data = //Some data
}

And a subproblem is:

class SubProblem {
    final Set<Integer> targetedSectionIds;
    final Data data;

    SubProblem(Set<Integer> targetedSectionsIds, Data data){
        this.targetedSectionIds = targetedSectionIds;
        this.data = data;
    }
}

Work will look like this, then.

class Work implements Runnable {
    final Set<Section> subSections;
    final Data data;
    final Result result;

    Work(Set<Section> subSections, Data data) {
        this.sections = SubSections;
        this.data = data;
    }

    @Override
    public void run(){
        for(Section section : subSections){
            result.addUp(compute(data, section));
        }
    }
}

Now we have instances of 'Worker', that have their own state sections I have.

class Worker implements ExecutorService {
    final Map<Integer,Section> sectionsIHave;
    {
        sectionsIHave = {1:section1, 5:section5, 8:section8 };
    }

    final ExecutorService executor = //some executor.

    @Override
    public void execute(SubProblem problem){
        Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds);
        super.execute(new Work(sectionsNeeded, problem.data);
    }

}

phew.

So, we have a lot of Problems and Workers are constantly asking for more SubProblems. My task is to break up Problems into SubProblem and give it to them. The difficulty is however, that I have to later collect all the results for the SubProblems and merge (reduce) them into a Result for the whole Problem.

This is however, costly, so I want to give the workers "chunks" that are as big as possible (has as many targetedSections as possible).

It doesn't have to be perfect (mathematically as efficient as possible or something). I mean, I guess that it is impossible to have a perfect solution, because you can't predict how long each computation will take, etc.. But is there a good heuristic solution for this? Or maybe some resources I can read up before I go into designing?

Any advice is highly appreciated!

EDIT:
We have control over the Section Allocation, too, so controlling that is another options. Basically, the only restriction on this is that a worker can only have a fixed number of sections.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

一瞬间的火花 2024-09-03 00:36:15

好的,看来您的网络服务有一个分片模型,我们做了类似的事情,我们使用“entityId”(sectionId)的反向索引到“客户端”(worker),该客户端将连接到特定的网络服务处理该特定实体。最简单的方法(见下文)是使用 id 到worker的反向映射。如果内存是一个限制,另一种可能性是使用函数(例如,sectionId % 服务数量)。

为了给服务提供尽可能多的工作,有一个简单的批处理算法,该算法将填充批处理以达到用户指定的最大值。这将允许根据远程服务消耗它们的速度来粗略地调整工作块的大小。

public class Worker implements Runnable {

    private final Map<Integer, Section> sections;
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096);
    private final int batchSize;

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) {
        this.sections = sectionsIHave;
        this.batchSize = batchSize;
    }

    public Set<Integer> getSectionIds() {
        return sections.keySet();
    }

    public void execute(final SubProblem command) throws InterruptedException {

        if (sections.containsKey(command.getSectionId())) {
            problemQ.put(command);
        } else {
            throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId());
        }

    }

    @Override
    public void run() {
        final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize);
        while (!Thread.interrupted()) {
            batch.clear();

            try {
                batch.add(problemQ.take());
                for (int i = 1; i < batchSize; i++) {
                    final SubProblem problem = problemQ.poll();
                    if (problem != null) {
                        batch.add(problem);
                    } else {
                        break;
                    }

                    process(batch);
                }
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void process(final List<SubProblem> batch) {
        // Submit to remote process.
    }

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) {
        final Map<Integer, Worker> temp = new HashMap<Integer, Worker>();
        for (final Worker worker : workers) {
            for (final Integer sectionId : worker.getSectionIds()) {
                temp.put(sectionId, worker);
            }
        }
        return Collections.unmodifiableMap(temp);
    }

    public static void main(final String[] args) throws InterruptedException {
     // Load workers, where worker is bound to single remote service
        final List<Worker> workers = getWorkers();
        final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers);
        final List<SubProblem> subProblems = getSubProblems();
        for (final SubProblem problem : subProblems) {
            final Worker w = workerReverseIndex.get(problem.getSectionId());
            w.execute(problem);
        }
    }
}

Okay, it appears that you have a sharding model for you network services, we do something similar and we use a reverse index of "entityId" (sectionId) to the "client" (worker) that will connect to the particular network service that will deal with that specific entity. Simplest method (see below) is to use a reverse map of id to worker. If memory is a constraint another possibility is to use a function (e.g. sectionId % number of services).

To give the services as much work as possible, there is a simple batching algorithm that will fill to batch to some user specified maximum. This will allow chunks of work to be sized roughly according to how fast the remote services are able to consume them.

public class Worker implements Runnable {

    private final Map<Integer, Section> sections;
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096);
    private final int batchSize;

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) {
        this.sections = sectionsIHave;
        this.batchSize = batchSize;
    }

    public Set<Integer> getSectionIds() {
        return sections.keySet();
    }

    public void execute(final SubProblem command) throws InterruptedException {

        if (sections.containsKey(command.getSectionId())) {
            problemQ.put(command);
        } else {
            throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId());
        }

    }

    @Override
    public void run() {
        final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize);
        while (!Thread.interrupted()) {
            batch.clear();

            try {
                batch.add(problemQ.take());
                for (int i = 1; i < batchSize; i++) {
                    final SubProblem problem = problemQ.poll();
                    if (problem != null) {
                        batch.add(problem);
                    } else {
                        break;
                    }

                    process(batch);
                }
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void process(final List<SubProblem> batch) {
        // Submit to remote process.
    }

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) {
        final Map<Integer, Worker> temp = new HashMap<Integer, Worker>();
        for (final Worker worker : workers) {
            for (final Integer sectionId : worker.getSectionIds()) {
                temp.put(sectionId, worker);
            }
        }
        return Collections.unmodifiableMap(temp);
    }

    public static void main(final String[] args) throws InterruptedException {
     // Load workers, where worker is bound to single remote service
        final List<Worker> workers = getWorkers();
        final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers);
        final List<SubProblem> subProblems = getSubProblems();
        for (final SubProblem problem : subProblems) {
            final Worker w = workerReverseIndex.get(problem.getSectionId());
            w.execute(problem);
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文