这个scala并行数组代码线程安全吗?

发布于 2024-11-05 15:03:59 字数 559 浏览 1 评论 0原文

我想使用并行数组来完成任务,在开始编码之前,我有兴趣知道这个小片段是否是线程安全的:

import collection.mutable._

var listBuffer = ListBuffer[String]("one","two","three","four","five","six","seven","eight","nine")
var jSyncList  = java.util.Collections.synchronizedList(new java.util.ArrayList[String]())
listBuffer.par.foreach { e =>
    println("processed :"+e)
    // using sleep here to simulate a random delay
    Thread.sleep((scala.math.random * 1000).toLong)
    jSyncList.add(e)
}
jSyncList.toArray.foreach(println)

是否有更好的方法来处理并行集合的某些内容,并在其他地方累积结果?

I want to use parallel arrays for a task, and before I start with the coding, I'd be interested in knowing if this small snipept is threadsafe:

import collection.mutable._

var listBuffer = ListBuffer[String]("one","two","three","four","five","six","seven","eight","nine")
var jSyncList  = java.util.Collections.synchronizedList(new java.util.ArrayList[String]())
listBuffer.par.foreach { e =>
    println("processed :"+e)
    // using sleep here to simulate a random delay
    Thread.sleep((scala.math.random * 1000).toLong)
    jSyncList.add(e)
}
jSyncList.toArray.foreach(println)

Are there better ways of processing something with parallel collections, and acumulating the results elsewhere?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

君勿笑 2024-11-12 15:03:59

您发布的代码是完全安全的;但我不确定这个前提:为什么需要在非并行集合中累积并行集合的结果?并行集合的要点之一是它们看起来像其他集合。

我认为并行集合还将提供一个 seq 方法来切换到顺序集合。所以你应该使用这个!

The code you posted is perfectly safe; I'm not sure about the premise though: why do you need to accumulate the results of a parallel collection in a non-parallel one? One of the whole points of the parallel collections is that they look like other collections.

I think that parallel collections also will provide a seq method to switch to sequential ones. So you should probably use this!

等待圉鍢 2024-11-12 15:03:59

为了使此模式安全:

listBuffer.par.foreach { e => f(e) }

