alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.52k stars 7.62k forks source link

多表关联的数据该如何使用canal同步至es? #4070

Open ice-llz opened 2 years ago

ice-llz commented 2 years ago

近期我的需求里需要将一部分数据库里的数据同步到 elasticSearch,然后从 es 进行搜索。 需要同步的表结构如下:

image

我需要将resourceId, resourceName, authorNamecategoryName这四个字段的数据同步到 es 中。sql 查询语句如下:

select r.resouceId, rf.resourceName, a.authorName, cr.categoryId, c.categoryName 
from resource as r
left join resourcefu as rf on r.resourceId = rf.resourceId
left join author as a on r.authorId = a.authorId
left join category_rel as cr on r.resourceId = cr.resourceId
left join category as c on cr.caetgoryId = c.categoryId

首先想到的就是监听 binlog,进而查到了 canal. 于是开始使用 canal-deployer 监听 binlog, canal adapter 将监听的表的数据同步给 Es. 那么就需要在 adpter/config/es7文件里将上述 sql 写入进去。 一切配置好后,

  1. 执行 etl 同步全量数据,没有问题。
  2. 修改 author 表的数据,改动同步至 es。
  3. 修改资源表的数据,改动同步至 es。
  4. 修改分类-资源关系表的数据,log 里打印出了 update 的日志,但改动并未同步到 es。
  5. 分类表新增一条数据,并修改分类-资源关系表的映射关系指向这条新数据,大bug出现了,映射关系里关联的资源在 es 中只剩下 2 个字段,就是 categoryIdresourceId, designerNamecategoryName 为空。

查了发现不止我一个人有这个问题,已经有人指出是canal对这种关联关系表数据同步的支持不太友好。

image

于是我就想将两张表合为一张再进行查询,sql 如下

select r.resouceId, rf.resourceName, a.authorName, d.categoryId, d.categoryName 
from resource as r
left join resourcefu as rf on r.resourceId = rf.resourceId
left join author as a on r.authorId = a.authorId
left join (select cr.categoryId, c.categoryName from category_rel as cr left join category as c) d

还是不行,这下直接在启动 adpater 阶段就报了sqlParseExcpetion 错误 经查发现 canal adapter 虽然能支持关联表,但是对关联表由诸多限制,比如:

如果关联表是子查询则关联表只能有一个。好嘛,这下子查询也不能用了。 这下就感觉使用 canal_adapter 的路被完全堵死了。 只剩下其他方法,比如:

  1. 使用canal-deploy监听binlog后,在代码里解析进行深度定制后再写入ES。
  2. 使用其他同步工具如Flink-CDC, logstash。但是也不知道它们对多表关联的支持情况如何。

各位牛爷爷们,我觉得我这个同步场景应该不复杂,为什么 canal 这么不友好呢,还是我没学会如何写 sql? 如果不使用canal的话,有没有什么其他工具能够完美适配我这种场景? 求各位大佬指教

jianbo2018 commented 2 years ago

可能和阿里的开发手册规约有关系,关于索引规约中有一条【强制】要求是这么说的:

【强制】超过三个表禁止 join。需要 join 的字段,数据类型必须绝对一致;多表关联查询时,
保证被关联的字段需要有索引。

你上面的例子中有5张表,除了最后的分类表,其他4张表都以“资源主表”为主表,所以如果不考虑最后的“分类表”的话,可以创建3个sql映射的yml文件,并使用同一个_index应该就可以了。 反正对每个sql映射来说,如果es的mapping中没有的字段在es的doc中也仅仅是展示为null而已,当所有的sql映射都执行完毕后,应该就可以补齐es mapping中定义的所有field了。

我自己本地做了一个简单的3张表的join测试:

两个yml文件使用相同的_index:report_sheet sql1.yml:

dataSourceKey: ds_slave_01
destination: example
outerAdapterKey: mysql1
groupId: g1
esMapping:
  _index: report_sheet
  _id: _id
  sql: "select g.id as _id, g.payer_PNR, g.payer_name, g.payer_telno, o.amount, g.payer_from, g.payer_to, o.gmt_create
        from my_biz_gateway_order g
        left join my_order o on g.id=o.biz_id"
  commitBatch: 3000

