如何让 NHibernate 在每个请求使用会话时重试死锁事务?

发布于 2024-09-28 13:14:23 字数 129 浏览 9 评论 0原文

当您使用 Session-Per-Request 模式时,您在使用 NHibernate 的 3 层应用程序中使用什么模式/架构,该应用程序需要支持事务失败时的重试? (因为 ISession 在异常后变得无效,即使这是死锁、超时或活锁异常)。

What pattern/architecture do you use in a 3-tiered application using NHibernate that needs to support retries on transaction failures, when you are using the Session-Per-Request pattern? (as ISession becomes invalid after an exception, even if this is a deadlock or timeout or livelock exception).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

趴在窗边数星星i 2024-10-05 13:14:23

注释 2 如今,我永远不会将写入事务放入 Web 项目中 - 而是使用消息传递 + 队列,并在后台有一个工作人员处理消息,旨在完成事务工作。

然而,我仍然会使用事务来读取以获得一致的数据;与 MVCC/Snapshot 隔离一起,与 Web 项目隔离。在这种情况下,您会发现每个请求每个事务的会话非常好。

注1这篇文章的想法已经放在Castle Transactions框架中 和我的新 NHibernate 设施

好的,这是总体思路。假设您要为客户创建未最终订单。您有某种 GUI,例如浏览器/MVC 应用程序,它使用相关信息创建新的数据结构(或者您从网络获取此数据结构):

[Serializable]
class CreateOrder /*: IMessage*/
{
    // immutable
    private readonly string _CustomerName;
    private readonly decimal _Total;
    private readonly Guid _CustomerId;

    public CreateOrder(string customerName, decimal total, Guid customerId)
    {
        _CustomerName = customerName;
        _Total = total;
        _CustomerId = customerId;
    }

    // put ProtoBuf attribute
    public string CustomerName
    {
        get { return _CustomerName; }
    }

    // put ProtoBuf attribute
    public decimal Total
    {
        get { return _Total; }
    }

    // put ProtoBuf attribute
    public Guid CustomerId
    {
        get { return _CustomerId; }
    }
}

您需要一些东西来处理它。这可能是某种服务总线中的命令处理程序。 “命令处理程序”这个词只是其中之一,您也可以将其称为“服务”或“域服务”或“消息处理程序”。如果您正在进行函数式编程,那么它将是您的消息框实现,或者如果您正在进行 Erlang 或 Akka,那么它将是一个 Actor。

class CreateOrderHandler : IHandle<CreateOrder>
{
    public void Handle(CreateOrder command)
    {
        With.Policy(IoC.Resolve<ISession>, s => s.BeginTransaction(), s =>
        {
            var potentialCustomer = s.Get<PotentialCustomer>(command.CustomerId);
            potentialCustomer.CreateOrder(command.Total);
            return potentialCustomer;
        }, RetryPolicies.ExponentialBackOff.RetryOnLivelockAndDeadlock(3));
    }
}

interface IHandle<T> /* where T : IMessage */
{
    void Handle(T command);
}

上面显示了您可能为此给定问题域(应用程序状态/事务处理)选择的 API 用法。

With的实现:

static class With
{
    internal static void Policy(Func<ISession> getSession,
                                       Func<ISession, ITransaction> getTransaction,
                                       Func<ISession, EntityBase /* abstract 'entity' base class */> executeAction,
                                       IRetryPolicy policy)
    {
        //http://fabiomaulo.blogspot.com/2009/06/improving-ado-exception-management-in.html

        while (true)
        {
            using (var session = getSession())
            using (var t = getTransaction(session))
            {
                var entity = executeAction(session);
                try
                {
                    // we might not always want to update; have another level of indirection if you wish
                    session.Update(entity);
                    t.Commit();
                    break; // we're done, stop looping
                }
                catch (ADOException e)
                {
                    // need to clear 2nd level cache, or we'll get 'entity associated with another ISession'-exception

                    // but the session is now broken in all other regards will will throw exceptions
                    // if you prod it in any other way
                    session.Evict(entity);

                    if (!t.WasRolledBack) t.Rollback(); // will back our transaction

                    // this would need to be through another level of indirection if you support more databases
                    var dbException = ADOExceptionHelper.ExtractDbException(e) as SqlException;

                    if (policy.PerformRetry(dbException)) continue;
                    throw; // otherwise, we stop by throwing the exception back up the layers
                }
            }
        }
    }
}

