使用canal-adapter同步数据到es

本文使用v1.1.5版本

1.下载并解压

canal.adapter-1.1.5.tar.gz

2.修改conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 3
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # kafka consumer
    kafka.bootstrap.servers: 150.158.190.205:9093
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: earliest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000

  srcDataSources:
    testDS:
      url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
      username: root
      password: pwd
  canalAdapters:
  - instance: testTopic # canal instance Name or mq topic name
    groups:
    - groupId: testgroup
      outerAdapters:
      - name: logger
      - name: es7
        hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: docker-cluster
PWD/#$HOME/~:
HOSTNAME%%.*:
USER:

3.在conf/es7目录下新建test.yml

配置内容如下:

dataSourceKey: testDS 
destination: testTopic
groupId: testgroup 
esMapping:
  _index: testIndex
  _id: id
#  upsert: true
#  pk: id
  sql: "SELECT id, create_time, update_time, status FROM tb_test"
#  objFields:
#    _labels: array:;
#  etlCondition: "where id>0"
  commitBatch: 3000

4.启动canal-adapter

sh bin/startup.sh

添加新评论