如何同步操作?

发布于 2024-10-31 04:22:53 字数 12128 浏览 5 评论 0原文

我有这个两阶段锁定的实现。问题是它在大多数情况下都能完美工作,但并非在所有情况下都可以。我发现问题是由于我使用 Synchronized 造成的,而 Synchronized 的使用方式是错误的!我需要的操作顺序如下:

  1. 解锁任何锁的线程应该在任何其他线程开始锁定同一锁之前完全完成此阶段(ul 方法)。
  2. 唤醒时等待锁的线程应该在开始执行其操作之前获取该锁的所有等待者。
  3. 任何 addEdge 或 removeEdge 都需要同步,因为我对所有线程使用相同的图形对象。

这是代码:

class LockTable{
     private HashMap<Integer,MyLock> locks;
     public LockTable(){
         locks= new HashMap<Integer,MyLock>();   
    }

/*******************************************************************************
 * Acquire a shared/read lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void rl (int tid, int oid) throws InterruptedException
{

    MyLock lock=null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock

            if((lock != null) && (lock.lock.isWriteLocked())){

               wait = true;
            }
            if(lock == null){
             getIt = true;
             lock = new MyLock(tid, true);
             lock.lock.readLock().lock();
             lock.readers.add(tid);
             locks.put(oid, lock);
          }

        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
    }//synch
    if (wait){
        synchronized(this){
            //System.out.println("Transaction " + tid + " is waiting..");
            lock.waiters.add(tid);
            lock.waitersType.add('s');
            Main.g.addEdge(tid, lock.tid);
            //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
        }//sync
         if(Main.g.hasCycle()){
            //System.out.println("Detect Cycle in rl..Executing Restart");
            restart(tid);
        }
        //to exclude the restarted thread
        if(!Main.trans[tid].terminate){
            lock.lock.readLock().lock();
            lock.readers.add(tid);
          synchronized(this){
            lock.waitersType.remove(lock.waiters.indexOf(tid));
            lock.waiters.remove(lock.waiters.indexOf(tid));
            }//sync
            for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
            }//for
        }//if terminate
        else
            return;
    }
   else
    if(!getIt){
              //System.out.println("Getting Shared Lock without Waiting");
              lock.lock.readLock().lock();
              lock.readers.add(tid);
              for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
              } 
    }

} // rl

/*******************************************************************************
 * Acquire an exclusive/write lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void wl (int tid, int oid) throws InterruptedException
{
     //type to determine the last lock type in order
     //to be able to remove the edges from waitfor graph
    int type = 0;
    MyLock lock = null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock
            if(lock != null && (lock.lock.isWriteLocked() || lock.readers.size() > 0))
            {
                wait = true;
            }
            if(lock == null){
                getIt = true;
                lock = new MyLock(tid);
                lock.lock.writeLock().lock();
                locks.put(oid,lock);
            }
        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
  }
     if (wait){
            //System.out.println("Transaction " + tid + " is waiting..");
           synchronized(this) {
            if(lock.lock.isWriteLocked()){
                Main.g.addEdge(tid, lock.tid);
                //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
               }
            else{
                type = 1;
                for(int reader : lock.readers){
                     Main.g.addEdge(tid, reader);
                     //System.out.println("Edge has been added "+tid + "==>" + reader);
                }
                }//else
            if(Main.g.hasCycle()){
               //System.out.println("Detect Cycle");
               restart(tid);
             }//if
            //System.out.println("Graph: "+Main.g.toString());
         }//sync



       if(!Main.trans[tid].terminate){
            synchronized(this){
            lock.waiters.add(tid);
            lock.waitersType.add('w');
           }
            //System.out.println("I'm waiting here in wl");
            lock.lock.writeLock().lock();
            lock.tid = tid;
            //System.out.println("Wakeup..");
            synchronized(this){
                lock.waitersType.remove(lock.waiters.indexOf(tid));
                lock.waiters.remove(lock.waiters.indexOf(tid));
                //System.out.println("the number of waiters after wakeup: " + lock.waiters.size());
           }//sync
                for(int i =0 ; i < lock.waiters.size();i++){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                   // System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    //System.out.println("Graph: "+Main.g.toString());
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//for
         }
        else
            return; 
    }// if(wait) ==> for the lock to be released
    else
        if(!getIt){
            lock.lock.writeLock().lock();
            lock.tid = tid;
            for(int i =0 ; i < lock.waiters.size();i++){
                synchronized(Main.g){
                Main.g.addEdge(lock.waiters.get(i), tid);
                //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                }
                if(Main.g.hasCycle())
                    restart(lock.waiters.get(i));
            }//for

     }

} // wl

void restart(int tid){
 synchronized(this) {
    Main.rollBack++;
    MyLock lock;
    List<Integer> toRemove = new ArrayList();
    for(int i : locks.keySet()){
       lock = locks.get(i);
       //for all the locks in the lock table delete the restarted thread from the waiters list
       if(lock.waiters.contains(tid)){
           lock.waitersType.remove(lock.waiters.indexOf(tid));
           lock.waiters.remove(lock.waiters.indexOf(tid));
        }

           //lock.sem.release();
           if(lock.lock.isWriteLockedByCurrentThread()){


               //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), lock.tid);
               //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
               lock.lock.writeLock().unlock();
                //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());
               toRemove.add(i);

           }
       if(!lock.lock.isWriteLocked())
           if(lock.readers.contains(tid) && lock.lock.getReadLockCount()>0){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), tid);
                   // lock.numberOfReaders --;
                  //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
                  lock.readers.remove(lock.readers.indexOf(tid));
                  lock.lock.readLock().unlock();
                  //System.out.println("number of write holders: " + lock.lock.getWriteHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
                  toRemove.add(i);  

              }//if
    }//for
    Main.g.removeEdges(tid);
    Main.trans[tid].terminate = true;
    //System.out.println("Transaction" + tid + " restarted");

    }//sync
}

/*******************************************************************************
 * Unlock/release the lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void ul (int tid, int oid)
{
   MyLock lock = null;
    boolean error = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);                    // find the lock
            if( lock == null)
                System.out.println("println: lock not found");
        } catch(Exception e) {
            System.out.println("lock not found");   // lock not found
        } // try
    }
        if((lock != null) && (lock.lock.isWriteLockedByCurrentThread())){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), lock.tid);
                      }//sync
                   //System.out.println("tid: " + tid + " unlock object: " + oid);
                   lock.lock.writeLock().unlock();
                  //print out 
                  //System.out.println("done with unlock");
                  //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
      }// if lock != null
        else
          if((lock != null) && (lock.readers.size()>0)){
              synchronized(this){
              if(lock.readers.contains(tid)){
                lock.readers.remove(lock.readers.indexOf(tid));
                }
                //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), tid);
                      }
                lock.lock.readLock().unlock();
                //System.out.println("Transaction"+tid+" unlocked shared lock on object "+oid);
                //System.out.println("number of write holders: " + lock.lock.readLock().);
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());

              }//if lock.readers
          }//if
    if (error) 
        System.out.println ("Error: ul: no lock for oid = " + oid + " found/owned");
} // ul

I have this implementation for two-phase locking. The problem is it works perfect in most of the scenarios but not all of them. I figure out the problem come because of my usage of Synchronized which is used in wrong way some how! I need the order of operations to be like this:

  1. The thread who unlock any lock should finish this phase (ul method) completely before any other thread start locking the same lock.
  2. The thread who was waiting for a lock when it wakes up should get all the waiters of that lock before it starts executing it's operations.
  3. any addEdge or removeEdge need to be synchronized because I'm using the same graph object with all threads.

Here is the code:

class LockTable{
     private HashMap<Integer,MyLock> locks;
     public LockTable(){
         locks= new HashMap<Integer,MyLock>();   
    }

/*******************************************************************************
 * Acquire a shared/read lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void rl (int tid, int oid) throws InterruptedException
{

    MyLock lock=null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock

            if((lock != null) && (lock.lock.isWriteLocked())){

               wait = true;
            }
            if(lock == null){
             getIt = true;
             lock = new MyLock(tid, true);
             lock.lock.readLock().lock();
             lock.readers.add(tid);
             locks.put(oid, lock);
          }

        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
    }//synch
    if (wait){
        synchronized(this){
            //System.out.println("Transaction " + tid + " is waiting..");
            lock.waiters.add(tid);
            lock.waitersType.add('s');
            Main.g.addEdge(tid, lock.tid);
            //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
        }//sync
         if(Main.g.hasCycle()){
            //System.out.println("Detect Cycle in rl..Executing Restart");
            restart(tid);
        }
        //to exclude the restarted thread
        if(!Main.trans[tid].terminate){
            lock.lock.readLock().lock();
            lock.readers.add(tid);
          synchronized(this){
            lock.waitersType.remove(lock.waiters.indexOf(tid));
            lock.waiters.remove(lock.waiters.indexOf(tid));
            }//sync
            for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
            }//for
        }//if terminate
        else
            return;
    }
   else
    if(!getIt){
              //System.out.println("Getting Shared Lock without Waiting");
              lock.lock.readLock().lock();
              lock.readers.add(tid);
              for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
              } 
    }

} // rl

/*******************************************************************************
 * Acquire an exclusive/write lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void wl (int tid, int oid) throws InterruptedException
{
     //type to determine the last lock type in order
     //to be able to remove the edges from waitfor graph
    int type = 0;
    MyLock lock = null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock
            if(lock != null && (lock.lock.isWriteLocked() || lock.readers.size() > 0))
            {
                wait = true;
            }
            if(lock == null){
                getIt = true;
                lock = new MyLock(tid);
                lock.lock.writeLock().lock();
                locks.put(oid,lock);
            }
        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
  }
     if (wait){
            //System.out.println("Transaction " + tid + " is waiting..");
           synchronized(this) {
            if(lock.lock.isWriteLocked()){
                Main.g.addEdge(tid, lock.tid);
                //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
               }
            else{
                type = 1;
                for(int reader : lock.readers){
                     Main.g.addEdge(tid, reader);
                     //System.out.println("Edge has been added "+tid + "==>" + reader);
                }
                }//else
            if(Main.g.hasCycle()){
               //System.out.println("Detect Cycle");
               restart(tid);
             }//if
            //System.out.println("Graph: "+Main.g.toString());
         }//sync



       if(!Main.trans[tid].terminate){
            synchronized(this){
            lock.waiters.add(tid);
            lock.waitersType.add('w');
           }
            //System.out.println("I'm waiting here in wl");
            lock.lock.writeLock().lock();
            lock.tid = tid;
            //System.out.println("Wakeup..");
            synchronized(this){
                lock.waitersType.remove(lock.waiters.indexOf(tid));
                lock.waiters.remove(lock.waiters.indexOf(tid));
                //System.out.println("the number of waiters after wakeup: " + lock.waiters.size());
           }//sync
                for(int i =0 ; i < lock.waiters.size();i++){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                   // System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    //System.out.println("Graph: "+Main.g.toString());
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//for
         }
        else
            return; 
    }// if(wait) ==> for the lock to be released
    else
        if(!getIt){
            lock.lock.writeLock().lock();
            lock.tid = tid;
            for(int i =0 ; i < lock.waiters.size();i++){
                synchronized(Main.g){
                Main.g.addEdge(lock.waiters.get(i), tid);
                //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                }
                if(Main.g.hasCycle())
                    restart(lock.waiters.get(i));
            }//for

     }

} // wl

void restart(int tid){
 synchronized(this) {
    Main.rollBack++;
    MyLock lock;
    List<Integer> toRemove = new ArrayList();
    for(int i : locks.keySet()){
       lock = locks.get(i);
       //for all the locks in the lock table delete the restarted thread from the waiters list
       if(lock.waiters.contains(tid)){
           lock.waitersType.remove(lock.waiters.indexOf(tid));
           lock.waiters.remove(lock.waiters.indexOf(tid));
        }

           //lock.sem.release();
           if(lock.lock.isWriteLockedByCurrentThread()){


               //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), lock.tid);
               //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
               lock.lock.writeLock().unlock();
                //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());
               toRemove.add(i);

           }
       if(!lock.lock.isWriteLocked())
           if(lock.readers.contains(tid) && lock.lock.getReadLockCount()>0){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), tid);
                   // lock.numberOfReaders --;
                  //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
                  lock.readers.remove(lock.readers.indexOf(tid));
                  lock.lock.readLock().unlock();
                  //System.out.println("number of write holders: " + lock.lock.getWriteHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
                  toRemove.add(i);  

              }//if
    }//for
    Main.g.removeEdges(tid);
    Main.trans[tid].terminate = true;
    //System.out.println("Transaction" + tid + " restarted");

    }//sync
}

/*******************************************************************************
 * Unlock/release the lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void ul (int tid, int oid)
{
   MyLock lock = null;
    boolean error = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);                    // find the lock
            if( lock == null)
                System.out.println("println: lock not found");
        } catch(Exception e) {
            System.out.println("lock not found");   // lock not found
        } // try
    }
        if((lock != null) && (lock.lock.isWriteLockedByCurrentThread())){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), lock.tid);
                      }//sync
                   //System.out.println("tid: " + tid + " unlock object: " + oid);
                   lock.lock.writeLock().unlock();
                  //print out 
                  //System.out.println("done with unlock");
                  //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
      }// if lock != null
        else
          if((lock != null) && (lock.readers.size()>0)){
              synchronized(this){
              if(lock.readers.contains(tid)){
                lock.readers.remove(lock.readers.indexOf(tid));
                }
                //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), tid);
                      }
                lock.lock.readLock().unlock();
                //System.out.println("Transaction"+tid+" unlocked shared lock on object "+oid);
                //System.out.println("number of write holders: " + lock.lock.readLock().);
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());

              }//if lock.readers
          }//if
    if (error) 
        System.out.println ("Error: ul: no lock for oid = " + oid + " found/owned");
} // ul

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

别再吹冷风 2024-11-07 04:22:53

使用 synchronized 时存在一些问题。

  1. 除了极少数情况外,对共享对象的所有读/写(尤其是写)访问都必须在同一对象上同步。当您在某些地方写入时,您正在同步 Main.g ,但也有一些地方不同步(例如在 if(wait) 内) wl 方法)。

  2. 据我了解您的代码的目标,lock.{waiters|readers} 的状态必须与图表的状态同步Main.g)。这意味着每当您对 waiters 进行更改,然后相应地更新 Main.g 时,这两个操作都必须以原子方式执行。为此,您需要将它们包装在一个完整的 synchronized 语句中。我可以看到你理解这个概念,因为你在某些地方这样做了,但你似乎在其他地方错过了它。例如:在 rl 方法中,在 if(!getIt){ 内更新 lock.readers,然后更新 Main.g< /code> 没有任何同步。

总的来说,我不能给你任何具体的信息,因为代码非常复杂,所以很难说出它的意图是什么。但我认为您可以通过锁定代码的较大部分来解决一些锁定问题。例如,您可以通过添加 synchronized 来同步 this 上的整个方法 rlwlul 关键字添加到方法声明中,并摆脱方法内的所有细粒度锁定。细粒度锁定可以为您提供更好的性能,但代价是高复杂性。我建议您首先从更简单的锁定方案开始,然后逐渐改进它们(如果您确实需要的话!)。

There are couple of problems with your usage of synchronized.

  1. Except very rare cases, all read/write (especially write) access to a shared object must be synchronized on the same object. You are synchronizing on Main.g when you write to it in some places, but then there are places where it's not synchronized (e.g inside if(wait) of wl method).

  2. As far as I understood the objective of your code, the state of lock.{waiters|readers} has to be in sync with the graph's state (Main.g). This means that whenever you make a change to, say, waiters and then update Main.g accordingly, those two operations must be performed atomically. To do this you need to wrap them in one un-broken synchronized statement. I can see that you understand this concept, because you are doing this in some places, but then you seem to missing it out in others. For example: in rl method, inside the if(!getIt){ you update lock.readers and then Main.g without any synchronization.

Overall, I cannot give you anything specific, since the code is quite complicated so it's hard to tell what is its intention. But I think you can resolve some locking problems by locking larger sections of your code. For example you could just synchronize the whole methods rl, wl and ul on this by adding synchronized keyword to the method declaration, and get rid of all fine-grained locking inside the methods. Fine-grained locking can give you better performance, but at a cost of high complexity. I would recommend you start with simpler locking schemes first, and then gradually improve them (if you really really need to!).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文