本文使用v1.1.5版本
1.下载并解压
2.修改conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 3
timeout:
accessKey:
secretKey:
consumerProperties:
# kafka consumer
kafka.bootstrap.servers: 150.158.190.205:9093
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: earliest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
srcDataSources:
testDS:
url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
username: root
password: pwd
canalAdapters:
- instance: testTopic # canal instance Name or mq topic name
groups:
- groupId: testgroup
outerAdapters:
- name: logger
- name: es7
hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: docker-cluster
PWD/#$HOME/~:
HOSTNAME%%.*:
USER:
3.在conf/es7目录下新建test.yml
配置内容如下:
dataSourceKey: testDS
destination: testTopic
groupId: testgroup
esMapping:
_index: testIndex
_id: id
# upsert: true
# pk: id
sql: "SELECT id, create_time, update_time, status FROM tb_test"
# objFields:
# _labels: array:;
# etlCondition: "where id>0"
commitBatch: 3000
4.启动canal-adapter
sh bin/startup.sh