sql2.yml

dataSourceKey: ds_slave_01
destination: example
outerAdapterKey: mysql1
groupId: g1
esMapping:
  _index: report_sheet
  _id: _id
  sql: "select g.id as _id, c.hahaha
        from my_biz_gateway_order g
        left join my_hahaha c on g.id=c.biz_id"
  commitBatch: 3000

es mapping:

{
  "report_sheet" : {
    "mappings" : {
      "properties" : {
        "amount" : {
          "type" : "long"
        },
        "gmt_create" : {
          "type" : "date"
        },
        "hahaha" : {
          "type" : "text"
        },
        "payer_PNR" : {
          "type" : "text"
        },
        "payer_from" : {
          "type" : "text"
        },
        "payer_name" : {
          "type" : "text"
        },
        "payer_telno" : {
          "type" : "long"
        },
        "payer_to" : {
          "type" : "text"
        }
      }
    }
  }
}

测试结果可以正常运行。

jianbo2018 commented 2 years ago

另外,在这里追问一个问题,我就犯个懒了。不知道用视图可不可以解决,虽然canal不能监控视图,只能监控视图背后的表,但sql映射文件貌似是直接执行的“基于sql映射拼接的查询语句”,所以感觉应该也可以,不知道有没有人测试过....

ice-llz commented 2 years ago

另外,在这里追问一个问题,我就犯个懒了。不知道用视图可不可以解决,虽然canal不能监控视图,只能监控视图背后的表,但sql映射文件貌似是直接执行的“基于sql映射拼接的查询语句”,所以感觉应该也可以,不知道有没有人测试过....

你好,认真看了你的回答。 先回答第一点:业务需要必须携带分类表的字段,不能不考虑。 第二点:视图我试过,结果是canal无法监控视图的变化。因为canal是基于sql里的table名判断是否需要将监控到的数据改动推送至es。如果创建成为视图的话,监控的表名为视图名,而binlog监听到的表名永远都不会是视图名,而是视图的组成表的表名们。虽然它们在我们看来是一样的,但是canal是可以监控到任何改动,但是无法将任意一个改动推送至es。

jianbo2018 commented 2 years ago

那如果不想修改canal源码重编译的话,我感觉可以试试单独创建一个服务:监听canal instance在MQ的topic,然后用es的high level api自己将数据从mysql中查出来再索引到es中了。 持续关注有没有大神有更好的办法

ice-llz commented 2 years ago

那如果不想修改canal源码重编译的话,我感觉可以试试单独创建一个服务:监听canal instance在MQ的topic,然后用es的high level api自己将数据从mysql中查出来再索引到es中了。 持续关注有没有大神有更好的办法

你说的这个就是自己写代码了,那也没必要再监听mq,直接从canal deploy里可以拿到数据

MrLucy commented 1 year ago

几个表全部开启监听 es-mapping _index 指向同一个索引,开启upsert 应该能满足你的要求,我也有个类似的一个需求多表同步到一个索引,因为对数据实时性要求不是很高 就是这么实现的,目前没有发现什么问题

补充一下 如果从表时逻辑删除的可以,物理删除的话不行

1512033796 commented 1 year ago

你上面的例子中有5张表,除了最后的分类表,其他4张表都以“资源主表”为主表,所以如果不考虑最后的“分类表”的话,可以创建3个sql映射的yml文件,并使用同一个_index应该就可以了。 反正对每个sql映射来说,如果es的mapping中没有的字段在es的doc中也仅仅是展示为null而已,当所有的sql映射都执行完毕后,应该就可以补齐es mapping中定义的所有field了。 测试结果可以正常运行。

大佬,测试案例中增量同步是可以的,但是全量同步时,第二个yum文件全量同步后会将前面的操作数据全覆盖,这种情况如何解决呢?

Fata1errror commented 1 year ago

那如果不想修改canal源码重编译的话,我感觉可以试试单独创建一个服务:监听canal instance在MQ的topic,然后用es的high level api自己将数据从mysql中查出来再索引到es中了。 持续关注有没有大神有更好的办法

你说的这个就是自己写代码了,那也没必要再监听mq,直接从canal deploy里可以拿到数据

我也打算这么做了,大神们还有其他更好的方法吗