f 必须能够以安全的方式同时运行。我认为安全多线程所需的相同规则也适用(对共享状态的访问需要是线程安全的,f 调用不同 e 的顺序不会'不要具有确定性,当您开始同步 f 中的语句时,您可能会遇到死锁。

此外,我不清楚什么可以保证并行集合在处理时为您提供有关底层集合被修改的保证,因此可以添加/删除元素的可变列表缓冲区可能是一个糟糕的选择。您永远不知道下一个编码器何时会在 foreach 之前调用类似 foo(listBuffer) 的内容,并将该引用传递给另一个线程,这可能会在处理列表时改变列表。

除此之外,我认为对于任何需要很长时间、可以并发调用并且可以乱序处理的 f 来说,这是一个很好的模式。

immutCol.par.foreach { e => threadSafeOutOfOrderProcessingOf(e) }

免责声明:我自己没有尝试过// colls,但我期待着有这样的问题/答案向我们展示什么是有效的。

For this pattern to be safe:

listBuffer.par.foreach { e => f(e) }

f has to be able to run concurrently in a safe way. I think the same rules that you need for safe multi-threading apply (access to share state needs to be thread safe, the order of the f calls for different e won't be deterministic and you may run into deadlocks as you start synchronizing your statements in f).

Additionally I'm not clear what guarantees the parallel collections gives you about the underlying collection being modified while being processed, so a mutable list buffer which can have elements added/removed is possibly a poor choice. You never know when the next coder will call something like foo(listBuffer) before your foreach and pass that reference to another thread which may mutate the list while it's being processed.

Other than that, I think for any f that will take a long time, can be called concurrently and where e can be processed out of order, this is a fine pattern.

immutCol.par.foreach { e => threadSafeOutOfOrderProcessingOf(e) }

disclaimer: I have not tried // colls myself, but I'm looking forward at having SO questions/answers show us what works well.

楠木可依 2024-11-12 15:03:59

synchronizedList 应该是安全的,尽管 println 可能会给出意想不到的结果 - 您无法保证项目将被打印的顺序,甚至您的 println 也不会被打印。字符中间交错。

同步列表也不太可能是执行此操作的最快方法,更安全的解决方案是在不可变集合上映射(Vector 可能是您最好的选择),然后打印所有行(按顺序):

val input = Vector("one","two","three","four","five","six","seven","eight","nine")
val output  = input.par.map { e =>
  val msg = "processed :" + e
  // using sleep here to simulate a random delay
  Thread.sleep((math.random * 1000).toLong)
  msg
}
println(output mkString "\n")

您还会注意到此代码与您的示例具有同样多的实际用途:)

The synchronisedList should be safe, though the println may give unexpected results - you have no guarantees of the order that items will be printed, or even that your printlns won't be interleaved mid-character.

A synchronised list is also unlikely to be the fastest way you can do this, a safer solution is to map over an immutable collection (Vector is probably your best bet here), then print all the lines (in order) afterwards:

val input = Vector("one","two","three","four","five","six","seven","eight","nine")
val output  = input.par.map { e =>
  val msg = "processed :" + e
  // using sleep here to simulate a random delay
  Thread.sleep((math.random * 1000).toLong)
  msg
}
println(output mkString "\n")

You'll also note that this code has about as much practical usefulness as your example :)

篱下浅笙歌 2024-11-12 15:03:59

这段代码很奇怪——为什么要并行地添加一些需要同步的东西呢?您将增加争用,但绝对不会获得任何回报。

事情的原理 - 累积并行处理的结果,可以通过foldreduceaggregate等东西更好地实现代码>.

This code is plain weird -- why add stuff in parallel to something that needs to be synchronized? You'll add contention and gain absolutely nothing in return.

The principle of the thing -- accumulating results from parallel processing, are better achieved with stuff like fold, reduce or aggregate.

怂人 2024-11-12 15:03:59

您发布的代码是安全的 - 不会由于数组列表的状态不一致而出现错误,因为对它的访问是同步的。

但是,并行集合会同时(同时)且无序地处理项目。无序意味着 54. 元素可能会在 2. 元素之前处理 - 您的同步数组列表将包含非预定义顺序的项目。

一般来说,最好使用 mapfilter 和其他函数组合器将一个集合转换为另一个集合 - 如果集合有一些 (就像 Seq 所做的那样)。例如:

ParArray(1, 2, 3, 4).map(_ + 1)

始终返回 ParArray(2, 3, 4, 5)

但是,如果您需要将特定的线程安全集合类型(例如 ConcurrentSkipListMap 或同步集合)传递给某些 API 中的某些方法,则从并行 foreach 修改它是安全的。

最后,注意 - 并行集合提供对数据的并行批量操作。可变并行集合不是线程安全的,因为您可以从不同线程向它们添加元素。诸如插入映射或附加缓冲区之类的可变操作仍然必须同步。

The code you've posted is safe - there will be no errors due to inconsistent state of your array list, because access to it is synchronized.

However, parallel collections process items concurrently (at the same time), AND out-of-order. The out-of-order means that the 54. element may be processed before the 2. element - your synchronized array list will contain items in non-predefined order.

In general it's better to use map, filter and other functional combinators to transform a collection into another collection - these will ensure that the ordering guarantees are preserved if a collection has some (like Seqs do). For example:

ParArray(1, 2, 3, 4).map(_ + 1)

always returns ParArray(2, 3, 4, 5).

However, if you need a specific thread-safe collection type such as a ConcurrentSkipListMap or a synchronized collection to be passed to some method in some API, modifying it from a parallel foreach is safe.

Finally, a note - parallel collection provide parallel bulk operations on data. Mutable parallel collections are not thread-safe in the sense that you can add elements to them from different threads. Mutable operations like insertion to a map or appending a buffer still have to be synchronized.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文