在线程环境中,分布式事务如何处理与同一数据库的多个连接?

发布于 2024-08-20 17:29:43 字数 3268 浏览 9 评论 0原文

我试图确定分布式事务中多个数据库连接的行为。

我有一个长时间运行的进程,它产生一系列线程,然后每个线程负责管理其数据库连接等。所有这些都在事务范围内运行,并且每个线程都通过 DependentTransaction 对象登记在事务中。

当我并行处理这个过程时,我遇到了一些问题,即似乎存在某种阻止查询在事务上同时执行的块。

我想知道的是事务协调器如何处理从多个连接到同一数据库的查询,以及是否建议跨线程传递连接对象?

我读到 MS SQL 只允许每个事务有一个连接,但我显然能够在同一事务中创建和初始化到同一数据库的多个连接。打开连接时,如果没有出现“另一个会话正在使用的事务上下文”异常,我根本无法并行执行线程。结果是连接必须等待执行而不是同时运行,最终代码运行完成,但由于此锁定问题,线程化应用程序没有任何净收益。

代码看起来像这样。

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

编辑

我的测试最终表明这是不可能完成的。即使有多个连接或使用相同的连接,事务中的所有请求或问题也会按顺序处理。

也许他们将来会改变这种行为。

I’m trying to determine the behaviour of multiple database connection in a distributed transaction.

I’ve got a long running process which spawns a series of threads and each thread is then responsible for managing its’ DB connections and such. All of this runs inside of the transaction scope and each thread is enlisted in the transaction via a DependentTransaction object.

When I went to put this process in parallel I ran into a few issues, namely that there appears to be some sort of block preventing the queries from executing at the same time on the transaction.

What I would like to know is how the transaction co-ordinator handles queries from multiple connections to the same DB and if it’s even advisable to pass a connection object across threads?

I’ve read that MS SQL only allows one connection per transaction but I am clearly able to create and initialize more than one connection to the same DB in the same transaction. I’m simply not able to execute the threads in parallel without getting a “Transaction context in use by another session” exception when opening the connections. The result is that the connections have to wait to execute instead of running at the same time and in the end the code runs to completion but there is no net gain to threading the app because of this locking issue.

The code looks something like this.

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

EDIT

My tests have conclusively showed that this just can't be done. Even if there is more than one connection or the same connection is used all request s in the transaction or the questions are processed sequentially.

Perhaps they will change this behaviour in the future.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不即不离 2024-08-27 17:29:43

首先,您必须将您在这里或那里读到的有关 SQL Server 事务的内容分为两种不同的情况:本地和分布式。

本地 SQL 事务

  • SQL Server 只允许在每个本地事务上执行一个请求。
  • 默认情况下,只有一个会话可以注册本地事务。使用 sp_getbindtoken 和 sp_bindsession 可以在本地事务中注册多个会话。会话仍然仅限于在任何时间只有一个执行请求。
  • 通过多个活动结果集 (MARS),一个会话可以执行多个请求。所有请求都必须注册到同一个本地事务中。

分布式事务

  • 多个会话可以将其本地事务注册到单个分布式事务中。
  • 每个会话仍然注册在本地事务中,并受到上述本地事务的所有限制 注册
  • 在分布式事务中的本地事务受分布式事务协调的两阶段提交的约束 注册在分布式事务中的
  • 实例上的所有本地事务仍然是注册在本地事务中的。 独立本地事务,主要意味着它们具有冲突的锁命名空间。

因此,当客户端创建 .Net TransactionScope 并在此事务范围下在同一服务器上执行多个请求时,这些请求都是注册在分布式事务中的本地事务。一个简单的示例:

class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

创建一个虚拟测试表:

create table test (id int not null identity(1,1) primary key, a varchar(100));

并运行我的示例中的代码。您将看到两个请求并行执行,每个请求在表中插入 100k 行,然后在事务范围完成时提交两个请求。因此,您看到的问题与 SQL Server 或 TransactionScope 无关,它们可以轻松处理您描述的场景。此外,代码非常简单、直接,不需要创建依赖事务、进行克隆或提升事务。

更新

使用显式线程和相关事务:

 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }

First, you have to separte what you read here and there about SQL Server transactions into 2 distinct cases: local and distributed.

Local SQL transactions:

  • SQL Server allows only one request to execute on each local transaction.
  • By default only one session can enroll in a local transaction. Using sp_getbindtoken and sp_bindsession multiple sessions can be enrolled in a local transaction. The sessions are still restricted to only one executing a request at any time.
  • With Multiple Active Result Sets (MARS) one sessions can execute multiple requests. All requests have to be enrolled in the same local transaction.

Distributed Transactions:

  • Multiple sessions can have their local transaction enrolled in a single distributed transaction.
  • Each session is still enroled in a local transaction, subject to all restrictions mentioned above for local transactions
  • Local transactions enroled in a distributed transaction are subject to two phase commit coordinated by the distributed transaction
  • All local transactions on an instance enrolled in a distributed transaction are still independent local transactions, primarily meaning they have conflicting lock namespaces.

So when a client creates a .Net TransactionScope and under this transaction scope it executes multiple requests on the same server, these requests are all local transactions enrolled in a distributed transaction. A simple example:

class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

Create a dummy test table:

create table test (id int not null identity(1,1) primary key, a varchar(100));

and run the code in my sample. You will see that both requests are executing in parallel, each one isnerting 100k rows in the table, then both commit when the transaction scope is complete. So the problems you're seeing are no related to SQL Server nor to TransactionScope, they can easily handle the scenario you describe. More, the code is very simple and straight forward and there isn't any need for dependent transactions to be created, cloning to occur nor transactions to be promotted.

Updated

Using explicit threads and dependent transactions:

 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文