使用容器或存储的小型事件同步

发布于 2025-01-24 10:49:20 字数 2873 浏览 2 评论 0原文

我有一个代码,可以简单地实现计划。模拟在间隔中并行运行处理器。此外,对于每个间隔,都有一个同步屏障,可以等到所有处理器执行任务,然后移至下一个间隔。 该代码以下来自 https://wso2.com/blog/research/modeling-closed-system-performance-of-a-server-with-with-discrete-event-simulation/ 该代码由一个客户端组成,该客户端将请求发送到输出队列,该输出队列由服务器(处理器)检查。然后,服务器检查他们的队列并在队列中执行作业。该代码的问题没有同步。处理器不会互相等待。我需要将统一的消息发送给所有处理器,以便他们相互等待。我当时正在考虑使用容器或商店,但似乎无法将我的头缠绕在它们周围。

例如,如果我运行4个处理器,每个处理器都在执行不同的执行时间的作业(P1:4S,P2:3S,P3:2S,P4:1S);处理器1(P1)执行4秒长的作业。如何添加同步屏障,以中断处理器P2:P4到4秒钟?

import random
import simpy 
SEED=42
average_processing_time=0.025
response_times=[]
queue_lengths=[]
waiting_times=[]

concurrenncy=4
num_cores=4



def client(env,out_pipe,in_pipe,i):
    global response_times
    while True:
        processing_time=random.expovariate(1/average_processing_time)
        arrival_time=env.now
        d={1:processing_time, 2:i , 3:arrival_time}
        out_pipe[i].put(d)
        #print('cliuent is processing the request %d' % i)
        response=yield in_pipe[i].get(filter=lambda x: True if x[2] == i else False)
        response_time=env.now-arrival_time
        response_times.append(response_time)
        

        
def server (env,in_pipe, out_pipe,i,channel):
    global queue_lengths 
    global waiting_times
    times=[]
    
        
    while True:
        request=yield in_pipe[i].get()
        #request_all=yield in_pipe.get()
        processing_time=request[1]
        arrival_time=request[3]
        waiting_time=env.now-arrival_time
        waiting_times.append(waiting_time)
        #for j in range(num_cores): 
         #  request_all=yield in_pipe[j].get()
            #times.append(request_all[1])
            
        queue_length=len(in_pipe[i].items)
        queue_lengths.append(queue_length)
        print('server %d is processing the request at time %f' % (i,env.now))
        #if max(times) > processing_time:
         #   new_t=max(times)
        #else:
         #   new_t=processing_time
        yield env.timeout(processing_time)
        channel.put(1)
        out_pipe[i].put(request)
        
random.seed(SEED)
in_pipe=[]
out_pipe=[]
p=[]
enviornment=simpy.Environment()
channel=simpy.Store(enviornment)
for i in range(num_cores):
    in_pipe.append(simpy.Store(enviornment))
    out_pipe.append(simpy.FilterStore(enviornment))
for i in range(concurrenncy):
    enviornment.process(client(enviornment,in_pipe,out_pipe,i))
    
for i in range(num_cores):
    t=enviornment.process(server(enviornment,in_pipe,out_pipe,i,channel))
    p.append(t)
enviornment.run(until=enviornment.all_of(p))

response_times=[x*100 for x in response_times]
waiting_times=[x*100 for x in waiting_times]
#print(waiting_times)
        

I have a code that implement scheduling using simply. The simulation run processors in parallel in intervals. Moreover, for each interval, there is a synchronization barrier that waits till all processors execution of the tasks and then move to the next interval.
The code is below adopted from https://wso2.com/blog/research/modeling-closed-system-performance-of-a-server-with-discrete-event-simulation/
The code consists of a client which sends a request to an output queue which is examined by servers (processors). Then, servers examine their queue and execute the job in the queue. The problem with this code there is no synchronization; processors don't wait for each other. I need a unified message to be sent to all processors so that they wait for each other. I was thinking of using a container or store but can't seem to wrap my head around them.

For example, if I run 4 processors where each one is executing jobs of different execution times (P1: 4s, P2: 3s, P3: 2s, P4: 1s); processor 1 (P1) is executing a job of 4 seconds length. How can I add synchronizing barrier so it will interrupt processors P2:P4 till 4 seconds have passed?

import random
import simpy 
SEED=42
average_processing_time=0.025
response_times=[]
queue_lengths=[]
waiting_times=[]

concurrenncy=4
num_cores=4



def client(env,out_pipe,in_pipe,i):
    global response_times
    while True:
        processing_time=random.expovariate(1/average_processing_time)
        arrival_time=env.now
        d={1:processing_time, 2:i , 3:arrival_time}
        out_pipe[i].put(d)
        #print('cliuent is processing the request %d' % i)
        response=yield in_pipe[i].get(filter=lambda x: True if x[2] == i else False)
        response_time=env.now-arrival_time
        response_times.append(response_time)
        

        
def server (env,in_pipe, out_pipe,i,channel):
    global queue_lengths 
    global waiting_times
    times=[]
    
        
    while True:
        request=yield in_pipe[i].get()
        #request_all=yield in_pipe.get()
        processing_time=request[1]
        arrival_time=request[3]
        waiting_time=env.now-arrival_time
        waiting_times.append(waiting_time)
        #for j in range(num_cores): 
         #  request_all=yield in_pipe[j].get()
            #times.append(request_all[1])
            
        queue_length=len(in_pipe[i].items)
        queue_lengths.append(queue_length)
        print('server %d is processing the request at time %f' % (i,env.now))
        #if max(times) > processing_time:
         #   new_t=max(times)
        #else:
         #   new_t=processing_time
        yield env.timeout(processing_time)
        channel.put(1)
        out_pipe[i].put(request)
        
