Why does Spark Streaming stop working when I send two input streams?(为什么当我发送两个输入流时,Spark Streaming停止工作?)
问题描述
我正在开发一个Spark流应用程序,其中我需要使用来自两个服务器的输入流,每个服务器每秒向Spark上下文发送一条JSON消息。
我的问题是,如果我只在一个流上执行操作,一切都运行得很好。但如果我有来自不同服务器的两个流,那么Spark在可以打印任何东西之前冻结,并且只有在两个服务器都发送了它们必须发送的所有JSON消息时(当它检测到socketTextStream
没有接收数据时)才开始重新工作。
以下是我的代码:
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);
return streamPair;
}
});
JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);
return streamPair;
}
});
dataStream2.print(); //for example
请注意,没有错误消息,Spark Simple在启动上下文后冻结,虽然我从端口收到JSON消息,但它没有显示任何内容。
非常感谢。
推荐答案
查看Spark Streaming documentation中的这些警告,并查看它们是否适用:
要记住的要点
- 在本地运行Spark Streaming程序时,请勿使用local或local1为主URL。这两种情况都意味着只有一个线程将用于本地运行任务。如果您使用的是基于接收器的输入DStream(例如Sockets、Kafka、Flume等),则将使用单个线程来运行接收器,而不会留下处理接收到的数据的线程。因此,在本地运行时,请始终使用"local[n]"作为主URL,其中n>要运行的接收器的数量(有关如何设置主URL的信息,请参阅Spark属性)。
- 将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数必须多于接收器数。否则,系统将接收数据,但无法处理数据。
这篇关于为什么当我发送两个输入流时,Spark Streaming停止工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:为什么当我发送两个输入流时,Spark Streaming停止工作?
- GC_FOR_ALLOC 是否更“严重"?在调查内存使用情况时? 2022-01-01
- 获取数字的最后一位 2022-01-01
- 未找到/usr/local/lib 中的库 2022-01-01
- 如何使 JFrame 背景和 JPanel 透明且仅显示图像 2022-01-01
- 在 Java 中,如何将 String 转换为 char 或将 char 转换 2022-01-01
- 将 Java Swing 桌面应用程序国际化的最佳实践是什么? 2022-01-01
- Eclipse 的最佳 XML 编辑器 2022-01-01
- java.lang.IllegalStateException:Bean 名称“类别"的 BindingResult 和普通目标对象都不能用作请求属性 2022-01-01
- 如何指定 CORS 的响应标头? 2022-01-01
- 转换 ldap 日期 2022-01-01