go中如何等待低延迟的线程?

发布于 2024-12-06 13:23:29 字数 1047 浏览 1 评论 0原文

我一直在尝试在 Go 中创建一个简单的事件循环包装器。但我被难住了,我应该如何跟踪当前线程中的操作? 我希望 CurrentTick 运行一个函数,即使调用函数退出,也不会启动下一个刻度,直到 CurrentTick 运行的所有函数退出。我想我可以使用互斥体来监视线程数量,但我意识到如果我不断地检查它会限制 CPU 的性能。如果我使用 time.Sleep 它将是潜伏的。你会如何解决这个问题?

package eventloop

import (
    "reflect"
)

type eventLoop *struct{
    functions []reflect.Value
    addFunc chan<-/*3*/ reflect.Value
    mutex chan/*1*/ bool
    threads int
}

func NewEventLoop() eventLoop {
    var funcs chan reflect.Value
    loop := eventLoop{
        []Reflect.Value{},
        funcs = make(chan reflect.Value, 3),
        make(chan bool, 1),
        0,
    }
    go func(){
        for {
            this.mutex <- 1
            if threads == 0 {
            }
        }
    }
}

func (this eventLoop) NextTick(f func()) {
    this.addFunc <- reflect.ValueOf(f)
}

func (this eventLoop) CurrentTick(f func()) {
    this.mutex <- 1
    threads += 1
    <-this.mutex
    go func() {
        f()
        this.mutex <- 1
        threads -= 1
        <-this.mutex
    }()
}

I've been trying to create a simple event loop wrapper in Go. But I got stumped, how was I supposed to keep track of operations in the current thread?
I wanted CurrentTick to run a function, and even if the calling function quits, not start the next tick until all functions run by CurrentTick quit. I thought I might use a mutex to monitor the number of threads, but I realized if I kept checking that over and over it would throttle the CPU. If I used time.Sleep it would be latent. How would you solve the problem?

package eventloop

import (
    "reflect"
)

type eventLoop *struct{
    functions []reflect.Value
    addFunc chan<-/*3*/ reflect.Value
    mutex chan/*1*/ bool
    threads int
}

func NewEventLoop() eventLoop {
    var funcs chan reflect.Value
    loop := eventLoop{
        []Reflect.Value{},
        funcs = make(chan reflect.Value, 3),
        make(chan bool, 1),
        0,
    }
    go func(){
        for {
            this.mutex <- 1
            if threads == 0 {
            }
        }
    }
}

func (this eventLoop) NextTick(f func()) {
    this.addFunc <- reflect.ValueOf(f)
}

