如何快速安全的插入千万条数据

前言

最近有个需求解析一个订单文件,而且说明文件可达到千万条数据,每条数据大概在20个字段左右,每一个字段使用逗号分隔,须要尽可能在半小时内入库。java

思路

1.估算文件大小

由于告诉文件有千万条,同时每条记录大概在20个字段左右,因此能够大体估算一下整个订单文件的大小,方法也很简单使用FileWriter往文件中插入一千万条数据,查看文件大小,经测试大概在1.5G左右;mysql

2.如何批量插入

由上可知文件比较大,一次性读取内存确定不行,方法是每次从当前订单文件中截取一部分数据,而后进行批量插入,如何批次插入可使用**insert(...)values(...),(...)**的方式,经测试这种方式效率仍是挺高的;git

3.数据的完整性

截取数据的时候须要注意,须要保证数据的完整性,每条记录最后都是一个换行符,须要根据这个标识保证每次截取都是整条数,不要出现半条数据这种状况;github

4.数据库是否支持批次数据

由于须要进行批次数据的插入,数据库是否支持大量数据写入,好比这边使用的mysql,能够经过设置max_allowed_packet来保证批次提交的数据量;sql

5.中途出错的状况

由于是大文件解析,若是中途出现错误,好比数据恰好插入到900w的时候,数据库链接失败,这种状况不可能从新来插一遍,全部须要记录每次插入数据的位置,而且须要保证和批次插入的数据在同一个事务中,这样恢复以后能够从记录的位置开始继续插入。数据库

实现

1.准备数据表

这里须要准备两张表分别是:订单状态位置信息表,订单表;缓存

