多线程中的基于功能输入的锁定

发布于 2025-02-06 19:09:57 字数 4255 浏览 1 评论 0 原文

我正在处理一件代码,该代码处理队列的消息(使用MassTransit)。许多消息可以并行处理。所有消息在ActiveDirectory中创建或修改对象(在这种情况下)。所有对象都需要根据AD模式定义进行验证。 (尽管它与问题无关,但我想指出,我们的广告模式中有许多具有自定义扩展名的客户)

检索架构信息是一个缓慢的操作。我想做1次,然后缓存。但是有许多并行处理消息。在第一次成功之前,许多消息开始获取架构信息。因此完成了太多的工作。目前,我用一个简单的信号量修复了此操作。请参阅下面的代码。

但这不是一个好的解决方案,因为现在只有1个线程可以一直输入此代码。

我需要一些东西来锁定每个对象的代码1次,并保留其他请求,直到完成第一个检索和缓存为止。

什么样的结构可以让我这样做?

private static SemaphoreSlim _lock = new SemaphoreSlim(1, 1);

public ActiveDirectorySchemaObject? GetSchemaObjectFor(string objectClass)
{

    //todo: create better solution
    _lock.Wait();
    try
    {
        if (_activeDirectorySchemaContainer.HasSchemaObjectFor(
            _scopeContext.CustomerId, objectClass) == false)
        {
            _logger.LogInformation($"Getting and caching schema from AD " +
                $"for {objectClass}");
            _activeDirectorySchemaContainer.SetSchemaObjectFor(
                _scopeContext.CustomerId, objectClass,
                GetSchemaFromActiveDirectory(objectClass));
        }
    }
    finally
    {
        _lock.Release();
    }
    return _activeDirectorySchemaContainer.GetSchemaObjectFor(
        _scopeContext.CustomerId, objectClass);
}

以下是问题的简化。简而言之。我正在寻找适当的构造,以锁定输入的每个变化的代码,以进行并行的构造。

评论提到懒惰。我以前从未使用过的东西。但是阅读文档,我看到它可以防止对象的初始化,直到以后。也许我可以为此进行重构。但是,查看当前的代码,我似乎需要一个懒惰的“ if”或一个懒惰的“函数”,但也许我过于复杂。我发现考虑并行编程常常会伤害我的头。

根据要求,包含setSchemafor和其他功能的模式容器类代码。感谢到目前为止提供的所有信息。

public interface IActiveDirectorySchemaContainer
    {
        //Dictionary<string, Dictionary<string, JObject>> schemaStore {  get; }

        bool HasSchemaObjectFor(string customerId, string objectClass);
        ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass);
        void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schema);
    }



    public class ActiveDirectorySchemaContainer : IActiveDirectorySchemaContainer
    {
        private Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>> _schemaStore = new Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>>();

        public bool HasSchemaObjectFor(string customerId, string objectClass)
        {
            if (!_schemaStore.ContainsKey(customerId))
                return false;

            if (!_schemaStore[customerId].ContainsKey(objectClass))
                return false;

            if (_schemaStore[customerId][objectClass] != null)
                return true;
            else
                return false;
        }

        public ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass)
        {
            return _schemaStore[customerId][objectClass];
        }

        public void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schemaObject)
        {
            if (HasSchemaObjectFor(customerId, objectClass))
            {
                _schemaStore[customerId][objectClass] = schemaObject;
            }
            else
            {
                if (!_schemaStore.ContainsKey(customerId))
                {
                    _schemaStore.Add(customerId, new Dictionary<string, ActiveDirectorySchemaObject>());
                }

                if (!_schemaStore[customerId].ContainsKey(objectClass))
                {
                    _schemaStore[customerId].Add(objectClass, schemaObject);
                }
                else
                {
                    _schemaStore[customerId][objectClass] = schemaObject;
                }
            }
        }
    }

