Azure 队列辅助角色多线程示例

发布于 2024-12-01 12:29:41 字数 243 浏览 2 评论 0原文

我们有 4 个 Azure 队列,通过直接 REST API 或我们提供的 WCF 服务填充。

  1. 我们希望有一个辅助角色来监控所有这 4 个队列。
  2. 我正在考虑使用多线程从配置中读取队列名称等并旋转处理方法(从队列中读取消息并进行处理)

有人可以为我提供一个关于如何在请问是工人角色吗?

不太确定是否可以在没有多线程的情况下实现上述目标,因为我对多线程还很陌生。

谢谢

We have 4 Azure Queues which gets populated either by direct REST API or a WCF Service that we provide.

  1. We would like to have ONE worker role to monitor all these 4 queues.
  2. I'm thinking of using multi thread that reads the queue name etc. from config and spins the process method (which reads the message from queue and does the processing)

Could someone please provide me an example or guidance on how to achieve this in a Worker role please?

Not too sure if above can be achieved without multi threading as I'm quiet new to multi threading.

Thank you

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

冷月断魂刀 2024-12-08 12:29:41

您可以为不同的任务触发不同的线程,但也可以考虑非线程方法(根据您对消息的处理方式,其性能可能会更好或更差):

while (true)
{
    var msg = queue1.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue1.DeleteMessage(msg);
    }
    msg = queue2.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue2.DeleteMessage(msg);
    }
    // ...
    if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do
}

You can fire off different threads for the different tasks, but also consider the non-threaded approach (which may perform better or worse depending on what you do with the messages):

while (true)
{
    var msg = queue1.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue1.DeleteMessage(msg);
    }
    msg = queue2.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue2.DeleteMessage(msg);
    }
    // ...
    if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do
}
岁月静好 2024-12-08 12:29:41

这是我们当前的实现,可以完全按照您的要求进行操作,但以更好的方式(或者我们认为是这样)。也就是说,这段代码仍然需要一些大量清理。不过,这是其功能版本 0.1。

public class WorkerRole : RoleEntryPoint
{
    public override void Run()
    {
        var logic = new WorkerAgent();
        logic.Go(false);
    }

    public override bool OnStart()
    {
        // Initialize our Cloud Storage Configuration.
        AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);

        return base.OnStart();
    }
}

public class WorkerAgent
{
    private const int _resistance_to_scaling_larger_queues = 9;
    private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
                                                       {
                                                           {typeof (Queue1.Processor), 1},
                                                           {typeof (Queue2.Processor), 1},
                                                           {typeof (Queue3.Processor), 1},
                                                           {typeof (Queue4.Processor), 1},
                                                       };

    private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
    private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
    protected TimeSpan CurrentDelay { get; set; }

    public Func<string> GetSpecificQueueTypeToProcess { get; set; }

    /// <summary>
    /// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
    /// </summary>
    public Dictionary<Type, int> QueueWeights
    {
        get
        {
            return _queueWeights;
        }
        set
        {
            _queueWeights = value;
        }
    }

    public static TimeSpan QueueWeightCalibrationDelay
    {
        get { return TimeSpan.FromMinutes(15); }
    }


    protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();


    protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }

    public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
    {
        CurrentDelay = _minDelay;
        GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
    }

    protected IProcessQueues CurrentProcessor { get; set; }

    /// <summary>
    /// Processes queue request(s).
    /// </summary>
    /// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
    public void Go(bool onlyProcessOnce)
    {
        if (onlyProcessOnce)
        {
            ProcessOnce(false);
        }
        else
        {
            ProcessContinuously();
        }
    }

    public void ProcessContinuously()
    {
        while (true)
        {
            // temporary hack to get this started.
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// Attempts to fetch and process a single queued request.
    /// </summary>
    public void ProcessOnce(bool shouldDelay)
    {
        PopulateQueueMetaData(QueueWeightCalibrationDelay);

        if (shouldDelay)
        {
            Thread.Sleep(CurrentDelay);
        }

        var typesToPickFrom = new List<Type>();
        foreach(var item in QueueWeights)
        {
            for (var i = 0; i < item.Value; i++)
            {
                typesToPickFrom.Add(item.Key);
            }
        }

        var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
        var typeToTryAndProcess = typesToPickFrom[randomIndex];

        CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
        CleanQueueDelays();

        if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
        {
            var errors = CurrentProcessor.Go();

            var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
                               ? _maxDelay // the queue was empty
                               : _minDelay; // else

            QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
        }
        else
        {
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// This method populates/refreshes the QueueMetaData collection.
    /// </summary>
    /// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
    private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
    {
        if (QueueMetaData == null)
        {
            QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
        }

        var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
        var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
        var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
        var results = new Dictionary<Type, AzureQueueMetaData>();

        foreach (var queueProcessorType in queuesWithoutMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);

                    QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
                                                  ? 1
                                                  : (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
                }
            }
        }

        foreach (var queueProcessorType in expiredQueueMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);
                }
            }
        }

        QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
    }

    private void CleanQueueDelays()
    {
        QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
    }
}

