用于执行大规模并行查询的通用类。反馈?

发布于 2024-10-09 05:47:50 字数 2733 浏览 8 评论 0原文

我不明白为什么,但客户端库中似乎没有机制可以并行执行 Windows Azure 表存储的许多查询。我创建了一个模板类,可以用来节省大量时间,欢迎您随意使用它。不过,如果您能将其拆开,并提供有关如何改进这门课的反馈,我将不胜感激。

public class AsyncDataQuery<T> where T: new()
{
    public AsyncDataQuery(bool preserve_order)
    {
        m_preserve_order = preserve_order;
        this.Queries = new List<CloudTableQuery<T>>(1000);
    }

    public void AddQuery(IQueryable<T> query)
    {
        var data_query = (DataServiceQuery<T>)query;
        var uri = data_query.RequestUri; // required

        this.Queries.Add(new CloudTableQuery<T>(data_query));
    }

    /// <summary>
    /// Blocking but still optimized.
    /// </summary>
    public List<T> Execute()
    {
        this.BeginAsync();
        return this.EndAsync();
    }

    public void BeginAsync()
    {
        if (m_preserve_order == true)
        {
            this.Items = new List<T>(Queries.Count);
            for (var i = 0; i < Queries.Count; i++)
            {
                this.Items.Add(new T());
            }
        }
        else
        {
            this.Items = new List<T>(Queries.Count * 2);
        }

        m_wait = new ManualResetEvent(false);

        for (var i = 0; i < Queries.Count; i++)
        {
            var query = Queries[i];
            query.BeginExecuteSegmented(callback, i);
        }
    }

    public List<T> EndAsync()
    {
        m_wait.WaitOne();
        m_wait.Dispose();

        return this.Items;
    }

    private List<T> Items { get; set; }
    private List<CloudTableQuery<T>> Queries { get; set; }

    private bool m_preserve_order;
    private ManualResetEvent m_wait;
    private int m_completed = 0;
    private object m_lock = new object();

    private void callback(IAsyncResult ar)
    {
        int i = (int)ar.AsyncState;
        CloudTableQuery<T> query = Queries[i];
        var response = query.EndExecuteSegmented(ar);
        if (m_preserve_order == true)
        { // preserve ordering only supports one result per query
            lock (m_lock)
            {
                this.Items[i] = response.Results.Single();
            }
        }
        else
        { // add any number of items
            lock (m_lock)
            {
                this.Items.AddRange(response.Results);
            }
        }
        if (response.HasMoreResults == true)
        { // more data to pull
            query.BeginExecuteSegmented(response.ContinuationToken, callback, i);
            return;
        }
        m_completed = Interlocked.Increment(ref m_completed);
        if (m_completed == Queries.Count)
        {
            m_wait.Set();
        }
    }
}

I don't understand why, but there appears to be no mechanism in the client library for performing many queries in parallel for Windows Azure Table Storage. I've created a template class that can be used to save considerable time, and you're welcome to use it however you wish. I would appreciate however, if you could pick it apart, and provide feedback on how to improve this class.

public class AsyncDataQuery<T> where T: new()
{
    public AsyncDataQuery(bool preserve_order)
    {
        m_preserve_order = preserve_order;
        this.Queries = new List<CloudTableQuery<T>>(1000);
    }

    public void AddQuery(IQueryable<T> query)
    {
        var data_query = (DataServiceQuery<T>)query;
        var uri = data_query.RequestUri; // required

        this.Queries.Add(new CloudTableQuery<T>(data_query));
    }

    /// <summary>
    /// Blocking but still optimized.
    /// </summary>
    public List<T> Execute()
    {
        this.BeginAsync();
        return this.EndAsync();
    }

