Google Dataflow (Apache beam) JdbcIO bulk insert into mysql database(Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库)
问题描述
我正在使用 Dataflow SDK 2.X Java API (Apache Beam SDK) 将数据写入 mysql.我已经基于 Apache Beam SDK 文档 使用数据流将数据写入 mysql.它一次插入单行,因为我需要实现批量插入.我在官方文档中找不到任何启用批量插入模式的选项.
I'm using Dataflow SDK 2.X Java API ( Apache Beam SDK) to write data into mysql. I've created pipelines based on Apache Beam SDK documentation to write data into mysql using dataflow. It inserts single row at a time where as I need to implement bulk insert. I do not find any option in official documentation to enable bulk inset mode.
想知道是否可以在数据流管道中设置批量插入模式?如果是,请让我知道我需要在下面的代码中更改什么.
Wondering, if it's possible to set bulk insert mode in dataflow pipeline? If yes, please let me know what I need to change in below code.
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query) {
query.setInt(1, kv.getKey());
query.setString(2, kv.getValue());
}
})
推荐答案
EDIT 2018-01-27:
事实证明,这个问题与 DirectRunner 有关.如果您使用 DataflowRunner 运行相同的管道,您应该获得实际上多达 1,000 条记录的批次.DirectRunner 总是在分组操作后创建大小为 1 的包.
It turns out that this issue is related to the DirectRunner. If you run the same pipeline using the DataflowRunner, you should get batches that are actually up to 1,000 records. The DirectRunner always creates bundles of size 1 after a grouping operation.
原答案:
我在使用 Apache Beam 的 JdbcIO 写入云数据库时遇到了同样的问题.问题是,虽然 JdbcIO 确实支持批量写入多达 1,000 条记录,但我从未真正见过它一次写入超过 1 行(我不得不承认:这总是在开发环境中使用 DirectRunner).
I've run into the same problem when writing to cloud databases using Apache Beam's JdbcIO. The problem is that while JdbcIO does support writing up to 1,000 records in one batch, in I have never actually seen it write more than 1 row at a time (I have to admit: This was always using the DirectRunner in a development environment).
因此,我在 JdbcIO 中添加了一个功能,您可以通过将数据分组在一起并将每个组写为一个批次来自己控制批次的大小.下面是基于 Apache Beam 原始 WordCount 示例的如何使用此功能的示例.
I have therefore added a feature to JdbcIO where you can control the size of the batches yourself by grouping your data together and writing each group as one batch. Below is an example of how to use this feature based on the original WordCount example of Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
与 JdbcIO 的普通写入方法的不同之处在于新方法 writeIterable()
将 PCollection
作为输入而不是 <代码>PCollection
The difference with the normal write-method of JdbcIO is the new method writeIterable()
that takes a PCollection<Iterable<RowT>>
as input instead of PCollection<RowT>
. Each Iterable is written as one batch to the database.
可以在此处找到具有此附加功能的 JdbcIO 版本:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
The version of JdbcIO with this addition can be found here: https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
可以在此处找到包含上述示例的整个示例项目:https://github.com/olavloite/spanner-beam-example
The entire example project containing the example above can be found here: https://github.com/olavloite/spanner-beam-example
(Apache Beam 上还有一个拉取请求未决,以将其包含在项目中)
(There is also a pull request pending on Apache Beam to include this in the project)
这篇关于Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库
- 如何将 SonarQube 6.7 从 MySQL 迁移到 postgresql 2022-01-01
- 在SQL中,如何为每个组选择前2行 2021-01-01
- SQL 临时表问题 2022-01-01
- 使用 Oracle PL/SQL developer 生成测试数据 2021-01-01
- 远程 mySQL 连接抛出“无法使用旧的不安全身份验证连接到 MySQL 4.1+"来自 XAMPP 的错误 2022-01-01
- 导入具有可变标题的 Excel 文件 2021-01-01
- 更改自动增量起始编号? 2021-01-01
- 以一个值为轴心,但将一行上的数据按另一行分组? 2022-01-01
- 如何使用 pip 安装 Python MySQLdb 模块? 2021-01-01
- 如何将 Byte[] 插入 SQL Server VARBINARY 列 2021-01-01