如你所见,我们需要一个新的工作单元;每次出现问题时都会调用 ISession。这就是循环位于 using 语句/块外部的原因。拥有函数相当于拥有工厂实例,只不过我们直接调用对象实例,而不是调用它的方法。恕我直言,它可以提供更好的调用者 API。

我们希望能够相当顺利地处理重试的执行方式,因此我们有一个可以由不同处理程序实现的接口,称为 IRetryHandler。应该可以将它们链接到您想要强制执行控制流的每个方面(是的,它非常接近 AOP)。与 AOP 的工作方式类似,返回值用于控制控制流,但仅以真/假方式进行,这是我们的要求。

interface IRetryPolicy
{
    bool PerformRetry(SqlException ex);
}

AggregateRoot、PotentialCustomer 是具有生命周期的实体。这就是您将使用 *.hbm.xml 文件/FluentNHibernate 进行映射的内容。

它有一个与发送的命令 1:1 对应的方法。这使得命令处理程序完全易于阅读。

此外,使用带有鸭子类型的动态语言,它允许您将命令的类型名称映射到方法,类似于 Ruby/Smalltalk 的做法。

如果您正在进行事件溯源,事务处理将是类似的,只是事务不会与 NHibernate 的接口进行交互。推论是您将保存通过调用 CreateOrder(decimal) 创建的事件,并为您的实体提供一种从存储中重新读取已保存事件的机制。

最后要注意的一点是,我将重写我创建的三个方法。这是 NHibernate 方面的要求,因为它需要一种方法来知道一个实体何时与另一个实体相等(它们是否在集合/包中)。有关我的实现的更多信息,请参见此处。无论如何,这是示例代码,我现在不关心我的客户,所以我不会实现它们:

sealed class PotentialCustomer : EntityBase
{
    public void CreateOrder(decimal total)
    {
        // validate total
        // run business rules

        // create event, save into event sourced queue as transient event
        // update private state
    }

    public override bool IsTransient() { throw new NotImplementedException(); }
    protected override int GetTransientHashCode() { throw new NotImplementedException(); }
    protected override int GetNonTransientHashCode() { throw new NotImplementedException(); }
}

我们需要一种创建重试策略的方法。当然,我们可以通过多种方式做到这一点。在这里,我将流畅的接口与静态方法类型相同类型的同一对象的实例组合在一起。我显式地实现了该接口,以便在流畅的接口中没有其他方法可见。该接口仅使用我下面的“示例”实现。

internal class RetryPolicies : INonConfiguredPolicy
{
    private readonly IRetryPolicy _Policy;

    private RetryPolicies(IRetryPolicy policy)
    {
        if (policy == null) throw new ArgumentNullException("policy");
        _Policy = policy;
    }

    public static readonly INonConfiguredPolicy ExponentialBackOff =
        new RetryPolicies(new ExponentialBackOffPolicy(TimeSpan.FromMilliseconds(200)));

    IRetryPolicy INonConfiguredPolicy.RetryOnLivelockAndDeadlock(int retries)
    {
        return new ChainingPolicy(new[] {new SqlServerRetryPolicy(retries), _Policy});
    }
}

我们需要一个接口来部分完整地调用流畅的接口。这为我们提供了类型安全。因此,在完成策略配置之前,我们需要两个远离静态类型的解引用运算符(即“句号”--(.))。

internal interface INonConfiguredPolicy
{
    IRetryPolicy RetryOnLivelockAndDeadlock(int retries);
}

连锁政策可以得到解决。它的实现检查其所有子级是否返回 continue,并且在检查时,它还会执行其中的逻辑。

internal class ChainingPolicy : IRetryPolicy
{
    private readonly IEnumerable<IRetryPolicy> _Policies;

    public ChainingPolicy(IEnumerable<IRetryPolicy> policies)
    {
        if (policies == null) throw new ArgumentNullException("policies");
        _Policies = policies;
    }

