canal-adapter-v1.1.5的坑汇总

1.canal.adapter启动报错Could not resolve placeholder ‘HOSTNAME%%.*‘ Caused by: java.lang.IllegalArgumentException: Could not resolve placeholder 'HOSTNAME%%.*' in value "history -a; printf "\033]0;%s@%s:%s\007" "${USER}" "${HOSTNAME%%.*}" "${PWD/#$HOME/~}"" 配置里面根本就不需要用到HOSTNAME%%.*,问题就是系统的变量读取方法${}跟spring boot那一套冲突了,解决的办法就是在conf/application.yml配置文件中增加${}里的相关变量并赋空值(变量冒号后为空), PWD/#$HOME/~: HOSTNAME%%.*: USER: 2.java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource 原因是 druid 包冲突导致的,解决办法如下 下载源码包 wget https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz 解压后,使用IDEA打开,定位到 client-adapter.escore 模块的 pom.xml 的 druid 更新为 <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <scope>provided</scope> </dependency> 更新后,在项目根目录下执行 mvn clean package 然后到 canal-canal-1.1.5/client-adapter/es7x/target 下 将打包好的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 替换掉 canal-adapter/plugin 下原来的jar包 重启 Canal Adapter,发现日志不再报错,问题成功解决。 Reference https://www.cnblogs.com/agilestyle/p/15075936.html

November 11, 2021 · 1 min

使用canal-adapter同步数据到es

本文使用v1.1.5版本 1.下载并解压 canal.adapter-1.1.5.tar.gz 2.修改conf/application.yml server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: kafka #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 3 timeout: accessKey: secretKey: consumerProperties: # kafka consumer kafka.bootstrap.servers: 150.158.190.205:9093 kafka.enable.auto.commit: false kafka.auto.commit.interval.ms: 1000 kafka.auto.offset.reset: earliest kafka.request.timeout.ms: 40000 kafka.session.timeout.ms: 30000 kafka.isolation.level: read_committed kafka.max.poll.records: 1000 srcDataSources: testDS: url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false username: root password: pwd canalAdapters: - instance: testTopic # canal instance Name or mq topic name groups: - groupId: testgroup outerAdapters: - name: logger - name: es7 hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # or rest # security.auth: test:123456 # only used for rest mode cluster.name: docker-cluster PWD/#$HOME/~: HOSTNAME%%.*: USER: 3.在conf/es7目录下新建test.yml 配置内容如下: ...

November 11, 2021 · 1 min

使用canal-admin和canal-deployer同步MySQL数据到kafka

本文使用的canal-admin、canal-deployer都是v1.1.5版本 官方文档 1.下载并解压canal-admin canal.admin-1.1.6-SNAPSHOT.tar.gz 2.新建canal_manager数据库,执行canal-admin/conf目录下的canal_manager.sql脚本 3.修改canal-admin/conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: root password: pwd driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin 4.启动canal-admin sh bin/startup.sh 5.进入管理页面 http://127.0.0.1:8089 默认账号/密码:admin/123456 6.集群管理 新建集群,例如test 主配置 ################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = kafka # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 ################################################# ######### destinations ############# ################################################# canal.destinations = # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ Properties ############# ################################################## canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = 127.0.0.1:9093 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 kafka.kerberos.enable = false kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf" kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf" 7.启动canal-deployer 下载并解压canal.deployer-1.1.5.tar.gz 修改conf/canal_local.properties # register ip canal.register.ip = 127.0.0.1 # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = test canal.admin.register.name = 启动canal-deployer sh bin/startup.sh local canal-deployer注册到canal-admin中,在Server管理中可以看到 ...

November 11, 2021 · 4 min

docker安装es+kibana

一、安装es 1.下载运行镜像 docker pull docker.io/elasticsearch:7.14.2 docker run -d --name elasticsearch -v /home/es/:/use/share/elasticsearch/ -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.io/elasticsearch:7.14.2 2.进入容器内部 docker exec -it elasticsearch bash 二、安装kibana docker pull kibana:7.14.2 docker run -d --name kibana -p 5601:5601 kibana:7.14.2 修改配置文件 docker exec -it kibana bash vi /usr/share/kibana/config/kibana.yml host改为自己的ip 并加上 i18n.locale: "zh-CN" 显示中文 浏览器访问ip:5601进入kibana网址

November 11, 2021 · 1 min