Worker/Timeslot 排列/约束过滤算法

发布于 2024-08-07 16:43:37 字数 15476 浏览 9 评论 0原文

希望你能帮我解决这些问题。这对工作没有帮助——它是为了一个由非常努力工作的志愿者组成的慈善机构,他们真的可以使用比他们目前拥有的更不混乱/烦人的时间表系统。

如果有人知道一个好的第三方应用程序(当然)可以自动执行此操作,那几乎同样好。只是......请不要建议随机的时间表,例如预订教室的时间表,因为我认为他们不能做到这一点。

预先感谢您的阅读;我知道这是一个很大的帖子。不过,我正在尽力清楚地记录这一点,并表明我自己已经做出了努力。

问题

我需要一个工人/时隙调度算法,为工人生成班次,满足以下条件:

输入数据

import datetime.datetime as dt

class DateRange:
    def __init__(self, start, end):
        self.start = start
        self.end   = end

class Shift:
    def __init__(self, range, min, max):
        self.range = range
        self.min_workers = min
        self.max_workers = max

tue_9th_10pm = dt(2009, 1, 9,   22, 0)
wed_10th_4am = dt(2009, 1, 10,   4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)

shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)

shift_1 = Shift(shift_1_times, 2,3)  # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2)  # allows 2
shift_3 = Shift(shift_3_times, 2,3)  # allows 3, requires 2, 3 available

shifts = ( shift_1, shift_2, shift_3 )

joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]

joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)

workers = ( joe, bob, sam, ned, max, amy, jim )

处理

从上面,班次工人 是要处理的两个主要输入变量

每个班次都有所需的最小和最大工人数量。满足轮班的最低要求对于成功至关重要,但如果所有其他方法都失败了,手动填补空白的轮班表比“错误”要好:) 主要的算法问题是,当足够的时候,不应该有不必要的空白工人有空。

理想情况下,一个轮班的最大工人数量将被填补,但相对于其他限制,这是最低优先级的,所以如果有什么必须给予的话,就应该是这个。

灵活的约束

这些有点灵活,如果找不到“完美”的解决方案,它们的界限可以稍微扩大。不过,这种灵活性应该是最后的手段,而不是随意利用。理想情况下,灵活性可以通过“fudge_factor”变量或类似变量进行配置。

  • 有一个最短时间段 两个班次之间。那么,一个工人 不应该安排两班倒 例如,在同一天。
  • 班次有最大数量 工人在给定时间段内可以做的事情 (例如,一个月)
  • 某些特定的最大数量 一个月内可以完成的轮班 (例如,夜班)

很高兴拥有,但不是必需的

如果您能想出一种算法来执行上述操作并包括任何/所有这些,我将印象深刻并感激不已。即使是单独执行这些操作的附加脚本也很棒。

  • 重叠的班次。例如, 最好能够指定 “前台”班次和“后台”班次 两者同时发生的转变。 这可以通过单独的调用来完成 具有不同班次数据的程序, 除了关于调度的限制 在给定时间内轮班的人员 期间将会被错过。

  • 可指定工人的最短重新安排时间段 以每个工人(而不是全局)为基础。例如, 如果乔感到工作过度或正在处理个人问题, 或者是初学者,我们可能想为他安排时间 比其他工作人员少。

  • 一些自动/随机/公平的方式选择员工来填补最低限度 当没有可用的工人适合时轮班人数。

  • 处理突然取消并填补空白的一些方法 无需重新安排其他班次。

输出测试

算法可能应该生成尽可能多的匹配解决方案,其中每个解决方案如下所示:

class Solution:
    def __init__(self, shifts_workers):
        """shifts_workers -- a dictionary of shift objects as keys, and a
        a lists of workers filling the shift as values."""

        assert isinstance(dict, shifts_workers)
        self.shifts_workers = shifts_workers

给定上述数据,这是单个解决方案的测试函数。我认为这是正确的,但我也希望得到一些同行评审。