    public bool PerformRetry(SqlException ex)
    {
        return _Policies.Aggregate(true, (val, policy) => val && policy.PerformRetry(ex));
    }
}

该策略让当前线程休眠一段时间;有时数据库会过载,并且让多个读取器/写入器不断尝试读取将是对数据库的事实上的 DOS 攻击(看看几个月前 facebook 崩溃时发生的情况,因为它们的缓存服务器都同时查询数据库)时间)。

internal class ExponentialBackOffPolicy : IRetryPolicy
{
    private readonly TimeSpan _MaxWait;
    private TimeSpan _CurrentWait = TimeSpan.Zero; // initially, don't wait

    public ExponentialBackOffPolicy(TimeSpan maxWait)
    {
        _MaxWait = maxWait;
    }

    public bool PerformRetry(SqlException ex)
    {
        Thread.Sleep(_CurrentWait);
        _CurrentWait = _CurrentWait == TimeSpan.Zero ? TimeSpan.FromMilliseconds(20) : _CurrentWait + _CurrentWait;
        return _CurrentWait <= _MaxWait;
    }
}

同样,在任何良好的基于​​ SQL 的系统中,我们都需要处理死锁。我们无法真正深入地计划这些,尤其是在使用 NHibernate 时,除了保持严格的事务策略——没有隐式事务;并小心Open-Session-In-View。如果您要获取大量数据,还需要记住笛卡尔积问题/N+1 选择问题。相反,您可能会使用多查询或 HQL 的“fetch”关键字。

internal class SqlServerRetryPolicy : IRetryPolicy
{
    private int _Tries;
    private readonly int _CutOffPoint;

    public SqlServerRetryPolicy(int cutOffPoint)
    {
        if (cutOffPoint < 1) throw new ArgumentOutOfRangeException("cutOffPoint");
        _CutOffPoint = cutOffPoint;
    }

    public bool PerformRetry(SqlException ex)
    {
        if (ex == null) throw new ArgumentNullException("ex");
        // checks the ErrorCode property on the SqlException
        return SqlServerExceptions.IsThisADeadlock(ex) && ++_Tries < _CutOffPoint;
    }
}

一个帮助程序类,使代码可读性更好。

internal static class SqlServerExceptions
{
    public static bool IsThisADeadlock(SqlException realException)
    {
        return realException.ErrorCode == 1205;
    }
}

不要忘记在 IConnectionFactory 中处理网络故障(可能通过实现 IConnection 进行委托)。


PS:如果您不只是进行阅读,则“每次请求会话”是一种损坏的模式。特别是如果您使用与写入相同的 ISession 进行读取,并且您没有对读取进行排序,使得它们始终位于写入之前。

Note 2 Nowadays I would never put write-transactions inside of the web project - but instead use messaging + queues and have a worker in the background handling messages aiming to cause transactional work to be done.

I would, however, still use transactions for reading to get consistent data; together with MVCC/Snapshot isolation, from web projects. In that case you'll find that session-per-request-per-transaction is perfectly fine.

Note 1 The ideas of this post have been placed in the Castle Transactions framework and my new NHibernate Facility.

OK, here's the general idea. Suppose you want to create a non-finalized order for a customer. You have some sort of GUI, e.g. a browser/MVC app, that create a new data structure with the relevant information (or you get this data structure from the network):

[Serializable]
class CreateOrder /*: IMessage*/
{
    // immutable
    private readonly string _CustomerName;
    private readonly decimal _Total;
    private readonly Guid _CustomerId;

    public CreateOrder(string customerName, decimal total, Guid customerId)
    {
        _CustomerName = customerName;
        _Total = total;
        _CustomerId = customerId;
    }

    // put ProtoBuf attribute
    public string CustomerName
    {
        get { return _CustomerName; }
    }

    // put ProtoBuf attribute
    public decimal Total
    {
        get { return _Total; }
    }

    // put ProtoBuf attribute
    public Guid CustomerId
    {
        get { return _CustomerId; }
    }
}

You need something to handle it. Probably this would be a command handler in a service bus of some sort. The word 'command handler' is one of many and you might as well just call it a 'service' or 'domain service' or 'message handler'. If you were doing functional programming, it would be your message box implementation, or if you were doing Erlang or Akka, it would be an Actor.

