如何最好地组装大量消息处理程序

发布于 2024-11-08 10:15:38 字数 758 浏览 2 评论 0 原文

我正在开始开发消息处理系统,其中各种消息将从外部源到达并需要“处理”。消息对象基本上是 DTO。

我正在考虑让消息类存在于它们自己的程序集中,可供消息处理系统和外部系统使用,并让处理程序从通用抽象类继承,大致如下:

Messages.dll

public abstract class Message {}

public class FooMessage : Message
{
  ...
}

public class BarMessage : Message
{
  ...
}

MessageHandlers.dll

public abstract class MessageHandler<TMessage> where TMessage : Message
{
  public void Handle(TMessage message);
}

public class FooHandler : MessageHandler<FooMessage>
{
  public void Handle(FooMessage message)
  {
    ...
  }
}

现在的问题: 将所有这些粘合在一起的最佳方法是什么?我当然可以做一些反射并编写一个辅助函数来根据传入消息名称实例化适当的处理程序,但这不正是 IoC 容器应该擅长的事情吗?有没有更适合这项工作的人?意见和指示表示赞赏。

谢谢!

I am starting work on a message processing system, where a variety of messages will arrive from an external source and need to be "handled". The message objects are basically DTO's.

I am thinking of having the message classes live in their own assembly, available both to the message processing system and to the external system, and having the handlers inherit from a generic abstract class, along the following rough lines:

Messages.dll

public abstract class Message {}

public class FooMessage : Message
{
  ...
}

public class BarMessage : Message
{
  ...
}

MessageHandlers.dll

public abstract class MessageHandler<TMessage> where TMessage : Message
{
  public void Handle(TMessage message);
}

public class FooHandler : MessageHandler<FooMessage>
{
  public void Handle(FooMessage message)
  {
    ...
  }
}

And now the question:
What is the best way to glue all this together? I can certainly do a bit of reflection and write a helper function to instantiate the appropriate handler based on the incoming message name, but isn't this exactly the sort of thing IoC containers are supposed to be good at? Is there a particular one better suited for the job? Opinions and pointers appreciated.

Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

瞄了个咪的 2024-11-15 10:26:54

由于您需要使用请求/响应消息传递模式,因此可以使用 Agatha (agatha-rrsl) 项目。
它是该模式的实现,它是成熟的,在重负载下进行了测试,在进程内工作,当然也可以使用 WCF 在进程边界之外工作。

它内置了您提到的所有内容,并与许多流行的 IoC 容器(Windsor、StructureMap、Unity、Spring.NET、Ninject)集成,因此您不必重新发明轮子并开始专注于您的业务逻辑。

您可以在 Davy Brion 的博客此处找到非常有用的文章、使用技巧和大量信息。

Since you need to use a Request/Response messaging pattern you can use the Agatha (agatha-rrsl) project.
It is an implementation of that pattern, it is mature, tested under heavy load and works in-process and of course outside the process boundaries using WCF.

It has built-in everything you mention and integrates with many popular IoC containers (Windsor, StructureMap, Unity, Spring.NET, Ninject) so you will not have to re-invent the wheel and start focusing on your business logic.

You may find very useful articles, usage tips, and a lot of information on Davy Brion's blog here.

稳稳的幸福 2024-11-15 10:25:09

假设容器中的所有组件都已正确配置,这就是处理消息的方式(本例中的消息是通过队列传输的序列化消息):

var formatter = new BinaryFormatter();
using (var s = new MemoryStream(message.AsBytes))
{
    dynamic consumable = formatter.Deserialize(s);

    var consumerType = typeof(IMessageConsumer<>).MakeGenericType(consumable.GetType());
    dynamic consumer = this.container.Resolve(consumerType);
    consumer.Consume(consumable);
}

完整的示例代码可在此处获取:https://github.com/ploeh/CQRSonAzureDemo/blob/master/BookingWorkerRole/AzureQueueMessageProcessor.cs

Assuming that all components are correctly configured in the container, this is how you can handle a message (where the message in this case is a serialized message being transported over a queue):

var formatter = new BinaryFormatter();
using (var s = new MemoryStream(message.AsBytes))
{
    dynamic consumable = formatter.Deserialize(s);

    var consumerType = typeof(IMessageConsumer<>).MakeGenericType(consumable.GetType());
    dynamic consumer = this.container.Resolve(consumerType);
    consumer.Consume(consumable);
}

Full sample code is available here: https://github.com/ploeh/CQRSonAzureDemo/blob/master/BookingWorkerRole/AzureQueueMessageProcessor.cs

π浅易 2024-11-15 10:23:12

不久前我根据温莎做了类似的事情。您可以阅读有关此这里

I did something similar based on Windsor a while back. You can read about this here.

缺⑴份安定 2024-11-15 10:21:17

有多种方法可以注册您的处理程序。我总是从手动配置一切开始。在您的示例中,它看起来像这样:

<!-- language: c# -->
container.Register<IMessageHandler<FooMessage>, FooHandler>();
container.Register<IMessageHandler<BarMessage>, BarHandler>();
// etc

抽象基类,而是定义一个 IMessageHandler 接口。

提示:不要使用 应用程序正在增长,这可能会变得很麻烦,因此就您而言,批量注册应该是正确的选择。所有 DI 容器对此都有不同的机制。大多数容器开箱即用地支持此功能,而其他容器则不支持。以下是如何使用 简单注入器 执行此操作:

<!-- language: c# -->
container.RegisterManyOpenGeneric(typeof(MessageHandler<>),
    typeof(MessageHandler<>).Assembly);

这将注册实现 MessageHandler 并与 MessageHandler 位于同一程序集中。同样,这取决于您选择的容器如何编写。

