Flinkx同步mongodb oplog

 

基于当前现状,引入Flinkx同步mongodb的oplog来达到近实时同步数据一方面可以减少集群的压力; 此外Flinkx可以监听数据变化及时将数据同步到clickhouse或者kafka中方便做一些近实时的OLAP分析,以及查询。

  1. BUG修复

由于社区上用flink同步mongodb oplog比较少所以会有一些bug需要修复。在这里介绍我使用过程中修复的bug。

1:包路径缺少flinkx导致找不到类

 

解决办法:增加flinkx路径,重新导包

2:反序列化获取MongodboplogReader异常

 

解决办法:修改类名,针对mongooplog直接写死方式

3:mongo游标和client关闭导致空指针异常

 

解决办法:初期采用去除测试config test成功

 

后面仔细查看代码,发现是没初始化testconfig,所以在这里初始化就可以解决。

4:flinkx使用模糊匹配找库表,这不符合我们场景,我们需要精确匹配

 

更改后模板的写法变为:

"reader": {

          "name": "mongodboplogreader",

          "parameter": {

            "hostPorts": "10.212.32.84:20000,10.212.32.82:20000,10.212.32.10:30000",

            "username": "root123",

            "password": "root123",

            "database": "admin",

            "clusterMode": "REPLICA_SET",

            "authenticationMechanism": "SCRAM-SHA-1",

            "monitorDatabases": ["YM"],

            "monitorCollections":["YW.FACT_POSP","YW.FACT_DOWN"],

            "operateType":["insert","update","delete"],

            "pavingData":true,

            "excludeDocId": false

          }

        }

[注意] monitorCollections 使用库名.表名的形式即可,多个使用逗号分割。

目前大致修复的就是以上的问题。

  1. flinkx使用
  1. 导入依赖包到plugins/mongodboplogreader/下面                                                                                              

 

cp ../flink-mongo-jar/mongo-java-driver-3.12.6.jar ./plugins/mongodboplogreader/

cp flinkx-mongodb/flinkx-mongodb-core/target/flinkx-mongodb-core-1.6.jar ./plugins/mongodboplogreader/

cp flinkx-mongodb/flinkx-mongodb-oplog-reader/target/* ./plugins/mongodboplogreader/

 

  1. 建模板-->

和waterdrop思想史类似的,reader是源端,writer为目标端

{

  "job": {

    "content": [

      {

        "reader": {

          "name": "mongodboplogreader",

          "parameter": {

            "hostPorts": "10.213.32.86:20000,10.213.32.83:20000,10.213.32.10:30000",

            "username": "root",

            "password": "root",

            "database": "admin",

            "clusterMode": "REPLICA_SET",

            "authenticationMechanism": "SCRAM-SHA-1",

            "monitorDatabases": ["YSDW"],

            "monitorCollections":["YSDW.FACT_TRADE_POSP","YSDW.FACT_TRADE_POSP_DOWN"],

            "operateType":["insert","update","delete"],

            "pavingData":true,

            "excludeDocId": false

          }

        },

        "writer": {

          "name": "streamwriter",

          "parameter": {

            "print": true

          }

        }

      }

    ],

    "setting": {

      "speed": {

        "channel": 1,

        "bytes": 1048576

      },

      "errorLimit": {

        "record": 100

      },

      "restore" : {

        "isRestore" : true,

        "isStream" : true

      }

    }

  }

}

 

3:运行

bin/flinkx -mode local -job /root/test/flinkx-1.8.5/examples/stream_mongo_op.json -pluginRoot /root/test/flinkx-1.8.5/plugins -confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/tmp/flink-checkpoint/\"}"

 

运行后可以看到同步数据的输出

目前还需要测试点是flink.checkpoint.stateBackend参数,checkpoint生效就可以做到从上次位置读取,从而避免重复拉取数据。