DataX用来做批量数据迁移很适合,能够保证数据的一致性,性能也很好,结合时间戳字段,用来实现数据定时增量同步也是可以的,如每分钟或每5分钟增量同步一次数据。用DataX这个方案做增量同步要求每个表带一个时间戳字段,删除数据采用逻辑删除,这个要求也比较容易做到。
增量同步实现
实现增量同步需要在表中增加一个时间戳字段,如update_time,在同步配置文件中,通过where条件,根据时间戳字段筛选当前时间向前一段时间内的增量数据。
{
	"job": {
		"content": [{
			"reader": {
				"name": "mysqlreader",
				"parameter": {
					"column": [
						"doc_id",
						"title",
						"file_path",
						"approval_id",
						"page_count",
						"version"
					],
					"connection": [{
						"jdbcUrl": [
							"jdbc:mysql://192.168.81.1:3306/bootdo?useUnicode=true&characterEncoding=utf8"
						],
						"table": [
							"es_approval_doc"
						]
					}],
					"password": "123456",
					"username": "root",
					"where": "version > FROM_UNIXTIME(${start_time}) and version"
				}
			},
			"writer": {
				"name": "mysqlwriter",
				"parameter": {
					"column": [
						"doc_id",
						"title",
						"file_path",
						"approval_id",
						"page_count",
						"version"
					],
					"writeMode": "update",
					"connection": [{
						"jdbcUrl": "jdbc:mysql://192.168.81.1:3306/bootdo?useUnicode=true&characterEncoding=utf8",
						"table": [
							"es_approval_doc_copy"
						]
					}],
					"password": "123456",
					"username": "root"
				}
			}
		}],
		"setting": {
			"speed": {
				"channel": "1"
			}
		}
	}
}
json文件中,${start_time}和${end_time}为调用datax.py时传入的参数。如:
datax/bin/datax.py ../../mysql2mysql.json -p "-Dstart_time=1546337137 -Dend_time=1546337237"
定时同步实现
定时同步可以采用操作系统的定时任务+shell脚本实现。以下为在linux系统中的方案:
1、编写shell脚本,命名为syntask.sh:#!/bin/bash
# source /etc/profile # 截至时间设置为当前时间戳 end_time=$(date +%s) # 开始时间设置为120s前时间戳 start_time=$(($end_time - 120)) # datax/bin/datax.py ../../mysql2mysql.json -p "-Dstart_time=$start_time -Dend_time=$end_time"
这里通过脚本获取用于筛选条件中的开始时间start_time和结束时间end_time,将两个时间作为参数传给datax.py。
2、在crontab中,添加任务计划:
$crontab -e* */1 * * * /syntask.sh
DataX不适合实时数据同步或太频繁的定时同步,因为同步都需要去读取源表,频率过大对源表会造成压力。
此外,最好每次增量同步的时间段比定时任务时间间隔大一些,以保证所有时间产生的数据都被覆盖到。
异常情况下的补救措施:
如果某段时间内由于服务器、操作系统、网络等原因造成某个时间段内数据没有正常同步,那么可以通过手动执行同步的方式进行补救,执行同步时,将筛选的时间段加大大覆盖异常发生的整个时间段。
多表同步实现
通常我们的业务系统存在有多个表,表之间有外键关系。为实现多表的数据同步,我们需要理清外键依赖关系,为每个表分别编写json同步配置文件,并按外键依赖关系逐个调用datax.py。
如对于主表es_approval和子表es_approval_doc,可以对应写两个json配置文件:mysql2mysql-approval.json和mysql2mysql-approval-doc.json,在syntask.sh中先调用主表配置文件,再调用子表配置文件。
#!/bin/bash source /etc/profile # 截至时间设置为当前时间戳 end_time=$(date +%s) # 开始时间设置为120s前时间戳 start_time=$(($end_time - 3600)) /datax/bin/datax.py /mysql2mysql-approval.json -p "-Dstart_time=$start_time -Dend_time=$end_time" /datax/bin/datax.py /mysql2mysql-approval-doc.json -p "-Dstart_time=$start_time -Dend_time=$end_time"
多级多路同步
要实现多级同步,可以在每两级之间搭建一个datax实例实现这两级之间的数据同步。
要实现多路同步,可以为同一个表编写多个配置文件,向多个目标库同步。
参考: