第一个进入的线程如何向其他并发线程发出同一方法结束的信号?
第一个进入的线程如何向其他并发线程发出同一方法结束的信号?
我有名为 PollDPRAM() 的方法。它必须通过网络访问一些速度较慢的硬件并刷新对象私有数据。如果其他线程同时调用相同的方法,则它们不能执行该行程,而是等待第一个到来的线程完成作业并简单地退出,因为数据是新鲜的(比如说10-30毫秒前没有什么区别) 。 在方法中很容易检测到第二个、第三个等线程没有首先进入。我使用互锁计数器来检测并发性。
问题:我做出了一个糟糕的选择,通过观察计数器(Interlocked.Read)来检测第一个线程的退出,以在计数器减少到小于在 n>1 线程入口处检测到的值后进行观察。这个选择很糟糕,因为第一个线程在离开后几乎可以立即重新进入该方法。因此,n>1 个线程永远不会检测到计数器中的下降。
所以问题: 如何正确检测第一个进入的线程已退出该方法,即使第一个线程可以立即再次进入该方法?
你
谢谢 一段代码
private void pollMotorsData()
{
// Execute single poll with "foreground" handshaking
DateTime start = DateTime.Now;
byte retryCount = 0;
// Pick old data atomically to detect change
uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
bool changeDetected = false;
// The design goal of DPRAM is to ease the bottleneck
// Here is a sensor if bottleneck is actually that tight
long parallelThreads = Interlocked.Increment(ref this.motorsPollThreadCount);
try
{
// For first thread entering the counter will be 1
if (parallelThreads <= 1)
{
do
{
// Handshake signal to DPRAM write process on controller side that host PC is reading
this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
try
{
bool canReadMotors = false;
byte[] canReadFrozenDataFlag = new byte[2];
do
{
this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
if (canReadMotors) break;
retryCount++;
Thread.Sleep(1);
} while (retryCount < 10);
if (!canReadMotors)
{
throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
}
// The lock is meaningless in contructor as it is certainly single threaded
// but for practice sake the access to data should always be serialized
lock (motorsDataLock)
{
// Obtain fresh content of DPRAM
this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
this.motorsDataBorn = DateTime.Now;
}
}
finally
{
// Handshake signal to DPRAM write process on controller side that host PC has finished reading
this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
}
// Check live change in a separate atom
changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
} while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));
// Assert that result is live
if (!changeDetected)
{
throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
}
}
else
{
// OK. Bottleneck ! The concurrent polls have collided
// Give the controller a breathe by waiting for other thread do the job
// Avoid aggressive polling of stale data, which is not able to be written, locked by reader
// Just wait for other thread do whole polling job and return with no action
// because the data is milliseconds fresh
do
{
// Amount of parallel threads must eventually decrease
// But no thread will leave and decrease the counter until job is done
if (Interlocked.Read(ref this.motorsPollThreadCount) < parallelThreads)
{
// Return is possible because decreased value of concurrentThreads means that
// this very time other thread has finished the poll 1 millisecond ago at most
return;
}
Thread.Sleep(1);
retryCount++;
} while ((DateTime.Now - start).TotalMilliseconds < 255);
throw new DeltaTauControllerException(this.controller, "Timeout 255ms waiting on concurrent thread to complete DPRAM polling");
}
}
finally
{
// Signal to other threads that work is done
Interlocked.Decrement(ref this.motorsPollThreadCount);
// Trace the timing and bottleneck situations
TimeSpan duration = DateTime.Now - start;
if (duration.TotalMilliseconds > 50 || parallelThreads > 1 || retryCount > 0)
{
Trace.WriteLine(string.Format("Controller {0}, DPRAM poll {1:0} ms, threads {2}, retries {3}",
this.controller.number,
duration.TotalMilliseconds,
parallelThreads,
retryCount));
}
}
}
How first entered thread can signal to other concurrent threads the end of same method ?
I have method named say PollDPRAM(). It must make a trip over network to some slow hardware and refresh object private data. If the same method is simultaneously called by other threads, they must not do the trip, but wait for first coming thread to complete the job and simply exit, because the data is fresh (say 10-30 ms ago does not make a difference).
Its easy to detect in method that second, 3rd etc threads are not first entered. I use Interlocked counter to detect concurrency.
Problem: I made a poor choice to detect the exit of first thread by watching the counter (Interlocked.Read) to watch after the decrease of counter to value less than it was detected at entrance of n>1 thread. The choice is bad, because the first thread can reenter the method again nearly immediately after it leaves. So the n>1 threads will never detect dip in counter.
So question:
How to correctly detect that first entered thread has exited the method, even if this first thread can immediately enter it again ?
Thank you
P.S. Piece of code
private void pollMotorsData()
{
// Execute single poll with "foreground" handshaking
DateTime start = DateTime.Now;
byte retryCount = 0;
// Pick old data atomically to detect change
uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
bool changeDetected = false;
// The design goal of DPRAM is to ease the bottleneck
// Here is a sensor if bottleneck is actually that tight
long parallelThreads = Interlocked.Increment(ref this.motorsPollThreadCount);
try
{
// For first thread entering the counter will be 1
if (parallelThreads <= 1)
{
do
{
// Handshake signal to DPRAM write process on controller side that host PC is reading
this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
try
{
bool canReadMotors = false;
byte[] canReadFrozenDataFlag = new byte[2];
do
{
this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
if (canReadMotors) break;
retryCount++;
Thread.Sleep(1);
} while (retryCount < 10);
if (!canReadMotors)
{
throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
}
// The lock is meaningless in contructor as it is certainly single threaded
// but for practice sake the access to data should always be serialized
lock (motorsDataLock)
{
// Obtain fresh content of DPRAM
this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
this.motorsDataBorn = DateTime.Now;
}
}
finally
{
// Handshake signal to DPRAM write process on controller side that host PC has finished reading
this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
}
// Check live change in a separate atom
changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
} while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));
// Assert that result is live
if (!changeDetected)
{
throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
}
}
else
{
// OK. Bottleneck ! The concurrent polls have collided
// Give the controller a breathe by waiting for other thread do the job
// Avoid aggressive polling of stale data, which is not able to be written, locked by reader
// Just wait for other thread do whole polling job and return with no action
// because the data is milliseconds fresh
do
{
// Amount of parallel threads must eventually decrease
// But no thread will leave and decrease the counter until job is done
if (Interlocked.Read(ref this.motorsPollThreadCount) < parallelThreads)
{
// Return is possible because decreased value of concurrentThreads means that
// this very time other thread has finished the poll 1 millisecond ago at most
return;
}
Thread.Sleep(1);
retryCount++;
} while ((DateTime.Now - start).TotalMilliseconds < 255);
throw new DeltaTauControllerException(this.controller, "Timeout 255ms waiting on concurrent thread to complete DPRAM polling");
}
}
finally
{
// Signal to other threads that work is done
Interlocked.Decrement(ref this.motorsPollThreadCount);
// Trace the timing and bottleneck situations
TimeSpan duration = DateTime.Now - start;
if (duration.TotalMilliseconds > 50 || parallelThreads > 1 || retryCount > 0)
{
Trace.WriteLine(string.Format("Controller {0}, DPRAM poll {1:0} ms, threads {2}, retries {3}",
this.controller.number,
duration.TotalMilliseconds,
parallelThreads,
retryCount));
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
同步方法并在方法内部检查上次完成网络访问的时间记录,以确定是否需要再次进行。
Synchronize the method and inside the method check a record of the time that the network access was last done to determine if it needs to be done again.
您可以使用“lock”关键字支持的 C# 监视器类。
基本上,您的方法可以包装在 lock(lockobj) { CallMethod() } 中,
这将为您提供保护,假设所有线程都在同一进程中。
如果需要跨进程锁定,则需要使用互斥锁。
至于你的程序,我会考虑将静态时间戳和缓存值放入你的方法中。所以当方法进入时,如果timestamp在我可接受的范围内,则返回缓存的值,否则简单地执行fetch。与锁定机制相结合,这应该可以满足您的需要。
当然,这是假设 C# 监视器上占用和阻塞的时间不会影响应用程序的性能。
更新:
我已经更新了您的代码,以向您展示我关于使用缓存和时间戳的含义。我假设你的“motorsData”变量是从电机轮询返回的东西,因此我没有它的变量。但是,如果我误解了,只需添加一个变量来存储从代码返回的数据。请注意,我没有为您进行任何错误检查,因此您需要处理异常。
You can use the C# monitor classes which are supported by the "lock" keyword.
Basically your method can be wrapped in lock(lockobj) { CallMethod() }
This will give you protection, assuming all threads are in the same process.
You will need to use a Mutex if you need to lock across processes.
As for your program I would look at putting a static timestamp and cached value into your method. So when the method enters, if timestamp is within my acceptable range, return the cached value, otherwise simply perform the fetch. Combined with a locking mechanism this should do what you need it to.
Of course this assumes that the time to take and block on the C# monitor is not going to affect the performance of your app.
UPDATE:
I've updated your code to show you what I meant about using a cache and timestamp. I have assumed that your "motorsData" variable is the thing that is returned from the motor polling and as such I don't have a variable for it. However if I"ve misunderstood, simply add a variable that stores the data after it is returned from the code. Note I haven't done any error checking for you so you need to deal with your exceptions.
有很多不同的方法可以做到这一点。正如有人已经提到的那样,您可以使用关键部分,但是如果另一个线程阻塞,这不会给您“直接退出”的行为。为此你需要某种标志。您可以使用易失性布尔值并锁定该布尔值的访问,或者您可以使用具有单个计数的信号量。最后你可以使用互斥锁。使用同步对象的好处是您可以执行 WaitForSingleObject 并将超时设置为 0。然后您可以检查等待是否成功(如果成功则第一个线程已退出)或失败(在这种情况下第一个线程是仍在运行)。
Theres lots of different ways you could do this. You could use a critical section, as someone has already mentioned, but that won't give you the behavior of "just exit" if the other thread is blocking. For that you need some kind of flag. You could go with a volatile bool and lock around access of that bool, or you could use a semaphore with a single count. Finally you can use a mutex. The benefit of using the synchronization objects is you can do a WaitForSingleObject and set the timeout to 0. Then you can check if the wait was successful (if it is then the first thread has exited) or not (in which case the first thread is still running).