用于处理Azure表并发冲突的通用代码?

发布于 2025-01-07 14:09:43 字数 251 浏览 4 评论 0原文

我正在考虑对 Azure 存储表进行一些更新。我想正确使用乐观并发机制。似乎您需要执行以下操作:

  1. 加载要更新的行,可能会重试失败
  2. 将更新应用于行
  3. 保存行,可能会重试网络错误
    1. 如果存在并发冲突,则重新加载数据(可能重试失败)并尝试再次保存(可能重试失败)

是否有一些通用类或代码示例可以处理此问题?我可以将其编码,但我必须想象有人已经发明了这个特殊的轮子。

I'm looking at doing some updates into Azure Storage Tables. I want to use the optimistic concurrency mechanism properly. It seems like you'd need to do something like:

  1. Load row to update, possibly retrying failures
  2. Apply updates to row
  3. Save row, possibly retrying network errors
    1. If there is a concurrency conflict, reload the data (possibly retrying failures) and attempt to save again (possible retrying failures)

Is there some generic class or code sample that handles this? I can code it up, but I have to imagine someone has already invented this particular wheel.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

他夏了夏天 2025-01-14 14:09:44

如果有人发明了这个轮子,他们不会说话,所以我自己(重新)发明了它。这是故意非常通用的,更像是一个骨架而不是成品。它基本上就是我上面概述的算法。调用者必须连接委托来执行数据的实际加载、更新和保存。内置了基本的重试逻辑,但我建议用更强大的东西覆盖这些函数。

我相信这适用于表或 BLOB,以及单个实体或批次,尽管我实际上只尝试过单实体表更新。

任何意见、建议、改进等将不胜感激。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Data.Services.Client;
using Microsoft.WindowsAzure.StorageClient;
using System.Net;