def check_solution(solution):
  assert isinstance(Solution, solution)

  def shift_check(shift, workers, workers_allowed):
      assert isinstance(Shift, shift):
      assert isinstance(list, workers):
      assert isinstance(list, workers_allowed)

      num_workers = len(workers)
      assert num_workers >= shift.min_workers
      assert num_workers <= shift.max_workers

      for w in workers_allowed:
          assert w in workers

  shifts_workers = solution.shifts_workers

  # all shifts should be covered
  assert len(shifts_workers.keys()) == 3
  assert shift1 in shifts_workers.keys()
  assert shift2 in shifts_workers.keys()
  assert shift3 in shifts_workers.keys()

  # shift_1 should be covered by 2 people - joe, and bob
  shift_check(shift_1, shifts_workers[shift_1], (joe, bob))

  # shift_2 should be covered by 2 people - sam and amy
  shift_check(shift_2, shifts_workers[shift_2], (sam, amy))

  # shift_3 should be covered by 3 people - ned, max, and jim
  shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))

我尝试

用遗传算法来实现这一点,但似乎无法将其调整得非常正确,因此,尽管基本原理似乎适用于单班制,但它甚至无法解决仅需要几个班次和一个简单的情况。工人很少。

我最新的尝试是生成所有可能的排列作为解决方案,然后减少不满足约束的排列。这似乎工作得更快,并且让我更进一步,但我正在使用 python 2.6 的 itertools.product() 来帮助生成排列,但我不能完全正确。如果有很多错误,我不会感到惊讶,因为说实话,这个问题不太适合我的头脑:)

目前我的代码位于两个文件中:models.py 和 rota.py。 models.py 看起来像:

# -*- coding: utf-8 -*-
class Shift:
    def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
        self.start = start_datetime
        self.end = end_datetime
        self.duration = self.end - self.start
        self.min_coverage = min_coverage
        self.max_coverage = max_coverage

    def __repr__(self):
        return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)

class Duty:
    def __init__(self, worker, shift, slot):
        self.worker = worker
        self.shift = shift
        self.slot = slot

    def __repr__(self):
        return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
        self.worker.dump(indent=indent, depth=depth+1)
        print ind + ">"

class Avail:
    def __init__(self, start_time, end_time):
        self.start = start_time
        self.end = end_time

    def __repr__(self):
        return "<%s to %s>" % (self.start, self.end)

class Worker:
    def __init__(self, name, availabilities):
        self.name = name
        self.availabilities = availabilities

    def __repr__(self):
        return "<Worker %s Avail=%r>" % (self.name, self.availabilities)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Worker %s" % self.name
        for avail in self.availabilities:
            print ind + " " * indent + repr(avail)
        print ind + ">"

    def available_for_shift(self,  shift):
        for a in self.availabilities:
            if shift.start >= a.start and shift.end <= a.end:
                return True
        print "Worker %s not available for %r (Availability: %r)" % (self.name,  shift, self.availabilities)
        return False