random.seed(SEED)
in_pipe=[]
out_pipe=[]
p=[]
enviornment=simpy.Environment()
channel=simpy.Store(enviornment)
for i in range(num_cores):
    in_pipe.append(simpy.Store(enviornment))
    out_pipe.append(simpy.FilterStore(enviornment))
for i in range(concurrenncy):
    enviornment.process(client(enviornment,in_pipe,out_pipe,i))
    
for i in range(num_cores):
    t=enviornment.process(server(enviornment,in_pipe,out_pipe,i,channel))
    p.append(t)
enviornment.run(until=enviornment.all_of(p))

response_times=[x*100 for x in response_times]
waiting_times=[x*100 for x in waiting_times]
#print(waiting_times)
        

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

ゝ杯具 2025-01-31 10:49:21

这是一个快速示例,说明如何同步几个工人,每个工人在开始下一个任务之前等待所有工人完成

"""
    Simple demo where workers wait for all workers to finish before starting next task

    Programer: Michael R. Gibbs
"""

import simpy
import random

class Worker():
    """
        simple worker
        waits for a start event
        does some work
        repeat
    """

    def __init__(self, env, id, controler):
        self.env = env
        self.id = id
        self.controler = controler
        self.next_done_event = simpy.Event(self.env)

        self.env.process(self.do_work())

    def do_work(self):
        """
            wait for controler's start event
            do some work
            trigger done event
            repeat
        """

        while True:

            yield self.controler.get_start_event()

            print(f'{self.env.now:.2f} worker {self.id} has started')

            yield self.env.timeout(random.uniform(1,4))

            print(f'{self.env.now:.2f} worker {self.id} has finish')

            self.next_done_event.succeed()

            self.next_done_event = simpy.Event(self.env)

    def get_next_done_event(self):
        """
            workers next done event
        """

        return self.next_done_event

class Controler():
    """
        tracks when all the workers have finished
        and broadcasts a done event
    """

    def __init__(self, env):
        
        self.env = env
        self.start_event = simpy.Event(self.env)

        self.workers = []

        self.env.process(self.monitor_workers())

    def add_worker(self, worker):
        """
            workers need to be addeded before sim is started
        """

        self.workers.append(worker)

    def get_start_event(self):
        """
            returns the controler next start event

            Note: that all the workers are gettng the same event
        """

        return self.start_event

    def monitor_workers(self):
        """
            trigger start event
            wait for all workers to finish
            repeat
        """

        # need this to insure all workers are set up
        yield self.env.timeout(1)

        while True:

            self.start_event.succeed()
            self.start_event = simpy.Event(self.env)

            done_events = [worker.get_next_done_event() for worker in self.workers]

            yield self.env.all_of(done_events)

            print(f'{self.env.now:.2f} all workers have finished')

# boot it up
env = simpy.Environment()

controler = Controler(env)

for i in range(3):
    worker = Worker(env, i+1, controler)
    controler.add_worker(worker)

env.run(100)
    
print('done')

Here is a quick example on how to sync several workers where each worker waits for all the workers to finish before starting the next task

"""
    Simple demo where workers wait for all workers to finish before starting next task

    Programer: Michael R. Gibbs
"""

import simpy
import random

class Worker():
    """
        simple worker
        waits for a start event
        does some work
        repeat
    """

    def __init__(self, env, id, controler):
        self.env = env
        self.id = id
        self.controler = controler
        self.next_done_event = simpy.Event(self.env)

        self.env.process(self.do_work())

    def do_work(self):
        """
            wait for controler's start event
            do some work
            trigger done event
            repeat
        """

        while True:

            yield self.controler.get_start_event()

            print(f'{self.env.now:.2f} worker {self.id} has started')

            yield self.env.timeout(random.uniform(1,4))

            print(f'{self.env.now:.2f} worker {self.id} has finish')

            self.next_done_event.succeed()

            self.next_done_event = simpy.Event(self.env)

    def get_next_done_event(self):
        """
            workers next done event
        """

        return self.next_done_event

class Controler():
    """
        tracks when all the workers have finished
        and broadcasts a done event
    """

    def __init__(self, env):
        
        self.env = env
        self.start_event = simpy.Event(self.env)

        self.workers = []

        self.env.process(self.monitor_workers())

    def add_worker(self, worker):
        """
            workers need to be addeded before sim is started
        """

        self.workers.append(worker)

    def get_start_event(self):
        """
            returns the controler next start event

            Note: that all the workers are gettng the same event
        """

        return self.start_event

    def monitor_workers(self):
        """
            trigger start event
            wait for all workers to finish
            repeat
        """

        # need this to insure all workers are set up
        yield self.env.timeout(1)

        while True:

            self.start_event.succeed()
            self.start_event = simpy.Event(self.env)

            done_events = [worker.get_next_done_event() for worker in self.workers]

            yield self.env.all_of(done_events)

            print(f'{self.env.now:.2f} all workers have finished')

# boot it up
env = simpy.Environment()

controler = Controler(env)

for i in range(3):
    worker = Worker(env, i+1, controler)
    controler.add_worker(worker)

env.run(100)
    
print('done')
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文