用于执行大规模并行查询的通用类。反馈?
我不明白为什么,但客户端库中似乎没有机制可以并行执行 Windows Azure 表存储的许多查询。我创建了一个模板类,可以用来节省大量时间,欢迎您随意使用它。不过,如果您能将其拆开,并提供有关如何改进这门课的反馈,我将不胜感激。
public class AsyncDataQuery<T> where T: new()
{
public AsyncDataQuery(bool preserve_order)
{
m_preserve_order = preserve_order;
this.Queries = new List<CloudTableQuery<T>>(1000);
}
public void AddQuery(IQueryable<T> query)
{
var data_query = (DataServiceQuery<T>)query;
var uri = data_query.RequestUri; // required
this.Queries.Add(new CloudTableQuery<T>(data_query));
}
/// <summary>
/// Blocking but still optimized.
/// </summary>
public List<T> Execute()
{
this.BeginAsync();
return this.EndAsync();
}
public void BeginAsync()
{
if (m_preserve_order == true)
{
this.Items = new List<T>(Queries.Count);
for (var i = 0; i < Queries.Count; i++)
{
this.Items.Add(new T());
}
}
else
{
this.Items = new List<T>(Queries.Count * 2);
}
m_wait = new ManualResetEvent(false);
for (var i = 0; i < Queries.Count; i++)
{
var query = Queries[i];
query.BeginExecuteSegmented(callback, i);
}
}
public List<T> EndAsync()
{
m_wait.WaitOne();
m_wait.Dispose();
return this.Items;
}
private List<T> Items { get; set; }
private List<CloudTableQuery<T>> Queries { get; set; }
private bool m_preserve_order;
private ManualResetEvent m_wait;
private int m_completed = 0;
private object m_lock = new object();
private void callback(IAsyncResult ar)
{
int i = (int)ar.AsyncState;
CloudTableQuery<T> query = Queries[i];
var response = query.EndExecuteSegmented(ar);
if (m_preserve_order == true)
{ // preserve ordering only supports one result per query
lock (m_lock)
{
this.Items[i] = response.Results.Single();
}
}
else
{ // add any number of items
lock (m_lock)
{
this.Items.AddRange(response.Results);
}
}
if (response.HasMoreResults == true)
{ // more data to pull
query.BeginExecuteSegmented(response.ContinuationToken, callback, i);
return;
}
m_completed = Interlocked.Increment(ref m_completed);
if (m_completed == Queries.Count)
{
m_wait.Set();
}
}
}
I don't understand why, but there appears to be no mechanism in the client library for performing many queries in parallel for Windows Azure Table Storage. I've created a template class that can be used to save considerable time, and you're welcome to use it however you wish. I would appreciate however, if you could pick it apart, and provide feedback on how to improve this class.
public class AsyncDataQuery<T> where T: new()
{
public AsyncDataQuery(bool preserve_order)
{
m_preserve_order = preserve_order;
this.Queries = new List<CloudTableQuery<T>>(1000);
}
public void AddQuery(IQueryable<T> query)
{
var data_query = (DataServiceQuery<T>)query;
var uri = data_query.RequestUri; // required
this.Queries.Add(new CloudTableQuery<T>(data_query));
}
/// <summary>
/// Blocking but still optimized.
/// </summary>
public List<T> Execute()
{
this.BeginAsync();
return this.EndAsync();
}
public void BeginAsync()
{
if (m_preserve_order == true)
{
this.Items = new List<T>(Queries.Count);
for (var i = 0; i < Queries.Count; i++)
{
this.Items.Add(new T());
}
}
else
{
this.Items = new List<T>(Queries.Count * 2);
}
m_wait = new ManualResetEvent(false);
for (var i = 0; i < Queries.Count; i++)
{
var query = Queries[i];
query.BeginExecuteSegmented(callback, i);
}
}
public List<T> EndAsync()
{
m_wait.WaitOne();
m_wait.Dispose();
return this.Items;
}
private List<T> Items { get; set; }
private List<CloudTableQuery<T>> Queries { get; set; }
private bool m_preserve_order;
private ManualResetEvent m_wait;
private int m_completed = 0;
private object m_lock = new object();
private void callback(IAsyncResult ar)
{
int i = (int)ar.AsyncState;
CloudTableQuery<T> query = Queries[i];
var response = query.EndExecuteSegmented(ar);
if (m_preserve_order == true)
{ // preserve ordering only supports one result per query
lock (m_lock)
{
this.Items[i] = response.Results.Single();
}
}
else
{ // add any number of items
lock (m_lock)
{
this.Items.AddRange(response.Results);
}
}
if (response.HasMoreResults == true)
{ // more data to pull
query.BeginExecuteSegmented(response.ContinuationToken, callback, i);
return;
}
m_completed = Interlocked.Increment(ref m_completed);
if (m_completed == Queries.Count)
{
m_wait.Set();
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我想我参加聚会迟到了。我要添加两件事:
另外,我认为这实际上是比任务并行库更好的方法。在此之前我尝试过“每个查询任务”方法。该代码实际上更尴尬,并且往往会导致拥有大量活动线程。我还没有对你的代码进行广泛的测试,但乍一看似乎效果更好。
更新
我已经对上面的代码进行了或多或少的重写。我的重写删除了所有锁定,支持挂起事务的客户端超时(很少见,但确实会发生,并且确实会毁了你的一天),以及一些异常处理逻辑。有一个完整的解决方案,已在 Bitbucket 上进行了测试。最相关的代码位于一个文件中,尽管它确实需要项目其他部分的一些帮助者。
Guess I'm late to the party. I would add two things:
Also, I think this is actually a better approach that the Task Parallel Library. I tried the Task-per-query approach before this. The code was actually more awkward, and it tended to result in having a lot of active threads. I still haven't tested extensively with your code, but it seems to work better on first blush.
Update
I've put some work into a more-or-less rewrite of the code above. My rewrite removes all locking, supports client-side timeouts of hung transactions (rare, but it does happen, and can really ruin your day), and some exception handling logic. There is a full solution with tests up on Bitbucket. The most relevant code lives in one file, though it does require some helpers that are in other parts of the project.
您是否考虑过使用任务并行库?
http://msdn.microsoft.com/en-us/library/dd537609.aspx
Have you considered using the Task Parallel Library?
http://msdn.microsoft.com/en-us/library/dd537609.aspx