如何高效地并行化数组列表并控制并行度?

发布于 2025-01-10 05:11:29 字数 1315 浏览 0 评论 0原文

我有一个 resourceId 数组,我需要并行循环。并为每个资源生成URL,然后放入一个地图中,其中键(resourcId)和值是url。

我得到下面的代码可以完成这项工作,但我不确定这是否是正确的方法。我在这里使用 sizedwaitgroup 来并行化 resourceId 列表。并且在向地图写入数据时也使用地图锁。我确信这不是有效的代码,因为使用锁然后使用调整大小的等待组会产生一些性能问题。

做到这一点的最佳且有效的方法是什么?我应该在这里使用频道吗?我想控制应该有多少并行度,而不是 resourceId 列表的运行长度。如果任何 resourceId url 生成失败,我想将其记录为该 resourceId 的错误,但不要中断其他并行运行的 go 例程以获取为其他 resourceId 生成的 url代码>资源ID。

例如:如果有 10 个资源,其中 2 个失败,则记录这 2 个资源的错误,并且映射应该包含剩余 8 个资源的条目。

// running 20 threads in parallel
swg := sizedwaitgroup.New(20)
var mutex = &sync.Mutex{}
start := time.Now()
m := make(map[string]*customerPbV1.CustomerResponse)
for _, resources := range resourcesList {
  swg.Add()
  go func(resources string) {
    defer swg.Done()
    customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
    if err != nil {
      errs.NewWithCausef(err, "Could not generate the url for %s", resources)
    }
    mutex.Lock()
    m[resources] = customerUrl
    mutex.Unlock()
  }(resources)
}
swg.Wait()

elapsed := time.Since(start)
fmt.Println(elapsed)

注意:以上代码将以高吞吐量从多个读取器线程调用,因此它需要表现良好。

I have a resourceId array which I need loop in parallel. And generate URL for each resource and then put inside a map which is key (resourcId) and value is url.

I got below code which does the job but I am not sure if this is the right way to do it. I am using sizedwaitgroup here to parallelize the resourceId list. And also using lock on map while writing the data to it. I am sure this isn't efficient code as using lock and then using sizedwaitgroup will have some performance problem.

What is the best and efficient way to do this? Should I use channels here? I want to control the parallelism on how much I should have instead of running length of resourceId list. If any resourceId url generation fails, I want to log that as an error for that resourceId but do not disrupt other go routine running in parallel to get the url generated for other resourceId.

For example: If there are 10 resources, and 2 fails then log error for those 2 and map should have entry for remaining 8.

// running 20 threads in parallel
swg := sizedwaitgroup.New(20)
var mutex = &sync.Mutex{}
start := time.Now()
m := make(map[string]*customerPbV1.CustomerResponse)
for _, resources := range resourcesList {
  swg.Add()
  go func(resources string) {
    defer swg.Done()
    customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
    if err != nil {
      errs.NewWithCausef(err, "Could not generate the url for %s", resources)
    }
    mutex.Lock()
    m[resources] = customerUrl
    mutex.Unlock()
  }(resources)
}
swg.Wait()

elapsed := time.Since(start)
fmt.Println(elapsed)

Note: Above code will be called at high throughput from multiple reader threads so it needs to perform well.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

我的影子我的梦 2025-01-17 05:11:29

我不确定 sizedwaitgroup 是什么,也没有解释,但总的来说,这种方法看起来不是很典型的 Go。就此而言,“最好”是一个见仁见智的问题,但 Go 中最典型的方法是这样的:(

func main() {
    wg := new(sync.WaitGroup)
    start := time.Now()
    numWorkers := 20
    m := make(map[string]*customerPbV1.CustomerResponse)
    work := make(chan string)
    results := make(chan result)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(work, results)
    }
    go func() {
        for _, resources := range resourcesList {
            work <- resources
        }
        close(work)
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        m[result.resources] = result.response
    }

    elapsed := time.Since(start)
    fmt.Println(elapsed)
}

type result struct {
    resources string
    response  *customerPbV1.CustomerResponse
}

func worker(ch chan string, r chan result) {
    for w := range ch {
        customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
        if err != nil {
            errs.NewWithCausef(err, "Could not generate the url for %s", resources)
            continue
        }
        r <- result{w, customerUrl}
    }
}