客户ID将为多个客户分开架构信息,并且该容器由依赖注入作为单身人士提供。每条消息都可以具有不同的客户ID,并同时处理。但是我只想一次检索模式数据。这种体系结构可能不是理想的选择,但是我目前不允许更改它。

 public static IServiceCollection AddActiveDirectorySchemaService(
             this IServiceCollection services)
        {
            services.AddScoped<IActiveDirectorySchemaService, ActiveDirectorySchemaService>();
            services.AddSingleton<IActiveDirectorySchemaContainer, ActiveDirectorySchemaContainer>();
            return services;
        }

I'm working with a piece of code that processes messages of a queue (using masstransit). Many messages can be processed in parallel. All messages create or modify an object in ActiveDirectory (in this case). All objects need to be validated against the AD schema definitions. (Though its not relevant to the problem, I want to note that we have many customers with custom extension in their AD Schema)

Retrieving the schema information is a slow operation. I want to do it 1 time and then cache it. But with many parallel processing messages. Many messages start getting the schema information before the first succeeds. So too much work is done. For the moment I fixed this with a simple semaphore. See code below.

But that is not a good solution as now only 1 thread can enter this code all the time.

I need something to lock the code 1 time per object and hold off other request until the first retrieval and caching is complete.

What kind of construct will allow me to do that?

private static SemaphoreSlim _lock = new SemaphoreSlim(1, 1);

public ActiveDirectorySchemaObject? GetSchemaObjectFor(string objectClass)
{

    //todo: create better solution
    _lock.Wait();
    try
    {
        if (_activeDirectorySchemaContainer.HasSchemaObjectFor(
            _scopeContext.CustomerId, objectClass) == false)
        {
            _logger.LogInformation(
quot;Getting and caching schema from AD " +
                
quot;for {objectClass}");
            _activeDirectorySchemaContainer.SetSchemaObjectFor(
                _scopeContext.CustomerId, objectClass,
                GetSchemaFromActiveDirectory(objectClass));
        }
    }
    finally
    {
        _lock.Release();
    }
    return _activeDirectorySchemaContainer.GetSchemaObjectFor(
        _scopeContext.CustomerId, objectClass);
}

The following is a possible simplification of the question. In short. I am looking for the proper construct to lock a piece of code for parallel acces for every variation of a input.

A comment mentioned Lazy. Something I have not used before. But reading the docs I see it defers initialization of an object until later. Maybe I could refactor for that. But looking at the code as it currently is, I seem to need an lazy "if" or an lazy "function", but maybe I am over complicating. I find thinking about parallel programming often hurts my head.

As requested the schema container class code containing setschemafor and the other functions. Thanks so far for all information provided.

public interface IActiveDirectorySchemaContainer
    {
        //Dictionary<string, Dictionary<string, JObject>> schemaStore {  get; }

        bool HasSchemaObjectFor(string customerId, string objectClass);
        ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass);
        void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schema);
    }



    public class ActiveDirectorySchemaContainer : IActiveDirectorySchemaContainer
    {
        private Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>> _schemaStore = new Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>>();

        public bool HasSchemaObjectFor(string customerId, string objectClass)
        {
            if (!_schemaStore.ContainsKey(customerId))
                return false;

            if (!_schemaStore[customerId].ContainsKey(objectClass))
                return false;

            if (_schemaStore[customerId][objectClass] != null)
                return true;
            else
                return false;
        }

        public ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass)
        {
            return _schemaStore[customerId][objectClass];
        }

        public void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schemaObject)
        {
            if (HasSchemaObjectFor(customerId, objectClass))
            {
                _schemaStore[customerId][objectClass] = schemaObject;
            }
            else
            {
                if (!_schemaStore.ContainsKey(customerId))
                {
                    _schemaStore.Add(customerId, new Dictionary<string, ActiveDirectorySchemaObject>());
                }

                if (!_schemaStore[customerId].ContainsKey(objectClass))
                {
                    _schemaStore[customerId].Add(objectClass, schemaObject);
                }
                else
                {
                    _schemaStore[customerId][objectClass] = schemaObject;
                }
            }
        }
    }

