迭代并发集合时的线程安全

发布于 2024-09-27 17:49:52 字数 511 浏览 0 评论 0原文

我正在编写一些客户端-服务器应用程序,我必须处理多个线程。我有一些服务器,每隔几秒钟发送一次活动数据包。这些服务器在 ConcurrentHashMap 中维护,其中包含它们的端点以及最后一个活动包到达相应服务器的时间。

现在我有一个线程,它必须“整理”所有在特定时间内未发送活动数据包的服务器。

我想我不能就那样做,可以吗?

for( IPEndPoint server : this.fileservers.keySet() )
{
    Long time = this.fileservers.get( server );

    //If server's time is updated here, I got a problem

    if( time > fileserverTimeout )
        this.fileservers.remove( server );
}

有没有一种方法可以解决这个问题,而无需获取整个循环的锁(然后我也必须在其他线程中遵守该锁)?

I'm writing some client-server-application where I have to deal with multiple threads. I've got some servers, that send alive-packets every few seconds. Those servers are maintained in a ConcurrentHashMap, that contains their EndPoints paired with the time the last alive-package arrived of the respective server.

Now I've got a thread, that has to "sort out" all the servers that haven't sent alive-packets for a specific amount of time.

I guess I can't just do it like that, can I?

for( IPEndPoint server : this.fileservers.keySet() )
{
    Long time = this.fileservers.get( server );

    //If server's time is updated here, I got a problem

    if( time > fileserverTimeout )
        this.fileservers.remove( server );
}

Is there a way I can get around that without aquiring a lock for the whole loop (that I then have to respect in the other threads as well)?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

美煞众生 2024-10-04 17:49:52

这里可能没有问题,具体取决于您在地图中存储的内容。您的代码对我来说看起来有点奇怪,因为您似乎保存了“服务器未处于活动状态的持续时间”。

我记录该数据的第一个想法是存储“服务器处于活动状态的最新时间戳”。那么您的代码将如下所示:

package so3950354;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ServerManager {

  private final ConcurrentMap<Server, Long> lastActive = new ConcurrentHashMap<Server, Long>();

  /** May be overridden by a special method for testing. */
  protected long now() {
    return System.currentTimeMillis();
  }

  public void markActive(Server server) {
    lastActive.put(server, Long.valueOf(now()));
  }

  public void removeInactive(long timeoutMillis) {
    final long now = now();

    Iterator<Map.Entry<Server, Long>> it = lastActive.entrySet().iterator();
    while (it.hasNext()) {
      final Map.Entry<Server, Long> entry = it.next();
      final long backThen = entry.getValue().longValue();
      /*
       * Even if some other code updates the timestamp of this server now,
       * the server had timed out at some point in time, so it may be
       * removed. It's bad luck, but impossible to avoid.
       */
      if (now - backThen >= timeoutMillis) {
        it.remove();
      }
    }
  }

  static class Server {

  }
}

如果您确实想避免在调用 removeInactive 期间没有任何代码调用 markActive,则没有办法绕过显式锁定。您可能想要的是:

  • 允许并发调用 markActive
  • markActive 期间,不允许调用 removeInactive
  • removeInactive 期间,不允许调用 markActive

这看起来像是 ReadWriteLock 的典型场景,其中 markActive 是“读取”操作,removeInactive 是“写入”操作。

There is probably no problem here, depending on what exactly you store in the map. Your code looks a little weird to me, since you seem to save "the duration for which the server hasn't been active".

My first idea for recording that data was to store "the latest timestamp at which the server has been active". Then your code would look like this:

package so3950354;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ServerManager {

  private final ConcurrentMap<Server, Long> lastActive = new ConcurrentHashMap<Server, Long>();

  /** May be overridden by a special method for testing. */
  protected long now() {
    return System.currentTimeMillis();
  }

  public void markActive(Server server) {
    lastActive.put(server, Long.valueOf(now()));
  }

  public void removeInactive(long timeoutMillis) {
    final long now = now();

    Iterator<Map.Entry<Server, Long>> it = lastActive.entrySet().iterator();
    while (it.hasNext()) {
      final Map.Entry<Server, Long> entry = it.next();
      final long backThen = entry.getValue().longValue();
      /*
       * Even if some other code updates the timestamp of this server now,
       * the server had timed out at some point in time, so it may be
       * removed. It's bad luck, but impossible to avoid.
       */
      if (now - backThen >= timeoutMillis) {
        it.remove();
      }
    }
  }

  static class Server {

  }
}

If you really want to avoid that no code ever calls markActive during a call to removeInactive, there is no way around explicit locking. What you probably want is:

  • concurrent calls to markActive are allowed.
  • during markActive no calls to removeInactive are allowed.
  • during removeInactive no calls to markActive are allowed.

This looks like a typical scenario for a ReadWriteLock, where markActive is the "reading" operation and removeInactive is the "writing" Operation.

迷鸟归林 2024-10-04 17:49:52

我不明白另一个线程如何在代码中更新服务器的时间。使用 this.fileservers.get( server ) 从地图中检索服务器的时间后,另一个线程无法更改其值,因为 Long 对象是不可变的。是的,另一个线程可以将该服务器的新 Long 对象放入映射中,但这不会影响该线程,因为它已经检索了服务器的时间。

