alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.34k stars 7.58k forks source link

adapter1.1.6同步es7报错 NullPointerException #4641

Open lenovore opened 1 year ago

lenovore commented 1 year ago

adpater: v1.1.6 es: v7.10.2

canal同步数据到kafka,adapter消费kafka数据同步到es。 多表关联,插入主表1条数据可以同步到es,然后更新这条数据,同步报错(针对下面的adapter2配置):

2023-03-09 11:54:52.294 [Thread-18] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed!  Error sync and rollback, execute times: 9
2023-03-09 11:54:52.797 [pool-12-thread-1] ERROR c.a.o.canal.client.adapter.es.core.service.ESSyncService - sync error, es index: test3_many, DML : Dml{destination='test3_many', database='sbtest', table='tb_parent', type='UPDATE', es=1678334088000, ts=1678334088228, sql='', data=[{type_bigint=3, type_text=huc1, type_varchar=huc, id=1, type_datetime=2023-03-09 11:54:48}], old=[{type_bigint=1, type_text=huc, type_datetime=2023-03-09 11:54:42}]}
2023-03-09 11:54:52.799 [pool-12-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - java.lang.NullPointerException
java.lang.RuntimeException: java.lang.NullPointerException
    at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:112) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:60) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:104) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:83) ~[na:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:140) ~[client-adapter.launcher-1.1.6.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$writeOut$1(AdapterProcessor.java:98) ~[client-adapter.launcher-1.1.6.jar:na]
    at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891) ~[na:1.8.0_212]
    at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$writeOut$2(AdapterProcessor.java:95) ~[client-adapter.launcher-1.1.6.jar:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]
Caused by: java.lang.NullPointerException: null
    at com.alibaba.otter.canal.client.adapter.es7x.support.ES7xTemplate.getESDataFromDmlData(ES7xTemplate.java:321) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.singleTableSimpleFiledUpdate(ESSyncService.java:814) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.update(ESSyncService.java:262) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:97) ~[na:na]
    ... 11 common frames omitted
2023-03-09 11:54:52.801 [Thread-18] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed!  Error sync but ACK!

对应的配置如下: application.yml

canal.conf:
  mode: kafka
  zookeeperHosts: zookeeper:2181
  flatMessage: true
  syncBatchSize: 1000
  consumerProperties:
    kafka.bootstrap.servers: kafka:9092
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.90.178:7306/sbtest?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: test3_single
    groups:
    - groupId: g1
      outerAdapters:
      - name: es7
        hosts: http://192.168.90.178:9200
        properties:
          mode: rest
          security.auth: elastic:elastic
          cluster.name: Dev2ES
      - name: logger
  - instance: test3_many
    groups:
    - groupId: g2
      outerAdapters:
      - name: es7
        hosts: http://192.168.90.178:9200
        properties:
          mode: rest
          security.auth: elastic:elastic
          cluster.name: Dev2ES
      - name: logger

adapter1

dataSourceKey: defaultDS
destination: test3_single
groupId: g1
esMapping:
  _index: test3_single
  _id: _id
  sql: "select a.id as _id,a.id as id,a.type_bigint,a.type_char,a.type_varchar,a.type_text,a.type_decimal,a.type_datetime,a.type_date from sbtest.tb_single as a"
  etlCondition: "where id>{}"
  commitBatch: 1000

adapter2

dataSourceKey: defaultDS
destination: test3_many
groupId: g2
esMapping:
  _index: test3_many
  _id: id
  sql: "select a.id,a.type_bigint,a.type_text,a.type_datetime,
  b.pid as child1_id,b.type_bigint as child1_bigint,
concat(b.type_arr1,';',b.type_arr2) as child1_arr,
concat('\"[',b.type_arrobj1,',',b.type_arrobj2,']\"') as child1_arrobj
from sbtest.tb_parent as a
left join sbtest.tb_child1_1 as b on a.id=b.pid 
"
  objFields:
    child1_arr: array:;
    child1_arrobj: object
  etlCondition: "where id>{}"
  commitBatch: 1000

但是如果把拼接的 child1_arrobj字段及其属性child1_arrobj: object删除,更新又能正常同步了。 所以这个错误应该是和object对象有关,有谁知道怎么解决吗?

lytpe commented 1 year ago

不知道能否提供思路,也在学习中,根据问题,我定位了源码。

截屏2023-03-12 12 49 39 截屏2023-03-12 12 50 42

我觉得SchemaItem这边有问题,或者不使用 child1_arrobj。

modest269 commented 1 month ago

请问你这个问题解决了吗