The customerId is to separate schema information for multiple customers And the container is provided by dependency injection as a singleton. Every message can have a different customerId and be processed concurrently. Yet I want to retrieve schema data only a single time. This architecture might not be ideal, but I am not allowed to change that at this time.

 public static IServiceCollection AddActiveDirectorySchemaService(
             this IServiceCollection services)
        {
            services.AddScoped<IActiveDirectorySchemaService, ActiveDirectorySchemaService>();
            services.AddSingleton<IActiveDirectorySchemaContainer, ActiveDirectorySchemaContainer>();
            return services;
        }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

允世 2025-02-13 19:09:57

这是您可以使用 具有 懒惰&lt; t&gt; 对象作为值,以确保每键的架构仅初始化一次:

private readonly ConcurrentDictionary<(string CustomerId, string ObjectClass),
    Lazy<Schema>> _cachedSchemas = new();

public Schema GetSchemaObjectFor(string objectClass)
{
    var combinedKey = (_scopeContext.CustomerId, objectClass);
    Lazy<Schema> lazySchema = _cachedSchemas.GetOrAdd(combinedKey, key =>
    {
        return new Lazy<Schema>(() =>
        {
            _logger.LogInformation($"Getting schema for {key}");
            return GetSchemaFromActiveDirectory(key.ObjectClass);
        });
    });
    return lazySchema.Value;
}

contrentDictionary&lt; tkey的键,tvealue&gt; paluet> paliteTuple&caliteTuple&gt; string&gt&gt&gt&gt&gt&gt ; 。第一个字符串是客户ID,第二个字符串是对象类。为这两个字符串的每个唯一组合创建了一个新的架构。

不幸的是,以上建议遭受懒惰&lt; t&gt; 类的主要缺陷:其有关错误处理的行为27421“ rel =” nofollow noreferrer“ title =”引入第四类lazythreadsafetymode:threadsafevalueonly”>不配置。因此,如果 valuefactory 失败,则所有后续请求 value 都将接收缓存错误。这种行为是缓存系统的表演者。幸运的是,有替代性懒惰&lt; t&gt; 可用的实现可用于缓存目的的正确行为,即如果失败,则可以重试 valuefactory 。您可以找到毫无例外的caching”>在这里紧凑的实现,包括我昨天发布了我自己的

Here is how you could use a ConcurrentDictionary<TKey,TValue> that has Lazy<T> objects as values, in order to ensure that the schema of each key will be initialized only once:

private readonly ConcurrentDictionary<(string CustomerId, string ObjectClass),
    Lazy<Schema>> _cachedSchemas = new();

