alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.37k stars 7.59k forks source link

canal-adapter消费dynamicTopic问题 #3402

Open jackzlz opened 3 years ago

jackzlz commented 3 years ago

canal配置dynamicTopic后实现单表单topic,canal-adapter应该怎么配置来实现每个表消费对应的topic呢 ?如果配置canalAdapters多个instance,看了下代码会导致ESAdapter重复初始化

jackzlz commented 3 years ago

目前的方案是: 如果多个mq同步到同一个es时,保证es只初始化一次即可。即保证adapter的init方法只执行一次。 改动点:

  1. ExtensionLoader增加isExtensionExists方法

    public boolean isExtensionExists(String name, String key) {
    
        String extKey = name + "-" + StringUtils.trimToEmpty(key);
        Holder<Object> holder = cachedInstances.get(extKey);
        return holder != null && holder.get() != null;
    
    }
  2. CanalAdapterLoaderloadAdapter的时候判断

图片

  1. canalAdapters配置
    canalAdapters:
    - instance: schema_table1 # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: es7
              key: es7
              hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
              properties:
                mode: rest # or rest
                security.auth: elastic:12345678 #  only used for rest mode
                scheme: https
                certificate.type: pem
                certificate.path: /crt/elasticsearch-test.crt
    - instance: schema_table2 # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: es7
              key: es7
    - instance: schema_table3 # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: es7
              key: es7

配置的时候保证outerAdapters配置相同的name和key,这样同一个name+key只会初始化一次。而es的连接信息只需要配置第一个,因为是按顺序加载的

jackzlz commented 3 years ago

上面的写法有问题。改成了下面的: 图片 图片

jackzlz commented 3 years ago

最终采用的方案是,多个表就配置多个instance,因为多个表配置同一个esconnection偶尔报错。ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception,

  canalAdapters:
    - instance: schema_table1 # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: es7
              key: table1
              hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
              properties:
                mode: rest # or rest
                security.auth: elastic:12345678 #  only used for rest mode
                scheme: https
                certificate.type: pem
                certificate.path: /crt/elasticsearch-test.crt
    - instance: schema_table2 # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: es7
              key: table2
              hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
              properties:
                mode: rest # or rest
                security.auth: elastic:12345678 #  only used for rest mode
                scheme: https
                certificate.type: pem
                certificate.path: /crt/elasticsearch-test.crt
    - instance: schema_table3 # canal instance Name or mq topic name
      groups:
        - groupId: g1
          outerAdapters:
            - name: logger
            - name: es7
              key: table3
              hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
              properties:
                mode: rest # or rest
                security.auth: elastic:12345678 #  only used for rest mode
                scheme: https
                certificate.type: pem
                certificate.path: /crt/elasticsearch-test.crt