为了处理传入的任意消息,您将需要一些反射,但您不应该在应用程序本身中执行此操作。只需在应用程序中定义一个良好的抽象,并将反射的使用移至应用程序中具有 DI 配置的部分即可。

例如,您可以定义一个 IMessageProcessor 接口,该接口允许处理任何类型的 Message

<!-- language: c# -->
public interface IMessageProcessor
{
    void Process(Message message);
}

在您的 DI 配置附近,您可以定义一个 IMessageProcessor 的实现,它可能看起来像这样:

<!-- language: c# -->
private class DIMessageProcessor : IMessageProcessor
{
    private readonly Container container;

    public DIMessageProcessor(Container container)
    {
        this.container = container;
    }

    public void Process(Message message)
    {
        if (message == null) throw new ArgumentNullException("message");

        Type messageType = message.GetType();

        Type handlerType =
            typeof(IMessageHandler<>).MakeGenericType(messageType);

        var handler = this.container.GetInstance(handlerType);

        handlerType.GetMethod("Handle").Invoke(handler, message);
    }
}

将其排除在应用程序之外很重要。您不仅会混合职责,而且会更难添加新行为。例如,考虑防止消息重播(当一条消息被执行多次时)。作为 IMessageProcessor 的装饰器来实现此行为会很容易。除此之外,DIMessageProcessor 很容易进行单元测试,或者替换为性能更高的新实现(并再次进行单元测试)。

有多种方法可以注册DIMessageProcessor,具体取决于您选择的容器。例如,使用简单注入器:

<!-- language: c# -->
container.RegisterSingle<IMessageProcessor>(new DIMessageProcessor(container));

有了这个,您就可以将 IMessageProcessor 注入到需要它的类型中。这是 WCF 的示例:

<!-- language: c# -->
[ServiceKnownType("GetKnownMessageTypes")]
public class WCFMessageService
{
    private readonly IMessageProcessor processor;

    public WCFMessageService()
    {
        this.processor = 
            Global.Container.GetInstance<IMessageProcessor>();
    }

    [OperationContract]
    public void Process(Message message)
    {
        this.processor.Process(message);
    }

    public static IEnumerable<Type> GetKnownMessageTypes(
        ICustomAttributeProvider provider)
    {
        var knownMessageTypes =
            from type in typeof(Message).Assembly.GetTypes()
            where typeof(Message).IsAssignableFrom(type)
            select type;

        return knownMessageTypes.ToArray();
    }
}

我希望这会有所帮助。

There are several ways to register your handlers. I always start by configuring everything by hand. In your example, it would look like this:

<!-- language: c# -->
container.Register<IMessageHandler<FooMessage>, FooHandler>();
container.Register<IMessageHandler<BarMessage>, BarHandler>();
// etc

Tip: Rather than using a abstract base class, define an IMessageHandler<T> interface.

Once the application is growing, this can get cumbersome, so in your case, batch registration should be the way to go. All DI containers have different mechanisms for this. Most containers support this out of the box, while others don't. Here is how to do it using Simple Injector:

<!-- language: c# -->
container.RegisterManyOpenGeneric(typeof(MessageHandler<>),
    typeof(MessageHandler<>).Assembly);

This will register all concrete types that implement MessageHandler<T> and live in the same assembly as the MessageHandler<T>. Again, it depends on your container of choice how to write this.

For processing incoming arbitrary messages, you will need a bit of reflection, but you shouldn't do this in the application itself. Just define a good abstraction in the application and move the use of reflection inside the part of the application where you have your DI configuration.

What you can do for instance, is define an IMessageProcessor interface, that allows processing any type of Message:

<!-- language: c# -->
public interface IMessageProcessor
{
    void Process(Message message);
}

Near your DI configuration, you can define an implementation of IMessageProcessor, and it could look like this:

<!-- language: c# -->
private class DIMessageProcessor : IMessageProcessor
{
    private readonly Container container;

    public DIMessageProcessor(Container container)
    {
        this.container = container;
    }

    public void Process(Message message)
    {
        if (message == null) throw new ArgumentNullException("message");

        Type messageType = message.GetType();

        Type handlerType =
            typeof(IMessageHandler<>).MakeGenericType(messageType);

        var handler = this.container.GetInstance(handlerType);

        handlerType.GetMethod("Handle").Invoke(handler, message);
    }
}

Leaving this out of the application is important. Not only would you be mixing responsibilities, but it would be harder to add new behavior. Think for instance about preventing message replays (when a message is executed multiple times). It would be easy to implement this behavior as a decorator of IMessageProcessor. Besides this, the DIMessageProcessor would be easy to unit test, or replaced with a new more performant implementation (and unit tested again).

There are several ways to register the DIMessageProcessor, depending on your container of choice. Using Simple Injector for instance:

<!-- language: c# -->
container.RegisterSingle<IMessageProcessor>(new DIMessageProcessor(container));

With this in place you can inject the IMessageProcessor into the types that need this. Here is an example for WCF:

<!-- language: c# -->
[ServiceKnownType("GetKnownMessageTypes")]
public class WCFMessageService
{
    private readonly IMessageProcessor processor;

    public WCFMessageService()
    {
        this.processor = 
            Global.Container.GetInstance<IMessageProcessor>();
    }

    [OperationContract]
    public void Process(Message message)
    {
        this.processor.Process(message);
    }

    public static IEnumerable<Type> GetKnownMessageTypes(
        ICustomAttributeProvider provider)
    {
        var knownMessageTypes =
            from type in typeof(Message).Assembly.GetTypes()
            where typeof(Message).IsAssignableFrom(type)
            select type;

        return knownMessageTypes.ToArray();
    }
}

I hope this helps.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文