class Solution:
    def __init__(self, shifts):
        self._shifts = list(shifts)

    def __repr__(self):
        return "<Solution: shifts=%r>" % self._shifts

    def duties(self):
        d = []
        for s in self._shifts:
            for x in s:
                yield x

    def shifts(self):
        return list(set([ d.shift for d in self.duties() ]))

    def dump_shift(self, s, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<ShiftList"
        for duty in s:
            duty.dump(indent=indent, depth=depth+1)
        print ind + ">"

    def dump(self, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<Solution"
        for s in self._shifts:
            self.dump_shift(s, indent=indent, depth=depth+1)
        print ind + ">"

class Env:
    def __init__(self, shifts, workers):
        self.shifts = shifts
        self.workers = workers
        self.fittest = None
        self.generation = 0

class DisplayContext:
    def __init__(self,  env):
        self.env = env

    def status(self, msg, *args):
        raise NotImplementedError()

    def cleanup(self):
        pass

    def update(self):
        pass

rota.py 看起来像:

#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-

from datetime import datetime as dt
am2 = dt(2009,  10,  1,  2, 0)
am8 = dt(2009,  10,  1,  8, 0)
pm12 = dt(2009,  10,  1,  12, 0)

def duties_for_all_workers(shifts, workers):
    from models import Duty

    duties = []

    # for all shifts
    for shift in shifts:
        # for all slots
        for cov in range(shift.min_coverage, shift.max_coverage):
            for slot in range(cov):
                # for all workers
                for worker in workers:
                    # generate a duty
                    duty = Duty(worker, shift, slot+1)
                    duties.append(duty)

    return duties

def filter_duties_for_shift(duties,  shift):
    matching_duties = [ d for d in duties if d.shift == shift ]
    for m in matching_duties:
        yield m

def duty_permutations(shifts,  duties):
    from itertools import product

    # build a list of shifts
    shift_perms = []
    for shift in shifts:
        shift_duty_perms = []
        for slot in range(shift.max_coverage):
            slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
            shift_duty_perms.append(slot_duties)
        shift_perms.append(shift_duty_perms)

    all_perms = ( shift_perms,  shift_duty_perms )

    # generate all possible duties for all shifts
    perms = list(product(*shift_perms))
    return perms

def solutions_for_duty_permutations(permutations):
    from models import Solution
    res = []
    for duties in permutations:
        sol = Solution(duties)
        res.append(sol)
    return res

def find_clashing_duties(duty, duties):
    """Find duties for the same worker that are too close together"""
    from datetime import timedelta
    one_day = timedelta(days=1)
    one_day_before = duty.shift.start - one_day
    one_day_after = duty.shift.end + one_day
    for d in [ ds for ds in duties if ds.worker == duty.worker ]:
        # skip the duty we're considering, as it can't clash with itself
        if duty == d:
            continue

        clashes = False

        # check if dates are too close to another shift
        if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
            clashes = True

        # check if slots collide with another shift
        if d.slot == duty.slot:
            clashes = True

        if clashes:
            yield d

def filter_unwanted_shifts(solutions):
    from models import Solution

    print "possibly unwanted:",  solutions
    new_solutions = []
    new_duties = []

    for sol in solutions:
        for duty in sol.duties():
            duty_ok = True

            if not duty.worker.available_for_shift(duty.shift):
                duty_ok = False

            if duty_ok:
                print "duty OK:"
                duty.dump(depth=1)
                new_duties.append(duty)
            else:
                print "duty **NOT** OK:"
                duty.dump(depth=1)

        shifts = set([ d.shift for d in new_duties ])
        shift_lists = []
        for s in shifts:
            shift_duties = [ d for d in new_duties if d.shift == s ]
            shift_lists.append(shift_duties)

        new_solutions.append(Solution(shift_lists))

    return new_solutions

def filter_clashing_duties(solutions):
    new_solutions = []

    for sol in solutions:
        solution_ok = True

        for duty in sol.duties():
            num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))

            # check if many duties collide with this one (and thus we should delete this one
            if num_clashing_duties > 0:
                solution_ok = False
                break

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_incomplete_shifts(solutions):
    new_solutions = []

    shift_duty_count = {}

    for sol in solutions:
        solution_ok = True

        for shift in set([ duty.shift for duty in sol.duties() ]):
            shift_duties = [ d for d in sol.duties() if d.shift == shift ]
            num_workers = len(set([ d.worker for d in shift_duties ]))

            if num_workers < shift.min_coverage:
                solution_ok = False

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_solutions(solutions,  workers):
    # filter permutations ############################
    # for each solution
    solutions = filter_unwanted_shifts(solutions)
    solutions = filter_clashing_duties(solutions)
    solutions = filter_incomplete_shifts(solutions)
    return solutions

def prioritise_solutions(solutions):
    # TODO: not implemented!
    return solutions

    # prioritise solutions ############################
    # for all solutions
        # score according to number of staff on a duty
        # score according to male/female staff
        # score according to skill/background diversity
        # score according to when staff last on shift

    # sort all solutions by score

def solve_duties(shifts, duties,  workers):
    # ramify all possible duties #########################
    perms = duty_permutations(shifts,  duties)
    solutions = solutions_for_duty_permutations(perms)
    solutions = filter_solutions(solutions,  workers)
    solutions = prioritise_solutions(solutions)
    return solutions

def load_shifts():
    from models import Shift
    shifts = [
        Shift(am2, am8, 2, 3),
        Shift(am8, pm12, 2, 3),
    ]
    return shifts

def load_workers():
    from models import Avail, Worker
    joe_avail = ( Avail(am2,  am8), )
    sam_avail = ( Avail(am2,  am8), )
    ned_avail = ( Avail(am2,  am8), )
    bob_avail = ( Avail(am8,  pm12), )
    max_avail = ( Avail(am8,  pm12), )
    joe = Worker("joe", joe_avail)
    sam = Worker("sam", sam_avail)
    ned = Worker("ned", sam_avail)
    bob = Worker("bob", bob_avail)
    max = Worker("max", max_avail)
    return (joe, sam, ned, bob, max)

def main():
    import sys

    shifts = load_shifts()
    workers = load_workers()
    duties = duties_for_all_workers(shifts, workers)
    solutions = solve_duties(shifts, duties, workers)

    if len(solutions) == 0:
        print "Sorry, can't solve this.  Perhaps you need more staff available, or"
        print "simpler duty constraints?"
        sys.exit(20)
    else:
        print "Solved.  Solutions found:"
        for sol in solutions:
            sol.dump()

if __name__ == "__main__":
    main()

在结果之前截取调试输出,当前给出:

Solved.  Solutions found:
    <Solution
        <ShiftList
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker joe
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker sam
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker ned
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
        >
        <ShiftList
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker bob
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker max
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
        >
    >

Hope you can help me out with this guys. It's not help with work -- it's for a charity of very hard working volunteers, who could really use a less confusing/annoying timetable system than what they currently have.

If anyone knows of a good third-party app which (certainly) automate this, that would almost as good. Just... please don't suggest random timetabling stuff such as the ones for booking classrooms, as I don't think they can do this.

Thanks in advance for reading; I know it's a big post. I'm trying to do my best to document this clearly though, and to show that I've made efforts on my own.

Problem

I need a worker/timeslot scheduling algorithm which generates shifts for workers, which meets the following criteria:

Input Data

import datetime.datetime as dt

class DateRange:
    def __init__(self, start, end):
        self.start = start
        self.end   = end

class Shift:
    def __init__(self, range, min, max):
        self.range = range
        self.min_workers = min
        self.max_workers = max

tue_9th_10pm = dt(2009, 1, 9,   22, 0)
wed_10th_4am = dt(2009, 1, 10,   4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)

shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)

shift_1 = Shift(shift_1_times, 2,3)  # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2)  # allows 2
shift_3 = Shift(shift_3_times, 2,3)  # allows 3, requires 2, 3 available

shifts = ( shift_1, shift_2, shift_3 )

joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]

joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)

workers = ( joe, bob, sam, ned, max, amy, jim )

Processing

From above, shifts and workers are the two main input variables to process

Each shift has a minimum and maximum number of workers needed. Filling the minimum requirements for a shift is crucial to success, but if all else fails, a rota with gaps to be filled manually is better than "error" :) The main algorithmic issue is that there shouldn't be unnecessary gaps, when enough workers are available.

Ideally, the maximum number of workers for a shift would be filled, but this is the lowest priority relative to other constraints, so if anything has to give, it should be this.

Flexible constraints

These are a little flexible, and their boundaries can be pushed a little if a "perfect" solution can't be found. This flexibility should be a last resort though, rather than being exploited randomly. Ideally, the flexibility would be configurable with a "fudge_factor" variable, or similar.

  • There is a minimum time period
    between two shifts. So, a worker
    shouldn't be scheduled for two shifts
    in the same day, for instance.
  • There are a maximum number of shifts a
    worker can do in a given time period
    (say, a month)
  • There are a maximum number of certain
    shifts that can be done in a month
    (say, overnight shifts)

Nice to have, but not necessary

If you can come up with an algorithm which does the above and includes any/all of these, I'll be seriously impressed and grateful. Even an add-on script to do these bits separately would be great too.

  • Overlapping shifts. For instance,
    it would be good to be able to specify
    a "front desk" shift and a "back office"
    shift that both occur at the same time.
    This could be done with separate invocations
    of the program with different shift data,
    except that the constraints about scheduling
    people for multiple shifts in a given time
    period would be missed.

  • Minimum reschedule time period for workers specifiable
    on a per-worker (rather than global) basis. For instance,
    if Joe is feeling overworked or is dealing with personal issues,
    or is a beginner learning the ropes, we might want to schedule him
    less often than other workers.

  • Some automated/random/fair way of selecting staff to fill minimum
    shift numbers when no available workers fit.

  • Some way of handling sudden cancellations, and just filling the gaps
    without rearranging other shifts.

Output Test

Probably, the algorithm should generate as many matching Solutions as possible, where each Solution looks like this:

class Solution:
    def __init__(self, shifts_workers):
        """shifts_workers -- a dictionary of shift objects as keys, and a
        a lists of workers filling the shift as values."""

        assert isinstance(dict, shifts_workers)
        self.shifts_workers = shifts_workers

Here's a test function for an individual solution, given the above data. I think this is right, but I'd appreciate some peer review on it too.

