有条件地锁定线程并每次处理一个代码

发布于 2025-02-11 04:23:15 字数 309 浏览 1 评论 0原文

首先,新线程由外部工厂独立于我的代码出生。
我有一些整数密钥变量 - “代码”。我收到的这个变量“代码”是由于大量计算和请求DB的(也许我需要对此“代码”的多​​线程保护)。

我只需要与相同的“代码”锁定线程,如果当前“代码”现在正在处理,则使用其他“代码”线程无法锁定和处理而没有障碍。

现在使用“代码”处理和锁定的线程需要立即完成。

同样,所有这些功能都必须与异步/等待的异步工作。

我正在使用.NET Core 6。

它看起来像是一项非常常见的任务,但是我需要使用什么机制?
INET中是否存在此任务的代码模板?

Firstly, new thread born independently of my code by external factory.
I have something integer key variable - "CODE". This variable "CODE" I received as result of a lot of calculating and request to DB (and maybe I need multi-threading protection of this "CODE").

I need lock thread only for the same "CODE", if currently "CODE" is handling now, thread with other "CODE" can not locking and handling without obstacle.

Thread with "CODE" now handling and locking need to immediately finish.

Also all this function must be working asynchronously with ASYNC/AWAIT.

I am using .NET Core 6.

It looks like a very common task, but what mechanism do I need to use?
Does code template for this task exist in inet?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦旅人picnic 2025-02-18 04:23:15

这是我的解决方案,但我不确定它如何与高上载一起工作。我只尝试使用高并发来测试此代码。如果有人对我有建议。

我意识到解决方案和锁定关键部分的解决方案。要解锁主储物柜并使用辅助储物柜,我创建了新任务。
这是我解决方案的模式。

Public Sub New(ByVal logger As ILogger(...)
....
    WorkingServer = New ConcurrentDictionary(Of Integer, BashJob)()
    WorkingVm = New ConcurrentDictionary(Of Integer, BashJob)()
End Sub

Private WorkingServer As ConcurrentDictionary(Of Integer, BashJob)
Private RequestNextJob As New Object

Public Async Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
    _logger.LogInformation($"Time {Now}")
    Interlocked.Increment(Counter)
    SyncLock RequestNextJob
        ' calculate CODE hidden in NextJob
        ...
        Dim NextJob As BashJob = Res1.Result.Item1(0)
        Dim Val1 As BashJob
        Dim ServerWorking As Boolean = WorkingServer.TryGetValue(NextJob.toServer, Val1)
            If Not ServerWorking Then
               Dim AddSucess1 = WorkingServer.TryAdd(NextJob.toServer, NextJob)
               If AddSucess1 Then
                    Dim ServerThread = New Thread(Sub() ServerJob(NextJob, New ServerClosure))
                    ServerThread.Start()
               Else
                    Exit Function
               End If
            Else
               Exit Function
            End If
    End SyncLock
End Function

Async Sub ServerJob(ByVal Prm As BashJob, ByVal Closure As ServerClosure)
    Try
        Closure.Res = Sql.ExecNonQuery(...)
        ...
        'main processor
        ...
        WorkingServer.TryRemove(Prm.toServer, Prm)
    Catch ex As Exception
        _logger.LogInformation($"ServerSsh ({Counter.ToString}) {Now.ToString} Server:{Prm.toServer} {Prm.i}:[{Prm.Command}] GettingError {ex.Message}")
    Finally
        WorkingServer.TryRemove(Prm.toServer, Prm)
    End Try
End Sub
End Class

Public Class ServerClosure
    Property Res As Integer
    Property Server As ServerBashAsync
    Property Connect As Tuple(Of Renci.SshNet.SshClient, Exception, Exception)
    Property BashRet As Task(Of String)
    Property IsCompleted As Integer
    Property IsCompletedWithErr As Integer
End Class

This is my solution, but I'm not sure how it working with high uploading. I only try to test this code with high concurrency. If anybody has advice for me, please.

I realize solution with ConcurrentDictionary and locking critical section. To unlock main locker and using secondary locker I created new task.
This is schema of my solution.

Public Sub New(ByVal logger As ILogger(...)
....
    WorkingServer = New ConcurrentDictionary(Of Integer, BashJob)()
    WorkingVm = New ConcurrentDictionary(Of Integer, BashJob)()
End Sub

Private WorkingServer As ConcurrentDictionary(Of Integer, BashJob)
Private RequestNextJob As New Object

Public Async Function Execute(context As IJobExecutionContext) As Task Implements IJob.Execute
    _logger.LogInformation(
quot;Time {Now}")
    Interlocked.Increment(Counter)
    SyncLock RequestNextJob
        ' calculate CODE hidden in NextJob
        ...
        Dim NextJob As BashJob = Res1.Result.Item1(0)
        Dim Val1 As BashJob
        Dim ServerWorking As Boolean = WorkingServer.TryGetValue(NextJob.toServer, Val1)
            If Not ServerWorking Then
               Dim AddSucess1 = WorkingServer.TryAdd(NextJob.toServer, NextJob)
               If AddSucess1 Then
                    Dim ServerThread = New Thread(Sub() ServerJob(NextJob, New ServerClosure))
                    ServerThread.Start()
               Else
                    Exit Function
               End If
            Else
               Exit Function
            End If
    End SyncLock
End Function

Async Sub ServerJob(ByVal Prm As BashJob, ByVal Closure As ServerClosure)
    Try
        Closure.Res = Sql.ExecNonQuery(...)
        ...
        'main processor
        ...
        WorkingServer.TryRemove(Prm.toServer, Prm)
    Catch ex As Exception
        _logger.LogInformation(
quot;ServerSsh ({Counter.ToString}) {Now.ToString} Server:{Prm.toServer} {Prm.i}:[{Prm.Command}] GettingError {ex.Message}")
    Finally
        WorkingServer.TryRemove(Prm.toServer, Prm)
    End Try
End Sub
End Class

Public Class ServerClosure
    Property Res As Integer
    Property Server As ServerBashAsync
    Property Connect As Tuple(Of Renci.SshNet.SshClient, Exception, Exception)
    Property BashRet As Task(Of String)
    Property IsCompleted As Integer
    Property IsCompletedWithErr As Integer
End Class
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文