
Reactor 错误重试使用分享
在使用 Reactor 进行响应式编程时,错误处理和重试机制是非常重要的部分。Reactor 提供了多种方式来处理错误和实现重试机制。【1】使用 retry 操作符retry 操作符可以在发生错误时重新订阅序列。你可以指定重试的次数。.retry(3);上面的代码会在发生错误时重试 3 次。【2】使用 retryWhen 操作符retryWhen 操作符提供了更灵活的重试机制,可以根据特定条件或延
优质博文:IT-BLOG-CN
在使用 Reactor 进行响应式编程时,错误处理和重试机制是非常重要的部分。Reactor 提供了多种方式来处理错误和实现重试机制。
要记住的是,它通过重新订阅上游来工作Flux。这实际上是一个不同的序列,原始序列仍然终止。为了验证这一点,我们可以重新使用前面的示例,并附加一个retry(1)以重试一次,而不是使用onErrorReturn。以下示例显示了如何执行此操作:
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed()// elapsed将每个值与自发出前一个值以来的持续时间相关联。
.subscribe(System.out::println, System.err::println);// 我们还想看看什么时候有onError。
Thread.sleep(2100);//确保我们有足够的时间完成 4x2 刻度。
上述示例产生以下输出:
259,tick 0
249,tick 1
251,tick 2
506,tick 0 // 从第 0 个滴答开始,开始了一个新的滴答。interval额外的 250 毫秒持续时间来自第 4 个滴答,这个滴答会导致异常和随后的重试。
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
从前面的例子可以看出,retry(1)仅仅重新订阅了interval 一次原始订阅,从 0 重新开始计时。第二次,由于仍然发生异常,它放弃并将错误传播到下游。
有一个更高级的版本retry(称为retryWhen),它使用“同伴” Flux来判断特定故障是否应该重试。此同伴Flux由操作员创建,但由用户装饰,以便自定义重试条件。
CompanionFlux是Flux传递给Retry策略/函数的,作为 的唯一参数提供retryWhen。作为用户,您可以定义该函数并使其返回新的 Publisher<?>。该类是一个抽象类,但如果您想使用简单的 lambda ( )Retry转换 Companion,它会提供工厂方法。Retry.from(Function)
重试周期如下:
1、每次发生错误(可能重试)时,RetrySignal都会将 发送到 Companion 中Flux,该 Companion 已由您的函数修饰。Flux这里有 ,可以一览迄今为止的所有尝试。RetrySignal可以访问错误及其周围的元数据。
2、如果伴随设备Flux发出一个值,就会发生重试。
3、如果伴随进程Flux完成,错误将被忽略,重试循环停止,并且生成的序列也将完成。
4、如果伴随进程Flux产生错误(e),则重试循环停止并且结果序列出现错误e。
前两种情况的区别很重要。简单地完成伴随操作实际上会吞掉错误。考虑 retry(3)使用以下方式进行模拟retryWhen:
Flux<String> flux = Flux
.<String>error(new IllegalArgumentException()) // 这会不断产生错误,需要重试。
.doOnError(System.out::println) // doOnError重试之前让我们记录并查看所有失败。
.retryWhen(Retry.from(companion -> // 这Retry是改编自一个非常简单的Functionlambda
companion.take(3))); // 在这里,我们将前三个错误视为可重试(take(3)),然后放弃。
实际上,前面的示例生成了一个空的Flux,但它成功完成了。由于 会以最新的错误终止, retry(3)因此此示例与 并不完全相同。FluxretryWhenretry(3)
要达到同样的行为还需要一些额外的技巧:
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(Retry.from(companion -> // 我们Retry通过改编Functionlambda 来定制,而不是提供具体的类
companion.map(rs -> { // 同伴发出RetrySignal对象,其中包含迄今为止的重试次数和最后一次失败
if (rs.totalRetries() < 3) return rs.totalRetries();// 为了允许三次重试,我们考虑索引 < 3 并返回要发出的值(这里我们只是返回索引)。
else throw Exceptions.propagate(rs.failure());// 为了以错误的方式终止序列,我们在三次重试之后抛出原始异常。
})
));
可以使用 中公开的构建器Retry以更流畅的方式实现相同目标,以及更精细调整的重试策略。例如:errorFlux.retryWhen(Retry.max(3));。
核心提供的Retry助手RetrySpec和RetryBackoffSpec都允许高级自定义,例如:
1、设置filter(Predicate)可以触发重试的异常
2、通过修改之前设置的过滤器modifyErrorFilter(Function)
3、触发副作用,例如记录重试触发器周围的日志(即延迟之前和之后的退避),前提是重试经过验证(doBeforeRetry()并且doAfterRetry()是附加的)
4、在重试触发器周围触发异步Mono,这允许在基本延迟之上添加异步行为,但从而进一步延迟触发器(doBeforeRetryAsync并且doAfterRetryAsync是附加的)
5、通过 自定义在达到最大尝试次数时出现的异常onRetryExhaustedThrow(BiFunction)。默认情况下,Exceptions.retryExhausted(…)使用 ,可以使用 来区分Exceptions.isRetryExhausted(Throwable)
出现暂时错误时重试
一些长期存在的源可能会出现零星的错误,随后会在较长的一段时间内一切运行正常。本文档将这种错误模式称为瞬态错误。
在这种情况下,最好单独处理每个突发,以便下一个突发不会继承前一个突发的重试状态。例如,使用指数退避策略,每个后续突发应从最小退避开始延迟重试尝试,Duration而不是从不断增长的退避开始。
RetrySignal表示状态的接口具有retryWhen可用totalRetriesInARow()于此目的的值。与通常的单调递增totalRetries()索引不同,此辅助索引每次通过重试恢复错误时都会重置为 0(即,当重试尝试导致传入onNext而不是onError再次时)。
transientErrors(boolean)当将配置参数设置为true或 时RetrySpec,RetryBackoffSpec生成的策略将使用该totalRetriesInARow()索引,从而有效地处理瞬态错误。这些规范根据索引计算重试模式,因此实际上规范的所有其他配置参数都独立应用于每次突发错误。
AtomicInteger errorCount = new AtomicInteger(); // 为了说明起见,我们将计算重试序列中的错误数量。
Flux<Integer> transientFlux = httpRequest.get() // 我们假设一个 http 请求源,例如一个流端点,有时会连续失败两次,然后恢复。
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true))// 我们retryWhen在该源上使用它,配置最多 2 次重试,但处于transientErrors模式。
.blockLast();
assertThat(errorCount).hasValue(6); // 最后,在注册尝试transientFlux后,获得了有效的响应并且 成功完成。6errorCount
如果没有transientErrors(true),则配置的最大尝试次数2将被第二次突发所超过,整个序列最终将失败。
如果您想在没有实际的 http 远程端点的情况下在本地尝试此操作,您可以实现一个伪httpRequest方法Supplier,如下所示:
final AtomicInteger transientHelper = new AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
Flux.generate(sink -> { // 我们generate有一个出现大量错误的来源。
int i = transientHelper.getAndIncrement();
if (i == 10) {// 当计数器达到 10 时,它将成功完成。
sink.next(i);
sink.complete();
}
else if (i % 3 == 0) {// 如果transientHelper原子是的倍数3,我们就发射onNext,从而结束当前爆发。
sink.next(i);
}
else {
sink.error(new IllegalStateException("Transient error at " + i));// 在其他情况下,我们会发出onError。这是 3 次中有 2 次,因此 2 次的突发onError被 1 次打断onNext。
}
});
总结
在使用 Reactor 进行响应式编程时,错误处理和重试机制是非常重要的部分。Reactor 提供了多种方式来处理错误和实现重试机制。以下是一些常见的错误重试使用方法:
【1】使用 retry 操作符
retry 操作符可以在发生错误时重新订阅序列。你可以指定重试的次数。
Flux<String> flux = Flux.<String>error(new RuntimeException("Error"))
.retry(3);
flux.subscribe(
System.out::println,
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
上面的代码会在发生错误时重试 3 次。
【2】使用 retryWhen 操作符
retryWhen 操作符提供了更灵活的重试机制,可以根据特定条件或延迟策略进行重试。
Flux<String> flux = Flux.<String>error(new RuntimeException("Error"))
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)));
flux.subscribe(
System.out::println,
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
上面的代码会在发生错误时,每隔 1 秒重试一次,共重试 3 次。
【3】使用 retryBackoff 操作符
retryBackoff 操作符提供了指数退避(Exponential Backoff)策略,可以在每次重试时增加延迟时间。
Flux<String> flux = Flux.<String>error(new RuntimeException("Error"))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
flux.subscribe(
System.out::println,
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
上面的代码会在发生错误时,按照指数退避策略进行重试,初始延迟为 1 秒,共重试 3 次。
【4】使用 onErrorResume 和 onErrorReturn
onErrorResume 和 onErrorReturn 操作符可以在发生错误时提供备用数据或恢复策略。
Flux<String> flux = Flux.<String>error(new RuntimeException("Error"))
.onErrorResume(e -> {
System.err.println("Error: " + e);
return Flux.just("Recovered");
});
flux.subscribe(
System.out::println,
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
上面的代码会在发生错误时,切换到备用数据流 “Recovered”。
【5】自定义重试逻辑
你可以通过 retryWhen 操作符自定义复杂的重试逻辑,例如根据错误类型或条件进行重试。
Flux<String> flux = Flux.<String>error(new RuntimeException("Error"))
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4), (error, index) -> {
if (index < 4) {
return index;
} else {
throw Exceptions.propagate(error);
}
})
.flatMap(index -> {
System.out.println("Retrying in " + index + " seconds...");
return Mono.delay(Duration.ofSeconds(index));
}));
flux.subscribe(
System.out::println,
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
上面的代码会在发生错误时,按照自定义的重试逻辑进行重试,每次重试的延迟时间逐渐增加。
更多推荐
所有评论(0)