namespace SepiaLabs.Azure
{
    /// <summary>
    /// Attempt to write an update to storage while using optimistic concurrency.
    /// Implements a basic state machine. Data will be fetched (with retries), then mutated, then updated (with retries, and possibly refetching & remutating). 
    /// Clients may pass in a state object with relevant information. eg, a TableServiceContext object.
    /// </summary>
    /// <remarks>
    /// This object natively implements a very basic retry strategy. 
    /// Clients may want to subclass it and override the ShouldRetryRetrieval() and ShouldRetryPersist() functions to implement more advanced retry strategies. 
    /// 
    /// This class intentionally avoids checking if the row is present before updating it. This is so callers may throw custom exceptions, or attempt to insert the row instead ("upsert" style interaction)
    /// </remarks>
    /// <typeparam name="RowType">The type of data that will be read and updated. Though it is called RowType for clarity, you could manipulate a collection of rows.</typeparam>
    /// <typeparam name="StateObjectType">The type of the user-supplied state object</typeparam>
    public class AzureDataUpdate<RowType, StateObjectType>
        where RowType : class
    {
        /// <summary>
        /// Function to retrieve the data that will be updated. 
        /// This function will be called at least once. It will also be called any time a concurrency update conflict occurs. 
        /// </summary>
        public delegate RowType DataRetriever(StateObjectType stateObj);

        /// <summary>
        /// Function to apply the desired changes to the data.
        /// This will be called after each time the DataRetriever function is called. 
        /// If you are using a TableServiceContext with MergeOption.PreserveChanges set, this function can be a no-op after the first call
        /// </summary>
        public delegate void DataMutator(RowType data, StateObjectType stateObj);

        /// <summary>
        /// Function to persist the modified data. The may be called multiple times. 
        /// </summary>
        /// <param name="data"></param>
        /// <param name="stateObj"></param>
        public delegate void DataPersister(RowType data, StateObjectType stateObj);

        public DataRetriever RetrieverFunction { get; set; }
        public DataMutator MutatorFunction { get; set; }
        public DataPersister PersisterFunction { get; set; }

        public AzureDataUpdate()
        {
        }

        public AzureDataUpdate(DataRetriever retrievalFunc, DataMutator mutatorFunc, DataPersister persisterFunc)
        {
            this.RetrieverFunction = retrievalFunc;
            this.MutatorFunction = mutatorFunc;
            this.PersisterFunction = persisterFunc;
        }

        public RowType Execute(StateObjectType userState)
        {
            if (RetrieverFunction == null)
            {
                throw new InvalidOperationException("Must provide a data retriever function before executing");
            }
            else if (MutatorFunction == null)
            {
                throw new InvalidOperationException("Must provide a data mutator function before executing");
            }
            else if (PersisterFunction == null)
            {
                throw new InvalidOperationException("Must provide a data persister function before executing");
            }

            //Retrieve and modify data
            RowType data = this.DoRetrieve(userState);

            //Call the mutator function. 
            MutatorFunction(data, userState);

            //persist changes
            int attemptNumber = 1;
            while (true)
            {
                bool isPreconditionFailedResponse = false;

                try
                {
                    PersisterFunction(data, userState);
                    return data; //return the mutated data
                }
                catch (DataServiceRequestException dsre)
                {
                    DataServiceResponse resp = dsre.Response;

                    int statusCode = -1;
                    if (resp.IsBatchResponse)
                    {
                        statusCode = resp.BatchStatusCode;
                    }
                    else if (resp.Any())
                    {
                        statusCode = resp.First().StatusCode;
                    }

                    isPreconditionFailedResponse = (statusCode == (int)HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, dsre, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (DataServiceClientException dsce)
                {
                    isPreconditionFailedResponse = (dsce.StatusCode == (int)HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, dsce, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (StorageClientException sce)
                {
                    isPreconditionFailedResponse = (sce.StatusCode == HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, sce, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (Exception ex)
                {
                    if (!ShouldRetryPersist(attemptNumber, ex, false, userState))
                    {
                        throw;
                    }
                }

                if (isPreconditionFailedResponse)
                {
                    //Refetch the data, re-apply the mutator
                    data = DoRetrieve(userState);
                    MutatorFunction(data, userState);
                }

                attemptNumber++;
            }
        }

        /// <summary>
        /// Retrieve the data to be updated, possibly with retries
        /// </summary>
        /// <param name="userState">The UserState for this operation</param>
        private RowType DoRetrieve(StateObjectType userState)
        {
            int attemptNumber = 1;

            while (true)
            {
                try
                {
                    return RetrieverFunction(userState);
                }
                catch (Exception ex)
                {
                    if (!ShouldRetryRetrieval(attemptNumber, ex, userState))
                    {
                        throw;
                    }
                }

                attemptNumber++;
            }
        }

        /// <summary>
        /// Determine whether a data retrieval should be retried. 
        /// Implements a simplistic, constant wait time strategy. Users may override to provide a more complex implementation.
        /// </summary>
        /// <param name="attemptNumber">What number attempt is this. </param>
        /// <param name="ex">The exception that was caught</param>
        /// <param name="userState">The user-supplied state object for this operation</param>
        /// <returns>True to attempt the retrieval again, false to abort the retrieval and fail the update attempt</returns>
        protected virtual bool ShouldRetryRetrieval(int attemptNumber, Exception ex, StateObjectType userState)
        {
            //Simple, basic retry strategy - try 3 times, sleep for 1000msec each time
            if (attemptNumber < 3)
            {
                Thread.Sleep(1000); 
                return true;
            }
            else
            {
                return false;
            }
        }

        /// <summary>
        /// Determine whether a data update should be retried. If the <paramref name="isPreconditionFailed"/> param is true, 
        /// then the retrieval and mutation process will be repeated as well
        /// Implements a simplistic, constant wait time strategy. Users may override to provide a more complex implementation.
        /// </summary>
        /// <param name="attemptNumber">What number attempt is this. </param>
        /// <param name="ex">The exception that was caught</param>
        /// <param name="userState">The user-supplied state object for this operation</param>
        /// <param name="isPreconditionFailedResponse">Indicates whether the exception is a PreconditionFailed response. ie, an optimistic concurrency failure</param>
        /// <returns>True to attempt the update again, false to abort the retrieval and fail the update attempt</returns>
        protected virtual bool ShouldRetryPersist(int attemptNumber, Exception ex, bool isPreconditionFailedResponse, StateObjectType userState)
        {
            if (isPreconditionFailedResponse)
            {
                return true; //retry immediately
            }
            else
            {
                //For other failures, wait to retry
                //Simple, basic retry strategy - try 3 times, sleep for 1000msec each time
                if (attemptNumber < 3)
                {
                    Thread.Sleep(1000);
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }
    }
}

If someone invented this wheel they're not talking, so I went off and (re)invented it myself. This is intentionally very generic, more of a skeleton than a finished product. It's basically just the algorithm I outlined above. The caller has to wire in delegates to do the actual loading, updating and saving of the data. There is basic retry logic built in, but I would recommend overriding those functions with something more robust.

I believe this will work with tables or BLOBs, and single entities or batches, though I've only actually tried it with single-entity table updates.

Any comments, suggestions, improvements, etc would be appreciated.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Data.Services.Client;
using Microsoft.WindowsAzure.StorageClient;
using System.Net;

namespace SepiaLabs.Azure
{
    /// <summary>
    /// Attempt to write an update to storage while using optimistic concurrency.
    /// Implements a basic state machine. Data will be fetched (with retries), then mutated, then updated (with retries, and possibly refetching & remutating). 
    /// Clients may pass in a state object with relevant information. eg, a TableServiceContext object.
    /// </summary>
    /// <remarks>
    /// This object natively implements a very basic retry strategy. 
    /// Clients may want to subclass it and override the ShouldRetryRetrieval() and ShouldRetryPersist() functions to implement more advanced retry strategies. 
    /// 
    /// This class intentionally avoids checking if the row is present before updating it. This is so callers may throw custom exceptions, or attempt to insert the row instead ("upsert" style interaction)
    /// </remarks>
    /// <typeparam name="RowType">The type of data that will be read and updated. Though it is called RowType for clarity, you could manipulate a collection of rows.</typeparam>
    /// <typeparam name="StateObjectType">The type of the user-supplied state object</typeparam>
    public class AzureDataUpdate<RowType, StateObjectType>
        where RowType : class
    {
        /// <summary>
        /// Function to retrieve the data that will be updated. 
        /// This function will be called at least once. It will also be called any time a concurrency update conflict occurs. 
        /// </summary>
        public delegate RowType DataRetriever(StateObjectType stateObj);

        /// <summary>
        /// Function to apply the desired changes to the data.
        /// This will be called after each time the DataRetriever function is called. 
        /// If you are using a TableServiceContext with MergeOption.PreserveChanges set, this function can be a no-op after the first call
        /// </summary>
        public delegate void DataMutator(RowType data, StateObjectType stateObj);

        /// <summary>
        /// Function to persist the modified data. The may be called multiple times. 
        /// </summary>
        /// <param name="data"></param>
        /// <param name="stateObj"></param>
        public delegate void DataPersister(RowType data, StateObjectType stateObj);

        public DataRetriever RetrieverFunction { get; set; }
        public DataMutator MutatorFunction { get; set; }
        public DataPersister PersisterFunction { get; set; }

        public AzureDataUpdate()
        {
        }

        public AzureDataUpdate(DataRetriever retrievalFunc, DataMutator mutatorFunc, DataPersister persisterFunc)
        {
            this.RetrieverFunction = retrievalFunc;
            this.MutatorFunction = mutatorFunc;
            this.PersisterFunction = persisterFunc;
        }

        public RowType Execute(StateObjectType userState)
        {
            if (RetrieverFunction == null)
            {
                throw new InvalidOperationException("Must provide a data retriever function before executing");
            }
            else if (MutatorFunction == null)
            {
                throw new InvalidOperationException("Must provide a data mutator function before executing");
            }
            else if (PersisterFunction == null)
            {
                throw new InvalidOperationException("Must provide a data persister function before executing");
            }

            //Retrieve and modify data
            RowType data = this.DoRetrieve(userState);

            //Call the mutator function. 
            MutatorFunction(data, userState);

            //persist changes
            int attemptNumber = 1;
            while (true)
            {
                bool isPreconditionFailedResponse = false;

                try
                {
                    PersisterFunction(data, userState);
                    return data; //return the mutated data
                }
                catch (DataServiceRequestException dsre)
                {
                    DataServiceResponse resp = dsre.Response;

                    int statusCode = -1;
                    if (resp.IsBatchResponse)
                    {
                        statusCode = resp.BatchStatusCode;
                    }
                    else if (resp.Any())
                    {
                        statusCode = resp.First().StatusCode;
                    }

                    isPreconditionFailedResponse = (statusCode == (int)HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, dsre, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (DataServiceClientException dsce)
                {
                    isPreconditionFailedResponse = (dsce.StatusCode == (int)HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, dsce, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (StorageClientException sce)
                {
                    isPreconditionFailedResponse = (sce.StatusCode == HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, sce, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (Exception ex)
                {
                    if (!ShouldRetryPersist(attemptNumber, ex, false, userState))
                    {
                        throw;
                    }
                }

                if (isPreconditionFailedResponse)
                {
                    //Refetch the data, re-apply the mutator
                    data = DoRetrieve(userState);
                    MutatorFunction(data, userState);
                }

                attemptNumber++;
            }
        }

        /// <summary>
        /// Retrieve the data to be updated, possibly with retries
        /// </summary>
        /// <param name="userState">The UserState for this operation</param>
        private RowType DoRetrieve(StateObjectType userState)
        {
            int attemptNumber = 1;

            while (true)
            {
                try
                {
                    return RetrieverFunction(userState);
                }
                catch (Exception ex)
                {
                    if (!ShouldRetryRetrieval(attemptNumber, ex, userState))
                    {
                        throw;
                    }
                }

                attemptNumber++;
            }
        }

        /// <summary>
        /// Determine whether a data retrieval should be retried. 
        /// Implements a simplistic, constant wait time strategy. Users may override to provide a more complex implementation.
        /// </summary>
        /// <param name="attemptNumber">What number attempt is this. </param>
        /// <param name="ex">The exception that was caught</param>
        /// <param name="userState">The user-supplied state object for this operation</param>
        /// <returns>True to attempt the retrieval again, false to abort the retrieval and fail the update attempt</returns>
        protected virtual bool ShouldRetryRetrieval(int attemptNumber, Exception ex, StateObjectType userState)
        {
            //Simple, basic retry strategy - try 3 times, sleep for 1000msec each time
            if (attemptNumber < 3)
            {
                Thread.Sleep(1000); 
                return true;
            }
            else
            {
                return false;
            }
        }

        /// <summary>
        /// Determine whether a data update should be retried. If the <paramref name="isPreconditionFailed"/> param is true, 
        /// then the retrieval and mutation process will be repeated as well
        /// Implements a simplistic, constant wait time strategy. Users may override to provide a more complex implementation.
        /// </summary>
        /// <param name="attemptNumber">What number attempt is this. </param>
        /// <param name="ex">The exception that was caught</param>
        /// <param name="userState">The user-supplied state object for this operation</param>
        /// <param name="isPreconditionFailedResponse">Indicates whether the exception is a PreconditionFailed response. ie, an optimistic concurrency failure</param>
        /// <returns>True to attempt the update again, false to abort the retrieval and fail the update attempt</returns>
        protected virtual bool ShouldRetryPersist(int attemptNumber, Exception ex, bool isPreconditionFailedResponse, StateObjectType userState)
        {
            if (isPreconditionFailedResponse)
            {
                return true; //retry immediately
            }
            else
            {
                //For other failures, wait to retry
                //Simple, basic retry strategy - try 3 times, sleep for 1000msec each time
                if (attemptNumber < 3)
                {
                    Thread.Sleep(1000);
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文