不过,根据名称,我认为 errs.NewWithCause 不会) t 实际上处理错误,但返回一个,在这种情况下,当前代码将它们扔到地板上,正确的解决方案将有一个额外的 chan error 来处理错误:

func main() {
    wg := new(sync.WaitGroup)
    start := time.Now()
    numWorkers := 20
    m := make(map[string]*customerPbV1.CustomerResponse)
    work := make(chan string)
    results := make(chan result)
    errors := make(chan error)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(work, results, errors)
    }

    go func() {
        for _, resources := range resourcesList {
            work <- resources
        }
        close(work)
    }()

    go func() {
        wg.Wait()
        close(results)
        close(errors)
    }()

    go func() {
        for err := range errors {
            // Do something with err
        }
    }()

    for result := range results {
        m[result.resources] = result.response
    }

    elapsed := time.Since(start)
    fmt.Println(elapsed)
}

type result struct {
    resources string
    response  *customerPbV1.CustomerResponse
}

func worker(ch chan string, r chan result, errs chan error) {
    for w := range ch {
        customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
        if err != nil {
            errs <- errs.NewWithCausef(err, "Could not generate the url for %s", resources)
            continue
        }
        r <- result{w, customerUrl}
    }
}

I'm not sure what sizedwaitgroup is and it's not explained, but overall this approach doesn't look very typical of Go. For that matter, "best" is a matter of opinion, but the most typical approach in Go would be something along these lines:

func main() {
    wg := new(sync.WaitGroup)
    start := time.Now()
    numWorkers := 20
    m := make(map[string]*customerPbV1.CustomerResponse)
    work := make(chan string)
    results := make(chan result)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(work, results)
    }
    go func() {
        for _, resources := range resourcesList {
            work <- resources
        }
        close(work)
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        m[result.resources] = result.response
    }

    elapsed := time.Since(start)
    fmt.Println(elapsed)
}

type result struct {
    resources string
    response  *customerPbV1.CustomerResponse
}

func worker(ch chan string, r chan result) {
    for w := range ch {
        customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
        if err != nil {
            errs.NewWithCausef(err, "Could not generate the url for %s", resources)
            continue
        }
        r <- result{w, customerUrl}
    }
}

(Though, based on the name, I would assume errs.NewWithCause doesn't actually handle errors, but returns one, in which case the current code is dropping them on the floor, and a proper solution would have an additional chan error for handling errors:

func main() {
    wg := new(sync.WaitGroup)
    start := time.Now()
    numWorkers := 20
    m := make(map[string]*customerPbV1.CustomerResponse)
    work := make(chan string)
    results := make(chan result)
    errors := make(chan error)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(work, results, errors)
    }

    go func() {
        for _, resources := range resourcesList {
            work <- resources
        }
        close(work)
    }()

    go func() {
        wg.Wait()
        close(results)
        close(errors)
    }()

    go func() {
        for err := range errors {
            // Do something with err
        }
    }()

    for result := range results {
        m[result.resources] = result.response
    }

    elapsed := time.Since(start)
    fmt.Println(elapsed)
}

type result struct {
    resources string
    response  *customerPbV1.CustomerResponse
}

func worker(ch chan string, r chan result, errs chan error) {
    for w := range ch {
        customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
        if err != nil {
            errs <- errs.NewWithCausef(err, "Could not generate the url for %s", resources)
            continue
        }
        r <- result{w, customerUrl}
    }
}
べ映画 2025-01-17 05:11:29

我已经创建了带有注释的示例代码。
请阅读评论。

注意:查询函数将在 1 秒后休眠。

package main

import (
    "errors"
    "fmt"
    "log"
    "math/rand"
    "runtime"
    "strconv"
    "sync"
    "time"
)

type Result struct {
    resource string
    val      int
    err      error
}

/*
CHANGE Result struct to this
result struct will collect all you need to create map
type Result struct {
    resources string
    customerUrl *customerPbV1.CustomerResponse
    err error
}
*/

// const numWorker = 8

