Java同步块使用方法调用来获取同步对象

发布于 2024-10-01 08:21:23 字数 861 浏览 1 评论 0原文

我们正在编写一些锁定代码,并遇到了一个特殊的问题。我们使用 ConcurrentHashMap 来获取我们锁定的对象实例。所以我们的同步块看起来像这样

synchronized(locks.get(key)) { ... }

我们重写了 ConcurrentHashMap 的 get 方法,使其始终返回一个新对象(如果它不包含键)。

@Override
public Object get(Object key) {
   Object o = super.get(key);
   if (null == o) {
      Object no = new Object();
      o = putIfAbsent((K) key, no);
      if (null == o) {
         o = no;
      }
   }
   return o;
}

但是有没有一种状态,get方法已经返回了对象,但是线程还没有进入synchronized块。允许其他线程获取相同的对象并锁定它。

我们有一个潜在的竞争条件,

  • 线程 1:获取带有键 A 的对象,但不进入同步块
  • 线程 2:获取带有键 A 的对象,进入同步块 线程
  • 2:从映射中删除对象,退出同步阻塞
  • 线程 1:进入同步块,对象不再在映射中
  • 线程 3:获取键 A 的新对象(与线程 1 获取的对象不同)
  • 线程 3:进入同步块,而线程 1 也进入同步块都在其同步块中使用键 A

如果 java 在 get 调用返回后直接进入同步块,则这种情况是不可能的。如果没有,是否有人对我们如何删除密钥而不必担心这种竞争条件有任何意见?

We are writing some locking code and have run into a peculiar question. We use a ConcurrentHashMap for fetching instances of Object that we lock on. So our synchronized blocks look like this

synchronized(locks.get(key)) { ... }

We have overridden the get method of ConcurrentHashMap to make it always return a new object if it did not contain one for the key.

@Override
public Object get(Object key) {
   Object o = super.get(key);
   if (null == o) {
      Object no = new Object();
      o = putIfAbsent((K) key, no);
      if (null == o) {
         o = no;
      }
   }
   return o;
}

But is there a state in which the get-method has returned the object, but the thread has not yet entered the synchronized block. Allowing other threads to get the same object and lock on it.

We have a potential race condition were

  • thread 1: gets the object with key A, but does not enter the synchronized block
  • thread 2: gets the object with key A, enters a synchronized block
  • thread 2: removes the object from the map, exits synchronized block
  • thread 1: enters the synchronized block with the object that is no longer in the map
  • thread 3: gets a new object for key A (not the same object as thread 1 got)
  • thread 3: enters a synchronized block, while thread 1 also is in its synchronized block both using key A

This situation would not be possible if java entered the synchronized block directly after the call to get has returned. If not, does anyone have any input on how we could remove keys without having to worry about this race condition?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

仅冇旳回忆 2024-10-08 08:21:23

在我看来,问题源于您锁定地图值,而实际上您需要锁定(或其某种派生)。如果我理解正确的话,您希望避免 2 个线程使用相同的键运行关键部分。

您可以锁定钥匙吗?你能保证你总是使用同一个密钥实例吗?

一个不错的选择:

根本不删除锁。使用具有弱值的 ReferenceMap。这样,只有当当前没有任何线程使用映射条目时,才会删除该条目。

注意:

1)现在您必须同步此地图(使用 Collections.synchronizedMap(..))。

2)您还需要同步为给定键生成/返回值的代码。

As I see it, the problem originates from the fact that you lock on map values, while in fact you need to lock on the key (or some derivation of it). If I understand correctly, you want to avoid 2 threads from running the critical section using the same key.

Is it possible for you to lock on the keys? can you guarantee that you always use the same instance of the key?

A nice alternative:

Don't delete the locks at all. Use a ReferenceMap with weak values. This way, a map entry is removed only if it is not currently in use by any thread.

Note:

1) Now you will have to synchronize this map (using Collections.synchronizedMap(..)).