class CreateOrderHandler : IHandle<CreateOrder>
{
    public void Handle(CreateOrder command)
    {
        With.Policy(IoC.Resolve<ISession>, s => s.BeginTransaction(), s =>
        {
            var potentialCustomer = s.Get<PotentialCustomer>(command.CustomerId);
            potentialCustomer.CreateOrder(command.Total);
            return potentialCustomer;
        }, RetryPolicies.ExponentialBackOff.RetryOnLivelockAndDeadlock(3));
    }
}

interface IHandle<T> /* where T : IMessage */
{
    void Handle(T command);
}

The above shows an API usage you might choose for this given problem domain (application state/transaction handling).

The implementation of With:

static class With
{
    internal static void Policy(Func<ISession> getSession,
                                       Func<ISession, ITransaction> getTransaction,
                                       Func<ISession, EntityBase /* abstract 'entity' base class */> executeAction,
                                       IRetryPolicy policy)
    {
        //http://fabiomaulo.blogspot.com/2009/06/improving-ado-exception-management-in.html

        while (true)
        {
            using (var session = getSession())
            using (var t = getTransaction(session))
            {
                var entity = executeAction(session);
                try
                {
                    // we might not always want to update; have another level of indirection if you wish
                    session.Update(entity);
                    t.Commit();
                    break; // we're done, stop looping
                }
                catch (ADOException e)
                {
                    // need to clear 2nd level cache, or we'll get 'entity associated with another ISession'-exception

                    // but the session is now broken in all other regards will will throw exceptions
                    // if you prod it in any other way
                    session.Evict(entity);

                    if (!t.WasRolledBack) t.Rollback(); // will back our transaction

                    // this would need to be through another level of indirection if you support more databases
                    var dbException = ADOExceptionHelper.ExtractDbException(e) as SqlException;

                    if (policy.PerformRetry(dbException)) continue;
                    throw; // otherwise, we stop by throwing the exception back up the layers
                }
            }
        }
    }
}

As you can see, we need a new unit of work; the ISession every time something goes wrong. That's why the loop is on the outside of the Using statements/blocks. Having functions are equivalent to having factory instances, except we're invoking directly on an object instance, rather than calling a method on it. It makes for a nicer caller-API imho.

