如何在 Java 中对 Hive 进行异步调用?

发布于 2024-08-20 10:14:42 字数 775 浏览 11 评论 0原文

我想以异步方式在服务器上执行 Hive 查询。 Hive 查询可能需要很长时间才能完成,因此我不想阻止调用。我目前正在使用 Thirft 进行阻塞调用(在 client.execute() 上阻塞),但我还没有看到如何进行非阻塞调用的示例。这是阻塞代码:

        TSocket transport = new TSocket("hive.example.com", 10000);
        transport.setTimeout(999999999);
        TBinaryProtocol protocol = new TBinaryProtocol(transport);
        Client client = new ThriftHive.Client(protocol);
        transport.open();
        client.execute(hql);  // Omitted HQL

        List<String> rows;
        while ((rows = client.fetchN(1000)) != null) {
            for (String row : rows) {
                // Do stuff with row
            }
        }

        transport.close();

上面的代码缺少 try/catch 块以保持简短。

有谁知道如何进行异步调用? Hive/Thrift 可以支持吗?有更好的办法吗?

谢谢!

I would like to execute a Hive query on the server in an asynchronous manner. The Hive query will likely take a long time to complete, so I would prefer not to block on the call. I am currently using Thirft to make a blocking call (blocks on client.execute()), but I have not seen an example of how to make a non-blocking call. Here is the blocking code:

        TSocket transport = new TSocket("hive.example.com", 10000);
        transport.setTimeout(999999999);
        TBinaryProtocol protocol = new TBinaryProtocol(transport);
        Client client = new ThriftHive.Client(protocol);
        transport.open();
        client.execute(hql);  // Omitted HQL

        List<String> rows;
        while ((rows = client.fetchN(1000)) != null) {
            for (String row : rows) {
                // Do stuff with row
            }
        }

        transport.close();

The code above is missing try/catch blocks to keep it short.

Does anyone have any ideas how to do an async call? Can Hive/Thrift support it? Is there a better way?

Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

笛声青案梦长安 2024-08-27 10:14:42

AFAIK,在撰写本文时 Thrift 不会生成异步客户端。此链接此处(搜索“异步”文本)中解释的原因是Thrift 专为假设延迟较低的数据中心而设计。

不幸的是,正如您所知,调用和结果之间经历的延迟并不总是由网络引起的,而是由正在执行的逻辑引起的!我们从 Java 应用程序服务器调用 Cassandra 数据库时遇到这个问题,我们希望限制总线程数。

摘要:现在您所能做的就是确保您有足够的资源来处理所需数量的阻塞并发线程,并等待更有效的实现。

AFAIK, at the time of writing Thrift does not generate asynchronous clients. The reason as explained in this link here (search text for "asynchronous") is that Thrift was designed for the data centre where latency is assumed to be low.

Unfortunately as you know the latency experienced between call and result is not always caused by the network, but by the logic being performed! We have this problem calling into the Cassandra database from a Java application server where we want to limit total threads.

Summary: for now all you can do is make sure you have sufficient resources to handle the required numbers of blocked concurrent threads and wait for a more efficient implementation.

守望孤独 2024-08-27 10:14:42

添加此补丁后,现在可以在 Java thrift 客户端中进行异步调用:
https://issues.apache.org/jira/browse/THRIFT-768

使用新的 thrift 生成异步 java 客户端并按如下方式初始化客户端:

TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9160);
TAsyncClientManager clientManager = new TAsyncClientManager();
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
Hive.AsyncClient client = new Hive.AsyncClient(protocolFactory, clientManager, transport);

现在,您可以像在同步接口上一样在此客户端上执行方法。唯一的变化是所有方法都采用回调的附加参数。

It is now possible to make an asynchronous call in a Java thrift client after this patch was put in:
https://issues.apache.org/jira/browse/THRIFT-768

Generate the async java client using the new thrift and initialize your client as follows:

TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9160);
TAsyncClientManager clientManager = new TAsyncClientManager();
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
Hive.AsyncClient client = new Hive.AsyncClient(protocolFactory, clientManager, transport);

Now you can execute methods on this client as you would on a synchronous interface. The only change is that all methods take an additional parameter of a callback.

枫林﹌晚霞¤ 2024-08-27 10:14:42

我对 Hive 一无所知,但作为最后的手段,您可以使用 Java 的并发库:

 Callable<SomeResult> c = new Callable<SomeResult>(){public SomeResult call(){

    // your Hive code here

 }};

 Future<SomeResult> result = executorService.submit(c);

 // when you need the result, this will block
 result.get();

或者,如果您不需要等待结果,请使用 Runnable 而不是 Callable

I know nothing about Hive, but as a last resort, you can use Java's concurrency library:

 Callable<SomeResult> c = new Callable<SomeResult>(){public SomeResult call(){

    // your Hive code here

 }};

 Future<SomeResult> result = executorService.submit(c);

 // when you need the result, this will block
 result.get();

Or, if you do not need to wait for the result, use Runnable instead of Callable.

凌乱心跳 2024-08-27 10:14:42

与 Hive 邮件列表交谈后,Hive 不支持使用 Thirft 进行异步调用。

After talking to the Hive mailing list, Hive does not support async calls using Thirft.

瞳孔里扚悲伤 2024-08-27 10:14:42

我对 Hive 不太了解,但是任何阻塞调用都可以通过生成新线程并使用回调来转换为异步调用。您可以查看java.util.concurrent.FutureTask,它的设计目的是允许轻松处理此类异步操作。

I don't know about Hive in particular but any blocking call can be turned in an asynch call by spawning a new thread and using a callback. You could look at java.util.concurrent.FutureTask which has been designed to allow easy handling of such asynchronous operation.

木緿 2024-08-27 10:14:42

我们发起对 AWS Elastic MapReduce 的异步调用。 AWS MapReduce 可以通过调用 AWS MapReduce Web 服务在 Amazon 云上运行 hadoop/hive 作业。

您还可以监控作业的状态,并在作业完成后从 S3 获取结果。

由于对 Web 服务的调用本质上是异步的,因此我们永远不会阻止其他操作。我们继续在单独的线程中监视作业的状态,并在作业完成时获取结果。

We fire off asynchronous calls to AWS Elastic MapReduce. AWS MapReduce can run hadoop/hive jobs on Amazon's cloud with a call to the AWS MapReduce web services.

You can also monitor the status of your jobs and grab the results off S3 once the job is completed.

Since the calls to the web services are asynchronous in nature, we never block our other operations. We continue to monitor the status of our jobs in a separate thread and grab the results when the job is complete.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文