Java 11在自定义条件下可完整的未来重试,而不是例外

发布于 2025-01-21 23:16:37 字数 1243 浏览 3 评论 0原文

I have a completable future defined below

CompletableFuture<Person> personFutures = personService.getPersons();

Now, based on a particular condition, I need to check and do the call to getPersons until the condition is matched or the number of retries (5 retries, 5seconds apart) have finished.

The condition will be

if(personFutures.get().size() != totalPersonsInOrg){
 retry(personService.getPersons(), 5, 5)
} else {
 return persons
}

I want to use the thenApply and thenCompose to chain these after the first completablefuture.

personFutures.thenApply(persons -> {
     if(persons.size() != totalPersonsOrg){
      retry(personservice,5,5)
     }
})

这就是需要更改的

private boolean allPersonsFound(String id, int retry, int c 
         count) 
{ 
    if (retry > maxRetries) {
        return false;
    }

     CompletableFuture<List<Persons>> personsFuture = personaService.getPersons();
    List<Persons> persons = personsFuture.get();

    if (persons.size() != count) {
        //add delay of 50ms
        return allPersonsFound(id, retry++, count);
    }
    return true;
}

I have a completable future defined below

CompletableFuture<Person> personFutures = personService.getPersons();

Now, based on a particular condition, I need to check and do the call to getPersons until the condition is matched or the number of retries (5 retries, 5seconds apart) have finished.

The condition will be

if(personFutures.get().size() != totalPersonsInOrg){
 retry(personService.getPersons(), 5, 5)
} else {
 return persons
}

I want to use the thenApply and thenCompose to chain these after the first completablefuture.

personFutures.thenApply(persons -> {
     if(persons.size() != totalPersonsOrg){
      retry(personservice,5,5)
     }
})

This is what needs to be changed

private boolean allPersonsFound(String id, int retry, int c 
         count) 
{ 
    if (retry > maxRetries) {
        return false;
    }

     CompletableFuture<List<Persons>> personsFuture = personaService.getPersons();
    List<Persons> persons = personsFuture.get();

    if (persons.size() != count) {
        //add delay of 50ms
        return allPersonsFound(id, retry++, count);
    }
    return true;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

生活了然无味 2025-01-28 23:16:37

假设您的PersonsService是:

interface PersonsService {
   CompletableFuture<Persons> getPersons();
}

您可能想拥有具有其他验证和重试逻辑的代理实现。

一种可能性是使用异步递归。这样的事情(我没有尝试运行它!):

final class ValidatedPersonsService implements PersonsService {
    private final PersonsService upstreamService;
    private final Predicate<Persons> validationPredicate;
    private final int numberOfAttempts;
    private final long backoffDuratioMs;
    private final Executor executor;
    private final Executor delayedExecutor;

    ValidatedPersonsService(final PersonsService upstreamService,
                            final Predicate<Persons> validationPredicate,
                            final int numberOfAttempts,
                            final long backoffDuratioMs,
                            final Executor executor) {
        this.upstreamService = upstreamService;
        this.validationPredicate = validationPredicate;
        this.numberOfAttempts = numberOfAttempts;
        this.backoffDuratioMs = backoffDuratioMs;
        this.executor = executor;
        this.delayedExecutor = CompletableFuture.delayedExecutor(backoffDuratioMs, TimeUnit.MILLISECONDS, executor);
    }

    // this one is needed to track number of passed attempts through the async recursion steps
    private static final class PersonsResponse {
        final Persons persons;
        final int attempt;

        private PersonsResponse(final Persons persons, final int attempt) {
            this.persons = persons;
            this.attempt = attempt;
        }
    }

    @Override
    public CompletableFuture<Persons> getPersons() {
        return submitRequest(1, executor)
                .thenApply(response -> response.persons);
    }

    private CompletableFuture<PersonsResponse> submitRequest(int currentAttempt, Executor executor) {
        if (currentAttempt > numberOfAttempts) {
            return CompletableFuture.failedFuture(new RuntimeException("max number of attempts exceeded"));
        } else {
            return upstreamService
                    .getPersons()
                    .thenApplyAsync(persons -> new PersonsResponse(persons, currentAttempt), executor) // break out into our executor, to not rely on concurrency model of the upstream service
                    .thenCompose(this::validateResponse);
        }
    }

    private CompletableFuture<PersonsResponse> validateResponse(PersonsResponse response) {
        if (validationPredicate.test(response.persons)) {
            return CompletableFuture.completedFuture(response);
        } else {
            return submitRequest(response.attempt + 1, delayedExecutor);
        }
    }
}

Assuming your PersonsService is:

interface PersonsService {
   CompletableFuture<Persons> getPersons();
}

You probably want to have a proxy implementation with additional validation and retry logic.

One possibility is to use asynchronous recursion. Something like this (I have not tried to run it!):

final class ValidatedPersonsService implements PersonsService {
    private final PersonsService upstreamService;
    private final Predicate<Persons> validationPredicate;
    private final int numberOfAttempts;
    private final long backoffDuratioMs;
    private final Executor executor;
    private final Executor delayedExecutor;

    ValidatedPersonsService(final PersonsService upstreamService,
                            final Predicate<Persons> validationPredicate,
                            final int numberOfAttempts,
                            final long backoffDuratioMs,
                            final Executor executor) {
        this.upstreamService = upstreamService;
        this.validationPredicate = validationPredicate;
        this.numberOfAttempts = numberOfAttempts;
        this.backoffDuratioMs = backoffDuratioMs;
        this.executor = executor;
        this.delayedExecutor = CompletableFuture.delayedExecutor(backoffDuratioMs, TimeUnit.MILLISECONDS, executor);
    }

    // this one is needed to track number of passed attempts through the async recursion steps
    private static final class PersonsResponse {
        final Persons persons;
        final int attempt;

        private PersonsResponse(final Persons persons, final int attempt) {
            this.persons = persons;
            this.attempt = attempt;
        }
    }

    @Override
    public CompletableFuture<Persons> getPersons() {
        return submitRequest(1, executor)
                .thenApply(response -> response.persons);
    }

    private CompletableFuture<PersonsResponse> submitRequest(int currentAttempt, Executor executor) {
        if (currentAttempt > numberOfAttempts) {
            return CompletableFuture.failedFuture(new RuntimeException("max number of attempts exceeded"));
        } else {
            return upstreamService
                    .getPersons()
                    .thenApplyAsync(persons -> new PersonsResponse(persons, currentAttempt), executor) // break out into our executor, to not rely on concurrency model of the upstream service
                    .thenCompose(this::validateResponse);
        }
    }

    private CompletableFuture<PersonsResponse> validateResponse(PersonsResponse response) {
        if (validationPredicate.test(response.persons)) {
            return CompletableFuture.completedFuture(response);
        } else {
            return submitRequest(response.attempt + 1, delayedExecutor);
        }
    }
}

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文