2) You also need to synchronize the code that generates/returns a value for a given key.

笑忘罢 2024-10-08 08:21:23

您有 2 个选择

:您可以在同步块内检查地图。

Object o = map.get(k);
synchronized(o) {
  if(map.get(k) != o) {
    // object removed, handle...
  }
}

b.您可以扩展您的值以包含指示其状态的标志。当从映射中删除某个值时,您可以设置一个标志来指示该值已被删除(在同步块内)。

CacheValue v = map.get(k);
sychronized(v) {
  if(v.isRemoved()) {
    // object removed, handle...
  }
}

you have 2 options:

a. you could check the map once inside the synchronized block.

Object o = map.get(k);
synchronized(o) {
  if(map.get(k) != o) {
    // object removed, handle...
  }
}

b. you could extend your values to contain a flag indicating their status. when a value is removed from the map, you set a flag indicating that it was removed (within the sync block).

CacheValue v = map.get(k);
sychronized(v) {
  if(v.isRemoved()) {
    // object removed, handle...
  }
}
戏剧牡丹亭 2024-10-08 08:21:23

代码按原样是线程安全的。也就是说,如果您从 CHM 中删除,那么在同步从集合返回的对象时所做的任何类型的假设都将丢失。

但是是否有一种状态
get-方法已返回对象,
但线程还没有进入
同步块。允许其他
线程获取相同的对象并且
锁定它。

是的,但是每当您在对象上同步时都会发生这种情况。保证的是,在另一个线程存在之前,另一个线程不会进入同步块。

如果没有,有没有人有任何意见
我们如何才能在不发生任何情况下删除钥匙
不得不担心这场比赛
条件?

确保这种原子性的唯一真正方法是在 CHM 或另一个对象(由所有线程共享)上进行同步。最好的方法是不要从 CHM 中删除。

The code as is, is thread safe. That being said, if you are removing from the CHM then any type of assumptions that are made when synchronizing on an object returned from the collection will be lost.

But is there a state in which the
get-method has returned the object,
but the thread has not yet entered the
synchronized block. Allowing other
threads to get the same object and
lock on it.

Yes, but that happens any time you synchronize on an Object. What is garunteed is that the other thread will not enter the synchronized block until the other exists.

If not, does anyone have any input on
how we could remove keys without
having to worry about this race
condition?

The only real way of ensuring this atomicity is to either synchronize on the CHM or another object (shared by all threads). The best way is to not remove from the CHM.

黑白记忆 2024-10-08 08:21:23

感谢所有很好的建议和想法,非常感谢!最终,这次讨论使我想出了一个不使用对象进行锁定的解决方案。

只是对我们实际正在做的事情的简要描述。

我们有一个缓存,可以连续从我们的环境接收数据。缓存对于每个键都有多个“存储桶”,并在事件进入时将事件聚合到存储桶中。传入的事件具有一个确定要使用的缓存条目的键,以及一个确定应该使用的缓存条目中的存储桶的时间戳。增加。

缓存还有一个定期运行的内部刷新任务。它将迭代所有缓存条目并将除当前存储桶之外的所有存储桶刷新到数据库。

现在,传入数据的时间戳可以是过去的任何时间,但其中大多数是最近的时间戳。因此,当前存储桶将比之前时间间隔的存储桶获得更多命中。

知道了这一点,我可以证明我们的竞争条件。所有这些代码都是针对单个缓存条目的,因为问题被隔离到单个缓存元素的并发写入和刷新。

// buckets :: ConcurrentMap<Long, AtomicLong>

void incrementBucket(long timestamp, long value) {
   long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);
   AtomicLong bucket = buckets.get(key);
   if (null == bucket) {
      AtomicLong newBucket = new AtomicLong(0);
      bucket = buckets.putIfAbsent(key, newBucket);
      if (null == bucket) {
          bucket = newBucket;
      }
   }

   bucket.addAndGet(value);
}

