How to short-circuit a reduce() operation on a Stream?(如何短路 Stream 上的 reduce() 操作?)
问题描述
这与 How to short-circuit reduce on Stream 本质上是相同的问题?.但是,由于该问题侧重于布尔值流,并且它的答案不能推广到其他类型和减少操作,所以我想问一个更一般的问题.
This is essentially the same question as How to short-circuit reduce on Stream?. However, since that question focuses on a Stream of boolean values, and its answer cannot be generalized for other types and reduce operations, I'd like to ask the more general question.
我们如何对流进行归约,使其在遇到吸收元素 用于归约操作?
How can we make a reduce on a stream so that it short-circuits when it encounters an absorbing element for the reducing operation?
乘法的典型数学情况是 0.这个流
:
The typical mathematical case would be 0 for multiplication. This Stream
:
int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
.reduce(1, (a, b) -> a * b);
将消耗最后两个元素(7
和 8
),而不管一旦遇到 0
产品是已知的事实.
will consume the last two elements (7
and 8
) regardless of the fact that once 0
has been encountered the product is known.
推荐答案
不幸的是,Stream API 创建自己的短路操作的能力有限.不太干净的解决方案是抛出一个 RuntimeException
并捕获它.这是 IntStream
的实现,但它也可以推广到其他流类型:
Unfortunately the Stream API has limited capabilities to create your own short-circuit operations. Not so clean solution would be to throw a RuntimeException
and catch it. Here's the implementation for IntStream
, but it can be generalized for other stream types as well:
public static int reduceWithCancelEx(IntStream stream, int identity,
IntBinaryOperator combiner, IntPredicate cancelCondition) {
class CancelException extends RuntimeException {
private final int val;
CancelException(int val) {
this.val = val;
}
}
try {
return stream.reduce(identity, (a, b) -> {
int res = combiner.applyAsInt(a, b);
if(cancelCondition.test(res))
throw new CancelException(res);
return res;
});
} catch (CancelException e) {
return e.val;
}
}
使用示例:
int product = reduceWithCancelEx(
IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println),
1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);
输出:
2
3
4
5
0
Result: 0
请注意,即使它适用于并行流,也不能保证其他并行任务在其中一个引发异常时立即完成.已经开始的子任务可能会一直运行到完成,因此您可能会处理比预期更多的元素.
Note that even though it works with parallel streams, it's not guaranteed that other parallel tasks will be finished as soon as one of them throws an exception. The sub-tasks which are already started will likely to run till finish, so you may process more elements than expected.
更新:更长的替代解决方案,但对并行更友好.它基于自定义拆分器,最多返回一个元素,这是所有底层元素的累积结果).当您在顺序模式下使用它时,它会在单个 tryAdvance
调用中完成所有工作.当您拆分它时,每个部分都会生成相应的单个部分结果,这些结果由 Stream 引擎使用组合器功能进行缩减.这是通用版本,但也可以进行原始特化.
Update: alternative solution which is much longer, but more parallel-friendly. It's based on custom spliterator which returns at most one element which is result of accumulation of all underlying elements). When you use it in sequential mode, it does all the work in single tryAdvance
call. When you split it, each part generates the correspoding single partial result, which are reduced by Stream engine using the combiner function. Here's generic version, but primitive specialization is possible as well.
final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
Consumer<T>, Cloneable {
private Spliterator<T> source;
private final BiFunction<A, ? super T, A> accumulator;
private final Predicate<A> cancelPredicate;
private final AtomicBoolean cancelled = new AtomicBoolean();
private A acc;
CancellableReduceSpliterator(Spliterator<T> source, A identity,
BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
this.source = source;
this.acc = identity;
this.accumulator = accumulator;
this.cancelPredicate = cancelPredicate;
}
@Override
public boolean tryAdvance(Consumer<? super A> action) {
if (source == null || cancelled.get()) {
source = null;
return false;
}
while (!cancelled.get() && source.tryAdvance(this)) {
if (cancelPredicate.test(acc)) {
cancelled.set(true);
break;
}
}
source = null;
action.accept(acc);
return true;
}
@Override
public void forEachRemaining(Consumer<? super A> action) {
tryAdvance(action);
}
@Override
public Spliterator<A> trySplit() {
if(source == null || cancelled.get()) {
source = null;
return null;
}
Spliterator<T> prefix = source.trySplit();
if (prefix == null)
return null;
try {
@SuppressWarnings("unchecked")
CancellableReduceSpliterator<T, A> result =
(CancellableReduceSpliterator<T, A>) this.clone();
result.source = prefix;
return result;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}
@Override
public long estimateSize() {
// let's pretend we have the same number of elements
// as the source, so the pipeline engine parallelize it in the same way
return source == null ? 0 : source.estimateSize();
}
@Override
public int characteristics() {
return source == null ? SIZED : source.characteristics() & ORDERED;
}
@Override
public void accept(T t) {
this.acc = accumulator.apply(this.acc, t);
}
}
类似于 Stream.reduce(identity, accumulator, combiner)
和 Stream.reduce(identity, combiner)
,但使用 cancelPredicate
:
public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
Predicate<U> cancelPredicate) {
return StreamSupport
.stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
.orElse(identity);
}
public static <T> T reduceWithCancel(Stream<T> stream, T identity,
BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}
让我们测试两个版本并计算实际处理了多少元素.让我们把 0
放在最后.例外版本:
Let's test both versions and count how many elements are actually processed. Let's put the 0
close to end. Exception version:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()), 1,
(a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281721
product: 0/count: 500001
所以当只处理一些元素时返回结果,任务继续在后台工作并且计数器仍在增加.这是拆分器版本:
So while result is returned when only some elements are processed, the tasks continue working in background and counter is still increasing. Here's spliterator version:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()).boxed(),
1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281353
product: 0/count: 281353
当返回结果时,所有的任务都真正完成了.
All the tasks are actually finished when the result is returned.
这篇关于如何短路 Stream 上的 reduce() 操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何短路 Stream 上的 reduce() 操作?
- 如何使用WebFilter实现授权头检查 2022-01-01
- value & 是什么意思?0xff 在 Java 中做什么? 2022-01-01
- Spring Boot连接到使用仲裁器运行的MongoDB副本集 2022-01-01
- Safepoint+stats 日志,输出 JDK12 中没有 vmop 操作 2022-01-01
- 从 finally 块返回时 Java 的奇怪行为 2022-01-01
- C++ 和 Java 进程之间的共享内存 2022-01-01
- Java包名称中单词分隔符的约定是什么? 2022-01-01
- Eclipse 插件更新错误日志在哪里? 2022-01-01
- 将log4j 1.2配置转换为log4j 2配置 2022-01-01
- Jersey REST 客户端:发布多部分数据 2022-01-01