Transforming two dataframes in spark sql(在spark sql中转换两个数据帧)
问题描述
我在 spark scala 中有两个数据框注册为表.从这两个表
I am having two dataframes in spark scala registered as tables. From these two tables
表 1:
+-----+--------+
|id |values |
+-----+----- +
| 0 | v1 |
| 0 | v2 |
| 1 | v3 |
| 1 | v1 |
+-----+----- +
表 2:
+-----+----+--- +----+
|id |v1 |v2 | v3
+-----+-------- +----+
| 0 | a1| b1| - |
| 1 | a2| - | c2 |
+-----+---------+----+
我想用上面两个表生成一个新表.
I want to generate a new table using the above two tables.
表 3:
+-----+--------+--------+
|id |values | field |
+-----+--------+--------+
| 0 | v1 | a1 |
| 0 | v2 | b1 |
| 1 | v3 | c2 |
| 1 | v1 | a2 |
+-----+--------+--------+
这里 v1 的形式是
Here v1 is of the form
v1: struct (nullable = true)
| |-- level1: string (nullable = true)
| |-- level2: string (nullable = true)
| |-- level3: string (nullable = true)
| |-- level4: string (nullable = true)
| |-- level5: string (nullable = true)
我在 scala 中使用 spark sql.
I am using spark sql in scala .
是否可以通过在数据帧上编写一些 sql 查询或使用一些 spark 函数来完成所需的操作.
Is it possible to do the desired thing by writing some sql query or using some spark functions on dataframes.
推荐答案
这是您可以使用的示例代码,它将生成此输出:
Here is the sample code that you can use , that will generate this output :
代码如下:
val df1=sc.parallelize(Seq((0,"v1"),(0,"v2"),(1,"v3"),(1,"v1"))).toDF("id","values")
val df2=sc.parallelize(Seq((0,"a1","b1","-"),(1,"a2","-","b2"))).toDF("id","v1","v2","v3")
val joinedDF=df1.join(df2,"id")
val resultDF=joinedDF.rdd.map{row=>
val id=row.getAs[Int]("id")
val values=row.getAs[String]("values")
val feilds=row.getAs[String](values)
(id,values,feilds)
}.toDF("id","values","feilds")
在控制台上测试时:
scala> val df1=sc.parallelize(Seq((0,"v1"),(0,"v2"),(1,"v3"),(1,"v1"))).toDF("id","values")
df1: org.apache.spark.sql.DataFrame = [id: int, values: string]
scala> df1.show
+---+------+
| id|values|
+---+------+
| 0| v1|
| 0| v2|
| 1| v3|
| 1| v1|
+---+------+
scala> val df2=sc.parallelize(Seq((0,"a1","b1","-"),(1,"a2","-","b2"))).toDF("id","v1","v2","v3")
df2: org.apache.spark.sql.DataFrame = [id: int, v1: string ... 2 more fields]
scala> df2.show
+---+---+---+---+
| id| v1| v2| v3|
+---+---+---+---+
| 0| a1| b1| -|
| 1| a2| -| b2|
+---+---+---+---+
scala> val joinedDF=df1.join(df2,"id")
joinedDF: org.apache.spark.sql.DataFrame = [id: int, values: string ... 3 more fields]
scala> joinedDF.show
+---+------+---+---+---+
| id|values| v1| v2| v3|
+---+------+---+---+---+
| 1| v3| a2| -| b2|
| 1| v1| a2| -| b2|
| 0| v1| a1| b1| -|
| 0| v2| a1| b1| -|
+---+------+---+---+---+
scala> val resultDF=joinedDF.rdd.map{row=>
| val id=row.getAs[Int]("id")
| val values=row.getAs[String]("values")
| val feilds=row.getAs[String](values)
| (id,values,feilds)
| }.toDF("id","values","feilds")
resultDF: org.apache.spark.sql.DataFrame = [id: int, values: string ... 1 more field]
scala>
scala> resultDF.show
+---+------+------+
| id|values|feilds|
+---+------+------+
| 1| v3| b2|
| 1| v1| a2|
| 0| v1| a1|
| 0| v2| b1|
+---+------+------+
我希望这可能是您的问题.谢谢!
I hope this might your problem. Thanks!
这篇关于在spark sql中转换两个数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:在spark sql中转换两个数据帧
- 远程 mySQL 连接抛出“无法使用旧的不安全身份验证连接到 MySQL 4.1+"来自 XAMPP 的错误 2022-01-01
- 导入具有可变标题的 Excel 文件 2021-01-01
- SQL 临时表问题 2022-01-01
- 使用 Oracle PL/SQL developer 生成测试数据 2021-01-01
- 如何使用 pip 安装 Python MySQLdb 模块? 2021-01-01
- 如何将 SonarQube 6.7 从 MySQL 迁移到 postgresql 2022-01-01
- 更改自动增量起始编号? 2021-01-01
- 以一个值为轴心,但将一行上的数据按另一行分组? 2022-01-01
- 如何将 Byte[] 插入 SQL Server VARBINARY 列 2021-01-01
- 在SQL中,如何为每个组选择前2行 2021-01-01