使用容器或存储的小型事件同步
我有一个代码,可以简单地实现计划。模拟在间隔中并行运行处理器。此外,对于每个间隔,都有一个同步屏障,可以等到所有处理器执行任务,然后移至下一个间隔。 该代码以下来自 https://wso2.com/blog/research/modeling-closed-system-performance-of-a-server-with-with-discrete-event-simulation/ 该代码由一个客户端组成,该客户端将请求发送到输出队列,该输出队列由服务器(处理器)检查。然后,服务器检查他们的队列并在队列中执行作业。该代码的问题没有同步。处理器不会互相等待。我需要将统一的消息发送给所有处理器,以便他们相互等待。我当时正在考虑使用容器或商店,但似乎无法将我的头缠绕在它们周围。
例如,如果我运行4个处理器,每个处理器都在执行不同的执行时间的作业(P1:4S,P2:3S,P3:2S,P4:1S);处理器1(P1)执行4秒长的作业。如何添加同步屏障,以中断处理器P2:P4到4秒钟?
import random
import simpy
SEED=42
average_processing_time=0.025
response_times=[]
queue_lengths=[]
waiting_times=[]
concurrenncy=4
num_cores=4
def client(env,out_pipe,in_pipe,i):
global response_times
while True:
processing_time=random.expovariate(1/average_processing_time)
arrival_time=env.now
d={1:processing_time, 2:i , 3:arrival_time}
out_pipe[i].put(d)
#print('cliuent is processing the request %d' % i)
response=yield in_pipe[i].get(filter=lambda x: True if x[2] == i else False)
response_time=env.now-arrival_time
response_times.append(response_time)
def server (env,in_pipe, out_pipe,i,channel):
global queue_lengths
global waiting_times
times=[]
while True:
request=yield in_pipe[i].get()
#request_all=yield in_pipe.get()
processing_time=request[1]
arrival_time=request[3]
waiting_time=env.now-arrival_time
waiting_times.append(waiting_time)
#for j in range(num_cores):
# request_all=yield in_pipe[j].get()
#times.append(request_all[1])
queue_length=len(in_pipe[i].items)
queue_lengths.append(queue_length)
print('server %d is processing the request at time %f' % (i,env.now))
#if max(times) > processing_time:
# new_t=max(times)
#else:
# new_t=processing_time
yield env.timeout(processing_time)
channel.put(1)
out_pipe[i].put(request)
random.seed(SEED)
in_pipe=[]
out_pipe=[]
p=[]
enviornment=simpy.Environment()
channel=simpy.Store(enviornment)
for i in range(num_cores):
in_pipe.append(simpy.Store(enviornment))
out_pipe.append(simpy.FilterStore(enviornment))
for i in range(concurrenncy):
enviornment.process(client(enviornment,in_pipe,out_pipe,i))
for i in range(num_cores):
t=enviornment.process(server(enviornment,in_pipe,out_pipe,i,channel))
p.append(t)
enviornment.run(until=enviornment.all_of(p))
response_times=[x*100 for x in response_times]
waiting_times=[x*100 for x in waiting_times]
#print(waiting_times)
I have a code that implement scheduling using simply. The simulation run processors in parallel in intervals. Moreover, for each interval, there is a synchronization barrier that waits till all processors execution of the tasks and then move to the next interval.
The code is below adopted from https://wso2.com/blog/research/modeling-closed-system-performance-of-a-server-with-discrete-event-simulation/
The code consists of a client which sends a request to an output queue which is examined by servers (processors). Then, servers examine their queue and execute the job in the queue. The problem with this code there is no synchronization; processors don't wait for each other. I need a unified message to be sent to all processors so that they wait for each other. I was thinking of using a container or store but can't seem to wrap my head around them.
For example, if I run 4 processors where each one is executing jobs of different execution times (P1: 4s, P2: 3s, P3: 2s, P4: 1s); processor 1 (P1) is executing a job of 4 seconds length. How can I add synchronizing barrier so it will interrupt processors P2:P4 till 4 seconds have passed?
import random
import simpy
SEED=42
average_processing_time=0.025
response_times=[]
queue_lengths=[]
waiting_times=[]
concurrenncy=4
num_cores=4
def client(env,out_pipe,in_pipe,i):
global response_times
while True:
processing_time=random.expovariate(1/average_processing_time)
arrival_time=env.now
d={1:processing_time, 2:i , 3:arrival_time}
out_pipe[i].put(d)
#print('cliuent is processing the request %d' % i)
response=yield in_pipe[i].get(filter=lambda x: True if x[2] == i else False)
response_time=env.now-arrival_time
response_times.append(response_time)
def server (env,in_pipe, out_pipe,i,channel):
global queue_lengths
global waiting_times
times=[]
while True:
request=yield in_pipe[i].get()
#request_all=yield in_pipe.get()
processing_time=request[1]
arrival_time=request[3]
waiting_time=env.now-arrival_time
waiting_times.append(waiting_time)
#for j in range(num_cores):
# request_all=yield in_pipe[j].get()
#times.append(request_all[1])
queue_length=len(in_pipe[i].items)
queue_lengths.append(queue_length)
print('server %d is processing the request at time %f' % (i,env.now))
#if max(times) > processing_time:
# new_t=max(times)
#else:
# new_t=processing_time
yield env.timeout(processing_time)
channel.put(1)
out_pipe[i].put(request)
random.seed(SEED)
in_pipe=[]
out_pipe=[]
p=[]
enviornment=simpy.Environment()
channel=simpy.Store(enviornment)
for i in range(num_cores):
in_pipe.append(simpy.Store(enviornment))
out_pipe.append(simpy.FilterStore(enviornment))
for i in range(concurrenncy):
enviornment.process(client(enviornment,in_pipe,out_pipe,i))
for i in range(num_cores):
t=enviornment.process(server(enviornment,in_pipe,out_pipe,i,channel))
p.append(t)
enviornment.run(until=enviornment.all_of(p))
response_times=[x*100 for x in response_times]
waiting_times=[x*100 for x in waiting_times]
#print(waiting_times)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是一个快速示例,说明如何同步几个工人,每个工人在开始下一个任务之前等待所有工人完成
Here is a quick example on how to sync several workers where each worker waits for all the workers to finish before starting the next task