多线程 linq2sql 应用程序 TransactionScope 困难

发布于 2024-11-02 21:09:36 字数 3864 浏览 1 评论 0原文

我创建了一个文件处理服务,它从特定目录读取并导入 xml 文件。

该服务启动多个工作程序,这些工作程序将轮询文件队列以查找新文件,并使用 linq2sql 进行数据访问。每个工作线程都有自己的数据上下文。

正在处理的文件包含多个订单,每个订单包含多个地址(客户/承包商/分包商)

我已经围绕每个文件的处理定义了一个事务范围。这样我想确保整个文件得到正确处理,或者在发生异常时回滚整个文件:

        try
        {
            using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                foreach (var order in orders)
                {
                    HandleType1Order(order);
                }
                tx.Complete();
            }
        }
        catch (SqlException ex)
        {
            if (ex.Number == SqlErrorNumbers.Deadlock)
            {
                throw new FileHandlerException("File Caused a Deadlock, retrying later", ex, true);
            }
            else
                throw;
        }

该服务的要求之一是创建或更新在 xml 文件中找到的地址。所以我创建了一个地址服务来负责地址管理。针对 xml 导入文件中的每个订单(在方法 HandleType1Order() 内)执行以下代码段(因此是整个文件的 TransactionScope 的一部分)。

 using (var tx = new TransactionScope())
            {

                address = GetAddressByReference(number);
                if (address != null) //address is already known
                {
                    Log.Debug("Found address {0} - {1}. Updating...", address.Code, address.Name);
                    UpdateAddress(address, name, number, isContractor, isSubContractor, isCustomer);
                }
                else
                {
                    //address not known, so create it
                    Log.Debug("Address {0} not known, creating address", number);
                    address = CreateAddress(name, number, sourceSystemId, isContractor, isSubContractor,
                                            isCustomer);
                    _addressRepository.Save(address);

                }

                _addressRepository.Flush();
                tx.Complete();
            }

我在这里想做的是创建或更新一个地址,并且该地址的编号是唯一的。

方法 GetAddressByReference(string number) 返回已知地址,如果未找到地址,则返回 null。

 public virtual Address GetAddressByReference(string reference)
 {
     return _addressRepository.GetAll().SingleOrDefault(a=>a.Code==reference);
 }

然而,当我运行该服务时,它会创建多个具有相同号码的地址。方法 GetAddressByReference() get 被并发调用,当第二个线程使用相同的地址号执行该方法时,它应该返回一个已知地址,但它返回 null。我的事务边界或隔离级别可能有问题,但我似乎无法让它工作。

有人能指出我正确的方向吗?非常感谢帮助!

ps 我对事务死锁并导致回滚没有问题,当发生死锁时,文件将被重试。


编辑 1 线程代码:

        public void Work()
    {
        _isRunning = true;
        while (true)
        {
            ImportFileTask task = _queue.Dequeue(); //dequeue blocks on empty queue               
            if (task == null)
                break; //Shutdown worker when a null task is read from the queue

            IFileImporter importer = null;
            try
            {
                using (new LockFile(task.FilePath).Acquire()) //create a filelock to sync access accross all processes to the file
                {
                    importer = _kernel.Resolve<IFileImporter>();
                    Log.DebugFormat("Processing file {0}", task.FilePath);
                    importer.Import(task.FilePath);
                    Log.DebugFormat("Done Processing file {0}", task.FilePath);
                }
            }
            catch(Exception ex)
            {
                Log.Fatal(
                    "A Fatal exception occured while handling {0} --> {1}".FormatWith(task.FilePath, ex.Message), ex);
            }
            finally
            {
                if (importer != null)
                    _kernel.ReleaseComponent(importer);
            }

        }

        _isRunning = false;
    }

上述方法在我们所有的工作线程中运行。它使用 Castle Windsor 来解析 FileImporter,该 FileImporter 具有短暂的生活方式(因此不跨线程共享)。

I've created a file processing service which reads and imports xml files from a specific directory.

