将异步计算包装为同步(阻塞)计算

发布于 2024-08-20 02:07:19 字数 1384 浏览 6 评论 0原文

类似的问题:

我有一个对象,它的方法我想向库客户端(尤其是脚本客户端)公开,如下所示:

interface MyNiceInterface
{
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
    public Future<Baz> doSomething(Foo fooArg, Bar barArg);
    // doSomethingAndBlock is the straightforward way;
    // doSomething has more control but deals with
    // a Future and that might be too much hassle for
    // scripting clients
}

但我可用的原始“东西”是一组事件驱动类:

interface BazComputationSink
{
    public void onBazResult(Baz result);
}

class ImplementingThing
{
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}

其中 ImplementingThing 接受输入,执行一些操作诸如将事物排队到任务队列中之类的神秘事物,然后当结果发生时,在与 ImplementingThing.doSomethingAsync() 可能是也可能不是同一线程的线程上调用 sink.onBazResult()被称为。

有没有一种方法可以使用我拥有的事件驱动函数以及并发原语来实现 MyNiceInterface,以便脚本客户端可以愉快地等待阻塞线程?

编辑:我可以使用FutureTask 为此?

similar questions:

I have an object with a method I would like to expose to library clients (especially scripting clients) as something like:

interface MyNiceInterface
{
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
    public Future<Baz> doSomething(Foo fooArg, Bar barArg);
    // doSomethingAndBlock is the straightforward way;
    // doSomething has more control but deals with
    // a Future and that might be too much hassle for
    // scripting clients
}

but the primitive "stuff" I have available is a set of event-driven classes:

interface BazComputationSink
{
    public void onBazResult(Baz result);
}

class ImplementingThing
{
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}

where ImplementingThing takes inputs, does some arcane stuff like enqueueing things on a task queue, and then later when a result occurs, sink.onBazResult() gets called on a thread that may or may not be the same thread as ImplementingThing.doSomethingAsync() was called.

Is there a way I can use the event-driven functions I have, along with concurrency primitives, to implement MyNiceInterface so scripting clients can happily wait on a blocking thread?

edit: can I use FutureTask for this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

清醇 2024-08-27 02:07:19

使用您自己的 Future 实现:

public class BazComputationFuture implements Future<Baz>, BazComputationSink {

    private volatile Baz result = null;
    private volatile boolean cancelled = false;
    private final CountDownLatch countDownLatch;

    public BazComputationFuture() {
        countDownLatch = new CountDownLatch(1);
    }

    @Override
    public boolean cancel(final boolean mayInterruptIfRunning) {
        if (isDone()) {
            return false;
        } else {
            countDownLatch.countDown();
            cancelled = true;
            return !isDone();
        }
    }

    @Override
    public Baz get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return result;
    }

    @Override
    public Baz get(final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        countDownLatch.await(timeout, unit);
        return result;
    }

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public boolean isDone() {
        return countDownLatch.getCount() == 0;
    }

    public void onBazResult(final Baz result) {
        this.result = result;
        countDownLatch.countDown();
    }

}

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    BazComputationFuture future = new BazComputationFuture();
    doSomethingAsync(fooArg, barArg, future);
    return future;
}

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    return doSomething(fooArg, barArg).get();
}

该解决方案在内部创建一个 CountDownLatch,一旦收到回调,该 CountDownLatch 就会被清除。如果用户调用 get,则使用 CountDownLatch 来阻塞调用线程,直到计算完成并调用 onBazResult 回调。 CountDownLatch 将确保如果在调用 get() 之前发生回调,则 get() 方法将立即返回结果。

Using your own Future implemenation:

public class BazComputationFuture implements Future<Baz>, BazComputationSink {

    private volatile Baz result = null;
    private volatile boolean cancelled = false;
    private final CountDownLatch countDownLatch;

    public BazComputationFuture() {
        countDownLatch = new CountDownLatch(1);
    }

    @Override
    public boolean cancel(final boolean mayInterruptIfRunning) {
        if (isDone()) {
            return false;
        } else {
            countDownLatch.countDown();
            cancelled = true;
            return !isDone();
        }
    }

    @Override
    public Baz get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return result;
    }

    @Override
    public Baz get(final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        countDownLatch.await(timeout, unit);
        return result;
    }

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public boolean isDone() {
        return countDownLatch.getCount() == 0;
    }

    public void onBazResult(final Baz result) {
        this.result = result;
        countDownLatch.countDown();
    }

}

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    BazComputationFuture future = new BazComputationFuture();
    doSomethingAsync(fooArg, barArg, future);
    return future;
}

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    return doSomething(fooArg, barArg).get();
}

The solution creates a CountDownLatch internally which is cleared once the callback is received. If the user calls get, the CountDownLatch is used to block the calling thread until the computation completes and call the onBazResult callback. The CountDownLatch will assure that if the callback occurs before get() is called the get() method will return immediately with a result.

胡渣熟男 2024-08-27 02:07:19