public Schema GetSchemaObjectFor(string objectClass)
{
    var combinedKey = (_scopeContext.CustomerId, objectClass);
    Lazy<Schema> lazySchema = _cachedSchemas.GetOrAdd(combinedKey, key =>
    {
        return new Lazy<Schema>(() =>
        {
            _logger.LogInformation(
quot;Getting schema for {key}");
            return GetSchemaFromActiveDirectory(key.ObjectClass);
        });
    });
    return lazySchema.Value;
}

The key of the ConcurrentDictionary<TKey,TValue> is a ValueTuple<string, string>. The first string is the customer ID, and the second is the object class. A new schema is created for each unique combination of these two strings.

Unfortunately the above suggestion suffers from a major flaw of the Lazy<T> class: its behavior regarding the handling of errors is not configurable. So if the valueFactory fails, all subsequent requests for the Value will receive the cached error. This behavior is a show-stopper for a caching system. Fortunately there are alternative Lazy<T> implementations available that exhibit the correct behavior for caching purposes, which is to retry the valueFactory if it fails. You can find here at least three robust and compact implementations, including one of my own that I posted yesterday.

轮廓§ 2025-02-13 19:09:57

一种相对简单的方法是使用 contrentDictionary 保留加载对象的缓存。字典根据其键的标签将项目分为存储桶,然后为 contrentDictionary ,每个存储桶都有自己的锁定。使用这样的词典将比您当前的方法具有效率提高。

为了避免敲击广告控制器/数据库/其他,我仍将使用信号量来确保只有一个线程可以一次请求架构。但是,这仅在字典尚未输入时发生。

请注意,第一个选项或多或少是 Theodor的答案,因此,如果这对您有用,它可能是最好的,它可能是最好的改用该答案。而且我的第二个选择可能通过合并Theodor的答案来优化。

public class CachedSchemaContainer
{
    private readonly SchemaRetriever _schemaRetriever;
    private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
    private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

    public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
    {
        _schemaRetriever = schemaRetriever;
    }

    public Schema GetSchemaObjectFor(string objectClass)
    {
        Schema schema;
        // try and retrieve the value
        if (_schemaCache.TryGetValue(objectClass, out schema))
        {
            return schema;
        }

        // OK, we need to wait our turn and try to load it from the AD controller
        _semaphoreSlim.Wait();
        try
        {
            // There's no point requerying it the last holder of the lock retrieved it, so check again
            if (_schemaCache.TryGetValue(objectClass, out schema))
            {
                return schema;
            }
                
            // Go and get the schema, add it to the dictionary, and then return it
            schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
            _schemaCache.TryAdd(objectClass, schema);
            return schema;
        }
        finally
        {
            // release the semaphore
            _semaphoreSlim.Release();
        }
    }
}

另一个可能的优化可能是缓存架构对象的引用。这意味着在给定线程以前访问此特定模式的情况下,不需要锁定。我们仍然有线程安全 contrentDictionary 来缓存线程之间的值,但最终,一旦缓存加热/填充了缓存/填充的缓存:

public class CachedSchemaContainer : IDisposable
{
    private readonly ISchemaRetriever _schemaRetriever;
    private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
    private readonly ThreadLocal<Dictionary<string, Schema>> _threadSchemaCache = new ThreadLocal<Dictionary<string, Schema>>(() => new Dictionary<string, Schema>());
    private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

    public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
    {
        _schemaRetriever = schemaRetriever;
    }

    public Schema GetSchemaObjectFor(string objectClass)
    {
        Schema schema;

        // try and retrieve the value from the thread's cache
        if (_threadSchemaCache.Value.TryGetValue(objectClass, out schema))
        {
            return schema;
        }

        // try and retrieve the value
        if (_schemaCache.TryGetValue(objectClass, out schema))
        {
            // it was already cached in the shared dictionary, so let's add it to the thread's
            _threadSchemaCache.Value[objectClass] = schema;
            return schema;
        }

        // OK, we need to wait our turn and try to load it from the AD controller
        _semaphoreSlim.Wait();
        try
        {
            // There's no point requerying it the last holder of the lock retrieved it, so check again
            if (_schemaCache.TryGetValue(objectClass, out schema))
            {
                // it was already cached in the shared dictionary, so let's add it to the thread's
                _threadSchemaCache.Value[objectClass] = schema;
                return schema;
            }
                
            // Go and get the schema, add it to the shared and thread local dictionaries, and then return it
            schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
            _schemaCache.TryAdd(objectClass, schema);
            _threadSchemaCache.Value[objectClass] = schema;
            return schema;
        }
        finally
        {
            // release the semaphore
            _semaphoreSlim.Release();
        }
    }

    public void Dispose()
    {
        _threadSchemaCache.Dispose();
    }
}

这些示例中使用的常见类型定义:

public interface ISchemaRetriever
{
    Schema GetSchemaObjectFor(int customerId, string objectClass);
}

public class Schema
{
}

文档链接 : :

注意:架构这是一个参考类型(类),因此词典将指针存储为通用架构每个加载 object> ObjectClass 。因此,如果一个线程更改 schema 对象,那么它可能会破坏另一个等等。除非 schema 对象本身也安全。如果您仅读取值而不突变 schema 对象,那么您不必担心那里。

另外,正如Theodor指出的那样,除非您打算将此方法 async 在将来制作,否则您可能会消除使用 Smaphoreslim ,而只需使用简单<代码>锁定(lockingObject){} 而不是。

A relatively simple approach would be to use a ConcurrentDictionary to keep a cache of loaded objects. Dictionaries divide items into buckets based on the hashcode of their keys, and then for ConcurrentDictionary, each bucket has its own lock. Using a dictionary like this will provide an efficiency boost over your current approach.

So as to avoid hammering the AD controller/database/whatever, I'm still going to use a semaphore to ensure that only one thread can request a schema at once. This only takes place when the dictionary doesn't already have the entry, however.

Note that this first option is more or less a complicated version of Theodor's answer, so if this works for you, it's probably best to go with that answer instead. And my second option could probably be optimised by incorporating Theodor's answer.

public class CachedSchemaContainer
{
    private readonly SchemaRetriever _schemaRetriever;
    private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
    private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

    public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
    {
        _schemaRetriever = schemaRetriever;
    }

    public Schema GetSchemaObjectFor(string objectClass)
    {
        Schema schema;
        // try and retrieve the value
        if (_schemaCache.TryGetValue(objectClass, out schema))
        {
            return schema;
        }

        // OK, we need to wait our turn and try to load it from the AD controller
        _semaphoreSlim.Wait();
        try
        {
            // There's no point requerying it the last holder of the lock retrieved it, so check again
            if (_schemaCache.TryGetValue(objectClass, out schema))
            {
                return schema;
            }
                
            // Go and get the schema, add it to the dictionary, and then return it
            schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
            _schemaCache.TryAdd(objectClass, schema);
            return schema;
        }
        finally
        {
            // release the semaphore
            _semaphoreSlim.Release();
        }
    }
}

Another possible optimisation might be to cache a reference to the Schema object per-thread. This would mean that no locking would be required in the case that a given thread has accessed this specific schema before. We still have the thread-safe ConcurrentDictionary to cache the values between threads, but ultimately this will avoid a lot of locking once the caches are warmed up/populated:

public class CachedSchemaContainer : IDisposable
{
    private readonly ISchemaRetriever _schemaRetriever;
    private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
    private readonly ThreadLocal<Dictionary<string, Schema>> _threadSchemaCache = new ThreadLocal<Dictionary<string, Schema>>(() => new Dictionary<string, Schema>());
    private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

    public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
    {
        _schemaRetriever = schemaRetriever;
    }

    public Schema GetSchemaObjectFor(string objectClass)
    {
        Schema schema;

        // try and retrieve the value from the thread's cache
        if (_threadSchemaCache.Value.TryGetValue(objectClass, out schema))
        {
            return schema;
        }

        // try and retrieve the value
        if (_schemaCache.TryGetValue(objectClass, out schema))
        {
            // it was already cached in the shared dictionary, so let's add it to the thread's
            _threadSchemaCache.Value[objectClass] = schema;
            return schema;
        }

        // OK, we need to wait our turn and try to load it from the AD controller
        _semaphoreSlim.Wait();
        try
        {
            // There's no point requerying it the last holder of the lock retrieved it, so check again
            if (_schemaCache.TryGetValue(objectClass, out schema))
            {
                // it was already cached in the shared dictionary, so let's add it to the thread's
                _threadSchemaCache.Value[objectClass] = schema;
                return schema;
            }
                
            // Go and get the schema, add it to the shared and thread local dictionaries, and then return it
            schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
            _schemaCache.TryAdd(objectClass, schema);
            _threadSchemaCache.Value[objectClass] = schema;
            return schema;
        }
        finally
        {
            // release the semaphore
            _semaphoreSlim.Release();
        }
    }

    public void Dispose()
    {
        _threadSchemaCache.Dispose();
    }
}

Common type definitions used in these examples:

public interface ISchemaRetriever
{
    Schema GetSchemaObjectFor(int customerId, string objectClass);
}

public class Schema
{
}

Documentation links:

Note: Schema here is a reference type (a class), so the dictionaries store a pointer to a common Schema object per loaded objectClass. As such, if one thread makes a change to the Schema object, then that could break another one, etc. unless the Schema object itself is also thread safe. If you're only reading values and not mutating the Schema objects, then you should have nothing to worry about there.

Also, as Theodor points out, unless you're planning to make this method async in the future, you could potentially do away with using a SemaphoreSlim and just use a simple lock (lockingObject) { } instead. Docs

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文