func main() {
    now := time.Now()
    rand.Seed(time.Now().UnixNano())
    m := make(map[string]int)
    // m := make(map[string]*customerPbV1.CustomerResponse)                 // CHANGE TO THIS

    numWorker := runtime.NumCPU()
    fmt.Println(numWorker)
    chanResult := make(chan Result)

    go func() {
        for i := 0; i < 20; i++ {
            /*
             customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
             we asume i is resources
             chanResult <- Result {resource: strconv.Itoa(i)}
            */
            chanResult <- Result{ // this will block until chanResult is consume in line 68
                resource: strconv.Itoa(i),
            }
        }
        close(chanResult)
    }()

    var wg sync.WaitGroup
    cr := make(chan Result)
    wg.Add(numWorker)

    go func() {
        wg.Wait()
        close(cr) // NOTE: don't forget to close cr
    }()

    go func() {
        for i := 0; i < numWorker; i++ { // this for loop will run goroutine
            go func(x int) {
                for job := range chanResult { // unblock chan on line 49
                    log.Println("worker", x, "working on", job.resource)
                    x, err := query(job.resource) // TODO: customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
                    cr <- Result{                 // send to channel, will block until it consume. Consume is in MAIN goroutine "line 84"
                        resource: job.resource,
                        val:      x,
                        err:      err,
                    }
                }
                wg.Done()
            }(i)
        }
    }()

    counterTotal := 0
    counterSuccess := 0
    for res := range cr { // will unblock channel in line 71
        if res.err != nil {
            log.Printf("error found %s. stack trace: %s", res.resource, res.err)
        } else {
            m[res.resource] = res.val // NOTE: save to map
            counterSuccess++
        }
        counterTotal++
    }
    log.Printf("%d/%d of total job run", counterSuccess, counterTotal)
    fmt.Println("final :", m)
    fmt.Println("len m", len(m))

    fmt.Println(runtime.NumGoroutine())
    fmt.Println(time.Since(now))
}

func query(s string) (int, error) {
    time.Sleep(time.Second)
    i, err := strconv.Atoi(s)
    if err != nil {
        return 0, err
    }

    if i%3 == 0 {
        return 0, errors.New("i divided by 3")
    }
    ms := i + 500 + rand.Intn(500)
    return ms, nil
}

游乐场:https://go.dev/play/p/LeyE9n1hh81

I have create example code with comment.
please read the comment.

note: query function will sleep in 1 second.

package main

import (
    "errors"
    "fmt"
    "log"
    "math/rand"
    "runtime"
    "strconv"
    "sync"
    "time"
)

type Result struct {
    resource string
    val      int
    err      error
}

/*
CHANGE Result struct to this
result struct will collect all you need to create map
type Result struct {
    resources string
    customerUrl *customerPbV1.CustomerResponse
    err error
}
*/

// const numWorker = 8

func main() {
    now := time.Now()
    rand.Seed(time.Now().UnixNano())
    m := make(map[string]int)
    // m := make(map[string]*customerPbV1.CustomerResponse)                 // CHANGE TO THIS

    numWorker := runtime.NumCPU()
    fmt.Println(numWorker)
    chanResult := make(chan Result)

    go func() {
        for i := 0; i < 20; i++ {
            /*
             customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
             we asume i is resources
             chanResult <- Result {resource: strconv.Itoa(i)}
            */
            chanResult <- Result{ // this will block until chanResult is consume in line 68
                resource: strconv.Itoa(i),
            }
        }
        close(chanResult)
    }()

    var wg sync.WaitGroup
    cr := make(chan Result)
    wg.Add(numWorker)

    go func() {
        wg.Wait()
        close(cr) // NOTE: don't forget to close cr
    }()

    go func() {
        for i := 0; i < numWorker; i++ { // this for loop will run goroutine
            go func(x int) {
                for job := range chanResult { // unblock chan on line 49
                    log.Println("worker", x, "working on", job.resource)
                    x, err := query(job.resource) // TODO: customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
                    cr <- Result{                 // send to channel, will block until it consume. Consume is in MAIN goroutine "line 84"
                        resource: job.resource,
                        val:      x,
                        err:      err,
                    }
                }
                wg.Done()
            }(i)
        }
    }()

    counterTotal := 0
    counterSuccess := 0
    for res := range cr { // will unblock channel in line 71
        if res.err != nil {
            log.Printf("error found %s. stack trace: %s", res.resource, res.err)
        } else {
            m[res.resource] = res.val // NOTE: save to map
            counterSuccess++
        }
        counterTotal++
    }
    log.Printf("%d/%d of total job run", counterSuccess, counterTotal)
    fmt.Println("final :", m)
    fmt.Println("len m", len(m))

    fmt.Println(runtime.NumGoroutine())
    fmt.Println(time.Since(now))
}