The service starts several workers which will poll a filequeue for new files and uses linq2sql for dataaccess. Each workerthread has its own datacontext.

The files being processed contain several orders and each order contains several addresses (Customer/Contractor/Subcontractor)

I've defined a transactionscope around the handling of each file. This way I want to ensure that the whole file is handled correctly, or that the whole file is rolled back when an exception occurs:

        try
        {
            using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                foreach (var order in orders)
                {
                    HandleType1Order(order);
                }
                tx.Complete();
            }
        }
        catch (SqlException ex)
        {
            if (ex.Number == SqlErrorNumbers.Deadlock)
            {
                throw new FileHandlerException("File Caused a Deadlock, retrying later", ex, true);
            }
            else
                throw;
        }

One of the requirements for the service is that is creates or updates found addresses in the xml files. So I've created an address service which is responsible for address management. The following piece of code gets executed for each order (within the method HandleType1Order()) in the xml importfile (And thus is part of the TransactionScope for the entire file).

 using (var tx = new TransactionScope())
            {

                address = GetAddressByReference(number);
                if (address != null) //address is already known
                {
                    Log.Debug("Found address {0} - {1}. Updating...", address.Code, address.Name);
                    UpdateAddress(address, name, number, isContractor, isSubContractor, isCustomer);
                }
                else
                {
                    //address not known, so create it
                    Log.Debug("Address {0} not known, creating address", number);
                    address = CreateAddress(name, number, sourceSystemId, isContractor, isSubContractor,
                                            isCustomer);
                    _addressRepository.Save(address);

                }

                _addressRepository.Flush();
                tx.Complete();
            }

What I'm trying to do here, is to create or update an address, with the number being unique.

The method GetAddressByReference(string number) returns a known address or null when an address is not found.

 public virtual Address GetAddressByReference(string reference)
 {
     return _addressRepository.GetAll().SingleOrDefault(a=>a.Code==reference);
 }

When I run the service it however creates multiple addresses with the same number. The method GetAddressByReference() get's called concurrently and should return a known address when a second thread executes the method with the same addressnumber, however it returns null. There is propably something wrong with my transaction boundaries, or isolationlevel, but I can't seem to get it to work.

Can someone point me in the right direction? Help is much appreciated!!

p.s. I've no problem with the transactions being deadlocked and causing a rollback, the file will just be retried when a deadlock occurs.


Edit 1 Threading code:

        public void Work()
    {
        _isRunning = true;
        while (true)
        {
            ImportFileTask task = _queue.Dequeue(); //dequeue blocks on empty queue               
            if (task == null)
                break; //Shutdown worker when a null task is read from the queue

            IFileImporter importer = null;
            try
            {
                using (new LockFile(task.FilePath).Acquire()) //create a filelock to sync access accross all processes to the file
                {
                    importer = _kernel.Resolve<IFileImporter>();
                    Log.DebugFormat("Processing file {0}", task.FilePath);
                    importer.Import(task.FilePath);
                    Log.DebugFormat("Done Processing file {0}", task.FilePath);
                }
            }
            catch(Exception ex)
            {
                Log.Fatal(
                    "A Fatal exception occured while handling {0} --> {1}".FormatWith(task.FilePath, ex.Message), ex);
            }
            finally
            {
                if (importer != null)
                    _kernel.ReleaseComponent(importer);
            }

        }

        _isRunning = false;
    }

The above method runs in all of our worker threads. It uses Castle Windsor to resolve the FileImporter, which has a transient lifestyle (thus not shared accross threads).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

灼疼热情 2024-11-09 21:09:37

您没有发布线程代码,因此很难说出问题是什么。我假设您已经启动了 DTC(分布式事务协调器)?

你使用线程池吗?您使用“lock”关键字吗?

http://msdn.microsoft.com/en-us/library/c5kehkcz.aspx

You didn't post your threading code, so its difficult to say what the issue is. I'm assuming you have started DTC (Distributed Transaction Coordinator)?

Are you using a ThreadPool? Are you using the "lock" keyword?

http://msdn.microsoft.com/en-us/library/c5kehkcz.aspx

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文