simpy:如何实现一次处理多个事件的资源
我正在模拟人们的活动和电梯的使用。电梯在移动到另一层楼之前可以承载多人。默认进程有一个容量参数,但是,这些参数表示进程数,而不是同时使用电梯的人数。
我尝试使用多种可用资源,例如 Container
、Store
和 Base
。应请求电梯,但这些对象不具有所请求的功能。因此,唯一合适的解决方案是从 base.Resource
类中继承。我尝试创建一个子类 Elevator,从 base.Resource 实现并调整函数 _do_get
以从队列中获取多个元素。我非常有信心这不是实现它的正确方法,并且它也会给出错误: RuntimeError:
。我不知道要调整哪些文件才能让 Simpy 满意。有人能指出我正确的方向吗?
@dataclass
class Elevator(simpy.Resource):
current_floor: int = 0
available_floors: List[int] = field(default_factory=lambda: [0, 1])
capacity: int = 3
# load_carriers: List[LoadCarrier] = field(default_factory=list)
move_time: int = 5
def __init__(self, env: Environment, capacity: int = 1, elevator_capacity: int = 1):
self.elevator_capacity = elevator_capacity
if capacity <= 0:
raise ValueError('"capacity" must be > 0.')
super().__init__(env, capacity)
self.users: List[Request] = []
"""List of :class:`Request` events for the processes that are currently
using the resource."""
self.queue = self.put_queue
"""Queue of pending :class:`Request` events. Alias of
:attr:`~simpy.resources.base.BaseResource.put_queue`.
"""
@property
def count(self) -> int:
"""Number of users currently using the resource."""
return len(self.users)
if TYPE_CHECKING:
def request(self) -> Request:
"""Request a usage slot."""
return Request(self)
def release(self, request: Request) -> Release:
"""Release a usage slot."""
return Release(self, request)
else:
request = BoundClass(Request)
release = BoundClass(Release)
def _do_put(self, event: Request) -> None:
if len(self.users) < self.capacity:
self.users.append(event)
event.usage_since = self._env.now
event.succeed()
def _do_get(self, event: Release) -> None:
for i in range(min(self.elevator_capacity, len(self.users))):
try:
event = self.users.pop(0)
event.succeed()
# self.users.remove(event.request) # type: ignore
except ValueError:
pass
# event.succeed()
I am simulating people movements and their elevator usage. An elevator can take up multiple persons before moving to another floor. The default process has a capacity parameter, however, these indicate the number of processes and not the number of people using the elevator at the same time.
I have tried to use multiple of the resources available, such as Container
, Store
, and Base
. The elevator should be requested and these objects do not have the functionality to be requested. Hence, the only suitable solution is to inherent from the base.Resource
class. I have tried to create a subclass Elevator, implementing from base.Resource and adjusting the function _do_get
to take multiple elements from the queue. I am pretty confident that this is not the proper way to implement it and it gives an error as well: RuntimeError: <Request() object at 0x1ffb4474be0> has already been triggered
. I have no clue which files to adjust to make Simpy happy. Could someone point me in the right direction?
@dataclass
class Elevator(simpy.Resource):
current_floor: int = 0
available_floors: List[int] = field(default_factory=lambda: [0, 1])
capacity: int = 3
# load_carriers: List[LoadCarrier] = field(default_factory=list)
move_time: int = 5
def __init__(self, env: Environment, capacity: int = 1, elevator_capacity: int = 1):
self.elevator_capacity = elevator_capacity
if capacity <= 0:
raise ValueError('"capacity" must be > 0.')
super().__init__(env, capacity)
self.users: List[Request] = []
"""List of :class:`Request` events for the processes that are currently
using the resource."""
self.queue = self.put_queue
"""Queue of pending :class:`Request` events. Alias of
:attr:`~simpy.resources.base.BaseResource.put_queue`.
"""
@property
def count(self) -> int:
"""Number of users currently using the resource."""
return len(self.users)
if TYPE_CHECKING:
def request(self) -> Request:
"""Request a usage slot."""
return Request(self)
def release(self, request: Request) -> Release:
"""Release a usage slot."""
return Release(self, request)
else:
request = BoundClass(Request)
release = BoundClass(Release)
def _do_put(self, event: Request) -> None:
if len(self.users) < self.capacity:
self.users.append(event)
event.usage_since = self._env.now
event.succeed()
def _do_get(self, event: Release) -> None:
for i in range(min(self.elevator_capacity, len(self.users))):
try:
event = self.users.pop(0)
event.succeed()
# self.users.remove(event.request) # type: ignore
except ValueError:
pass
# event.succeed()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
所以这是我想出的解决方案。棘手的一点是我将两个事件链接在一起。当您排队等候电梯时,您会收到一个在电梯到达时触发的事件。此事件还会返回当您到达目标楼层时触发的第二个事件。该第二事件是电梯上并前往同一楼层的所有乘客共享的公共事件。触发此事件会通知一群乘客。这种订阅广播模式可以大大减少模型需要处理的事件数量,从而提高性能。我使用链式事件是因为如果您在队列中,而您前面的人上车而您没有上车,那么那个人也会在您之前下车,从而需要不同的目的地到达事件。换句话说,在你上电梯之前我不知道你什么时候下车,所以我需要推迟这部分直到你真正上电梯。
So here is the solution I came up with. The tricky bit is I chained two events together. When you queue up for the elevator you get a event that fires when the elevator arrives. This event also returns a second event that fires when you get to your destination floor. This second event is a common event shared by all the passengers that are on the elevator and going to the same floor. Firing this one event notifies a bunch of passengers. This subscribe broadcast pattern can greatly reduce the number of events the model needs to process which in turn improves performance. I use the chained events because if you are in a queue, and the guy in front of you gets on and you do not, then that guy is also going to get off before you, requiring a different destination arrive event. Put another way, I do not know when you will get off until you get on, so I need to defer that part till you actually get onto the elevator.