如何同步操作?
我有这个两阶段锁定的实现。问题是它在大多数情况下都能完美工作,但并非在所有情况下都可以。我发现问题是由于我使用 Synchronized 造成的,而 Synchronized 的使用方式是错误的!我需要的操作顺序如下:
- 解锁任何锁的线程应该在任何其他线程开始锁定同一锁之前完全完成此阶段(ul 方法)。
- 唤醒时等待锁的线程应该在开始执行其操作之前获取该锁的所有等待者。
- 任何 addEdge 或 removeEdge 都需要同步,因为我对所有线程使用相同的图形对象。
这是代码:
class LockTable{
private HashMap<Integer,MyLock> locks;
public LockTable(){
locks= new HashMap<Integer,MyLock>();
}
/*******************************************************************************
* Acquire a shared/read lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void rl (int tid, int oid) throws InterruptedException
{
MyLock lock=null;
boolean wait = false;
boolean getIt = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if((lock != null) && (lock.lock.isWriteLocked())){
wait = true;
}
if(lock == null){
getIt = true;
lock = new MyLock(tid, true);
lock.lock.readLock().lock();
lock.readers.add(tid);
locks.put(oid, lock);
}
} catch(Exception e) {
System.out.println(e.getStackTrace()); // lock not found, so oid is not locked;
} // try
}//synch
if (wait){
synchronized(this){
//System.out.println("Transaction " + tid + " is waiting..");
lock.waiters.add(tid);
lock.waitersType.add('s');
Main.g.addEdge(tid, lock.tid);
//System.out.println("Edge has been added "+tid + "==>" + lock.tid);
}//sync
if(Main.g.hasCycle()){
//System.out.println("Detect Cycle in rl..Executing Restart");
restart(tid);
}
//to exclude the restarted thread
if(!Main.trans[tid].terminate){
lock.lock.readLock().lock();
lock.readers.add(tid);
synchronized(this){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
}//sync
for(int i =0 ; i < lock.waiters.size();i++){
if(lock.waitersType.get(i) == 'w'){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}//sync
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//if lock.waitersType
}//for
}//if terminate
else
return;
}
else
if(!getIt){
//System.out.println("Getting Shared Lock without Waiting");
lock.lock.readLock().lock();
lock.readers.add(tid);
for(int i =0 ; i < lock.waiters.size();i++){
if(lock.waitersType.get(i) == 'w'){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//if lock.waitersType
}
}
} // rl
/*******************************************************************************
* Acquire an exclusive/write lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void wl (int tid, int oid) throws InterruptedException
{
//type to determine the last lock type in order
//to be able to remove the edges from waitfor graph
int type = 0;
MyLock lock = null;
boolean wait = false;
boolean getIt = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if(lock != null && (lock.lock.isWriteLocked() || lock.readers.size() > 0))
{
wait = true;
}
if(lock == null){
getIt = true;
lock = new MyLock(tid);
lock.lock.writeLock().lock();
locks.put(oid,lock);
}
} catch(Exception e) {
System.out.println(e.getStackTrace()); // lock not found, so oid is not locked;
} // try
}
if (wait){
//System.out.println("Transaction " + tid + " is waiting..");
synchronized(this) {
if(lock.lock.isWriteLocked()){
Main.g.addEdge(tid, lock.tid);
//System.out.println("Edge has been added "+tid + "==>" + lock.tid);
}
else{
type = 1;
for(int reader : lock.readers){
Main.g.addEdge(tid, reader);
//System.out.println("Edge has been added "+tid + "==>" + reader);
}
}//else
if(Main.g.hasCycle()){
//System.out.println("Detect Cycle");
restart(tid);
}//if
//System.out.println("Graph: "+Main.g.toString());
}//sync
if(!Main.trans[tid].terminate){
synchronized(this){
lock.waiters.add(tid);
lock.waitersType.add('w');
}
//System.out.println("I'm waiting here in wl");
lock.lock.writeLock().lock();
lock.tid = tid;
//System.out.println("Wakeup..");
synchronized(this){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
//System.out.println("the number of waiters after wakeup: " + lock.waiters.size());
}//sync
for(int i =0 ; i < lock.waiters.size();i++){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
// System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
//System.out.println("Graph: "+Main.g.toString());
}//sync
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//for
}
else
return;
}// if(wait) ==> for the lock to be released
else
if(!getIt){
lock.lock.writeLock().lock();
lock.tid = tid;
for(int i =0 ; i < lock.waiters.size();i++){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//for
}
} // wl
void restart(int tid){
synchronized(this) {
Main.rollBack++;
MyLock lock;
List<Integer> toRemove = new ArrayList();
for(int i : locks.keySet()){
lock = locks.get(i);
//for all the locks in the lock table delete the restarted thread from the waiters list
if(lock.waiters.contains(tid)){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
}
//lock.sem.release();
if(lock.lock.isWriteLockedByCurrentThread()){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int j=0;j<lock.waiters.size();j++)
Main.g.removeEdge(lock.waiters.get(j), lock.tid);
//System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
lock.lock.writeLock().unlock();
//System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
toRemove.add(i);
}
if(!lock.lock.isWriteLocked())
if(lock.readers.contains(tid) && lock.lock.getReadLockCount()>0){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int j=0;j<lock.waiters.size();j++)
Main.g.removeEdge(lock.waiters.get(j), tid);
// lock.numberOfReaders --;
//System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
lock.readers.remove(lock.readers.indexOf(tid));
lock.lock.readLock().unlock();
//System.out.println("number of write holders: " + lock.lock.getWriteHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
toRemove.add(i);
}//if
}//for
Main.g.removeEdges(tid);
Main.trans[tid].terminate = true;
//System.out.println("Transaction" + tid + " restarted");
}//sync
}
/*******************************************************************************
* Unlock/release the lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void ul (int tid, int oid)
{
MyLock lock = null;
boolean error = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if( lock == null)
System.out.println("println: lock not found");
} catch(Exception e) {
System.out.println("lock not found"); // lock not found
} // try
}
if((lock != null) && (lock.lock.isWriteLockedByCurrentThread())){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int i=0;i<lock.waiters.size();i++)
synchronized(Main.g){
Main.g.removeEdge(lock.waiters.get(i), lock.tid);
}//sync
//System.out.println("tid: " + tid + " unlock object: " + oid);
lock.lock.writeLock().unlock();
//print out
//System.out.println("done with unlock");
//System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
}// if lock != null
else
if((lock != null) && (lock.readers.size()>0)){
synchronized(this){
if(lock.readers.contains(tid)){
lock.readers.remove(lock.readers.indexOf(tid));
}
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int i=0;i<lock.waiters.size();i++)
synchronized(Main.g){
Main.g.removeEdge(lock.waiters.get(i), tid);
}
lock.lock.readLock().unlock();
//System.out.println("Transaction"+tid+" unlocked shared lock on object "+oid);
//System.out.println("number of write holders: " + lock.lock.readLock().);
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
}//if lock.readers
}//if
if (error)
System.out.println ("Error: ul: no lock for oid = " + oid + " found/owned");
} // ul
I have this implementation for two-phase locking. The problem is it works perfect in most of the scenarios but not all of them. I figure out the problem come because of my usage of Synchronized which is used in wrong way some how! I need the order of operations to be like this:
- The thread who unlock any lock should finish this phase (ul method) completely before any other thread start locking the same lock.
- The thread who was waiting for a lock when it wakes up should get all the waiters of that lock before it starts executing it's operations.
- any addEdge or removeEdge need to be synchronized because I'm using the same graph object with all threads.
Here is the code:
class LockTable{
private HashMap<Integer,MyLock> locks;
public LockTable(){
locks= new HashMap<Integer,MyLock>();
}
/*******************************************************************************
* Acquire a shared/read lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void rl (int tid, int oid) throws InterruptedException
{
MyLock lock=null;
boolean wait = false;
boolean getIt = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if((lock != null) && (lock.lock.isWriteLocked())){
wait = true;
}
if(lock == null){
getIt = true;
lock = new MyLock(tid, true);
lock.lock.readLock().lock();
lock.readers.add(tid);
locks.put(oid, lock);
}
} catch(Exception e) {
System.out.println(e.getStackTrace()); // lock not found, so oid is not locked;
} // try
}//synch
if (wait){
synchronized(this){
//System.out.println("Transaction " + tid + " is waiting..");
lock.waiters.add(tid);
lock.waitersType.add('s');
Main.g.addEdge(tid, lock.tid);
//System.out.println("Edge has been added "+tid + "==>" + lock.tid);
}//sync
if(Main.g.hasCycle()){
//System.out.println("Detect Cycle in rl..Executing Restart");
restart(tid);
}
//to exclude the restarted thread
if(!Main.trans[tid].terminate){
lock.lock.readLock().lock();
lock.readers.add(tid);
synchronized(this){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
}//sync
for(int i =0 ; i < lock.waiters.size();i++){
if(lock.waitersType.get(i) == 'w'){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}//sync
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//if lock.waitersType
}//for
}//if terminate
else
return;
}
else
if(!getIt){
//System.out.println("Getting Shared Lock without Waiting");
lock.lock.readLock().lock();
lock.readers.add(tid);
for(int i =0 ; i < lock.waiters.size();i++){
if(lock.waitersType.get(i) == 'w'){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//if lock.waitersType
}
}
} // rl
/*******************************************************************************
* Acquire an exclusive/write lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void wl (int tid, int oid) throws InterruptedException
{
//type to determine the last lock type in order
//to be able to remove the edges from waitfor graph
int type = 0;
MyLock lock = null;
boolean wait = false;
boolean getIt = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if(lock != null && (lock.lock.isWriteLocked() || lock.readers.size() > 0))
{
wait = true;
}
if(lock == null){
getIt = true;
lock = new MyLock(tid);
lock.lock.writeLock().lock();
locks.put(oid,lock);
}
} catch(Exception e) {
System.out.println(e.getStackTrace()); // lock not found, so oid is not locked;
} // try
}
if (wait){
//System.out.println("Transaction " + tid + " is waiting..");
synchronized(this) {
if(lock.lock.isWriteLocked()){
Main.g.addEdge(tid, lock.tid);
//System.out.println("Edge has been added "+tid + "==>" + lock.tid);
}
else{
type = 1;
for(int reader : lock.readers){
Main.g.addEdge(tid, reader);
//System.out.println("Edge has been added "+tid + "==>" + reader);
}
}//else
if(Main.g.hasCycle()){
//System.out.println("Detect Cycle");
restart(tid);
}//if
//System.out.println("Graph: "+Main.g.toString());
}//sync
if(!Main.trans[tid].terminate){
synchronized(this){
lock.waiters.add(tid);
lock.waitersType.add('w');
}
//System.out.println("I'm waiting here in wl");
lock.lock.writeLock().lock();
lock.tid = tid;
//System.out.println("Wakeup..");
synchronized(this){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
//System.out.println("the number of waiters after wakeup: " + lock.waiters.size());
}//sync
for(int i =0 ; i < lock.waiters.size();i++){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
// System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
//System.out.println("Graph: "+Main.g.toString());
}//sync
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//for
}
else
return;
}// if(wait) ==> for the lock to be released
else
if(!getIt){
lock.lock.writeLock().lock();
lock.tid = tid;
for(int i =0 ; i < lock.waiters.size();i++){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//for
}
} // wl
void restart(int tid){
synchronized(this) {
Main.rollBack++;
MyLock lock;
List<Integer> toRemove = new ArrayList();
for(int i : locks.keySet()){
lock = locks.get(i);
//for all the locks in the lock table delete the restarted thread from the waiters list
if(lock.waiters.contains(tid)){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
}
//lock.sem.release();
if(lock.lock.isWriteLockedByCurrentThread()){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int j=0;j<lock.waiters.size();j++)
Main.g.removeEdge(lock.waiters.get(j), lock.tid);
//System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
lock.lock.writeLock().unlock();
//System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
toRemove.add(i);
}
if(!lock.lock.isWriteLocked())
if(lock.readers.contains(tid) && lock.lock.getReadLockCount()>0){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int j=0;j<lock.waiters.size();j++)
Main.g.removeEdge(lock.waiters.get(j), tid);
// lock.numberOfReaders --;
//System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
lock.readers.remove(lock.readers.indexOf(tid));
lock.lock.readLock().unlock();
//System.out.println("number of write holders: " + lock.lock.getWriteHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
toRemove.add(i);
}//if
}//for
Main.g.removeEdges(tid);
Main.trans[tid].terminate = true;
//System.out.println("Transaction" + tid + " restarted");
}//sync
}
/*******************************************************************************
* Unlock/release the lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void ul (int tid, int oid)
{
MyLock lock = null;
boolean error = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if( lock == null)
System.out.println("println: lock not found");
} catch(Exception e) {
System.out.println("lock not found"); // lock not found
} // try
}
if((lock != null) && (lock.lock.isWriteLockedByCurrentThread())){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int i=0;i<lock.waiters.size();i++)
synchronized(Main.g){
Main.g.removeEdge(lock.waiters.get(i), lock.tid);
}//sync
//System.out.println("tid: " + tid + " unlock object: " + oid);
lock.lock.writeLock().unlock();
//print out
//System.out.println("done with unlock");
//System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
}// if lock != null
else
if((lock != null) && (lock.readers.size()>0)){
synchronized(this){
if(lock.readers.contains(tid)){
lock.readers.remove(lock.readers.indexOf(tid));
}
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int i=0;i<lock.waiters.size();i++)
synchronized(Main.g){
Main.g.removeEdge(lock.waiters.get(i), tid);
}
lock.lock.readLock().unlock();
//System.out.println("Transaction"+tid+" unlocked shared lock on object "+oid);
//System.out.println("number of write holders: " + lock.lock.readLock().);
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
}//if lock.readers
}//if
if (error)
System.out.println ("Error: ul: no lock for oid = " + oid + " found/owned");
} // ul
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
使用
synchronized
时存在一些问题。除了极少数情况外,对共享对象的所有读/写(尤其是写)访问都必须在同一对象上同步。当您在某些地方写入时,您正在同步
Main.g
,但也有一些地方不同步(例如在的
方法)。if(wait)
内) wl据我了解您的代码的目标,
lock.{waiters|readers}
的状态必须与图表的状态同步(Main.g
)。这意味着每当您对waiters
进行更改,然后相应地更新Main.g
时,这两个操作都必须以原子方式执行。为此,您需要将它们包装在一个完整的synchronized
语句中。我可以看到你理解这个概念,因为你在某些地方这样做了,但你似乎在其他地方错过了它。例如:在rl
方法中,在if(!getIt){
内更新lock.readers
,然后更新Main.g< /code> 没有任何同步。
总的来说,我不能给你任何具体的信息,因为代码非常复杂,所以很难说出它的意图是什么。但我认为您可以通过锁定代码的较大部分来解决一些锁定问题。例如,您可以通过添加
synchronized 来同步
关键字添加到方法声明中,并摆脱方法内的所有细粒度锁定。细粒度锁定可以为您提供更好的性能,但代价是高复杂性。我建议您首先从更简单的锁定方案开始,然后逐渐改进它们(如果您确实需要的话!)。this
上的整个方法rl
、wl
和ul
There are couple of problems with your usage of
synchronized
.Except very rare cases, all read/write (especially write) access to a shared object must be synchronized on the same object. You are synchronizing on
Main.g
when you write to it in some places, but then there are places where it's not synchronized (e.g insideif(wait)
ofwl
method).As far as I understood the objective of your code, the state of
lock.{waiters|readers}
has to be in sync with the graph's state (Main.g
). This means that whenever you make a change to, say,waiters
and then updateMain.g
accordingly, those two operations must be performed atomically. To do this you need to wrap them in one un-brokensynchronized
statement. I can see that you understand this concept, because you are doing this in some places, but then you seem to missing it out in others. For example: inrl
method, inside theif(!getIt){
you updatelock.readers
and thenMain.g
without any synchronization.Overall, I cannot give you anything specific, since the code is quite complicated so it's hard to tell what is its intention. But I think you can resolve some locking problems by locking larger sections of your code. For example you could just synchronize the whole methods
rl
,wl
andul
onthis
by addingsynchronized
keyword to the method declaration, and get rid of all fine-grained locking inside the methods. Fine-grained locking can give you better performance, but at a cost of high complexity. I would recommend you start with simpler locking schemes first, and then gradually improve them (if you really really need to!).