沃梦达 / IT编程 / 数据库 / 正文

SpringBoot用多线程批量导入数据库实现方法

下面是 Spring Boot 用多线程批量导入数据库实现方法的详细攻略。

下面是 Spring Boot 用多线程批量导入数据库实现方法的详细攻略。

1. 背景介绍

在实际的软件开发过程中,数据导入操作是一个非常常见的需求。如果数据比较少的时候,通过单线程导入是能够满足需求的。但是如果数据量很大时,单线程导入会非常慢,可能需要几个小时或者几天的时间才能完成。

因此,如果我们能够使用多线程技术来进行批量导入,就可以大大提高导入效率,缩短导入时间。

2. 实现步骤

2.1 创建数据表

在本次示例中,我们使用一个用户表 user 来进行演示。

用户表结构:

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL COMMENT '用户ID',
  `username` varchar(50) NOT NULL COMMENT '用户名',
  `email` varchar(50) DEFAULT NULL COMMENT '电子邮箱',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';

2.2 准备数据

为了方便演示,我们使用 faker 库生成一些随机的用户数据。

@Value("${batch.data-count}")
private int dataCount;

@Bean(name = "users")
public BlockingQueue<User> users() {
    BlockingQueue<User> users = new LinkedBlockingDeque<>(dataCount);
    Faker faker = new Faker(Locale.CHINA);
    for (int i = 0; i < dataCount; i++) {
        users.add(new User().setUsername(faker.name().username())
                             .setEmail(faker.internet().emailAddress())
                             .setCreateTime(new Date())
                             .setUpdateTime(new Date()));
    }
    return users;
}

2.3 定义导入任务

接下来,我们需要定义一个导入任务,其中包含两个主要的步骤:数据导入和数据校验。

数据导入步骤如下:

@Transactional(rollbackFor = Exception.class)
public void insertBatch(List<User> users) {
    log.info("insertBatch start, size={}", users.size());
    userDao.insertBatch(users);
    log.info("insertBatch end, size={}", users.size());
}

数据校验步骤如下:

public void validate(List<User> expected, List<User> actual) throws Exception {
    log.info("validate start, expected={}, actual={}", expected.size(), actual.size());
    assertEquals(expected.size(), actual.size());
    Map<Long, User> expectedMap = expected.stream().collect(Collectors.toMap(User::getId, Function.identity()));
    Map<Long, User> actualMap = actual.stream().collect(Collectors.toMap(User::getId, Function.identity()));
    expectedMap.forEach((id, ex) -> {
        User ac = actualMap.get(id);
        assertEquals(ex, ac);
    });
    log.info("validate end, expected={}, actual={}", expected.size(), actual.size());
}

2.4 定义多线程任务及线程池

为了提高导入效率,我们可以使用多线程技术进行批量导入。具体实现思路如下:

  1. 将数据划分成多个分片,每个分片大约包含 1000 条数据。
  2. 每个分片使用不同的线程来进行导入。
  3. 等待所有分片导入完成后,验证导入结果。

具体实现如下:

@Autowired
private UserDao userDao;

@Value("${batch.thread-count}")
private int threadCount;

@Autowired
@Qualifier("users")
private BlockingQueue<User> users;

@Autowired
private Validator validator;

@Async
@Scheduled(fixedDelay = 1000L)
public void batchInsertTask() throws Exception {
    log.info("batchInsertTask start, threadCount={}, dataCount={}", threadCount, users.size());
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    AtomicInteger counter = new AtomicInteger();
    List<User> allExpects = Lists.newArrayList();
    while (true) {
        List<User> batch = Lists.newArrayListWithCapacity(1000);
        for (int i = 0; i < 1000; i++) {
            User user = users.poll();
            if (user != null) {
                batch.add(user);
            }
        }
        if (CollectionUtils.isEmpty(batch)) {
            break;
        }
        List<User> expects = Lists.newArrayList(batch);
        allExpects.addAll(expects);

        executorService.execute(() -> {
            try {
                insertBatch(batch);
            } catch (Exception e) {
                log.error("batchInsertTask error", e);
            } finally {
                counter.incrementAndGet();
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

    List<User> allActuals = userDao.listAll();
    validate(allExpects, allActuals);

    log.info("batchInsertTask end, threadCount={}, dataCount={}, counter={}", threadCount, allActuals.size(), counter.get());
}

2.5 配置文件设置

最后,我们需要在 application.yml 配置文件中设置数据源信息和多线程导入的参数信息。

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    hibernate:
      ddl-auto: update

batch:
  data-count: 1000000
  thread-count: 10

其中,batch.data-count 表示总数据量,batch.thread-count 表示使用的线程数量。

3. 示例演示

为了验证多线程批量导入数据库的效果,我们编写了一个测试用例来模拟实际场景。

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApplicationTest {

    @Autowired
    private BlockingQueue<User> users;

    @Autowired
    private UserDao userDao;

    @Autowired
    private Validator validator;

    @Test
    public void testBatchInsert() throws Exception {
        long start = System.currentTimeMillis();
        List<User> expects = Lists.newArrayList(users);
        Application.applicationContext.getBean(Application.class).batchInsertTask();
        List<User> actuals = userDao.listAll();
        validator.validate(expects, actuals);
        long end = System.currentTimeMillis();
        log.info("testBatchInsert time={}", (end - start) / 1000);
    }

}

在运行测试用例后,控制台输出的执行时间为:testBatchInsert time=16,即批量导入 100 万条数据仅用了 16 秒钟。

4. 总结

通过本次示例演示,我们学习了 Spring Boot 如何通过多线程的方式实现批量导入数据的效果。通过多线程的方式导入数据,能够大幅度缩短数据导入时间,提高工作效率,是软件开发过程中非常实用的技术手段。同时,在实际使用过程中,我们需要注意线程数量的选择、数据划分的合理性和数据校验等方面。

本文标题为:SpringBoot用多线程批量导入数据库实现方法