def check_solution(solution):
  assert isinstance(Solution, solution)

  def shift_check(shift, workers, workers_allowed):
      assert isinstance(Shift, shift):
      assert isinstance(list, workers):
      assert isinstance(list, workers_allowed)

      num_workers = len(workers)
      assert num_workers >= shift.min_workers
      assert num_workers <= shift.max_workers

      for w in workers_allowed:
          assert w in workers

  shifts_workers = solution.shifts_workers

  # all shifts should be covered
  assert len(shifts_workers.keys()) == 3
  assert shift1 in shifts_workers.keys()
  assert shift2 in shifts_workers.keys()
  assert shift3 in shifts_workers.keys()

  # shift_1 should be covered by 2 people - joe, and bob
  shift_check(shift_1, shifts_workers[shift_1], (joe, bob))

  # shift_2 should be covered by 2 people - sam and amy
  shift_check(shift_2, shifts_workers[shift_2], (sam, amy))

  # shift_3 should be covered by 3 people - ned, max, and jim
  shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))

Attempts

I've tried implementing this with a Genetic Algorithm, but can't seem to get it tuned quite right, so although the basic principle seems to work on single shifts, it can't solve even easy cases with a few shifts and a few workers.

My latest attempt is to generate every possible permutation as a solution, then whittle down the permutations that don't meet the constraints. This seems to work much more quickly, and has gotten me further, but I'm using python 2.6's itertools.product() to help generate the permutations, and I can't quite get it right. It wouldn't surprise me if there are many bugs as, honestly, the problem doesn't fit in my head that well :)

Currently my code for this is in two files: models.py and rota.py. models.py looks like:

# -*- coding: utf-8 -*-
class Shift:
    def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
        self.start = start_datetime
        self.end = end_datetime
        self.duration = self.end - self.start
        self.min_coverage = min_coverage
        self.max_coverage = max_coverage

    def __repr__(self):
        return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)

class Duty:
    def __init__(self, worker, shift, slot):
        self.worker = worker
        self.shift = shift
        self.slot = slot

    def __repr__(self):
        return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
        self.worker.dump(indent=indent, depth=depth+1)
        print ind + ">"

class Avail:
    def __init__(self, start_time, end_time):
        self.start = start_time
        self.end = end_time

    def __repr__(self):
        return "<%s to %s>" % (self.start, self.end)

class Worker:
    def __init__(self, name, availabilities):
        self.name = name
        self.availabilities = availabilities

    def __repr__(self):
        return "<Worker %s Avail=%r>" % (self.name, self.availabilities)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Worker %s" % self.name
        for avail in self.availabilities:
            print ind + " " * indent + repr(avail)
        print ind + ">"

    def available_for_shift(self,  shift):
        for a in self.availabilities:
            if shift.start >= a.start and shift.end <= a.end:
                return True
        print "Worker %s not available for %r (Availability: %r)" % (self.name,  shift, self.availabilities)
        return False