这样,我们就有了一个单独的类,它知道如何处理每个队列,并且它实现了 IProcessQueues。我们加载 _queueWeights 集合以及我们希望它处理的每种类型。我们设置_resistance_to_scaling_larger_queues常量来控制我们希望如何扩展。请注意,这是按对数方式缩放的(请参阅 PopulateQueueMetaData 方法)。没有队列的权重小于 1,即使它有 0 个项目。如果您将 PopulateQueueMetaData 设置为 10,则数量级每增加 10,该类型的“权重”就会增加 1。例如,如果您有 QueueA有 0 个项目,QueueB 有 0 个项目,QueueC 有 10 个项目,那么你们各自的权重是 1、1 和 2。这意味着 QueueC 接下来有 ​​50% 的机会被处理,而QueueA 和 QueueB 各只有 25% 的机会被处理。如果 QueueC 有 100 个项目,那么您的权重为 1、1、3,您被处理的机会为 20%、20%、60%。这可以确保您的空队列不会被遗忘。

它所做的另一件事是它有 _minDelay_maxDelay。如果此代码认为队列中至少有 1 个项目,那么它将继续以 _minDelay 速率处理该项目。但是,如果它最后有 0 个项目,那么它将不允许其处理速度快于 _maxDelay 速率。因此,这意味着如果随机数生成器拉出包含 0 个项目的队列(无论权重如何),它将简单地跳过尝试处理它并继续进行下一次迭代。 (可以在这部分进行一些额外的优化,以获得更好的存储事务效率,但这是一个巧妙的小补充。)

我们在这里有几个自定义类(例如 AzureQueueAzureQueueMetaData) - 一个本质上是 CloudQueue 的包装器,另一个存储一些信息,例如队列的大致计数 - 其中没有什么有趣的(只是简化代码的一种方法)。

再说一遍,我并不称此代码为“漂亮”代码,但在此代码中实现了一些相当聪明的概念并发挥了作用。无论您出于何种原因使用它。 :)

最后,像这样编写代码使我们能够拥有一个可以处理更多队列的项目。如果我们发现这根本跟不上,我们可以轻松地将其扩展到更多的实例,并扩展到所有队列。在最简单的场景中,您可以部署一个实例来监控 3 个队列。但是,如果第 4 个队列开始影响性能(或者您需要更高的可用性),则将其增加到 2 个实例。一旦达到 15 个队列,就添加第三个。 25 个队列,添加第 4 个实例。获得一个新客户并需要在整个系统中处理许多队列请求,这很好。将这个角色旋转最多 20 个瞬间,直到完成,然后再旋转回来。有一个特别讨厌的队列吗?将该队列从 _queueWeights 集合中注释掉,部署以管理其余队列,然后与所有其他队列一起重新部署(除了从 _queueWeights 集合中注释掉的这个队列),以及然后将其再次部署到一组不同的实例并进行调试,而不会 a) 让其他 QueueProcessor 干扰您的调试,b) 您的调试干扰您的其他 QueueProcessor。最终,这提供了很大的灵活性和效率。

Here is our current implementation to do exactly what you're requesting but in a better way (or so we think). That said, this code needs some heavy cleaning-up still. This is functional version 0.1 of this, though.

public class WorkerRole : RoleEntryPoint
{
    public override void Run()
    {
        var logic = new WorkerAgent();
        logic.Go(false);
    }

    public override bool OnStart()
    {
        // Initialize our Cloud Storage Configuration.
        AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);

        return base.OnStart();
    }
}

public class WorkerAgent
{
    private const int _resistance_to_scaling_larger_queues = 9;
    private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
                                                       {
                                                           {typeof (Queue1.Processor), 1},
                                                           {typeof (Queue2.Processor), 1},
                                                           {typeof (Queue3.Processor), 1},
                                                           {typeof (Queue4.Processor), 1},
                                                       };

    private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
    private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
    protected TimeSpan CurrentDelay { get; set; }

    public Func<string> GetSpecificQueueTypeToProcess { get; set; }

    /// <summary>
    /// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
    /// </summary>
    public Dictionary<Type, int> QueueWeights
    {
        get
        {
            return _queueWeights;
        }
        set
        {
            _queueWeights = value;
        }
    }

    public static TimeSpan QueueWeightCalibrationDelay
    {
        get { return TimeSpan.FromMinutes(15); }
    }


    protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();


    protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }

    public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
    {
        CurrentDelay = _minDelay;
        GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
    }

    protected IProcessQueues CurrentProcessor { get; set; }

    /// <summary>
    /// Processes queue request(s).
    /// </summary>
    /// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
    public void Go(bool onlyProcessOnce)
    {
        if (onlyProcessOnce)
        {
            ProcessOnce(false);
        }
        else
        {
            ProcessContinuously();
        }
    }

    public void ProcessContinuously()
    {
        while (true)
        {
            // temporary hack to get this started.
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// Attempts to fetch and process a single queued request.
    /// </summary>
    public void ProcessOnce(bool shouldDelay)
    {
        PopulateQueueMetaData(QueueWeightCalibrationDelay);

        if (shouldDelay)
        {
            Thread.Sleep(CurrentDelay);
        }

        var typesToPickFrom = new List<Type>();
        foreach(var item in QueueWeights)
        {
            for (var i = 0; i < item.Value; i++)
            {
                typesToPickFrom.Add(item.Key);
            }
        }

        var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
        var typeToTryAndProcess = typesToPickFrom[randomIndex];

        CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
        CleanQueueDelays();

        if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
        {
            var errors = CurrentProcessor.Go();

            var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
                               ? _maxDelay // the queue was empty
                               : _minDelay; // else

            QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
        }
        else
        {
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// This method populates/refreshes the QueueMetaData collection.
    /// </summary>
    /// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
    private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
    {
        if (QueueMetaData == null)
        {
            QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
        }

        var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
        var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
        var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
        var results = new Dictionary<Type, AzureQueueMetaData>();

        foreach (var queueProcessorType in queuesWithoutMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);

                    QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
                                                  ? 1
                                                  : (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
                }
            }
        }

        foreach (var queueProcessorType in expiredQueueMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);
                }
            }
        }

        QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
    }

    private void CleanQueueDelays()
    {
        QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
    }
}

With this, we have a separate class that knows how to process each queue, and it implements IProcessQueues. We load up the _queueWeights collection with each of those types that we want it to process. We set the _resistance_to_scaling_larger_queues constant to control how we want this to scale. Note that this scales on a logarithmic fashion (see the PopulateQueueMetaData method). No queue has a weight of less than 1, even if it has 0 items. If you set PopulateQueueMetaData to 10, then for every increase in magnitude by an order of 10, that type's "weight" gets increased by 1. For example, if you have QueueA with 0 items, QueueB with 0 items, and QueueC with 10 items, then your respective weights are 1, 1, and 2. This means QueueC has a 50% chance of being processed next while QueueA and QueueB each only have a 25% chance to be processed. If QueueC has 100 items, then your weights are 1, 1, 3 and your chances to be processed are 20%, 20%, 60%. This ensures that your empty queues don't get forgotten about.

The other thing this does is that it has _minDelay and _maxDelay. If this code thinks a queue has at least 1 item in it, then it will keep processing it as fast as at the _minDelay rate. However, if it last had 0 items in it, then it will not allow it to be processed faster than the _maxDelay rate. So this means if the random number generator pulls up the queue (regardless of weight) that has 0 items, it will simply skip trying to process it and move on to the next iteration. (Some additional optimization can be put into this part for better storage transaction efficiency but this is a neat little addition.)

We have a couple custom classes in here (such as AzureQueue and AzureQueueMetaData) - one is essentially a wrapper for a CloudQueue and the other stores some info, such as the approximate count of the Queue - nothing interesting in there (just a way to to simplify code).

Again, I don't call this "pretty" code but some fairly clever concepts are both implemented and functional in this code. Use it for whatever reason you wish to. :)

Lastly, writing this code like this allows us to have a single project that can process MANY more queues. If we find that this simply isn't keeping up, we can easily scale it to a larger number of instances and that scales up for ALL queues. In a minimal scenario, you could deploy one instance of this to monitor 3 queues. However, if the 4th queue starts to affect performance (or you need higher availability), then increase this up to 2 instances. Once you hit 15 queues, add in a third. 25 queues, add a 4th instance. Get a new customer and need to process MANY queue requests all across the systems, that's fine. Spin this one role up to 20 instanes until it's done and then spin them back down. Have a particularly nasty queue? Comment that queue out of _queueWeights collection, deploy to manage the rest of your queues, then redeploy it again with all other queues except this one commented out of the _queueWeights collection, and then deploy it again to a different set of instances and do your debugging without a) having the other QueueProcessors interfering with your debugging and b) your debugging interfering with your other QueueProcessors. Ultimately, this provides a LOT of flexibility and efficiencies.

你的往事 2024-12-08 12:29:41

在辅助角色的 while 循环内部,启动 4 个线程,就像编写多线程 C# 应用程序一样。当然,您需要定义四个不同的线程函数,并且这些函数应该具有单独的 while 循环来轮询队列。在工作线程的 while 循环结束时,只需等待线程完成即可。

Inside of the while loop of the worker role, start 4 threads as if you are writing a multi-threaded C# application. Of course, you need to have four different thread functions defined and those functions should have separate while loops to poll queues. At the end of the worker's while loop, just wait threads to finish.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文