Map<Long, Long> flush() {
   long now = System.currentTimeMillis();
   long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);

   Map<Long, Long> flushedValues = new HashMap<Long, Long>();
   for (Long key : new TreeSet<Long>(buckets.keySet())) {
      if (key != nowKey) {
         AtomicLong bucket = buckets.remove(key);
         if (null != bucket) {
            long databaseKey = databaseKey(key);
            long n = bucket.get()
            if (!flushedValues.containsKey(databaseKey)) {
               flushedValues.put(databaseKey, n);
            } else {
               long sum = flushedValues.get(databaseKey) + n;
               flushedValues.put(databaseKey, sum);
            }
         }
      }
   }

   return flushedValues;
}

可能发生的情况是:(fl = 刷新线程,it = 增量线程)

  • it:进入incrementBucket,执行直到调用addAndGet(value)
  • fl:进入flush并迭代存储桶
  • fl:到达正在递增的存储桶
  • fl: 删除它并调用bucket.get() 并将值存储到刷新的值
  • it: 递增存储桶(现在将丢失,因为存储桶已被刷新并删除)

解决方案:

void incrementBucket(long timestamp, long value) {
   long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);

   boolean done = false;
   while (!done) {
      AtomicLong bucket = buckets.get(key);
      if (null == bucket) {
         AtomicLong newBucket = new AtomicLong(0);
         bucket = buckets.putIfAbsent(key, newBucket);
         if (null == bucket) {
             bucket = newBucket;
         }
      }

      synchronized (bucket) {
         // double check if the bucket still is the same
         if (buckets.get(key) != bucket) {
            continue;
         }
         done = true;

         bucket.addAndGet(value);
      }
   }
}

Map<Long, Long> flush() {
   long now = System.currentTimeMillis();
   long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);

   Map<Long, Long> flushedValues = new HashMap<Long, Long>();
   for (Long key : new TreeSet<Long>(buckets.keySet())) {
      if (key != nowKey) {
         AtomicLong bucket = buckets.get(key);
         if (null != value) {
            synchronized(bucket) {
               buckets.remove(key);
               long databaseKey = databaseKey(key);
               long n = bucket.get()
               if (!flushedValues.containsKey(databaseKey)) {
                  flushedValues.put(databaseKey, n);
               } else {
                  long sum = flushedValues.get(databaseKey) + n;
                  flushedValues.put(databaseKey, sum);
               }
            }
         }
      }
   }

   return flushedValues;
}

I希望这对可能遇到同样问题的其他人有用。

Thanks for all the great suggestions and ideas, really appreciate it! Eventually this discussion made me come up with a solution that does not use objects for locking.

Just a brief description of what we're actually doing.

We have a cache that receives data continuously from our environment. The cache has several 'buckets' for each key and aggregated events into the buckets as they come in. The events coming in have a key that determines the cache entry to be used, and a timestamp determining the bucket in the cache entry that should be incremented.

The cache also has an internal flush task that runs periodically. It will iterate all cache entries and flushes all buckets but the current one to database.

Now the timestamps of the incoming data can be for any time in the past, but the majority of them are for very recent timestamps. So the current bucket will get more hits than buckets for previous time intervals.

Knowing this, I can demonstrate the race condition we had. All this code is for one single cache entry, since the issue was isolated to concurrent writing and flushing of single cache elements.

// buckets :: ConcurrentMap<Long, AtomicLong>

void incrementBucket(long timestamp, long value) {
   long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);
   AtomicLong bucket = buckets.get(key);
   if (null == bucket) {
      AtomicLong newBucket = new AtomicLong(0);
      bucket = buckets.putIfAbsent(key, newBucket);
      if (null == bucket) {
          bucket = newBucket;
      }
   }

   bucket.addAndGet(value);
}

