mysql大数量批量插入方案 LOAD DATA LOCAL INFILE实现及相关问题解决
- 业务场景
- 过程
- 代码
- 性能对比
- 问题及解决办法
业务场景
项目跟文件内容相关,上传文件将句子内容提取出来后,将每条句子信息插入到数据库,文件中提取出的句子数量较大,几k-几十w不等,为了将数据快速插入数据库,寻找方案。
过程
项目中使用mybatis做持久存储,首先尝试 批量sql的方式:
insert into table() values()()()()()()()...
由于数据量大,遇到两个问题:
1.values后面拼接数据,本地机器内存不够
2.插入效率仍然很慢
思考:使用多线程分批插入
结果:
1.文件大及数量多,线程数量太多
2.效率仍提升不明显
借鉴前人肩膀,但是仍然遇到许多问题,将问题一一解决后,对此过程中做一些记录,方便大家查阅。
原理:
正如原创博客作者所说:
MySQL使用 LOAD DATA LOCAL INFILE 从文件中导入数据比insert语句要快,MySQL文档上说要快20倍左右。
但是这个方法有个缺点,就是导入数据之前,必须要有文件,也就是说从文件中导入。这样就需要去写文件,以及文件删除等维护。某些情况下,比如数据源并发的话,还会出现写文件并发问题,很难处理。
而且如果先写文件,又增加了IO操作,所以为了避免写文件:
MySQL社区提供这样一个方法:setLocalInfileInputStream(),此方法位于com.mysql.jdbc.PreparedStatement 类中。通过使用 MySQL JDBC 的setLocalInfileInputStream 方法实现从Java InputStream中load data local infile 到MySQL数据库中。可以达到同样的效率,直接从内存(IO流中)中导入数据,而不需要写文件
代码
代码(springboot),根据自己业务改造表及字段参数:
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
/**
* @Description :大数量量插入数据库,快速插入方法
* @Author :
* @Date : 2021/3/4
**/
@Component
public class LoadDataInFileUtil {
private static final Logger logger = LoggerFactory.getLogger(LoadDataInFileUtil.class);
private Connection conn = null;
@Value("${spring.shardingsphere.datasource.shardingdb.driver-class-name}")
private String driverName;
@Value("${spring.shardingsphere.datasource.shardingdb.url}")
private String url;
@Value("${spring.shardingsphere.datasource.shardingdb.username}")
private String userName;
@Value("${spring.shardingsphere.datasource.shardingdb.password}")
private String password;
@Value("${database-name}")
private String database;
/**
* 获取原生jdbc连接
*
* @return
* @throws ClassNotFoundException
* @throws SQLException
*/
private Connection getConnection() throws ClassNotFoundException, SQLException {
Class.forName(driverName);
conn = DriverManager.getConnection(url, userName, password);
return conn;
}
/**
* 将数据从输入流加载到MySQL。
*
* @param loadDataSql SQL语句。
* @param dataStream 输入流。
* @return int 成功插入的行数。
*/
private int bulkLoadFromInputStream(String loadDataSql,
InputStream dataStream) throws SQLException, ClassNotFoundException {
if (null == dataStream) {
logger.info("输入流为NULL,没有数据导入。");
return 0;
}
//做测试发现连接池好像不支持,所以获取原生的jdbc连接
conn = getConnection();
PreparedStatement statement = null;
int result = 0;
try {
statement = conn.prepareStatement(loadDataSql);
// mysql8 使用下面的方式
/*if (statement.isWrapperFor(com.mysql.cj.jdbc.JdbcStatement.class)) {
com.mysql.cj.jdbc.ClientPreparedStatement mysqlStatement = statement.unwrap(com.mysql.cj.jdbc.ClientPreparedStatement.class);
mysqlStatement.setLocalInfileInputStream(dataStream);
mysqlStatement.executeUpdate();
}*/
// mysql5 使用下面的方式
if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) {
com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
mysqlStatement.setLocalInfileInputStream(dataStream);
result = mysqlStatement.executeUpdate();
}
} finally {
if (statement != null) {
statement.close();
}
}
return result;
}
/**
* 组装 SQL 语句
*
* @param dataBaseName 数据库名。
* @param tableName 表名。
* @param columnName 要插入数据的列名。
*/
public String assembleSql(String dataBaseName, String tableName, String[] columnName) {
String insertColumnName = StringUtils.join(columnName, ",");
//插入方案的核心就是下面这个语句
String sql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE " + dataBaseName + "." + tableName + " (" + insertColumnName + ")";
return sql;
}
/**
* 通过 LOAD DATA LOCAL INFILE 大批量导入数据到 MySQL。
*
* 原理是使用 setLocalInfileInputStream 会忽略 sql.csv 文件名,不从文件读取,直接从输入流读取数据
* @param sql SQL语句。
* @param builder 组装好的数据。
*/
public int fastInsertData(String sql, StringBuilder builder) {
int rows = 0;
InputStream is = null;
try {
byte[] bytes = builder.toString().getBytes();
is = new ByteArrayInputStream(bytes);
//批量插入数据。
long beginTime = System.currentTimeMillis();
rows = bulkLoadFromInputStream(sql, is);
long endTime = System.currentTimeMillis();
logger.info(" LOAD DATA LOCAL INFILE :【插入" + rows + "行数据至MySql中,耗时" + (endTime - beginTime) + "ms。】");
} catch (SQLException | ClassNotFoundException e) {
e.printStackTrace();
} finally {
try {
if (null != is) {
is.close();
}
if (null != conn) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
return rows;
}
/**
* 批量插入方法
*
* @return
*/
public void insertData(String dataBaseName, String tableName, String[] columns, StringBuilder tableColumnValue, int size) {
//插入语句
String sql = assembleSql(dataBaseName, tableName, columns);
//待插入行数,为了判断是否全部插入
int mainTableInsertRows = fastInsertData(sql, tableColumnValue);
//插入数据未全部成功视为全部失败
if (mainTableInsertRows != size)
System.println.out("插入失败!");
}
}
测试代码:
@RunWith(SpringRunner.class)
@SpringBootTest
public class LoadDataInFileUtilTest {
@Autowired
LoadDataInFileUtil loadDataInFileUtil;
@Test
public void testInsert() {
//表列值
StringBuilder tableColumnValue = new StringBuilder();
//数据库名
String dataBaseName = "dataBaseName ";
//待插入的表
String tableName = "tableName";
// 要插入数据的列名。(必须与插入的数据一一对应)
String[] columns = new String[]{"id","name","age"};
int number = 500000;
int id = 1;
for (int i = 0; i < number; i++) {
//顺序对应上面的列名
builderAppend(tableColumnValue, id);
builderAppend(tableColumnValue ,"测试名字");
builderEnd(tableColumnValue, 10);
}
long startTime = System.currentTimeMillis();
loadDataInFileUtil.insertData(dataBaseName , tableName , columns, tableColumnValue, number);
long endTime = System.currentTimeMillis();
System.out.println("插入数据,所用时间:" + (endTime - startTime));
}
/**
* 往 StringBuilder 里追加数据。
*
* @param builder StringBuilder。
* @param object 数据。
*/
public void builderAppend(StringBuilder builder, Object object) {
builder.append(object);
builder.append("\t");
}
/**
* 往 StringBuilder 里追加一条数据的最后一个字段。
*
* @param builder StringBuilder。
* @param object 数据。
*/
public void builderEnd(StringBuilder builder, Object object) {
builder.append(object);
builder.append("\n");
}
}
执行顺序:
从 insertData() 方法进入,首先通过assembleSql() 方法组织导入数据的sql语句“LOAD DATA LOCAL INFILE ‘sql.csv’ IGNORE INTO TABLE tableName()”,然后通过fastInsertData() 方法传入sql语句及需要导入的数据转化为流,最后通过setLocalInfileInputStream(dataStream) 将数据插入数据库。
性能对比
自己做了一些测试来对比效率,对比的是拼装批量插入方式和此方式,对比结果:
本机做的测试,若在服务器上,效率更佳。
此方案数据量越大,差距越明显。
问题及解决办法
1.mysql版本问题
一般我们使用 mysql5 或mysql8 版本的驱动包,对应的预编译 prepareStatement 对象为:
// mysql8 使用下面的方式
/*if (statement.isWrapperFor(com.mysql.cj.jdbc.JdbcStatement.class)) {
com.mysql.cj.jdbc.ClientPreparedStatement mysqlStatement = statement.unwrap(com.mysql.cj.jdbc.ClientPreparedStatement.class);
mysqlStatement.setLocalInfileInputStream(dataStream);
mysqlStatement.executeUpdate();
}*/
// mysql5 使用下面的方式
/* if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) {
com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
mysqlStatement.setLocalInfileInputStream(dataStream);
result = mysqlStatement.executeUpdate();
}*/
代码中有注释说明
2.字段拼接问题
因为字段值是通过 “\t” 来区别字段和 “\n” 来区别一条数据的,所以在字段的值中如果含有 Tab
3.值得问题
因为拼接的字段值都是已字符串的形式识别的,所以不存在true/false 这种数据,如果model类默认将数据库的tinyint类型转化为boolean了,需要单独建立model,使用0/1插入,其他字段同理。
4.是否成功
一批数据,如有无法插入的会跳过,不会报错,但会返回插入的条数,只能根据返回结果判定是否全部插入,来处理数据是否
5.数据库连接
因为我自己测试的时候,从dataSource获取的连接,似乎无法执行,所以获取了原生的jdbc连接来操作,注意操作完成时正确的关闭,否则造成资源浪费。
6.字段为空
如果字段为空,需要转化为“”,否则插入数据库以 null形式存在、
7.权限
操作数据库的账户需要开启 Excute权限
8.无法引用 com.mysql.jdbc.ClientPreparedStatement问题
因为之前在项目中引入jar包时,mysql驱动是以运行时编译的状态引入的,所以修改pom文件的scope
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<!--去掉运行时依赖,以便能使用com.mysql.jdbc.ClientPreparedStatement -->
<!-- <scope>runtime</scope>-->
</dependency>
以上为整个过程的使用及问题梳理,欢迎补充!