好吧,有一个简单的解决方案,可以执行以下操作:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
  final AtomicReference<Baz> notifier = new AtomicReference();
  doSomethingAsync(fooArg, barArg, new BazComputationSink() {
    public void onBazResult(Baz result) {
      synchronized (notifier) {
        notifier.set(result);
        notifier.notify();
      }
    }
  });
  synchronized (notifier) {
    while (notifier.get() == null)
      notifier.wait();
  }
  return notifier.get();
}

当然,这假设您的 Baz 结果永远不会为空......

Well, there is the simple solution of doing something like:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
  final AtomicReference<Baz> notifier = new AtomicReference();
  doSomethingAsync(fooArg, barArg, new BazComputationSink() {
    public void onBazResult(Baz result) {
      synchronized (notifier) {
        notifier.set(result);
        notifier.notify();
      }
    }
  });
  synchronized (notifier) {
    while (notifier.get() == null)
      notifier.wait();
  }
  return notifier.get();
}

Of course, this assumes that your Baz result will never be null…

巷子口的你 2024-08-27 02:07:19

google guava 库 有一个易于使用的 SettableFuture,使这个问题变得非常简单(大约10 行代码)。

public class ImplementingThing {

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    try {
        return doSomething(fooArg, barArg).get();
    } catch (Exception e) {
        throw new RuntimeException("Oh dear");
    }
};

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    final SettableFuture<Baz> future = new SettableFuture<Baz>();
    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
        @Override
        public void onBazResult(Baz result) {
            future.set(result);
        }
    });
    return future;
};

// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.

public static class Baz {}
public static class Foo {}
public static class Bar {}

public static interface BazComputationSink {
    public void onBazResult(Baz result);
}

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Baz baz = new Baz();
            sink.onBazResult(baz);
        }
    }).start();
};

public static void main(String[] args) {
    System.err.println("Starting Main");
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
    System.err.println("Ending Main");
}

The google guava library has an easy to use SettableFuture that makes this problem very simple (around 10 lines of code).

public class ImplementingThing {

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    try {
        return doSomething(fooArg, barArg).get();
    } catch (Exception e) {
        throw new RuntimeException("Oh dear");
    }
};

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    final SettableFuture<Baz> future = new SettableFuture<Baz>();
    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
        @Override
        public void onBazResult(Baz result) {
            future.set(result);
        }
    });
    return future;
};

// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.

public static class Baz {}
public static class Foo {}
public static class Bar {}

public static interface BazComputationSink {
    public void onBazResult(Baz result);
}

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Baz baz = new Baz();
            sink.onBazResult(baz);
        }
    }).start();
};

public static void main(String[] args) {
    System.err.println("Starting Main");
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
    System.err.println("Ending Main");
}
云柯 2024-08-27 02:07:19

使用 RxJava 2.x 这非常简单:

try {
    Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
            doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
            .toFuture().get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

或者没有 Lambda 表示法:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
                @Override
                public void subscribe(SingleEmitter<Baz> emitter) {
                    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                        @Override
                        public void onBazResult(Baz result) {
                            emitter.onSuccess(result);
                        }
                    });
                }
            }).toFuture().get();

甚至更简单:

Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                .blockingGet();

Kotlin 版本:

val baz = Single.create<Baz> { emitter -> 
    doSomethingAsync(fooArg, barArg) { result -> emitter.onSuccess(result) } 
}.blockingGet()

This is dead simple with RxJava 2.x:

try {
    Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
            doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
            .toFuture().get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

Or without Lambda notation:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
                @Override
                public void subscribe(SingleEmitter<Baz> emitter) {
                    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                        @Override
                        public void onBazResult(Baz result) {
                            emitter.onSuccess(result);
                        }
                    });
                }
            }).toFuture().get();

Even simpler:

Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                .blockingGet();

Kotlin Version:

val baz = Single.create<Baz> { emitter -> 
    doSomethingAsync(fooArg, barArg) { result -> emitter.onSuccess(result) } 
}.blockingGet()
苯莒 2024-08-27 02:07:19

一个很简单的例子,只是为了理解CountDownLatch,没有任何
额外的代码。

java.util.concurrent.CountDownLatch 是一种并发构造,允许一个或多个线程等待一组给定操作完成。

CountDownLatch 使用给定的计数进行初始化。通过调用 countDown() 方法来减少此计数。等待此计数达到零的线程可以调用 await() 方法之一。调用 await() 会阻塞线程,直到计数达到零。

下面是一个简单的例子。 Decrementer 在 CountDownLatch 上调用 countDown() 3 次后,等待的 Waiter 从 await() 调用中释放。

您还可以指定一些 TimeOut 来等待。

CountDownLatch latch = new CountDownLatch(3);

Waiter      waiter      = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();

Thread.sleep(4000);
public class Waiter implements Runnable{

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

//--------------

public class Decrementer implements Runnable {

    CountDownLatch latch = null;

