Partition a Stream by a discriminator function(通过鉴别器函数对流进行分区)
问题描述
Streams API 中缺少的功能之一是分区依据"转换,例如在 Clojure.假设我想重现 Hibernate 的 fetch join:我想发出一个 SQL SELECT 语句来从结果中接收这种对象:
One of the missing features in the Streams API is the "partition by" transformation, for example as defined in Clojure. Say I want to reproduce Hibernate's fetch join: I want to issue a single SQL SELECT statement to receive this kind of objects from the result:
class Family {
String surname;
List<String> members;
}
我发出:
SELECT f.name, m.name
FROM Family f JOIN Member m on m.family_id = f.id
ORDER BY f.name
然后我检索 (f.name, m.name)
记录的扁平流.现在我需要将其转换为 Family
对象流,其中包含其成员列表.假设我已经有一个 Stream<ResultRow>
;现在我需要将其转换为 Stream<List<ResultRow>>
,然后使用映射转换对其进行操作,将其转换为 Stream<Family>
.
and I retrieve a flat stream of (f.name, m.name)
records. Now I need to transform it into a stream of Family
objects, with a list of its members inside. Assume I already have a Stream<ResultRow>
; now I need to transform it into a Stream<List<ResultRow>>
and then act upon that with a mapping transformation which turns it into a Stream<Family>
.
转换的语义如下:只要提供的判别器函数一直返回相同的值,就一直将流收集到一个List
中;一旦值发生变化,发出 List
作为输出流的一个元素并开始收集新的 List
.
The semantics of the transformation are as follows: keep collecting the stream into a List
for as long as the provided discriminator function keeps returning the same value; as soon as the value changes, emit the List
as an element of the output stream and start collecting a new List
.
希望能写出这样的代码(我已经有了resultStream
方法):
I hope to be able to write this kind of code (I already have the resultStream
method):
Stream<ResultRow> dbStream = resultStream(queryBuilder.createQuery(
"SELECT f.name, m.name"
+ " FROM Family f JOIN Member m on m.family_id = f.id"
+ " ORDER BY f.name"));
Stream<List<ResultRow> partitioned = partitionBy(r -> r.string(0), dbStream);
Stream<Family> = partitioned.map(rs -> {
Family f = new Family(rs.get(0).string(0));
f.members = rs.stream().map(r -> r.string(1)).collect(toList());
return f;
});
不用说,我希望生成的流保持惰性(非物化),因为我希望能够处理任何大小的结果集而不会达到任何 O(n) 内存限制.如果没有这个关键要求,我会对提供的 groupingBy
收集器感到满意.
Needless to say, I expect the resulting stream to stay lazy (non-materialized) as I want to be able to process a result set of any size without hitting any O(n) memory limits. Without this crucial requirement I would be happy with the provided groupingBy
collector.
推荐答案
解决方案需要我们定义一个自定义的Spliterator
,它可以用来构造分区流.我们需要通过它自己的拆分器访问输入流并将其包装到我们的拆分器中.然后从我们的自定义拆分器构造输出流.
The solution requires us to define a custom Spliterator
which can be used to construct the partitioned stream. We shall need to access the input stream through its own spliterator and wrap it into ours. The output stream is then constructed from our custom spliterator.
下面的 Spliterator 会将任何 Stream<E>
转换为 Stream
提供一个
Function<E, ?>
作为鉴别函数.请注意,必须对输入流进行排序才能使此操作有意义.
The following Spliterator will turn any Stream<E>
into a Stream<List<E>>
provided a Function<E, ?>
as the discriminator function. Note that the input stream must be ordered for this operation to make sense.
import java.util.*;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.Comparator.naturalOrder;
public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> {
private final Spliterator<E> spliterator;
private final Function<? super E, ?> partitionBy;
private HoldingConsumer<E> holder;
private Comparator<List<E>> comparator;
public PartitionBySpliterator(
Spliterator<E> toWrap,
Function<? super E, ?> partitionBy
) {
super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL);
this.spliterator = toWrap;
this.partitionBy = partitionBy;
}
public static <E> Stream<List<E>> partitionBy(
Function<E, ?> partitionBy, Stream<E> in
) {
return StreamSupport.stream(
new PartitionBySpliterator<>(in.spliterator(), partitionBy), false);
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final HoldingConsumer<E> h;
if (holder == null) {
h = new HoldingConsumer<>();
if (!spliterator.tryAdvance(h)) {
return false;
}
holder = h;
} else {
h = holder;
}
final ArrayList<E> partition = new ArrayList<>();
final Object partitionKey = partitionBy.apply(h.value);
boolean didAdvance;
do {
partition.add(h.value);
}
while ((didAdvance = spliterator.tryAdvance(h))
&& Objects.equals(partitionBy.apply(h.value), partitionKey));
if (!didAdvance) {
holder = null;
}
action.accept(partition);
return true;
}
static final class HoldingConsumer<T> implements Consumer<T> {
T value;
@Override
public void accept(T value) {
this.value = value;
}
}
@Override
public Comparator<? super List<E>> getComparator() {
final Comparator<List<E>> c = this.comparator;
return c != null ? c : (this.comparator = comparator());
}
private Comparator<List<E>> comparator() {
@SuppressWarnings({"unchecked", "rawtypes"})
final Comparator<? super E> innerComparator =
Optional.ofNullable(spliterator.getComparator())
.orElse((Comparator) naturalOrder());
return (left, right) -> {
final int c = innerComparator.compare(left.get(0), right.get(0));
return c != 0 ? c : innerComparator.compare(
left.get(left.size() - 1), right.get(right.size() - 1));
};
}
}
这篇关于通过鉴别器函数对流进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:通过鉴别器函数对流进行分区
- 如何使用WebFilter实现授权头检查 2022-01-01
- Java包名称中单词分隔符的约定是什么? 2022-01-01
- Spring Boot连接到使用仲裁器运行的MongoDB副本集 2022-01-01
- 从 finally 块返回时 Java 的奇怪行为 2022-01-01
- Jersey REST 客户端:发布多部分数据 2022-01-01
- value & 是什么意思?0xff 在 Java 中做什么? 2022-01-01
- 将log4j 1.2配置转换为log4j 2配置 2022-01-01
- Eclipse 插件更新错误日志在哪里? 2022-01-01
- Safepoint+stats 日志,输出 JDK12 中没有 vmop 操作 2022-01-01
- C++ 和 Java 进程之间的共享内存 2022-01-01