We want fairly smooth handling of how we perform retries, so we have an interface that can be implemented by different handlers, called IRetryHandler. It should be possible to chain these for every aspect (yes, it's very close to AOP) you want to enforce of the control flow. Similar to how AOP works, the return value is used to control control-flow, but only in a true/false fashion, which is our requirement.

interface IRetryPolicy
{
    bool PerformRetry(SqlException ex);
}

The AggregateRoot, PotentialCustomer is an entity with a lifetime. It's what you would be mapping with your *.hbm.xml files/FluentNHibernate.

It has a method that corresponds 1:1 with the sent command. This makes the command handlers completely obvious to read.

Furthermore, with a dynamic language with duck typing, it would allow you to map commands' type names to methods, similar to how Ruby/Smalltalk does it.

If you were doing event sourcing, the transaction handling would be similar, except the transaction wouldn't interface NHibernate's such. The corollary is that you would save the events created through invoking CreateOrder(decimal), and provide your entity with a mechanism for re-reading saved events from store.

A final point to notice is that I'm overriding three methods I have created. This is a requirement from NHibernate's side, as it needs a way of knowing when an entity is equal to another, should they be in sets/bags. More about my implementation here. In any way, this is sample code and I don't care about my customer right now, so I'm not implementing them:

sealed class PotentialCustomer : EntityBase
{
    public void CreateOrder(decimal total)
    {
        // validate total
        // run business rules

        // create event, save into event sourced queue as transient event
        // update private state
    }

    public override bool IsTransient() { throw new NotImplementedException(); }
    protected override int GetTransientHashCode() { throw new NotImplementedException(); }
    protected override int GetNonTransientHashCode() { throw new NotImplementedException(); }
}

We need a method for creating retry policies. Of course we could do this in many ways. Here I'm combining a fluent interface with an instance of the same object of the same type that the static method's type is. I implement the interface explicitly so that no other methods are visible in the fluent interface. This interface only uses my 'example' implementations below.

internal class RetryPolicies : INonConfiguredPolicy
{
    private readonly IRetryPolicy _Policy;

    private RetryPolicies(IRetryPolicy policy)
    {
        if (policy == null) throw new ArgumentNullException("policy");
        _Policy = policy;
    }

    public static readonly INonConfiguredPolicy ExponentialBackOff =
        new RetryPolicies(new ExponentialBackOffPolicy(TimeSpan.FromMilliseconds(200)));

    IRetryPolicy INonConfiguredPolicy.RetryOnLivelockAndDeadlock(int retries)
    {
        return new ChainingPolicy(new[] {new SqlServerRetryPolicy(retries), _Policy});
    }
}

We need an interface for the partially complete invocation to the fluent interface. This gives us type-safety. We hence need two dereference operators (i.e. 'full stop' -- (.)), away from our static type, before finishing configuring the policy.

internal interface INonConfiguredPolicy
{
    IRetryPolicy RetryOnLivelockAndDeadlock(int retries);
}

The chaining policy could be resolved. Its implementation checks that all its children return continue and as it checks that, it also performs the logic in them.

internal class ChainingPolicy : IRetryPolicy
{
    private readonly IEnumerable<IRetryPolicy> _Policies;

    public ChainingPolicy(IEnumerable<IRetryPolicy> policies)
    {
        if (policies == null) throw new ArgumentNullException("policies");
        _Policies = policies;
    }

    public bool PerformRetry(SqlException ex)
    {
        return _Policies.Aggregate(true, (val, policy) => val && policy.PerformRetry(ex));
    }
}

This policy lets the current thread sleep some amount of time; sometimes the database is overloaded, and having multiple readers/writers continuously trying to read would be a de-facto DOS-attack on the database (see what happened a few months ago when facebook crashed because their cache servers all queried their databases at the same time).

internal class ExponentialBackOffPolicy : IRetryPolicy
{
    private readonly TimeSpan _MaxWait;
    private TimeSpan _CurrentWait = TimeSpan.Zero; // initially, don't wait

    public ExponentialBackOffPolicy(TimeSpan maxWait)
    {
        _MaxWait = maxWait;
    }

    public bool PerformRetry(SqlException ex)
    {
        Thread.Sleep(_CurrentWait);
        _CurrentWait = _CurrentWait == TimeSpan.Zero ? TimeSpan.FromMilliseconds(20) : _CurrentWait + _CurrentWait;
        return _CurrentWait <= _MaxWait;
    }
}

Similarly, in any good SQL-based system we need to handle deadlocks. We can't really plan for these in depth, especially when using NHibernate, other than keeping a strict transaction policy -- no implicit transactions; and be careful with Open-Session-In-View. There are also the cartesian product problem/N+1 selects problem you'd need to keep in mind if you are fetching a lot of data. Instead then, you might have Multi-Query, or HQL's 'fetch' keyword.

internal class SqlServerRetryPolicy : IRetryPolicy
{
    private int _Tries;
    private readonly int _CutOffPoint;

    public SqlServerRetryPolicy(int cutOffPoint)
    {
        if (cutOffPoint < 1) throw new ArgumentOutOfRangeException("cutOffPoint");
        _CutOffPoint = cutOffPoint;
    }

    public bool PerformRetry(SqlException ex)
    {
        if (ex == null) throw new ArgumentNullException("ex");
        // checks the ErrorCode property on the SqlException
        return SqlServerExceptions.IsThisADeadlock(ex) && ++_Tries < _CutOffPoint;
    }
}

A helper class to make the code read better.

internal static class SqlServerExceptions
{
    public static bool IsThisADeadlock(SqlException realException)
    {
        return realException.ErrorCode == 1205;
    }
}

Don't forget to handle network failures in the IConnectionFactory as well (by delegating perhaps through implementing IConnection).


PS: Session-per-request is a broken pattern if you are not only doing reading. Especially if you are doing reading with the same ISession that you are writing with and you are not ordering the reads such that they are all, always, before the writes.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文