func query(s string) (int, error) {
    time.Sleep(time.Second)
    i, err := strconv.Atoi(s)
    if err != nil {
        return 0, err
    }

    if i%3 == 0 {
        return 0, errors.New("i divided by 3")
    }
    ms := i + 500 + rand.Intn(500)
    return ms, nil
}

playground : https://go.dev/play/p/LeyE9n1hh81

幸福不弃 2025-01-17 05:11:29

这是一个纯通道解决方案(playground)。
我认为性能实际上取决于 GenerateUrl 或我的代码 generateURL
我还想指出的另一件事是,正确的术语是 并发而不是并行

package main

import (
    "errors"
    "log"
    "strconv"
    "strings"
)

type result struct {
    resourceID, url string
    err             error
}

func generateURL(resourceID string) (string, error) {
    if strings.HasPrefix(resourceID, "error-") {
        return "", errors.New(resourceID)
    }
    return resourceID, nil
}

func main() {
    // This is the resource IDs
    resources := make([]string, 10000)
    for i := 0; i < 10000; i++ {
        s := strconv.Itoa(i)
        if i%10 == 0 {
            resources[i] = "error-" + s
        } else {
            resources[i] = "id-" + s
        }
    }

    numOfChannel := 20
    // We send result through this channel to the resourceMap
    ch := make(chan result, 10)
    // These are the channels that go routine receives resource ID from
    channels := make([]chan string, numOfChannel)
    // After processing all resources, this channel is used to signal the go routines to exit
    done := make(chan struct{})

    for i := range channels {
        c := make(chan string)
        channels[i] = c

        go func() {
            for {
                select {
                case rid := <-c:
                    u, err := generateURL(rid)
                    ch <- result{rid, u, err}
                case _, ok := <-done:
                    if !ok {
                        break
                    }
                }
            }
        }()
    }

    go func() {
        for i, r := range resources {
            channels[i%numOfChannel] <- r
        }
    }()

    resourceMap := make(map[string]string)
    i := 0
    for p := range ch {
        if p.err != nil {
            log.Println(p.resourceID, p.err)
        } else {
            resourceMap[p.resourceID] = p.url
        }
        i++
        if i == len(resources)-1 {
            break
        }
    }

    close(done)
}

Here is a pure channel solution (playground).
I think the performance really depends on the GenerateUrl or in my code generateURL.
Also one more thing I would like to point out is that correct term for this is concurrency not parallelism.

package main

import (
    "errors"
    "log"
    "strconv"
    "strings"
)

type result struct {
    resourceID, url string
    err             error
}

func generateURL(resourceID string) (string, error) {
    if strings.HasPrefix(resourceID, "error-") {
        return "", errors.New(resourceID)
    }
    return resourceID, nil
}

func main() {
    // This is the resource IDs
    resources := make([]string, 10000)
    for i := 0; i < 10000; i++ {
        s := strconv.Itoa(i)
        if i%10 == 0 {
            resources[i] = "error-" + s
        } else {
            resources[i] = "id-" + s
        }
    }

    numOfChannel := 20
    // We send result through this channel to the resourceMap
    ch := make(chan result, 10)
    // These are the channels that go routine receives resource ID from
    channels := make([]chan string, numOfChannel)
    // After processing all resources, this channel is used to signal the go routines to exit
    done := make(chan struct{})

    for i := range channels {
        c := make(chan string)
        channels[i] = c

        go func() {
            for {
                select {
                case rid := <-c:
                    u, err := generateURL(rid)
                    ch <- result{rid, u, err}
                case _, ok := <-done:
                    if !ok {
                        break
                    }
                }
            }
        }()
    }

    go func() {
        for i, r := range resources {
            channels[i%numOfChannel] <- r
        }
    }()

    resourceMap := make(map[string]string)
    i := 0
    for p := range ch {
        if p.err != nil {
            log.Println(p.resourceID, p.err)
        } else {
            resourceMap[p.resourceID] = p.url
        }
        i++
        if i == len(resources)-1 {
            break
        }
    }

    close(done)
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文