对可变对象的 AtomicReference 和可见性
假设我有一个对象列表的 AtomicReference
:
AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());
线程 A 将元素添加到此列表:batch.get().add(o);
随后,线程 B 获取该列表,例如,将其存储在数据库中: insertBatch(batch.get());
当写入(线程 A)和读取(线程 B)确保线程 B 看到的列表与 A 离开时的方式相同,还是由 AtomicReference 处理?
换句话说:如果我有一个可变对象的 AtomicReference,并且一个线程更改了该对象,其他线程是否会立即看到此更改?
编辑:
也许一些示例代码是有序的:
public void process(Reader in) throws IOException {
List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
ExecutorService exec = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; ++i) {
tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
@Override public AtomicReference<List<Object>> call() throws IOException {
final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));
Processor.this.parser.parse(in, new Parser.Handler() {
@Override public void onNewObject(Object event) {
batch.get().add(event);
if (batch.get().size() >= batchSize) {
dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
}
}
});
return batch;
}
}));
}
List<Object> remainingBatches = new ArrayList<Object>();
for (Future<AtomicReference<List<Object>>> task : tasks) {
try {
AtomicReference<List<Object>> remainingBatch = task.get();
remainingBatches.addAll(remainingBatch.get());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw (RuntimeException)cause;
}
}
// these haven't been flushed yet by the worker threads
if (!remainingBatches.isEmpty()) {
dao.insertBatch(remainingBatches);
}
}
这里发生的情况是,我创建了四个工作线程来解析一些文本(这是 process()
的 Reader in
参数方法)。每个工作进程都会将已解析的行保存在一个批次中,并在批次满时刷新该批次 (dao.insertBatch(batch.getAndSet(new ArrayList
由于文本中的行数不是批处理大小的倍数,因此最后一个对象最终会出现在未刷新的批处理中,因为它未满。因此,这些剩余批次由主线程插入。
我使用 AtomicReference.getAndSet()
将完整批次替换为空批次。该程序在线程方面正确吗?
Say I have an AtomicReference
to a list of objects:
AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());
Thread A adds elements to this list: batch.get().add(o);
Later, thread B takes the list and, for example, stores it in a DB: insertBatch(batch.get());
Do I have to do additional synchronization when writing (Thread A) and reading (Thread B) to ensure thread B sees the list the way A left it, or is this taken care of by the AtomicReference?
In other words: if I have an AtomicReference to a mutable object, and one thread changes that object, do other threads see this change immediately?
Edit:
Maybe some example code is in order:
public void process(Reader in) throws IOException {
List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
ExecutorService exec = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; ++i) {
tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
@Override public AtomicReference<List<Object>> call() throws IOException {
final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));
Processor.this.parser.parse(in, new Parser.Handler() {
@Override public void onNewObject(Object event) {
batch.get().add(event);
if (batch.get().size() >= batchSize) {
dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
}
}
});
return batch;
}
}));
}
List<Object> remainingBatches = new ArrayList<Object>();
for (Future<AtomicReference<List<Object>>> task : tasks) {
try {
AtomicReference<List<Object>> remainingBatch = task.get();
remainingBatches.addAll(remainingBatch.get());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw (RuntimeException)cause;
}
}
// these haven't been flushed yet by the worker threads
if (!remainingBatches.isEmpty()) {
dao.insertBatch(remainingBatches);
}
}
What happens here is that I create four worker threads to parse some text (this is the Reader in
parameter to the process()
method). Each worker saves the lines it has parsed in a batch, and flushes the batch when it is full (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
).
Since the number of lines in the text isn't a multiple of the batch size, the last objects end up in a batch that isn't flushed, since it's not full. These remaining batches are therefore inserted by the main thread.
I use AtomicReference.getAndSet()
to replace the full batch with an empty one. It this program correct with regards to threading?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
嗯...实际上不是这样的。
AtomicReference
保证引用本身在线程中可见,即如果您为其分配与原始引用不同的引用,则更新将可见。它不保证引用所指向的对象的实际内容。因此,对列表内容的读/写操作需要单独同步。
编辑:因此,从您更新的代码和您发布的评论来看,将本地引用设置为
易失性
足以确保可见性。Um... it doesn't really work like this.
AtomicReference
guarantees that the reference itself is visible across threads i.e. if you assign it a different reference than the original one the update will be visible. It makes no guarantees about the actual contents of the object that reference is pointing to.Therefore, read/write operations on the list contents require separate synchronization.
Edit: So, judging from your updated code and the comment you posted, setting the local reference to
volatile
is sufficient to ensure visibility.我认为,忘记这里的所有代码,你确切的问题是这样的:
因此,对此的确切响应是:是,原子处理可见性。这不是我的观点,而是 JDK文档一:
原子的访问和更新的内存效应通常遵循易失性的规则,如《Java语言规范,第三版(17.4内存模型)》中所述。
我希望这有帮助。
I think that, forgetting all the code here, you exact question is this:
So, the exact response to that is: YES, atomic take care of visibility. And it is not my opinion but the JDK documentation one:
The memory effects for accesses and updates of atomics generally follow the rules for volatiles, as stated in The Java Language Specification, Third Edition (17.4 Memory Model).
I hope this helps.
添加到Tudor的答案:你将必须制作
ArrayList 本身是线程安全的,或者 - 根据您的要求 - 甚至更大的代码块。
如果您可以使用线程安全的
ArrayList
,您可以像这样“装饰”它:但请记住:即使像这样的“简单”构造也不是线程安全的:
Adding to Tudor's answer: You will have to make the
ArrayList
itself threadsafe or - depending on your requirements - even larger code blocks.If you can get away with a threadsafe
ArrayList
you can "decorate" it like this:But keep in mind: Even "simple" constructs like this are not threadsafe with this:
AtomicReference
只会帮助您引用列表,它不会对列表本身执行任何操作。更具体地说,在您的场景中,当系统处于负载状态时,您几乎肯定会遇到问题,其中消费者已获取列表,而生产者正在向列表添加项目。在我看来,您应该使用
BlockingQueue
。如果生产者比消费者更快,那么您可以限制内存占用,并让队列处理所有争用。类似于:
添加
感谢@Tudor指出您正在使用的架构。 ……我不得不承认这很奇怪。据我所知,你根本不需要
AtomicReference
。每个线程都拥有自己的 ArrayList,直到它被传递到 dao,此时它被替换,因此任何地方都不会发生争用。我有点担心您在一个
Reader
上创建四个解析器。我希望您有某种方法可以确保每个解析器不会影响其他解析器。我个人会使用某种形式的生产者-消费者模式,正如我在上面的代码中所描述的那样。也许是这样的。
The
AtomicReference
will only help you with the reference to the list, it will not do anything to the list itself. More particularly, in your scenario, you will almost certainly run into problems when the system is under load where the consumer has taken the list while the producer is adding an item to it.This sound to me like you should be using a
BlockingQueue
. You can then Limit the memory footprint if you producer is faster than your consumer and let the queue handle all contention.Something like:
Added
Thanks to @Tudor for pointing out the architecture you are using. ... I have to admit it is rather strange. You don't really need
AtomicReference
at all as far as I can see. Each thread owns its ownArrayList
until it is passed on todao
at which point it is replaced so there is no contention at all anywhere.I am a little concerned about you creating four parser on a single
Reader
. I hope you have some way of ensuring each parser does not affect the others.I personally would use some form of producer-consumer pattern as I have described in the code above. Something like this perhaps.