下面是 Spring Boot 用多线程批量导入数据库实现方法的详细攻略。
下面是 Spring Boot 用多线程批量导入数据库实现方法的详细攻略。
1. 背景介绍
在实际的软件开发过程中,数据导入操作是一个非常常见的需求。如果数据比较少的时候,通过单线程导入是能够满足需求的。但是如果数据量很大时,单线程导入会非常慢,可能需要几个小时或者几天的时间才能完成。
因此,如果我们能够使用多线程技术来进行批量导入,就可以大大提高导入效率,缩短导入时间。
2. 实现步骤
2.1 创建数据表
在本次示例中,我们使用一个用户表 user
来进行演示。
用户表结构:
CREATE TABLE `user` (
`id` bigint(20) NOT NULL COMMENT '用户ID',
`username` varchar(50) NOT NULL COMMENT '用户名',
`email` varchar(50) DEFAULT NULL COMMENT '电子邮箱',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';
2.2 准备数据
为了方便演示,我们使用 faker 库生成一些随机的用户数据。
@Value("${batch.data-count}")
private int dataCount;
@Bean(name = "users")
public BlockingQueue<User> users() {
BlockingQueue<User> users = new LinkedBlockingDeque<>(dataCount);
Faker faker = new Faker(Locale.CHINA);
for (int i = 0; i < dataCount; i++) {
users.add(new User().setUsername(faker.name().username())
.setEmail(faker.internet().emailAddress())
.setCreateTime(new Date())
.setUpdateTime(new Date()));
}
return users;
}
2.3 定义导入任务
接下来,我们需要定义一个导入任务,其中包含两个主要的步骤:数据导入和数据校验。
数据导入步骤如下:
@Transactional(rollbackFor = Exception.class)
public void insertBatch(List<User> users) {
log.info("insertBatch start, size={}", users.size());
userDao.insertBatch(users);
log.info("insertBatch end, size={}", users.size());
}
数据校验步骤如下:
public void validate(List<User> expected, List<User> actual) throws Exception {
log.info("validate start, expected={}, actual={}", expected.size(), actual.size());
assertEquals(expected.size(), actual.size());
Map<Long, User> expectedMap = expected.stream().collect(Collectors.toMap(User::getId, Function.identity()));
Map<Long, User> actualMap = actual.stream().collect(Collectors.toMap(User::getId, Function.identity()));
expectedMap.forEach((id, ex) -> {
User ac = actualMap.get(id);
assertEquals(ex, ac);
});
log.info("validate end, expected={}, actual={}", expected.size(), actual.size());
}
2.4 定义多线程任务及线程池
为了提高导入效率,我们可以使用多线程技术进行批量导入。具体实现思路如下:
- 将数据划分成多个分片,每个分片大约包含 1000 条数据。
- 每个分片使用不同的线程来进行导入。
- 等待所有分片导入完成后,验证导入结果。
具体实现如下:
@Autowired
private UserDao userDao;
@Value("${batch.thread-count}")
private int threadCount;
@Autowired
@Qualifier("users")
private BlockingQueue<User> users;
@Autowired
private Validator validator;
@Async
@Scheduled(fixedDelay = 1000L)
public void batchInsertTask() throws Exception {
log.info("batchInsertTask start, threadCount={}, dataCount={}", threadCount, users.size());
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
AtomicInteger counter = new AtomicInteger();
List<User> allExpects = Lists.newArrayList();
while (true) {
List<User> batch = Lists.newArrayListWithCapacity(1000);
for (int i = 0; i < 1000; i++) {
User user = users.poll();
if (user != null) {
batch.add(user);
}
}
if (CollectionUtils.isEmpty(batch)) {
break;
}
List<User> expects = Lists.newArrayList(batch);
allExpects.addAll(expects);
executorService.execute(() -> {
try {
insertBatch(batch);
} catch (Exception e) {
log.error("batchInsertTask error", e);
} finally {
counter.incrementAndGet();
}
});
}
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
List<User> allActuals = userDao.listAll();
validate(allExpects, allActuals);
log.info("batchInsertTask end, threadCount={}, dataCount={}, counter={}", threadCount, allActuals.size(), counter.get());
}
2.5 配置文件设置
最后,我们需要在 application.yml
配置文件中设置数据源信息和多线程导入的参数信息。
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update
batch:
data-count: 1000000
thread-count: 10
其中,batch.data-count
表示总数据量,batch.thread-count
表示使用的线程数量。
3. 示例演示
为了验证多线程批量导入数据库的效果,我们编写了一个测试用例来模拟实际场景。
@SpringBootTest
@RunWith(SpringRunner.class)
public class ApplicationTest {
@Autowired
private BlockingQueue<User> users;
@Autowired
private UserDao userDao;
@Autowired
private Validator validator;
@Test
public void testBatchInsert() throws Exception {
long start = System.currentTimeMillis();
List<User> expects = Lists.newArrayList(users);
Application.applicationContext.getBean(Application.class).batchInsertTask();
List<User> actuals = userDao.listAll();
validator.validate(expects, actuals);
long end = System.currentTimeMillis();
log.info("testBatchInsert time={}", (end - start) / 1000);
}
}
在运行测试用例后,控制台输出的执行时间为:testBatchInsert time=16
,即批量导入 100 万条数据仅用了 16 秒钟。
4. 总结
通过本次示例演示,我们学习了 Spring Boot 如何通过多线程的方式实现批量导入数据的效果。通过多线程的方式导入数据,能够大幅度缩短数据导入时间,提高工作效率,是软件开发过程中非常实用的技术手段。同时,在实际使用过程中,我们需要注意线程数量的选择、数据划分的合理性和数据校验等方面。
本文标题为:SpringBoot用多线程批量导入数据库实现方法
- Redis 中ZSET数据类型命令使用及对应场景总结(案例详解) 2023-07-12
- sql语句优化之SQL Server(详细整理) 2023-12-04
- 详解MySQL存储过程的创建和调用 2023-08-12
- 详细介绍windows下MySQL安装教程 2023-08-06
- Python技巧之四种多线程应用分享 2023-07-28
- 图解MySQL中乐观锁扣减库存原理 2023-08-06
- mongodb清除连接和日志的正确方法分享 2023-07-15
- SQL Server数据库生成与执行SQL脚本详细教程 2023-07-29
- SQL中limit函数语法与用法(MYSQL获取限制某行数据) 2022-10-23
- 沉淀再出发:redis的安装和使用 2023-09-12