在 CLR 中使用托管线程和纤程
好的,下面的链接有一个警告,指出该讨论使用了不受支持且未记录的 api。好吧,我正在尝试以任何方式使用代码示例。它大部分有效。关于以下与例外相关的具体问题有什么想法吗?
http://msdn.microsoft.com/en-us/magazine/cc164086.aspx仅供
参考,我对原始示例进行了改进。它维护着一个指向“前一个光纤”的指针。相反,下面更新的示例使用“主纤维”指针,该指针被传递到每个纤维类。这样,它们总是屈服于主纤维。这允许主光纤处理所有其他光纤的调度。其他纤维总是“屈服”回到主纤维。
发布这个问题的原因与在纤程内抛出异常有关。根据该文章,通过使用 CorBindToRunTime API 和 CreateLogicalThreadState()、SwitchOutLogicalThreadState() 等,框架将为每个纤程创建一个托管线程并正确处理异常。
然而,在包含的代码示例中,它有一个 UUnit 测试,该测试尝试在 Fiber 内抛出托管异常并在同一 Fiber 内捕获它。那份柔软的作品。但是在通过记录消息处理它之后,堆栈似乎处于不良状态,因为如果纤程调用任何其他方法,甚至是空方法,整个应用程序都会崩溃。
对我来说,这意味着 SwitchOutLogicalThreadState() 和 SwitchInLogicalThreadState() 可能没有正确使用,或者它们可能没有完成自己的工作。
注意:问题的一个线索是托管代码注销了 Thread.CurrentThread.ManagedThreadId,并且它对于每个纤程都是相同的。这表明 CreateLogicalThreadState() 方法并没有真正像宣传的那样创建新的托管线程。
为了更好地分析这一点,我制作了一个伪代码列表,列出了处理纤程时调用的低级 API 的顺序。请记住,所有纤程都在同一个线程上运行,因此不会同时发生任何事情,这是一个线性逻辑。必要的技巧当然是保存和恢复堆栈。这就是它似乎遇到麻烦的地方。
它一开始只是一个线程,然后转换为 Fiber:
- ConvertThreadToFiber(objptr);
- CreateFiber() // 创建多个 win32 光纤。
现在第一次调用 Fiber,它的启动方法执行以下操作:
- corhost->SwitchOutLogicalThreadState(&cookie);主要的cookie是 保存在堆栈上。
- SwitchToFiber(); // 第一次调用 Fiber 启动方法
- corhost->CreateLogicalThreadState();
- 运行主要的 Fiber 抽象方法。
最终纤程需要屈服回主纤程:
- corhost->SwitchOutLogicalThreadState(&cookie);
- SwitchToFiber(光纤);
- corhost->SwitchInLogicalThreadState(&cookie); // 主纤维 饼干,对吗?
此外,主纤程将恢复预先存在的纤程:
- corhost->SwitchOutLogicalThreadState(&cookie);
- SwitchToFiber(光纤);
- corhost->SwitchInLogicalThreadState(&cookie); // 主要纤维饼干,对吧?
以下是 Fibers.cpp,它包装了托管代码的 Fiber API。
#define _WIN32_WINNT 0x400
#using <mscorlib.dll>
#include <windows.h>
#include <mscoree.h>
#include <iostream>
using namespace std;
#if defined(Yield)
#undef Yield
#endif
#define CORHOST
namespace Fibers {
typedef System::Runtime::InteropServices::GCHandle GCHandle;
VOID CALLBACK unmanaged_fiberproc(PVOID pvoid);
__gc private struct StopFiber {};
enum FiberStateEnum {
FiberCreated, FiberRunning, FiberStopPending, FiberStopped
};
#pragma unmanaged
#if defined(CORHOST)
ICorRuntimeHost *corhost;
void initialize_corhost() {
CorBindToCurrentRuntime(0, CLSID_CorRuntimeHost,
IID_ICorRuntimeHost, (void**) &corhost);
}
#endif
void CorSwitchToFiber(void *fiber) {
#if defined(CORHOST)
DWORD *cookie;
corhost->SwitchOutLogicalThreadState(&cookie);
#endif
SwitchToFiber(fiber);
#if defined(CORHOST)
corhost->SwitchInLogicalThreadState(cookie);
#endif
}
#pragma managed
__gc __abstract public class Fiber : public System::IDisposable {
public:
#if defined(CORHOST)
static Fiber() { initialize_corhost(); }
#endif
Fiber() : state(FiberCreated) {
void *objptr = (void*) GCHandle::op_Explicit(GCHandle::Alloc(this));
fiber = ConvertThreadToFiber(objptr);
mainfiber = fiber;
//System::Console::WriteLine( S"Created main fiber.");
}
Fiber(Fiber *_mainfiber) : state(FiberCreated) {
void *objptr = (void*) GCHandle::op_Explicit(GCHandle::Alloc(this));
fiber = CreateFiber(0, unmanaged_fiberproc, objptr);
mainfiber = _mainfiber->fiber;
//System::Console::WriteLine(S"Created worker fiber");
}
__property bool get_IsRunning() {
return state != FiberStopped;
}
int GetHashCode() {
return (int) fiber;
}
bool Resume() {
if(!fiber || state == FiberStopped) {
return false;
}
if( state == FiberStopPending) {
Dispose();
return false;
}
void *current = GetCurrentFiber();
if(fiber == current) {
return false;
}
CorSwitchToFiber(fiber);
return true;
}
void Dispose() {
if(fiber) {
void *current = GetCurrentFiber();
if(fiber == current) {
state = FiberStopPending;
CorSwitchToFiber(mainfiber);
}
state = FiberStopped;
System::Console::WriteLine( S"\nDeleting Fiber.");
DeleteFiber(fiber);
fiber = 0;
}
}
protected:
virtual void Run() = 0;
void Yield() {
CorSwitchToFiber(mainfiber);
if(state == FiberStopPending)
throw new StopFiber;
}
private:
void *fiber, *mainfiber;
FiberStateEnum state;
private public:
void main() {
state = FiberRunning;
try {
Run();
} catch(System::Object *x) {
System::Console::Error->WriteLine(
S"\nFIBERS.DLL: main Caught {0}", x);
}
Dispose();
}
};
void fibermain(void* objptr) {
//System::Console::WriteLine( S"\nfibermain()");
System::IntPtr ptr = (System::IntPtr) objptr;
GCHandle g = GCHandle::op_Explicit(ptr);
Fiber *fiber = static_cast<Fiber*>(g.Target);
g.Free();
fiber->main();
System::Console::WriteLine( S"\nfibermain returning");
}
#pragma unmanaged
VOID CALLBACK unmanaged_fiberproc(PVOID objptr) {
#if defined(CORHOST)
corhost->CreateLogicalThreadState();
#endif
fibermain(objptr);
#if defined(CORHOST)
corhost->DeleteLogicalThreadState();
#endif
}
}
上面的 Fibers.cpp 类文件是 Visaul c++ 项目中唯一的类。它使用 /CLR:oldstyle 开关构建为具有 CLR 支持的 DLL。
using System;
using System.Threading;
using Fibers;
using NUnit.Framework;
namespace TickZoom.Utilities
{
public class FiberTask : Fiber
{
public FiberTask()
{
}
public FiberTask(FiberTask mainTask)
: base(mainTask)
{
}
protected override void Run()
{
while (true)
{
Console.WriteLine("Top of worker loop.");
try
{
Work();
}
catch (Exception ex)
{
Console.WriteLine("Exception: " + ex.Message);
}
Console.WriteLine("After the exception.");
Work();
}
}
private void Work()
{
Console.WriteLine("Doing work on fiber: " + GetHashCode() + ", thread id: " + Thread.CurrentThread.ManagedThreadId);
++counter;
Console.WriteLine("Incremented counter " + counter);
if (counter == 2)
{
Console.WriteLine("Throwing an exception.");
throw new InvalidCastException("Just a test exception.");
}
Yield();
}
public static int counter;
}
[TestFixture]
public class TestingFibers
{
[Test]
public void TestIdeas()
{
var fiberTasks = new System.Collections.Generic.List<FiberTask>();
var mainFiber = new FiberTask();
for( var i=0; i< 5; i++)
{
fiberTasks.Add(new FiberTask(mainFiber));
}
for (var i = 0; i < fiberTasks.Count; i++)
{
Console.WriteLine("Resuming " + i);
var fiberTask = fiberTasks[i];
if( !fiberTask.Resume())
{
Console.WriteLine("Fiber " + i + " was disposed.");
fiberTasks.RemoveAt(i);
i--;
}
}
for (var i = 0; i < fiberTasks.Count; i++)
{
Console.WriteLine("Disposing " + i);
fiberTasks[i].Dispose();
}
}
}
}
上面的单元测试给出以下输出,然后严重崩溃:
Resuming 0
Top of worker loop.
Doing work on fiber: 476184704, thread id: 7
Incremented counter 1
Resuming 1
Top of worker loop.
Doing work on fiber: 453842656, thread id: 7
Incremented counter 2
Throwing an exception.
Exception: Just a test exception.
After the exception.
Okay, the following link has a warning that the discussion uses unsupported and undocumented apis. Well I'm trying to use the code sample any way. It mostly works. Any ideas about the specific issue below relating to exceptions?
http://msdn.microsoft.com/en-us/magazine/cc164086.aspx
FYI, I made an improvement over the original sample. It was maintaining a pointer to the "previousfiber". Instead, the updated sample below uses a "mainfiber" pointer which gets passed to every fiber class. In that way, they always yield back to the main fiber. That allows the main fiber to handle scheduling for all other fibers. The other fibers always "yield" back to the main fiber.
The reason for posting this question has to do with throwing exceptions inside a fiber. According to the article, by using the CorBindToRunTime API with CreateLogicalThreadState(), SwitchOutLogicalThreadState(), etc, the framework will create a managed thread for each fiber and properly handle exceptions.
However, in the included code examples it has an UUnit test which experiments with throwing a managed exception within a Fiber and also catching it within the same fiber. That soft of works. But after handling it by logging a message, it seems the stack is in a bad state because if the fiber calls any other method even an empty method, the whole application crashes.
This implies to me that SwitchOutLogicalThreadState() and SwitchInLogicalThreadState() maybe aren't being used properly or else maybe they're not doing their job.
NOTE: One clue to the problem is that the managed code logs out the Thread.CurrentThread.ManagedThreadId and it is the same for every fiber. This suggests that the CreateLogicalThreadState() method didn't really create a new managed thread as advertised.
To analyze this better, I have made a pseudocode listing of the order of low level APIs called to handle the fibers. Remember, that fibers all run on the same thread so there's nothing concurrently happening, it's a linear logic. The necessary trick of course is to save and restore the stack. That's where it seems to be having trouble.
It starts out as simply a thread so then it converts to a fiber:
- ConvertThreadToFiber(objptr);
- CreateFiber() // create several win32 fibers.
Now invoke a fiber the first time, it's startup method does this:
- corhost->SwitchOutLogicalThreadState(&cookie); The main cookie is
held on the stack. - SwitchToFiber(); // first time calls the fiber startup method
- corhost->CreateLogicalThreadState();
- run the main fiber abstract method.
Eventually the fiber needs to yield back to the main fiber:
- corhost->SwitchOutLogicalThreadState(&cookie);
- SwitchToFiber(fiber);
- corhost->SwitchInLogicalThreadState(&cookie); // the main fiber
cookie, right?
Also the main fiber will resume a preexisting fiber:
- corhost->SwitchOutLogicalThreadState(&cookie);
- SwitchToFiber(fiber);
- corhost->SwitchInLogicalThreadState(&cookie); // the main fiber cookie, right?
The following is fibers.cpp which wraps the fiber api for managed code.
#define _WIN32_WINNT 0x400
#using <mscorlib.dll>
#include <windows.h>
#include <mscoree.h>
#include <iostream>
using namespace std;
#if defined(Yield)
#undef Yield
#endif
#define CORHOST
namespace Fibers {
typedef System::Runtime::InteropServices::GCHandle GCHandle;
VOID CALLBACK unmanaged_fiberproc(PVOID pvoid);
__gc private struct StopFiber {};
enum FiberStateEnum {
FiberCreated, FiberRunning, FiberStopPending, FiberStopped
};
#pragma unmanaged
#if defined(CORHOST)
ICorRuntimeHost *corhost;
void initialize_corhost() {
CorBindToCurrentRuntime(0, CLSID_CorRuntimeHost,
IID_ICorRuntimeHost, (void**) &corhost);
}
#endif
void CorSwitchToFiber(void *fiber) {
#if defined(CORHOST)
DWORD *cookie;
corhost->SwitchOutLogicalThreadState(&cookie);
#endif
SwitchToFiber(fiber);
#if defined(CORHOST)
corhost->SwitchInLogicalThreadState(cookie);
#endif
}
#pragma managed
__gc __abstract public class Fiber : public System::IDisposable {
public:
#if defined(CORHOST)
static Fiber() { initialize_corhost(); }
#endif
Fiber() : state(FiberCreated) {
void *objptr = (void*) GCHandle::op_Explicit(GCHandle::Alloc(this));
fiber = ConvertThreadToFiber(objptr);
mainfiber = fiber;
//System::Console::WriteLine( S"Created main fiber.");
}
Fiber(Fiber *_mainfiber) : state(FiberCreated) {
void *objptr = (void*) GCHandle::op_Explicit(GCHandle::Alloc(this));
fiber = CreateFiber(0, unmanaged_fiberproc, objptr);
mainfiber = _mainfiber->fiber;
//System::Console::WriteLine(S"Created worker fiber");
}
__property bool get_IsRunning() {
return state != FiberStopped;
}
int GetHashCode() {
return (int) fiber;
}
bool Resume() {
if(!fiber || state == FiberStopped) {
return false;
}
if( state == FiberStopPending) {
Dispose();
return false;
}
void *current = GetCurrentFiber();
if(fiber == current) {
return false;
}
CorSwitchToFiber(fiber);
return true;
}
void Dispose() {
if(fiber) {
void *current = GetCurrentFiber();
if(fiber == current) {
state = FiberStopPending;
CorSwitchToFiber(mainfiber);
}
state = FiberStopped;
System::Console::WriteLine( S"\nDeleting Fiber.");
DeleteFiber(fiber);
fiber = 0;
}
}
protected:
virtual void Run() = 0;
void Yield() {
CorSwitchToFiber(mainfiber);
if(state == FiberStopPending)
throw new StopFiber;
}
private:
void *fiber, *mainfiber;
FiberStateEnum state;
private public:
void main() {
state = FiberRunning;
try {
Run();
} catch(System::Object *x) {
System::Console::Error->WriteLine(
S"\nFIBERS.DLL: main Caught {0}", x);
}
Dispose();
}
};
void fibermain(void* objptr) {
//System::Console::WriteLine( S"\nfibermain()");
System::IntPtr ptr = (System::IntPtr) objptr;
GCHandle g = GCHandle::op_Explicit(ptr);
Fiber *fiber = static_cast<Fiber*>(g.Target);
g.Free();
fiber->main();
System::Console::WriteLine( S"\nfibermain returning");
}
#pragma unmanaged
VOID CALLBACK unmanaged_fiberproc(PVOID objptr) {
#if defined(CORHOST)
corhost->CreateLogicalThreadState();
#endif
fibermain(objptr);
#if defined(CORHOST)
corhost->DeleteLogicalThreadState();
#endif
}
}
The above fibers.cpp class file is the only class in the Visaul c++ project. It's built as a DLL with CLR support using /CLR:oldstyle switch.
using System;
using System.Threading;
using Fibers;
using NUnit.Framework;
namespace TickZoom.Utilities
{
public class FiberTask : Fiber
{
public FiberTask()
{
}
public FiberTask(FiberTask mainTask)
: base(mainTask)
{
}
protected override void Run()
{
while (true)
{
Console.WriteLine("Top of worker loop.");
try
{
Work();
}
catch (Exception ex)
{
Console.WriteLine("Exception: " + ex.Message);
}
Console.WriteLine("After the exception.");
Work();
}
}
private void Work()
{
Console.WriteLine("Doing work on fiber: " + GetHashCode() + ", thread id: " + Thread.CurrentThread.ManagedThreadId);
++counter;
Console.WriteLine("Incremented counter " + counter);
if (counter == 2)
{
Console.WriteLine("Throwing an exception.");
throw new InvalidCastException("Just a test exception.");
}
Yield();
}
public static int counter;
}
[TestFixture]
public class TestingFibers
{
[Test]
public void TestIdeas()
{
var fiberTasks = new System.Collections.Generic.List<FiberTask>();
var mainFiber = new FiberTask();
for( var i=0; i< 5; i++)
{
fiberTasks.Add(new FiberTask(mainFiber));
}
for (var i = 0; i < fiberTasks.Count; i++)
{
Console.WriteLine("Resuming " + i);
var fiberTask = fiberTasks[i];
if( !fiberTask.Resume())
{
Console.WriteLine("Fiber " + i + " was disposed.");
fiberTasks.RemoveAt(i);
i--;
}
}
for (var i = 0; i < fiberTasks.Count; i++)
{
Console.WriteLine("Disposing " + i);
fiberTasks[i].Dispose();
}
}
}
}
The above unit test gives the following output and then crashes badly:
Resuming 0
Top of worker loop.
Doing work on fiber: 476184704, thread id: 7
Incremented counter 1
Resuming 1
Top of worker loop.
Doing work on fiber: 453842656, thread id: 7
Incremented counter 2
Throwing an exception.
Exception: Just a test exception.
After the exception.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
不久前,我遇到了同样的问题 - 我尝试使用 .NET 3.5(后来的 4.0)中的代码片段,但它崩溃了。这说服了我放弃“hacky”解决方案。事实上,.NET 缺少通用的协同例程概念。有些人通过枚举器和
yield
关键字来模拟协同例程(请参阅 http://fxcritic.blogspot.com/2008/05/lightweight- Fibercoroutines.html)。然而,这对我来说有明显的缺点:它不像老式的 Win32 光纤那样直观,并且需要您使用IEnumerable
作为每个协同例程的返回类型。也许这篇文章: http://msdn.microsoft.com/en-us/vstudio/gg316360 对你来说很有趣。 Microsoft 即将推出新的
async
关键字。社区技术预览版 (CTP) 可供下载。我想应该可以在这些异步扩展之上开发一个干净的协同例程实现。A time ago, I experienced the same problem - I tried to use the code snippet in .NET 3.5 (later on 4.0) and it crashed. This convinced me to turn away from the "hacky" solution. The truth is that .NET is missing a generic co-routine concept. There are some guys which simulate co-routines by enumerators and the
yield
keyword (see http://fxcritic.blogspot.com/2008/05/lightweight-fibercoroutines.html). However, this has clear disadvantages to me: It is not as intuitive to use as good-old Win32 fibers and it requires you to useIEnumerable
as return type for each co-routine.Maybe this arcticle: http://msdn.microsoft.com/en-us/vstudio/gg316360 is interesting for you. Microsoft is about to introduce a new
async
keyword. A community technology preview (CTP) is offered for download. I guess it should be possible to develop a clean co-routine implementation on top of those async extensions.使用纤程时,您必须在切换到主纤程之前将异常管理堆栈状态存储在局部变量(堆栈上)上。
切换
(当执行返回时)
之后的第一个操作是从本地变量中的备份恢复异常堆栈。看一下这篇博客文章,了解如何在 Delphi 中使用纤程而不破坏异常处理:http://jsbattig.blogspot.com/2015/03/how-to-properly-support-windows-fibres.html
重点是,如果你想使用 Fibers 并编写异常处理程序和 switch内部的纤维并尝试finally或try-catch块,您必须弄清楚如何使用CLR来做到这一点。
我正在 C# 中使用 Fibers,但还没有找到方法。如果有办法做到这一点,我想最终这将是一个黑客。
When using fibers you must store the exception management stack state on a local variable (on the stack) before you switch to your main fiber.
The first operation right after the switch
(when execution comes back)
is restoring the exception stack from your backup in a local variable.Take a look at this blog entry on how to use fibers with Delphi without breaking exception handling: http://jsbattig.blogspot.com/2015/03/how-to-properly-support-windows-fibers.html
The point is, if you want to use Fibers AND write exception handlers AND switch fibers inside and try finally or try-catch block, you will have to figure out how to do this with CLR.
I'm playing around with Fibers in C# and I could not find the way yet. If there were a way to do it, I imagine it will be a hack at the end of the day.
您可以使用Delphi协程框架https://github.com/Purik/AIO
它已经完成了 Fibers 的实施。
例如,您可以将匿名过程包装到 Fiber - 过程将在 Fiber 上下文中运行,您可以访问和检测 Fiber 中引发的任何异常
You can use Delphi coroutines framework https://github.com/Purik/AIO
It has completed Fibers implementation.
For example, you can wrap anonymous procedure to Fiber- procedure will be runned in Fiber context, you can access and detect any exception raised in Fiber