class Solution:
    def __init__(self, shifts):
        self._shifts = list(shifts)

    def __repr__(self):
        return "<Solution: shifts=%r>" % self._shifts

    def duties(self):
        d = []
        for s in self._shifts:
            for x in s:
                yield x

    def shifts(self):
        return list(set([ d.shift for d in self.duties() ]))

    def dump_shift(self, s, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<ShiftList"
        for duty in s:
            duty.dump(indent=indent, depth=depth+1)
        print ind + ">"

    def dump(self, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<Solution"
        for s in self._shifts:
            self.dump_shift(s, indent=indent, depth=depth+1)
        print ind + ">"

class Env:
    def __init__(self, shifts, workers):
        self.shifts = shifts
        self.workers = workers
        self.fittest = None
        self.generation = 0

class DisplayContext:
    def __init__(self,  env):
        self.env = env

    def status(self, msg, *args):
        raise NotImplementedError()

    def cleanup(self):
        pass

    def update(self):
        pass

and rota.py looks like:

#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-

from datetime import datetime as dt
am2 = dt(2009,  10,  1,  2, 0)
am8 = dt(2009,  10,  1,  8, 0)
pm12 = dt(2009,  10,  1,  12, 0)

def duties_for_all_workers(shifts, workers):
    from models import Duty

    duties = []

    # for all shifts
    for shift in shifts:
        # for all slots
        for cov in range(shift.min_coverage, shift.max_coverage):
            for slot in range(cov):
                # for all workers
                for worker in workers:
                    # generate a duty
                    duty = Duty(worker, shift, slot+1)
                    duties.append(duty)

    return duties

def filter_duties_for_shift(duties,  shift):
    matching_duties = [ d for d in duties if d.shift == shift ]
    for m in matching_duties:
        yield m

def duty_permutations(shifts,  duties):
    from itertools import product

    # build a list of shifts
    shift_perms = []
    for shift in shifts:
        shift_duty_perms = []
        for slot in range(shift.max_coverage):
            slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
            shift_duty_perms.append(slot_duties)
        shift_perms.append(shift_duty_perms)

    all_perms = ( shift_perms,  shift_duty_perms )

    # generate all possible duties for all shifts
    perms = list(product(*shift_perms))
    return perms

def solutions_for_duty_permutations(permutations):
    from models import Solution
    res = []
    for duties in permutations:
        sol = Solution(duties)
        res.append(sol)
    return res

def find_clashing_duties(duty, duties):
    """Find duties for the same worker that are too close together"""
    from datetime import timedelta
    one_day = timedelta(days=1)
    one_day_before = duty.shift.start - one_day
    one_day_after = duty.shift.end + one_day
    for d in [ ds for ds in duties if ds.worker == duty.worker ]:
        # skip the duty we're considering, as it can't clash with itself
        if duty == d:
            continue

        clashes = False

        # check if dates are too close to another shift
        if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
            clashes = True

        # check if slots collide with another shift
        if d.slot == duty.slot:
            clashes = True

        if clashes:
            yield d

def filter_unwanted_shifts(solutions):
    from models import Solution

    print "possibly unwanted:",  solutions
    new_solutions = []
    new_duties = []

    for sol in solutions:
        for duty in sol.duties():
            duty_ok = True

            if not duty.worker.available_for_shift(duty.shift):
                duty_ok = False

            if duty_ok:
                print "duty OK:"
                duty.dump(depth=1)
                new_duties.append(duty)
            else:
                print "duty **NOT** OK:"
                duty.dump(depth=1)

        shifts = set([ d.shift for d in new_duties ])
        shift_lists = []
        for s in shifts:
            shift_duties = [ d for d in new_duties if d.shift == s ]
            shift_lists.append(shift_duties)

        new_solutions.append(Solution(shift_lists))

    return new_solutions

def filter_clashing_duties(solutions):
    new_solutions = []

    for sol in solutions:
        solution_ok = True

        for duty in sol.duties():
            num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))

            # check if many duties collide with this one (and thus we should delete this one
            if num_clashing_duties > 0:
                solution_ok = False
                break

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_incomplete_shifts(solutions):
    new_solutions = []

    shift_duty_count = {}

    for sol in solutions:
        solution_ok = True

        for shift in set([ duty.shift for duty in sol.duties() ]):
            shift_duties = [ d for d in sol.duties() if d.shift == shift ]
            num_workers = len(set([ d.worker for d in shift_duties ]))

            if num_workers < shift.min_coverage:
                solution_ok = False

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_solutions(solutions,  workers):
    # filter permutations ############################
    # for each solution
    solutions = filter_unwanted_shifts(solutions)
    solutions = filter_clashing_duties(solutions)
    solutions = filter_incomplete_shifts(solutions)
    return solutions

def prioritise_solutions(solutions):
    # TODO: not implemented!
    return solutions

    # prioritise solutions ############################
    # for all solutions
        # score according to number of staff on a duty
        # score according to male/female staff
        # score according to skill/background diversity
        # score according to when staff last on shift

    # sort all solutions by score

def solve_duties(shifts, duties,  workers):
    # ramify all possible duties #########################
    perms = duty_permutations(shifts,  duties)
    solutions = solutions_for_duty_permutations(perms)
    solutions = filter_solutions(solutions,  workers)
    solutions = prioritise_solutions(solutions)
    return solutions

def load_shifts():
    from models import Shift
    shifts = [
        Shift(am2, am8, 2, 3),
        Shift(am8, pm12, 2, 3),
    ]
    return shifts

def load_workers():
    from models import Avail, Worker
    joe_avail = ( Avail(am2,  am8), )
    sam_avail = ( Avail(am2,  am8), )
    ned_avail = ( Avail(am2,  am8), )
    bob_avail = ( Avail(am8,  pm12), )
    max_avail = ( Avail(am8,  pm12), )
    joe = Worker("joe", joe_avail)
    sam = Worker("sam", sam_avail)
    ned = Worker("ned", sam_avail)
    bob = Worker("bob", bob_avail)
    max = Worker("max", max_avail)
    return (joe, sam, ned, bob, max)