    public void BeginAsync()
    {
        if (m_preserve_order == true)
        {
            this.Items = new List<T>(Queries.Count);
            for (var i = 0; i < Queries.Count; i++)
            {
                this.Items.Add(new T());
            }
        }
        else
        {
            this.Items = new List<T>(Queries.Count * 2);
        }

        m_wait = new ManualResetEvent(false);

        for (var i = 0; i < Queries.Count; i++)
        {
            var query = Queries[i];
            query.BeginExecuteSegmented(callback, i);
        }
    }

    public List<T> EndAsync()
    {
        m_wait.WaitOne();
        m_wait.Dispose();

        return this.Items;
    }

    private List<T> Items { get; set; }
    private List<CloudTableQuery<T>> Queries { get; set; }

    private bool m_preserve_order;
    private ManualResetEvent m_wait;
    private int m_completed = 0;
    private object m_lock = new object();

    private void callback(IAsyncResult ar)
    {
        int i = (int)ar.AsyncState;
        CloudTableQuery<T> query = Queries[i];
        var response = query.EndExecuteSegmented(ar);
        if (m_preserve_order == true)
        { // preserve ordering only supports one result per query
            lock (m_lock)
            {
                this.Items[i] = response.Results.Single();
            }
        }
        else
        { // add any number of items
            lock (m_lock)
            {
                this.Items.AddRange(response.Results);
            }
        }
        if (response.HasMoreResults == true)
        { // more data to pull
            query.BeginExecuteSegmented(response.ContinuationToken, callback, i);
            return;
        }
        m_completed = Interlocked.Increment(ref m_completed);
        if (m_completed == Queries.Count)
        {
            m_wait.Set();
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

雨夜星沙 2024-10-16 05:47:50

我想我参加聚会迟到了。我要添加两件事:

  1. ManualResetEvent 是 IDisposable。所以你需要确保它被处理在某个地方。
  2. 错误处理 - 如果其中一个查询失败,则整个查询可能会失败。您可能应该重试失败的请求。或者,您可以返回确实返回的值,并附带一些查询失败的指示,以便调用者可以重试查询。
  3. 客户端超时 - 没有。如果服务器端超时,这不是问题,但如果失败(例如网络问题),客户端将永远挂起。

另外,我认为这实际上是比任务并行库更好的方法。在此之前我尝试过“每个查询任务”方法。该代码实际上更尴尬,并且往往会导致拥有大量活动线程。我还没有对你的代码进行广泛的测试,但乍一看似乎效果更好。

更新

我已经对上面的代码进行了或多或少的重写。我的重写删除了所有锁定,支持挂起事务的客户端超时(很少见,但确实会发生,并且确实会毁了你的一天),以及一些异常处理逻辑。有一个完整的解决方案,已在 Bitbucket 上进行了测试。最相关的代码位于一个文件中,尽管它确实需要项目其他部分的一些帮助者。

Guess I'm late to the party. I would add two things:

  1. ManualResetEvent is IDisposable. So you need to make sure it gets disposed somewhere.
  2. Error handling - if one of the queries fails it'll probably fail the whole thing. You should probably retry failed requests. Alternatively you could return the values you did get back with some indication of which queries failed, so that the caller could retry the queries.
  3. Client side timeouts - there are none. This isn't a problem if the server side times out for you, but if that ever fails (eg, network issues) the client will hang forever.

Also, I think this is actually a better approach that the Task Parallel Library. I tried the Task-per-query approach before this. The code was actually more awkward, and it tended to result in having a lot of active threads. I still haven't tested extensively with your code, but it seems to work better on first blush.

Update

I've put some work into a more-or-less rewrite of the code above. My rewrite removes all locking, supports client-side timeouts of hung transactions (rare, but it does happen, and can really ruin your day), and some exception handling logic. There is a full solution with tests up on Bitbucket. The most relevant code lives in one file, though it does require some helpers that are in other parts of the project.

吹泡泡o 2024-10-16 05:47:50

您是否考虑过使用任务并行库?

http://msdn.microsoft.com/en-us/library/dd537609.aspx

Have you considered using the Task Parallel Library?

http://msdn.microsoft.com/en-us/library/dd537609.aspx

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文