如何使用电抗器3.x将LIST<T&>;转换为通量<T&>

How to convert Listlt;Tgt; to Fluxlt;Tgt; by using Reactor 3.x(如何使用电抗器3.x将LISTlt;T;转换为通量lt;T)

本文介绍了如何使用电抗器3.x将LIST<T&>;转换为通量<T&>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Asyn Call Thrift接口:

public CompletableFuture<List<Long>> getFavourites(Long userId){
    CompletableFuture<List<Long>> future = new CompletableFuture();
    OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
    callback.addObserver(new OctoObserver() {
        @Override
        public void onSuccess(Object o) {
            future.complete((List<Long>) o);
        }

        @Override
        public void onFailure(Throwable throwable) {
            future.completeExceptionally(throwable);
        }
    });
    try {
        recommendAsyncService.getFavorites(userId, callback);
    } catch (TException e) {
        log.error("OctoCall RecommendAsyncService.getFavorites", e);
    }
    return future;
}
现在它返回CompletableFuture<;列表>;。然后我调用它来使用Flux做一些处理器。
public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
    // do not like it
    List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);

    System.out.println(recommendList);
    return Flux.fromIterable(recommendList)
            .flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
                    .userId(userId)
                    .productId(id)
                    .productType((int) (Math.random()*100))
                    .build())))
            .take(5)
            .publishOn(mdpScheduler);
}
但是,我想从getFavourites方法中获取一个通量,并且可以在getRecommend方法中使用它。
或者,您可以推荐Flux API,我可以将List<Long> recommendList转换为Flux<Long> recommendFlux

推荐答案

要将CompletableFuture<List<T>>转换为Flux<T>,可以使用Mono#fromFutureMono#flatMapMany

var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
    CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));

Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

List<T>在回调中异步接收到的Flux<T>也可以不使用CompletableFuture转换为Flux<T>。 您可以直接使用Mono#createMono#flatMapMany

Flux<Long> flux = Mono.<List<Long>>create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      sink.success(list);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
}).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

或简单使用Flux#create一次多次排放:

Flux<Long> flux = Flux.create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      list.forEach(sink::next);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
});

flux.subscribe(System.out::println);

这篇关于如何使用电抗器3.x将LIST&lt;T&>;转换为通量&lt;T&>的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:如何使用电抗器3.x将LIST&lt;T&>;转换为通量&lt;T&>