基于当前现状,引入Flinkx同步mongodb的oplog来达到近实时同步数据一方面可以减少集群的压力; 此外Flinkx可以监听数据变化及时将数据同步到clickhouse或者kafka中方便做一些近实时的OLAP分析,以及查询。
由于社区上用flink同步mongodb oplog比较少所以会有一些bug需要修复。在这里介绍我使用过程中修复的bug。
1:包路径缺少flinkx导致找不到类
解决办法:增加flinkx路径,重新导包
2:反序列化获取MongodboplogReader异常
解决办法:修改类名,针对mongooplog直接写死方式
3:mongo游标和client关闭导致空指针异常
解决办法:初期采用去除测试config test成功
后面仔细查看代码,发现是没初始化testconfig,所以在这里初始化就可以解决。
4:flinkx使用模糊匹配找库表,这不符合我们场景,我们需要精确匹配
更改后模板的写法变为:
"reader": {
"name": "mongodboplogreader",
"parameter": {
"hostPorts": "10.212.32.84:20000,10.212.32.82:20000,10.212.32.10:30000",
"username": "root123",
"password": "root123",
"database": "admin",
"clusterMode": "REPLICA_SET",
"authenticationMechanism": "SCRAM-SHA-1",
"monitorDatabases": ["YM"],
"monitorCollections":["YW.FACT_POSP","YW.FACT_DOWN"],
"operateType":["insert","update","delete"],
"pavingData":true,
"excludeDocId": false
}
}
[注意] monitorCollections 使用库名.表名的形式即可,多个使用逗号分割。
目前大致修复的就是以上的问题。
cp ../flink-mongo-jar/mongo-java-driver-3.12.6.jar ./plugins/mongodboplogreader/ cp flinkx-mongodb/flinkx-mongodb-core/target/flinkx-mongodb-core-1.6.jar ./plugins/mongodboplogreader/ cp flinkx-mongodb/flinkx-mongodb-oplog-reader/target/* ./plugins/mongodboplogreader/ |
和waterdrop思想史类似的,reader是源端,writer为目标端
{ "job": { "content": [ { "reader": { "name": "mongodboplogreader", "parameter": { "hostPorts": "10.213.32.86:20000,10.213.32.83:20000,10.213.32.10:30000", "username": "root", "password": "root", "database": "admin", "clusterMode": "REPLICA_SET", "authenticationMechanism": "SCRAM-SHA-1", "monitorDatabases": ["YSDW"], "monitorCollections":["YSDW.FACT_TRADE_POSP","YSDW.FACT_TRADE_POSP_DOWN"], "operateType":["insert","update","delete"], "pavingData":true, "excludeDocId": false } }, "writer": { "name": "streamwriter", "parameter": { "print": true } } } ], "setting": { "speed": { "channel": 1, "bytes": 1048576 }, "errorLimit": { "record": 100 }, "restore" : { "isRestore" : true, "isStream" : true } } } }
|
3:运行
bin/flinkx -mode local -job /root/test/flinkx-1.8.5/examples/stream_mongo_op.json -pluginRoot /root/test/flinkx-1.8.5/plugins -confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/tmp/flink-checkpoint/\"}" |
运行后可以看到同步数据的输出
目前还需要测试点是flink.checkpoint.stateBackend参数,checkpoint生效就可以做到从上次位置读取,从而避免重复拉取数据。