CREATE TABLE `file_analysis` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `file_type` varchar(255) NOT NULL COMMENT '文件类型 01:类型1,02:类型2',
  `file_name` varchar(255) NOT NULL COMMENT '文件名称',
  `file_path` varchar(255) NOT NULL COMMENT '文件路径',
  `status` varchar(255) NOT NULL COMMENT '文件状态 0初始化;1成功;2失败:3处理中',
  `position` bigint(20) NOT NULL COMMENT '上一次处理完成的位置',
  `crt_time` datetime NOT NULL COMMENT '建立时间',
  `upd_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
复制代码
CREATE TABLE `file_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `file_id` bigint(20) DEFAULT NULL,
  `field1` varchar(255) DEFAULT NULL,
  `field2` varchar(255) DEFAULT NULL,
  `field3` varchar(255) DEFAULT NULL,
  `field4` varchar(255) DEFAULT NULL,
  `field5` varchar(255) DEFAULT NULL,
  `field6` varchar(255) DEFAULT NULL,
  `field7` varchar(255) DEFAULT NULL,
  `field8` varchar(255) DEFAULT NULL,
  `field9` varchar(255) DEFAULT NULL,
  `field10` varchar(255) DEFAULT NULL,
  `field11` varchar(255) DEFAULT NULL,
  `field12` varchar(255) DEFAULT NULL,
  `field13` varchar(255) DEFAULT NULL,
  `field14` varchar(255) DEFAULT NULL,
  `field15` varchar(255) DEFAULT NULL,
  `field16` varchar(255) DEFAULT NULL,
  `field17` varchar(255) DEFAULT NULL,
  `field18` varchar(255) DEFAULT NULL,
  `crt_time` datetime NOT NULL COMMENT '建立时间',
  `upd_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10000024 DEFAULT CHARSET=utf8
复制代码

2.配置数据库包大小

mysql> show VARIABLES like '%max_allowed_packet%';
+--------------------------+------------+
| Variable_name            | Value      |
+--------------------------+------------+
| max_allowed_packet       | 1048576    |
| slave_max_allowed_packet | 1073741824 |
+--------------------------+------------+
2 rows in set

mysql> set global max_allowed_packet = 1024*1024*10;
Query OK, 0 rows affected
复制代码

经过设置max_allowed_packet,保证数据库可以接收批次插入的数据包大小;否则会出现以下错误:bash

Caused by: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (4980577 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable. at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3915) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2598) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2778) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2834) 复制代码

3.准备测试数据

public static void main(String[] args) throws IOException {
		FileWriter out = new FileWriter(new File("D://xxxxxxx//orders.txt"));
		for (int i = 0; i < 10000000; i++) {
			out.write(
					"vaule1,vaule2,vaule3,vaule4,vaule5,vaule6,vaule7,vaule8,vaule9,vaule10,vaule11,vaule12,vaule13,vaule14,vaule15,vaule16,vaule17,vaule18");
			out.write(System.getProperty("line.separator"));
		}
		out.close();
	}
复制代码

使用FileWriter遍历往一个文件里插入1000w条数据便可,这个速度仍是很快的,不要忘了在每条数据的后面添加换行符(\n\r)session

4.截取数据的完整性

除了须要设置每次读取文件的大小,同时还须要设置一个参数,用来每次获取一小部分数据,从这小部分数据中获取换行符(\n\r),若是获取不到一直累加直接获取为止,这个值设置大小大体同每条数据的大小差很少合适,部分实现以下:mybatis

ByteBuffer byteBuffer = ByteBuffer.allocate(buffSize); // 申请一个缓存区
long endPosition = batchFileSize + startPosition - buffSize;// 子文件结束位置

long startTime, endTime;
for (int i = 0; i < count; i++) {
	startTime = System.currentTimeMillis();
	if (i + 1 != count) {
		int read = inputChannel.read(byteBuffer, endPosition);// 读取数据
		readW: while (read != -1) {
			byteBuffer.flip();// 切换读模式
			byte[] array = byteBuffer.array();
			for (int j = 0; j < array.length; j++) {
				byte b = array[j];
				if (b == 10 || b == 13) { // 判断\n\r
					endPosition += j;
					break readW;
				}
			}
			endPosition += buffSize;
			byteBuffer.clear(); // 重置缓存块指针
			read = inputChannel.read(byteBuffer, endPosition);
		}
	} else {
		endPosition = fileSize; // 最后一个文件直接指向文件末尾
	}
    ...省略,更多能够查看Github完整代码...
}
复制代码

如上代码所示开辟了一个缓冲区,根据每行数据大小来定大概在200字节左右,而后经过遍历查找换行符(\n\r),找到之后将当前的位置加到以前的结束位置上,保证了数据的完整性;

5.批次插入数据

经过**insert(...)values(...),(...)**的方式批次插入数据,部分代码以下:

// 保存订单和解析位置保证在一个事务中
		SqlSession session = sqlSessionFactory.openSession();
		try {
			long startTime = System.currentTimeMillis();
			FielAnalysisMapper fielAnalysisMapper = session.getMapper(FielAnalysisMapper.class);
			FileOrderMapper fileOrderMapper = session.getMapper(FileOrderMapper.class);
			fileOrderMapper.batchInsert(orderList);

			// 更新上次解析到的位置,同时指定更新时间
			fileAnalysis.setPosition(endPosition + 1);
			fileAnalysis.setStatus("3");
			fileAnalysis.setUpdTime(new Date());
			fielAnalysisMapper.updateFileAnalysis(fileAnalysis);
			session.commit();
			long endTime = System.currentTimeMillis();
			System.out.println("===插入数据花费:" + (endTime - startTime) + "ms===");
		} catch (Exception e) {
			session.rollback();
		} finally {
			session.close();
		}
        ...省略,更多能够查看Github完整代码...
复制代码

如上代码在一个事务中同时保存批次订单数据和文件解析位置信息,batchInsert经过使用mybatis的****标签来遍历订单列表,生成values数据;

总结

以上展现了部分代码,完整的代码能够查看Github地址中的batchInsert模块,本地设置每次截取的文件大小为2M,经测试1000w条数据(大小1.5G左右)插入mysql数据库中,大概花费时间在20分钟左右,固然能够经过设置截取的文件大小,花费的时间也会相应的改变。

完整代码

Github