因此,就目前情况而言,我看不出您的代码有任何问题。 ConcurrentHashMap 中的迭代器是弱一致的,这意味着它们可以容忍并发修改,因此也不存在抛出 ConcurrentModificationException 的风险。

I don't see how another thread can update the server's time at that point in your code. Once you've retrieved the time of a server from the map using this.fileservers.get( server ), another thread cannot change its value as Long objects are immutable. Yes, another thread can put a new Long object for that server into the map, but that doesn't affect this thread, because it has already retrieved the time of the server.

So as it stands I can't see anything wrong with your code. The iterators in a ConcurrentHashMap are weakly consistent which means they can tolerate concurrent modification, so there is no risk of a ConcurrentModificationException being thrown either.

萝莉病 2024-10-04 17:49:52

(参见罗兰的回答,它采用了这里的想法并将它们充实到一个更完整的示例中,并提供一些很棒的附加见解。)

由于它是并发哈希映射,因此您可以执行以下操作。请注意,CHM 的迭代器都实现了可选方法,包括您想要的 remove()。请参阅 CHM API 文档 ,其中指出:

该类及其视图和迭代器
实现所有可选方法
MapIterator 接口。

这段代码应该可以工作(我不知道您的 CHM 中的 Key 的类型):

ConcurrentHashMap<K,Long> fileservers = ...;

for(Iterator<Map.Entry<K,Long>> fsIter = fileservers.entrySet().iterator(); fileservers.hasNext(); )
{
    Map.Entry<K,Long> thisEntry = fsIter.next();
    Long time = thisEntry.getValue();

    if( time > fileserverTimeout )
        fsIter.remove( server );
}

但请注意,其他地方可能存在竞争条件...您需要确保其他代码访问地图可以处理这种自发删除 - 即,可能无论您在哪里触摸 fileservers.put(),您都需要一些涉及 fileservers.putIfAbsent() 的逻辑>。与使用同步解决方案相比,该解决方案不太可能产生瓶颈,但它也需要更多的思考。

你写的“如果服务器时间在这里更新,我遇到了问题”正是 putIfAbsent() 出现的地方。如果该条目不存在,要么你以前没有见过它,要么你只是最近把它从桌子上掉下来了。如果需要协调这两个方面,那么您可能需要为条目引入一个可锁定记录,并在该级别执行同步(即,在执行remove()时同步记录) code>,而不是整个表)。然后,put() 端也可以在同一条记录上同步,从而消除潜在的竞争。

(See Roland's answer, which takes the ideas here and fleshes them out into a fuller example, with some great additional insights.)

Since it's a concurrent hash map, you can do the following. Note that CHM's iterators all implement the optional methods, including remove(), which you want. See CHM API docs, which states:

This class and its views and iterators
implement all of the optional methods
of the Map and Iterator interfaces.

This code should work (I don't know the type of the Key in your CHM):

ConcurrentHashMap<K,Long> fileservers = ...;

for(Iterator<Map.Entry<K,Long>> fsIter = fileservers.entrySet().iterator(); fileservers.hasNext(); )
{
    Map.Entry<K,Long> thisEntry = fsIter.next();
    Long time = thisEntry.getValue();

    if( time > fileserverTimeout )
        fsIter.remove( server );
}

But note that there may be race conditions elsewhere... You need to make sure that other bits of code accessing the map can cope with this kind of spontaneous removal -- i.e., probably whereever you touch fileservers.put() you'll need a bit of logic involving fileservers.putIfAbsent(). This solution is less likely to create bottlenecks than using synchronized, but it also requires a bit more thought.

Where you wrote "If server's time is updated here, I got a problem" is exactly where putIfAbsent() comes in. If the entry is absent, either you hadn't seen it before, or you just recently dropped it from the table. If the two sides of this need to be coordinated, then you may instead want to introduce a lockable record for the entry, and carry out the synchronization at that level (i.e., sync on the record while doing remove(), rather than on the whole table). Then the put() end of things can also sync on the same record, eliminating a potential race.

下壹個目標 2024-10-04 17:49:52

首先使映射同步,

this.fileservers = Collections.synchronizedMap(Map)

然后使用单例类中使用的策略

if( time > fileserverTimeout )
    {
        synchronized(this.fileservers)
        {
             if( time > fileserverTimeout )
                  this.fileservers.remove( server );
        }
    }

现在这可以确保一旦进入同步块,就不会发生更新。之所以如此,是因为一旦获取了映射上的锁,映射(同步包装器)本身将​​无法为其提供线程锁以进行更新、删除等。

检查两次时间可确保仅在存在时才使用同步删除的真实案例

Firstly make map synchronized

this.fileservers = Collections.synchronizedMap(Map)

then use the strategy which is used in Singleton classes

if( time > fileserverTimeout )
    {
        synchronized(this.fileservers)
        {
             if( time > fileserverTimeout )
                  this.fileservers.remove( server );
        }
    }

Now this makes sure that once you inside the synchronized block, no updates can occur. This is so because once the lock on the map is taken, map(synchronized wrapper) will not have itself available to provide a thread lock on it for update, remove etc.

Checking for time twice makes sure that synchronization is used only when there is a genuine case of delete

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文