更新:请注意。
我提出的问题得到了解答。对我来说不幸的是,这个问题比标题中的问题要大得多。除了向地图添加新条目之外,我还必须同时处理更新和删除。我想到的场景似乎无法在没有其中之一的情况下实现:
一个。僵局
b.复杂&耗时的检查和锁定
检查问题底部的最终想法。
原始帖子:
嗨,
我有一颗带有地图的春豆。
这就是我想要使用它的目的:
很少有并发 JMS 侦听器会接收带有操作的消息。每个动作由两个用户组成:长用户A和长用户B。消息将有它自己的字符串replyTo队列,该队列将用于识别操作。
因为当其中一个用户参与另一个已执行的操作时,我不能允许执行一个操作,所以我将使用此映射作为正在发生的事情的注册表,以便控制操作的执行。
假设我收到三个操作:
1. 用户A,用户B
2. 用户B、用户C
3. userC,userA
当收到第一个操作时,地图是空的,因此我将在其中记录有关该操作的信息并开始执行该操作。
当收到第二个操作时,我可以看到 userB 正在“忙”第一个操作,因此我只需记录有关该操作的信息。
第三个动作也是如此。
地图将如下所示:
[用户A:[操作1,操作3],
用户B:[操作1,操作2],
userC:[action2, action3]]
一旦第一个操作完成,我将从注册表中删除有关它的信息,并获取有关 userA 和 userB [action3, action2] 的下一步操作的信息。然后我会尝试重新启动它们。
我想现在你已经明白我想用这张地图做什么了。
因为要同时从多个线程访问地图,所以我必须以某种方式处理同步。
我将提供向地图添加新信息以及在操作完成后从地图中删除信息的方法。删除方法将为刚刚完成操作的两个用户返回下一个操作[如果有的话]。
因为可能同时执行数百个操作,并且繁忙用户的操作百分比应该很低,所以我不想阻止每个添加/删除操作对地图的访问。
我考虑过仅对映射中的每个列表进行同步访问,以允许同时并发访问多个用户条目。但是...因为当用户没有任何操作时,我想从地图中删除该用户的条目。另外......当用户在地图中没有条目时,我将不得不创建一个。我有点担心那里可能会发生冲突。
处理这种情况的最佳方法是什么?
使这两种方法 - 添加和删除 - 同步(我认为是最坏的情况)是唯一正确的[安全]方法吗?
另外,我将有另一个映射,其中包含操作 ID 作为键,用户 ID 作为值,以便更容易识别/删除用户对。我相信我可以跳过此同步,因为不存在一个操作同时执行两次的情况。
虽然代码是用 Groovy 编写的,但我相信 Java 程序员不会觉得它难以阅读。其背后是Java。
请考虑将以下内容视为伪代码,因为我只是进行原型设计。
class UserRegistry {
// ['actionA':[userA, userB]]
// ['actionB':[userC, userA]]
// ['actionC':[userB, userC]]
private Map<String, List<Long>> messages = [:]
/**
* ['userA':['actionA', 'actionB'],
* ['userB':['actionA', 'actionC'],
* ['userC':['actionB', 'actionC']
*/
private Map<long, List<String>> users = [:].asSynchronized()
/**
* Function will add entries for users and action to the registry.
* @param userA
* @param userB
* @param action
* @return true if a new entry was added, false if entries for at least one user already existed
*/
public boolean add(long userA, long userB, String action) {
boolean userABusy = users.containsKey(userA)
boolean userBBusy = users.containsKey(userB)
boolean retValue
if (userABusy || userBBusy) {
if (userABusy) {
users.get(userA).add(action)
} else {
users.put(userA, [action].asSynchronized())
}
if (userBBusy) {
users.get(userB).add(action)
} else {
users.put(userB, [action].asSynchronized())
}
messages.put(action, [userA, userB])
retValue = false
} else {
users.put(userA, [action].asSynchronized())
users.put(userB, [action].asSynchronized())
messages.put(action, [userA, userB])
retValue = true
}
return retValue
}
public List remove(String action) {
if(!messages.containsKey(action)) throw new Exception("we're screwed, I'll figure this out later")
List nextActions = []
long userA = messages.get(action).get(0)
long userB = messages.get(action).get(1)
if (users.get(userA).size() > 1) {
users.get(userA).remove(0)
nextActions.add(users.get(userA).get(0))
} else {
users.remove(userA)
}
if (users.get(userB).size() > 1) {
users.get(userB).remove(0)
nextActions.add(users.get(userB).get(0))
} else {
users.remove(userB)
}
messages.remove(action)
return nextActions
}
}
编辑
我昨晚考虑了这个解决方案,似乎消息映射可能会消失,而用户映射将是:
Map users<String, List<UserRegistryEntry>>
在哪里
用户注册表条目:
字符串操作Id
布尔等待
现在让我们假设我得到这些操作:
action1:userA,userC
操作2:用户A,用户D
动作3:用户B,用户C
action4: userB, userD
表示action1和action4可以同时执行,action2和action3被阻塞。地图看起来像这样:
[
[userAId: [actionId: action1, waiting: false],[actionId: action2, waiting: true]],
[userBId: [actionId: action3, waiting: true], [actionId: action4, waiting: false]],
[userCId: [actionId: action1, waiting: false],[actionId: action3, waiting: true]],
[userDId: [actionId: action2, waiting: true], [actionId: action4, waiting: false]]
]
这样,当操作执行完成时,我使用以下方法从地图中删除条目:
userAId、userBId、actionId
并获取有关 userA 和 userB 上第一个非阻塞等待操作的详细信息 [如果有的话] 并将其传递给执行。
现在我需要两个方法,将数据写入映射并将其从映射中删除。
public boolean add(long userA, long userB, String action) {
boolean userAEntryExists = users.containsKey(userA)
boolean userBEntryExists = users.containsKey(userB)
boolean actionWaiting = true
UserRegistryEntry userAEntry = new UserRegistryEntry(actionId: action, waiting: false)
UserRegistryEntry userBEntry = new UserRegistryEntry(actionId: action, waiting: false)
if (userAEntryExists || userBEntryExists) {
if (userAEntryExists) {
for (entry in users.get(userA)) {
if (!entry.waiting) {
userAEntry.waiting = true
userBEntry.waiting = true
actionWaiting = true
break;
}
}
}
if (!actionWaiting && userBEntryExists) {
for (entry in users.get(userB)) {
if (!entry.waiting) {
userAEntry.waiting = true
userBEntry.waiting = true
actionWaiting = true
break;
}
}
}
}
if (userBEntryExists) {
users.get(userA).add(userAEntry)
} else {
users.put(userA, [userAEntry])
}
if (userAEntryExists) {
users.get(userB).add(userBEntry)
} else {
users.put(userB, [userBEntry])
}
return actionWaiting
}
对于删除:
public List remove(long userA, long userB, String action) {
List<String> nextActions = []
finishActionAndReturnNew(userA, action, nextActions)
finishActionAndReturnNew(userB, action, nextActions)
return nextActions;
}
private def finishActionAndReturnNew(long userA, String action, List<String> nextActions) {
boolean userRemoved = false
boolean actionFound = false
Iterator itA = users.get(userA).iterator()
while (itA.hasNext()) {
UserRegistryEntry entry = itA.next()
if (!userRemoved && entry.actionId == action) {
itA.remove()
} else {
if (!actionFound && isUserFree(entry.otherUser)) {
nextActions.add(entry.actionId)
}
}
if (userRemoved && actionFound) break
}
}
public boolean isUserFree(long userId) {
boolean userFree = true
if (!users.containsKey(userId)) return true
for (entry in users.get(userId)) {
if (!entry.waiting) userFree = false
}
return userFree
}
最终想法:
这种情况是致命的:
[操作ID,用户A,用户B]
[a, 1,2]
[b,1,3]
[c,3,4]
[d, 3,1]
动作a和c同时执行,b和d正在等待。
当 a 和 c 完成时,用户 1,2,3,4 的条目将必须被删除,因此一个线程将锁定 1 和 2,另一个线程将锁定 3 和 4。当这些用户被锁定时,必须检查每个用户的下一步操作。当代码确定用户 1 的下一个操作是用户 3,而用户 3 的下一个操作是用户 1 时,whey 将尝试锁定它们。这就是僵局发生的时候。我知道我可以围绕这个问题编写代码,但执行起来似乎会花费很多时间,并且会阻塞两个工作人员。
现在,我将提出关于 SO 的另一个问题,更多关于我的问题的主题,并同时尝试使用 JMS 构建解决方案的原型。
UPDATE: Please note.
The question I have asked was answered. Unfortunately for me, the issue is quite bigger than question in the Title. Apart from adding new entries to the map I had to handle updates and removals at the same time. The scenario I have in mind seems not possible to implement without one or the other:
a. deadlocks
b. complex & time consuming checks and locks
Check the bottom of the Question for final thoughts.
ORIGINAL POST:
Hi,
I've got a spring bean with a Map.
Here's what I want to use it for:
few concurrent JMS Listeners will receive messages with actions. Each action consist of two users: long userA and long userB. Message will have it's own String replyTo queue which will be used to identify the action.
Because I cannot allow to execute an action when one of the users participates in another action which is executed I am going to use this map as a registry of what is going on and in order to control execution of actions.
So let's say I receive three actions:
1. userA, userB
2. userB, userC
3. userC, userA
When first action is received the map is empty so I am going to record info about the action in it and start executing the action.
When second action is received I can see that userB is 'busy' with first action so I simply record information about the action.
Same thing for third action.
Map is going to look like this:
[userA:[action1, action3],
userB:[action1, action2],
userC:[action2, action3]]
Once first action is complete I will remove information about it from the registry and get info about next actions for userA and userB [action3, action2]. Then I will try to restart them.
I think by now you get what I want to do with this map.
Because map is going to be accessed from several threads at the same time I have to handle synchronization somehow.
I will have methods to add new information to the map and to remove info from the map when action is done. The remove method will return next actions [if there are any] for the two users for whom the action just finished.
Because there could be hundreds of actions executed at the same time and the percentage of actions with busy users is supposed to be low I don't want to block access to the map for every add/remove operation.
I thought about making synchronized access only to each of the Lists within the Map to allow concurrent access to several user entries at the same time. However... because when there are no actions left for the user I want to remove entry for this user from the map. Also... when user has no entry in the map I will have to create one. I am a little bit afraid there could be clashes in there somewhere.
What would be the best way to handle this scenario?
Is making both methods - add and remove - synchronized (which I consider the worst case scenario) the only proper [safe] way to do it?
Additionally I will have another map which will contain action id as keys and user ids as values so it's easier to identify/remove user pairs. I believe I can skip synchronization on this one since there's no scenario where one action would be executed twice at the same time.
Although code is in Groovy I believe no Java programmer will find it difficult to read. It is Java behind it.
Please consider following as pseudo code as I am just prototyping.
class UserRegistry {
// ['actionA':[userA, userB]]
// ['actionB':[userC, userA]]
// ['actionC':[userB, userC]]
private Map<String, List<Long>> messages = [:]
/**
* ['userA':['actionA', 'actionB'],
* ['userB':['actionA', 'actionC'],
* ['userC':['actionB', 'actionC']
*/
private Map<long, List<String>> users = [:].asSynchronized()
/**
* Function will add entries for users and action to the registry.
* @param userA
* @param userB
* @param action
* @return true if a new entry was added, false if entries for at least one user already existed
*/
public boolean add(long userA, long userB, String action) {
boolean userABusy = users.containsKey(userA)
boolean userBBusy = users.containsKey(userB)
boolean retValue
if (userABusy || userBBusy) {
if (userABusy) {
users.get(userA).add(action)
} else {
users.put(userA, [action].asSynchronized())
}
if (userBBusy) {
users.get(userB).add(action)
} else {
users.put(userB, [action].asSynchronized())
}
messages.put(action, [userA, userB])
retValue = false
} else {
users.put(userA, [action].asSynchronized())
users.put(userB, [action].asSynchronized())
messages.put(action, [userA, userB])
retValue = true
}
return retValue
}
public List remove(String action) {
if(!messages.containsKey(action)) throw new Exception("we're screwed, I'll figure this out later")
List nextActions = []
long userA = messages.get(action).get(0)
long userB = messages.get(action).get(1)
if (users.get(userA).size() > 1) {
users.get(userA).remove(0)
nextActions.add(users.get(userA).get(0))
} else {
users.remove(userA)
}
if (users.get(userB).size() > 1) {
users.get(userB).remove(0)
nextActions.add(users.get(userB).get(0))
} else {
users.remove(userB)
}
messages.remove(action)
return nextActions
}
}
EDIT
I thought about this solution last night and it seems that messages map could go away and users Map would be:
Map users<String, List<UserRegistryEntry>>
where
UserRegistryEntry:
String actionId
boolean waiting
now let's assume I get these actions:
action1: userA, userC
action2: userA, userD
action3: userB, userC
action4: userB, userD
This means that action1 and action4 can be executed simultaneously and action2 and action3 are blocked. Map would look like this:
[
[userAId: [actionId: action1, waiting: false],[actionId: action2, waiting: true]],
[userBId: [actionId: action3, waiting: true], [actionId: action4, waiting: false]],
[userCId: [actionId: action1, waiting: false],[actionId: action3, waiting: true]],
[userDId: [actionId: action2, waiting: true], [actionId: action4, waiting: false]]
]
This way, when action execution is finished I remove entry from the map using:
userAId, userBId, actionId
And take details about first non blocked waiting action on userA and userB [if there are any] and pass them for execution.
So now the two methods I will need, which are going to write data to the Map and remove it from the map.
public boolean add(long userA, long userB, String action) {
boolean userAEntryExists = users.containsKey(userA)
boolean userBEntryExists = users.containsKey(userB)
boolean actionWaiting = true
UserRegistryEntry userAEntry = new UserRegistryEntry(actionId: action, waiting: false)
UserRegistryEntry userBEntry = new UserRegistryEntry(actionId: action, waiting: false)
if (userAEntryExists || userBEntryExists) {
if (userAEntryExists) {
for (entry in users.get(userA)) {
if (!entry.waiting) {
userAEntry.waiting = true
userBEntry.waiting = true
actionWaiting = true
break;
}
}
}
if (!actionWaiting && userBEntryExists) {
for (entry in users.get(userB)) {
if (!entry.waiting) {
userAEntry.waiting = true
userBEntry.waiting = true
actionWaiting = true
break;
}
}
}
}
if (userBEntryExists) {
users.get(userA).add(userAEntry)
} else {
users.put(userA, [userAEntry])
}
if (userAEntryExists) {
users.get(userB).add(userBEntry)
} else {
users.put(userB, [userBEntry])
}
return actionWaiting
}
And for removes:
public List remove(long userA, long userB, String action) {
List<String> nextActions = []
finishActionAndReturnNew(userA, action, nextActions)
finishActionAndReturnNew(userB, action, nextActions)
return nextActions;
}
private def finishActionAndReturnNew(long userA, String action, List<String> nextActions) {
boolean userRemoved = false
boolean actionFound = false
Iterator itA = users.get(userA).iterator()
while (itA.hasNext()) {
UserRegistryEntry entry = itA.next()
if (!userRemoved && entry.actionId == action) {
itA.remove()
} else {
if (!actionFound && isUserFree(entry.otherUser)) {
nextActions.add(entry.actionId)
}
}
if (userRemoved && actionFound) break
}
}
public boolean isUserFree(long userId) {
boolean userFree = true
if (!users.containsKey(userId)) return true
for (entry in users.get(userId)) {
if (!entry.waiting) userFree = false
}
return userFree
}
FINAL THOUGHT:
This scenario is a killer:
[ActionID, userA,userB]
[a, 1,2]
[b, 1,3]
[c, 3,4]
[d, 3,1]
Action a and c are executed simultaneously, b and d are waiting.
When a and c are done, entries for users 1,2,3,4 will have to be removed, thus one thread will have 1 and 2 locked, the other thread will have 3 and 4 locked. When these users are locked a check for next action for each of them has to be performed. When code determines that for user 1 next action is with user 3 and for user 3 next action is with user 1, whey will try to lock them. This is when the deadlock happens. I know I could code around that, but it seems it will take a lot of time to execute and it will block two workers.
For now I will ask another question on SO, more on the subject of my issue and try to prototype the solution using JMS in the meantime.
发布评论
评论(3)
您可能需要再次回顾同步(集合)如何工作:
这(作为一个非排他性示例)不是线程安全的:
请记住,只有单个“同步”方法在没有更大锁的情况下保证是原子的范围。
快乐编码。
编辑 - 每用户同步锁(更新评论):
只需使用标准数据结构,您就可以使用 ConcurrentHashMap - 特别是通过使用 'putIfAbsent' 方法。 (这与仅使用“同步 HashMap”的 get/put 相比显着不同,请参见上文。)
下面是一些伪代码和注释:
我不知道这在您的使用场景与实际情况下如何公平。单个公共锁对象。通常建议选择“更简单”,除非这样做有明显的优势。
HTH 和快乐编码。
You may need to review how synchronized (collections) work again:
This (as a non-exclusive example) is not thread-safe:
Remember that only individual "synchronized" methods are guaranteed atomic without a larger lock scope.
Happy coding.
Edit - per-user synchronization locks (updated for comment):
Just by using the standard data-structures you can achieve per-key locks by using ConcurrentHashMap -- in particular by using the 'putIfAbsent' method. (This is significantly different than just using get/put of a 'synchronized HashMap', see above.)
Below is some pseudo-code and notes:
I have no how this fairs under your usage scenario vs. a single common lock object. It is often advisable to go with "simpler" unless there is a clear advantage to do otherwise.
HTH and Happy coding.
您可能需要查看 Collections.synchronizedMap() 或 Collections.synchronizedList()
You might want to check out Collections.synchronizedMap() or Collections.synchronizedList()
类中有两个全局状态持有者,并且修改它们的两个方法中的每一个都有复合操作。因此,即使我们将 Map 更改为 ConcurrentHashMap,并将 List 更改为 CopyOnWriteArrayList 之类的内容,它仍然无法保证一致的状态。
我发现您会经常写入列表,因此,CopyOnWriteArrayList 无论如何可能太昂贵了。 ConcurrentHashMap 只有 16 路条带化。如果您有更好的硬件,替代方案是 Cliff Click 的 highscalelib(在方法中适当锁定之后)。
回到一致性问题,如何使用 ReentrantLock 而不是同步,看看是否可以从 lock()-to-unlock() 序列中排除一些语句。如果您使用 ConcurrentMap,则 add() 中执行 containsKey() 操作的前两条语句可以是乐观的,您可以将它们从锁定块中排除。
您真的需要消息映射吗?它有点像用户的倒排索引。另一种选择是使用另一种 watch() 方法,该方法在用户更改后根据来自 add() 的信号定期更新消息映射。刷新也可以是完全异步的。这样做时,您可以在更新消息时将 ReadWriteLock 与用户的 readLock() 结合使用。在这种情况下,add() 可以安全地获取用户的 writeLock()。只需要做更多的工作即可获得合理的正确性。
You have two global state-holders in the class and compound-actions in each of the two methods that modify both of them. So, even if we changed the Map's to be ConcurrentHashMap's and the List to something like CopyOnWriteArrayList, it would still not guarantee a consistent state.
I see that you will be writing to the List often, so, CopyOnWriteArrayList might be too expensive anyway. ConcurrentHashMap is only 16-way striped. If you have better hardware, an alternative would be Cliff Click's highscalelib (after appropriate locking in the methods).
Back to the consistency question, how about use a ReentrantLock instead of synchronizing and see if you can exclude some statements out of the lock()-to-unlock() sequence. If you went with a ConcurrentMap, the first two statements in the add() that do containsKey() can be optimistic and you may be able to exclude them from the lock block.
Do you really need the messages map? It is kind of like an inverse index of users. One other option would be to have another watch() method that periodically updates the messages map based on a signal from add() after a change to users. The refresh could alternatively be completely async. In doing that, you might be able to use a ReadWriteLock with the readLock() on users while you update messages. In this situation, add() can safely acquire a writeLock() on users. It is just some more work to get this reasonably correct.