Map<Long, Long> flush() {
   long now = System.currentTimeMillis();
   long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);

   Map<Long, Long> flushedValues = new HashMap<Long, Long>();
   for (Long key : new TreeSet<Long>(buckets.keySet())) {
      if (key != nowKey) {
         AtomicLong bucket = buckets.remove(key);
         if (null != bucket) {
            long databaseKey = databaseKey(key);
            long n = bucket.get()
            if (!flushedValues.containsKey(databaseKey)) {
               flushedValues.put(databaseKey, n);
            } else {
               long sum = flushedValues.get(databaseKey) + n;
               flushedValues.put(databaseKey, sum);
            }
         }
      }
   }

   return flushedValues;
}

What could happen was: (fl = flush thread, it = increment thread)

  • it: enters incrementBucket, executes until just before the call to addAndGet(value)
  • fl: enters flush and iterates the buckets
  • fl: reaches the bucket that is being incremented
  • fl: removes it and calls bucket.get() and stores the value to the flushed values
  • it: increments the bucket (which will be lost now, because the bucket has been flushed and removed)

The solution:

void incrementBucket(long timestamp, long value) {
   long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);

   boolean done = false;
   while (!done) {
      AtomicLong bucket = buckets.get(key);
      if (null == bucket) {
         AtomicLong newBucket = new AtomicLong(0);
         bucket = buckets.putIfAbsent(key, newBucket);
         if (null == bucket) {
             bucket = newBucket;
         }
      }

      synchronized (bucket) {
         // double check if the bucket still is the same
         if (buckets.get(key) != bucket) {
            continue;
         }
         done = true;

         bucket.addAndGet(value);
      }
   }
}

Map<Long, Long> flush() {
   long now = System.currentTimeMillis();
   long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);

   Map<Long, Long> flushedValues = new HashMap<Long, Long>();
   for (Long key : new TreeSet<Long>(buckets.keySet())) {
      if (key != nowKey) {
         AtomicLong bucket = buckets.get(key);
         if (null != value) {
            synchronized(bucket) {
               buckets.remove(key);
               long databaseKey = databaseKey(key);
               long n = bucket.get()
               if (!flushedValues.containsKey(databaseKey)) {
                  flushedValues.put(databaseKey, n);
               } else {
                  long sum = flushedValues.get(databaseKey) + n;
                  flushedValues.put(databaseKey, sum);
               }
            }
         }
      }
   }

   return flushedValues;
}

I hope this will be useful for others that might run in to the same problem.

女中豪杰 2024-10-08 08:21:23

您提供的两个代码片段很好,因为它们本身就是这样。您所做的类似于 Guava 的延迟实例化 MapMaker.makeComputingMap() 可能有效,但我认为延迟创建键的方式没有问题。

顺便说一句,线程完全有可能在锁对象的 get() 查找之后被抢占,但在 之前 > 进入同步。

我的问题是你的竞争条件描述中的第三个要点。你说:

线程2:从映射中删除对象,退出同步块

哪个对象,哪个映射?一般来说,我假设您正在查找要锁定的键,然后在同步块内对其他数据结构执行一些其他操作。如果您正在谈论从开头提到的 ConcurrentHashMap 中删除锁定对象,那么这是一个巨大差异。

真正的问题是这是否有必要。在通用环境中,我认为仅记住所有已查找过的键的所有锁对象(即使这些键不再代表活动对象)不会出现任何内存问题。想出某种方法来安全地处理可能存储在其他线程的局部变量中的对象是非常困难的,如果你确实想走这条路,我感觉性能会降低到围绕键查找的单个粗锁的性能。

如果我误解了那里发生的事情,请随时纠正我。

编辑:好的 - 在这种情况下,我坚持我的上述主张,即最简单的方法不是删除钥匙;这实际上可能并不像您想象的那么有问题,因为空间增长的速度非常小。根据我的计算(这很可能是错误的,我不是空间计算方面的专家,而且您的 JVM 可能会有所不同),地图每小时增长约 14Kb。在该映射用完 100MB 堆空间之前,您必须有一年的连续正常运行时间。