def main():
    import sys

    shifts = load_shifts()
    workers = load_workers()
    duties = duties_for_all_workers(shifts, workers)
    solutions = solve_duties(shifts, duties, workers)

    if len(solutions) == 0:
        print "Sorry, can't solve this.  Perhaps you need more staff available, or"
        print "simpler duty constraints?"
        sys.exit(20)
    else:
        print "Solved.  Solutions found:"
        for sol in solutions:
            sol.dump()

if __name__ == "__main__":
    main()

Snipping the debugging output before the result, this currently gives:

Solved.  Solutions found:
    <Solution
        <ShiftList
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker joe
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker sam
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker ned
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
        >
        <ShiftList
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker bob
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker max
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
        >
    >

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

逆光下的微笑 2024-08-14 16:43:37

我尝试用遗传算法来实现这个,
但似乎无法将其调整得非常正确,所以尽管
基本原理似乎适用于单班制,
即使是几个班次和几个工人的简单案件也无法解决。

简而言之,不要!除非您对遗传算法有丰富的经验,否则您将无法做到这一点。

  • 它们是近似方法,不能保证收敛到可行的解决方案。
  • 仅当您能够相当好地确定当前解决方案的质量(即未满足标准的数量)时,它们才起作用。
  • 它们的质量很大程度上取决于您用来将以前的解决方案组合/变异为新解决方案的运算符的质量。

如果你的 GA 经验几乎为零,那么在小型 Python 程序中正确使用是一件很困难的事情。如果你有一小群人,详尽的搜索并不是一个糟糕的选择。问题是,它可能适合 n 人,对于 n+1 人来说会很慢,对于 n+2 人来说会慢得难以忍受。您的 n 很可能最终会低至 10。

您正在解决一个 NP 完全问题,并且没有简单的获胜解决方案。如果您选择的花哨的时间表调度问题效果不够好,那么您的 python 脚本不太可能有更好的解决方案。

如果您坚持通过自己的代码执行此操作,则使用最小-最大或模拟退火会更容易获得一些结果。

I've tried implementing this with a Genetic Algorithm,
but can't seem to get it tuned quite right, so although
the basic principle seems to work on single shifts,
it can't solve even easy cases with a few shifts and a few workers.

In short, don't! Unless you have lots of experience with genetic algorithms, you won't get this right.

  • They are approximate methods that do not guarantee converging to a workable solution.
  • They work only if you can reasonably well establish the quality of your current solution (i.e. number of criteria not met).
  • Their quality critically depends on the quality of operators you use to combine/mutate previous solutions into new ones.

It is a tough thing to get right in small python program if you have close to zero experience with GA. If you have a small group of people exhaustive search is not that bad option. The problem is that it may work right for n people, will be slow for n+1 people and will be unbearably slow for n+2 and it may very well be that your n will end up as low as 10.

You are working on an NP-complete problem and there are no easy win solutions. If the fancy timetable scheduling problem of your choice does not work good enough, it is very unlikely you will have something better with your python script.

If you insist on doing this via your own code, it is much easier to get some results with min-max or simulated annealing.

一梦等七年七年为一梦 2024-08-14 16:43:37

好吧,我不知道特定的算法,但这是我会考虑的。

评估

无论采用哪种方法,您都需要一个函数来评估您的解决方案满足约束的程度。您可以采用“比较”方法(没有全局分数,而是比较两个解决方案的方法),但我建议进行评估。

真正好的是,如果您可以获得较短时间跨度的分数,例如每天,如果您可以从部分解决方案“预测”最终分数的范围(例如,仅前 3 个分数),这对算法非常有帮助7 天)。这样,如果它已经太低而无法满足您的期望,您可以中断基于此部分解决方案的计算。

对称性

在这 200 个人中,您很可能拥有相似的个人资料:即人们具有相同的特征(可用性、经验、意愿等)。如果两个人具有相同的个人资料,他们将可以互换:

  • 解决方案 1:(班次 1:Joe)(班次 2:Jim)...仅限其他工人...
  • 解决方案 2:(班次 1:Jim) (班次 2:乔)...仅其他工人...

从您的角度来看实际上是相同的解决方案。

好处是,通常,您的配置文件比人少,这对计算所花费的时间有很大帮助!

例如,假设您根据解决方案 1 生成了所有解决方案,那么无需根据解决方案 2 计算任何内容。

迭代

您可以考虑生成它,而不是立即生成整个计划增量(例如一次 1 周)。净收益是一周的复杂性降低了(可能性更少)。