    public Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

参考

如果您不想使用 CountDownLatch 或者您的要求与 Facebook 相似或不同的功能相同。意味着如果正在调用一种方法,则不要调用另一种方法。

在这种情况下,您可以声明 a

private volatile Boolean isInprocessOfLikeOrUnLike = false;

,然后您可以检查方法调用的开头,如果它是 false 则调用方法,否则返回..取决于您的实现。

A very simple example, just to understand CountDownLatch without any
extra code.

A java.util.concurrent.CountDownLatch is a concurrency construct that allows one or more threads to wait for a given set of operations to complete.

A CountDownLatch is initialized with a given count. This count is decremented by calls to the countDown() method. Threads waiting for this count to reach zero can call one of the await() methods. Calling await() blocks the thread until the count reaches zero.

Below is a simple example. After the Decrementer has called countDown() 3 times on the CountDownLatch, the waiting Waiter is released from the await() call.

You can also mention some TimeOut to await.

CountDownLatch latch = new CountDownLatch(3);

Waiter      waiter      = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();

Thread.sleep(4000);
public class Waiter implements Runnable{

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

//--------------

public class Decrementer implements Runnable {

    CountDownLatch latch = null;

    public Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Reference

If you don't want to use a CountDownLatch or your requirement is something same as Facebook like and unlike functionality. Means if one method is being called then don't call the other method.

In that case you can declare a

private volatile Boolean isInprocessOfLikeOrUnLike = false;

and then you can check in the beginning of your method call that if it is false then call method otherwise return.. depends upon your implementation.

雾里花 2024-08-27 02:07:19

这是基于 Paul Wagland 的答案的更通用的解决方案:

public abstract class AsyncRunnable<T> {
    protected abstract void run(AtomicReference<T> notifier);

    protected final void finish(AtomicReference<T> notifier, T result) {
        synchronized (notifier) {
            notifier.set(result);
            notifier.notify();
        }
    }

    public static <T> T wait(AsyncRunnable<T> runnable) {
        final AtomicReference<T> notifier = new AtomicReference<>();

        // run the asynchronous code
        runnable.run(notifier);

        // wait for the asynchronous code to finish
        synchronized (notifier) {
            while (notifier.get() == null) {
                try {
                    notifier.wait();
                } catch (InterruptedException ignore) {}
            }
        }

        // return the result of the asynchronous code
        return notifier.get();
    }
}

这是一个如何使用它的示例::

    String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
        @Override
        public void run(final AtomicReference<String> notifier) {
            // here goes your async code, e.g.:
            new Thread(new Runnable() {
                @Override
                public void run() {
                    finish(notifier, "This was a asynchronous call!");
                }
            }).start();
        }
    });

可以在此处找到更详细的代码版本: http://pastebin.com/hKHJUBqE

编辑:
与该问题相关的示例是:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
        @Override
        protected void run(final AtomicReference<Baz> notifier) {
            doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                public void onBazResult(Baz result) {
                    synchronized (notifier) {
                        notifier.set(result);
                        notifier.notify();
                    }
                }
            });
        }
    });
}

Here's a more generic solution based on Paul Wagland's answer:

public abstract class AsyncRunnable<T> {
    protected abstract void run(AtomicReference<T> notifier);

    protected final void finish(AtomicReference<T> notifier, T result) {
        synchronized (notifier) {
            notifier.set(result);
            notifier.notify();
        }
    }

    public static <T> T wait(AsyncRunnable<T> runnable) {
        final AtomicReference<T> notifier = new AtomicReference<>();

        // run the asynchronous code
        runnable.run(notifier);

        // wait for the asynchronous code to finish
        synchronized (notifier) {
            while (notifier.get() == null) {
                try {
                    notifier.wait();
                } catch (InterruptedException ignore) {}
            }
        }

        // return the result of the asynchronous code
        return notifier.get();
    }
}

Here's an example how to use it::

    String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
        @Override
        public void run(final AtomicReference<String> notifier) {
            // here goes your async code, e.g.:
            new Thread(new Runnable() {
                @Override
                public void run() {
                    finish(notifier, "This was a asynchronous call!");
                }
            }).start();
        }
    });

A more verbose version of the code can be found here: http://pastebin.com/hKHJUBqE

EDIT:
The example related to the question would be:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
        @Override
        protected void run(final AtomicReference<Baz> notifier) {
            doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                public void onBazResult(Baz result) {
                    synchronized (notifier) {
                        notifier.set(result);
                        notifier.notify();
                    }
                }
            });
        }
    });
}
一百个冬季 2024-08-27 02:07:19

最简单的方法(对我有用)是

  1. 创建一个阻塞队列
  2. 调用异步方法 - 使用向该阻塞队列提供结果的处理程序。
  3. 轮询队列(这就是
    你阻止)的结果。

    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) 抛出 InterruptedException {
        最终的BlockingQueue; blocker = new LinkedBlockingQueue();
        doSomethingAsync(fooArg, barArg, blocker::offer);
        // 现在阻塞直到响应或超时
        return blocker.poll(30, TimeUnit.SECONDS);
    }
    

The simplest way (which works for me) is to

  1. Create a blocking queue
  2. Call the asynchronous method - use a handler that offers the result to that blocking queue.
  3. Poll the queue (that's where
    you block) for the result.

    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException {
        final BlockingQueue<Baz> blocker = new LinkedBlockingQueue();
        doSomethingAsync(fooArg, barArg, blocker::offer);
        // Now block until response or timeout
        return blocker.poll(30, TimeUnit.SECONDS);
    }
    
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文