但我们假设钥匙确实需要移除。这会带来一个问题,即在您知道没有线程正在使用密钥之前,您无法删除该密钥。这会导致先有鸡还是先有蛋的问题,您需要所有线程在其他事情上同步,以便获得跨线程的原子性(检查)和可见性,这意味着您可以“除了在整个事情周围放置一个同步块之外,别无他法,完全颠覆了你的锁条带策略。

让我们重新审视一下约束条件。这里最重要的是事情最终会得到澄清。这不是正确性约束,而只是内存问题。因此,我们真正想做的是确定密钥肯定不再被使用的某个点,然后使用它作为触发器将其从地图中删除。这里有两种情况:

  1. 您可以识别这种情况,并对其进行逻辑测试。在这种情况下,您可以使用(在最坏的情况下)某种计时器线程从映射中删除键,或者希望使用某种与您的应用程序更清晰集成的逻辑。
  2. 您无法确定您知道密钥将不再被使用的任何条件。在这种情况下,根据定义,从地图中删除键是不安全的。因此,事实上,为了正确性起见,您必须将它们保留在其中。

无论如何,这实际上可以归结为手动垃圾收集。当您可以懒惰地确定不再使用这些键时,从地图中删除它们。您当前的解决方案在这里太急切了,因为(正如您所指出的)它正在这种情况发生之前进行删除。

The two code snippets you've provided are fine, as they are. What you've done is similar to how lazy instantiation with Guava's MapMaker.makeComputingMap() might work, but I see no problems with the way that the keys are lazily created.

You're right by the way that it's entirely possible for a thread to be prempted after the get() lookup of a lock object, but before entering sychronized.

My problem is with the third bullet point in your race condition description. You say:

thread 2: removes the object from the map, exits synchronized block

Which object, and which map? In general, I presumed that you were looking up a key to lock on, and then would be performing some other operations on other data structures, within the synchronized block. If you're talking about removing the lock object from the ConcurrentHashMap mentioned at the start, that's a massive difference.

And the real question is whether this is necessary at all. In a general purpose environment, I don't think there will be any memory issues with just remembering all of the lock objects for all the keys that have ever been looked up (even if those keys no longer represent live objects). It is much harder to come up with some way of safely disposing of an object that may be stored in a local variable of some other thread at any time, and if you do want to go down this route I have a feeling that performance will degrade to that of a single coarse lock around the key lookup.

If I've misunderstood what's going on there then feel free to correct me.

Edit: OK - in which case I stand by my above claim that the easiest way to do this is not remove the keys; this might not actually be as problematic as you think, since the rate at which the space grows will be very small. By my calculations (which may well be off, I'm not an expert in space calculations and your JVM may vary) the map grows by about 14Kb/hour. You'd have to have a year of continuous uptime before this map used up 100MB of heap space.

But let's assume that the keys really do need to be removed. This poses the problem that you can't remove a key until you know that no threads are using it. This leads to the chicken-and-egg problem that you'll require all threads to synchronize on something else in order to get atomicity (of checking) and visibility across threads, which then means that you can't do much else than slap a single synchronized block around the whole thing, completely subverting your lock striping strategy.

Let's revisit the constraints. The main thing here is that things get cleared up eventually. It's not a correctness constraint but just a memory issue. Hence what we really want to do is identify some point at which the key could definitely no longer be used, and then use this as the trigger to remove it from the map. There are two cases here:

  1. You can identify such a condition, and logically test for it. In which case you can remove the keys from the map with (in the worst case) some kind of timer thread, or hopefully some logic that's more cleanly integrated with your application.
  2. You cannot identify any condition by which you know that a key will no longer be used. In this case, by definition, there is no point at which it's safe to remove the keys from the map. So in fact, for correctness' sake, you must leave them in.

In any case, this effectively boils down to manual garbage collection. Remove the keys from the map when you can lazily determine that they're no longer going to be used. Your current solution is too eager here since (as you point out) it's doing the removal before this situation holds.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文