然后,一旦你有了这周的时间,你就计算第二个,当然要小心地将第一个考虑到你的限制。

优点是您明确设计算法以考虑已使用的解决方案,这样对于下一个时间表生成,它将确保不会让一个人连续 24 小时工作!

序列化

您应该考虑解决方案对象的序列化(选择您的选择,pickle 对于 Python 来说非常好)。在生成新计划时,您将需要之前的计划,而且我敢打赌您不会为这 200 人手动输入它。

详尽

现在,毕竟,我实际上赞成进行详尽的搜索,因为使用对称性和评估的可能性可能不会那么多(尽管问题仍然是 NP 完全的,没有灵丹妙药)。

您可能愿意尝试一下回溯算法

另外,您应该查看以下处理类似问题的链接:

两者都讨论了实现过程中遇到的问题,因此检查它们应该会对您有所帮助。

Okay, I don't know about a particular algorithm, but here is what I would take into consideration.

Evaluation

Whatever the method you will need a function to evaluate how much your solution is satisfying the constraints. You may take the 'comparison' approach (no global score but a way to compare two solutions), but I would recommend evaluation.

What would be real good is if you could obtain a score for a shorter timespan, for example daily, it is really helpful with algorithms if you can 'predict' the range of the final score from a partial solution (eg, just the first 3 days out of 7). This way you can interrupt the computation based on this partial solution if it's already too low to meet your expectations.

Symmetry

It is likely that among those 200 people you have similar profiles: ie people sharing the same characteristics (availability, experience, willingness, ...). If you take two persons with the same profile, they are going to be interchangeable:

  • Solution 1: (shift 1: Joe)(shift 2: Jim) ...other workers only...
  • Solution 2: (shift 1: Jim)(shift 2: Joe) ...other workers only...

are actually the same solution from your point of view.

The good thing is that usually, you have less profiles than persons, which helps tremendously with the time spent in computation!

For example, imagine that you generated all the solutions based on Solution 1, then there is no need to compute anything based on Solution 2.

Iterative

Instead of generating the whole schedule at once, you may consider generating it incrementally (say 1 week at a time). The net gain is that the complexity for a week is reduced (there are less possibilities).

Then, once you have this week, you compute the second one, being careful of taking the first into account the first for your constraints of course.

The advantage is that you explicitly design you algorithm to take into account an already used solution, this way for the next schedule generation it will make sure not to make a person work 24hours straight!

Serialization

You should consider the serialization of your solution objects (pick up your choice, pickle is quite good for Python). You will need the previous schedule when generating a new one, and I bet you'd rather not enter it manually for the 200 people.

Exhaustive

Now, after all that, I would actually favor an exhaustive search since using symmetry and evaluation the possibilities might not be so numerous (the problem remains NP-complete though, there is no silver bullet).

You may be willing to try your hand at the Backtracking Algorithm.

Also, you should take a look at the following links which deal with similar kind of problems:

Both discuss the problems encountered during the implementation, so checking them out should help you.

稀香 2024-08-14 16:43:37

我没有算法选择,但我可以提出一些实际考虑因素。

由于该算法正在处理取消,因此只要发生调度异常,它就必须运行以重新调度每个人。

考虑到某些算法不是很线性,可能会从那时起彻底重新安排每个人。您可能想避免这种情况,人们喜欢提前了解自己的日程安排。

您可以在不重新运行算法的情况下处理一些取消,因为它可以预先安排下一个或两个可用的人。

始终生成最佳解决方案可能是不可能或不可取的,但您可以保留每个工作人员的“次优”事件的运行计数,并在必须分配另一个“工作人员”时始终选择计数最低的工作人员错误的选择”。这就是人们普遍关心的(经常/不公平地做出一些“糟糕”的调度决策)。

I don't have an algorithm choice but I can relate some practical considerations.

Since the algorithm is dealing with cancellations, it has to run whenever a scheduling exception occurs to reschedule everyone.

Consider that some algorithms are not very linear and might radically reschedule everyone from that point forward. You probably want to avoid that, people like to know their schedules well in advance.

You can deal with some cancellations without rerunning the algorithm because it can pre-schedule the next available person or two.

It might not be possible or desirable to always generate the most optimal solution, but you can keep a running count of "less-than-optimal" events per worker, and always choose the worker with the lowest count when you have to assign another "bad choice". That's what people generally care about (several "bad" scheduling decisions frequently/unfairly).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文