func (this eventLoop) CurrentTick(f func()) {
    this.mutex <- 1
    threads += 1
    <-this.mutex
    go func() {
        f()
        this.mutex <- 1
        threads -= 1
        <-this.mutex
    }()
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

早乙女 2024-12-13 13:23:29

如果我理解你的意图,我认为你把事情过于复杂化了。我会这样做:

package eventloop

type EventLoop struct {
    nextFunc chan func()
    curFunc chan func()
}

func NewEventLoop() *EventLoop {
    el := &EventLoop{
        // Adjust the capacities to taste
        make(chan func(), 3),
        make(chan func(), 3),
    }
    go eventLoop(el)
    return el
}

func (el *EventLoop) NextTick(f func()) {
    el.nextFunc <- f
}

func (el *EventLoop) CurrentTick(f func()) {
    el.curFunc <- f
}

func (el *EventLoop) Quit() {
    close(el.nextFunc)
}

func eventLoop(el *EventLoop) {
    for {
        f, ok := <-el.nextFunc
        if !ok {
            return
        }
        f()

        drain: for {
            select {
            case f := <-el.curFunc:
                f()
            default:
                break drain
            }
        }
    }
}

根据您的使用情况,您可能需要添加一些同步,以确保循环中的所有任务在程序退出之前完成。

If I understand your intent, I think you're overcomplicating things. I'd do it like this:

package eventloop

type EventLoop struct {
    nextFunc chan func()
    curFunc chan func()
}

func NewEventLoop() *EventLoop {
    el := &EventLoop{
        // Adjust the capacities to taste
        make(chan func(), 3),
        make(chan func(), 3),
    }
    go eventLoop(el)
    return el
}

func (el *EventLoop) NextTick(f func()) {
    el.nextFunc <- f
}

func (el *EventLoop) CurrentTick(f func()) {
    el.curFunc <- f
}

func (el *EventLoop) Quit() {
    close(el.nextFunc)
}

func eventLoop(el *EventLoop) {
    for {
        f, ok := <-el.nextFunc
        if !ok {
            return
        }
        f()

        drain: for {
            select {
            case f := <-el.curFunc:
                f()
            default:
                break drain
            }
        }
    }
}

Depending on your use, you may need to add some synchronization to make sure all tasks in the loop finish before your program exits.

吝吻 2024-12-13 13:23:29

在遇到很多问题和随机问题(包括使用 15 作为长度而不是容量)之后,我自己解决了这个问题...似乎您只是在递减计数器后有一个线程发送消息。 (loop.tick 部分可以内联,但我不担心这一点)

package eventloop

type eventLoop struct{
    functions []func()
    addFunc chan/*3*/ func()
    mutex chan/*1*/ bool
    threads int
    waitChannel chan bool
    pauseState chan bool
}
func (this *eventLoop) NextTick (f func()) {
    this.addFunc <- f
}

func (this *eventLoop) tick () {
    this.mutex <- true
    for this.threads != 0 {
        <-this.mutex
        <-this.waitChannel
        this.mutex <- true
    }
    <-this.mutex
    L1: for {
        select {
            case f := <-this.addFunc:
                this.functions = append(this.functions,f)
            default: break L1
        }
    }
    if len(this.functions) != 0 {
        this.functions[0]()
        if len(this.functions) >= 2 {
            this.functions = this.functions[1:]
        } else {
            this.functions = []func(){}
        }
    }   else {
        (<-this.addFunc)()
    }
}
func (this *eventLoop) CurrentTick (f func()) {
    this.mutex <- true
    this.threads += 1
    <-this.mutex
    go func() {
        f()
        this.mutex <- true
        this.threads -= 1
        <-this.mutex
        this.waitChannel <- true
    }()
}
func NewEventLoop () *eventLoop {
    funcs := make(chan func(),3)
    loop := &eventLoop{
        make([]func(),0,15), /*functions*/
        funcs, /*addFunc*/
        make(chan bool, 1), /*mutex for threads*/
        0, /*Number of threads*/
        make(chan bool,0), /*The "wait" channel*/
        make(chan bool,1), 
    }
    go func(){
        for { loop.tick() }
    }()
    return loop
}

注意:这仍然有很多其他问题。

I figured it out myself, after a lot of problems and random issues including using 15 as length instead of capacity... Seems you just have a thread send a message after you decrement the counter. (the loop.tick part could be inlined, but I'm not worried about that)

package eventloop

type eventLoop struct{
    functions []func()
    addFunc chan/*3*/ func()
    mutex chan/*1*/ bool
    threads int
    waitChannel chan bool
    pauseState chan bool
}
func (this *eventLoop) NextTick (f func()) {
    this.addFunc <- f
}

func (this *eventLoop) tick () {
    this.mutex <- true
    for this.threads != 0 {
        <-this.mutex
        <-this.waitChannel
        this.mutex <- true
    }
    <-this.mutex
    L1: for {
        select {
            case f := <-this.addFunc:
                this.functions = append(this.functions,f)
            default: break L1
        }
    }
    if len(this.functions) != 0 {
        this.functions[0]()
        if len(this.functions) >= 2 {
            this.functions = this.functions[1:]
        } else {
            this.functions = []func(){}
        }
    }   else {
        (<-this.addFunc)()
    }
}
func (this *eventLoop) CurrentTick (f func()) {
    this.mutex <- true
    this.threads += 1
    <-this.mutex
    go func() {
        f()
        this.mutex <- true
        this.threads -= 1
        <-this.mutex
        this.waitChannel <- true
    }()
}
func NewEventLoop () *eventLoop {
    funcs := make(chan func(),3)
    loop := &eventLoop{
        make([]func(),0,15), /*functions*/
        funcs, /*addFunc*/
        make(chan bool, 1), /*mutex for threads*/
        0, /*Number of threads*/
        make(chan bool,0), /*The "wait" channel*/
        make(chan bool,1), 
    }
    go func(){
        for { loop.tick() }
    }()
    return loop
}

Note: this still has lots of other problems.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文