0%

rocketmq-connect 解决RocketMQ跨集群复制方案

介绍

Replicator是RocketMQ Connector的别名,用于RocketMQ集群之间的信息同步,Replicator是运行在RocketMQ Runt上的RocketMQ 集群消息同步Connector,其主要实现了Connector的机制,能够同步两个独立的RocketMQ集群之间的消息。

Source Connector

Replicator Connector 包含两种Source Connector:RmqSourceReplicatorRmqMetaReplicator

  • RmqSourceReplicator

    Topic同步

    自动创建topic

    消息白名单

    任务并行度

  • RmqMetaReplicator

    同步消费消费进度和ConsumerGroup

Sink Connector

Replicator没有SinkConnector

准备

  1. Linux/Unix/Mac
  2. 64bit JDK 1.8+;
  3. Maven 3.2.x或以上版本;
  4. 启动 RocketMQ;
  5. 创建测试Topic

sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8

tips : ${ROCKETMQ_HOME} 位置说明

bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release

source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution

构建Connect

1
2
3
4
5
6
7
8
官方版本:
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
mvn -Prelease-connect -DskipTests clean install -U

本地改造版本:(基于2022年10月27日)已编译后
git clone https://github.com/hyman0603/rocketmq-connect

运行Worker

1
2
3
4
5
6
7
官方版本:
cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

本地改造版本:(基于2022年10月27日)
本地启动:
容器启动或者k8s版本:

tips: 可修改 /bin/runconnect.sh 适当调整 JVM Parameters Configuration

JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx256m”

runtime启动成功:

The standalone worker boot success.

查看启动日志文件:

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

ctrl + c 退出日志

启动source connector

1
2
3
4
5
6
7
8
9
10
11
12
13
http://10.18.2.18:30525/connectors/rocketmq-replicator-idc-shazure
?config={
"connector.class":"org.apache.rocketmq.replicator.RmqSourceReplicator",
"source-rocketmq":"10.18.2.11:9876",
"target-rocketmq":"10.18.2.18:31157",
"replicator-store-topic":"replicatorTopic",
"taskDivideStrategy":"0",
"target-cluster":"DefaultCluster",
"source-cluster":"DefaultCluster",
"task-parallelism":"2",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter",
"white-list":"TopicTest,TopicTest2,TopicTest3"
}

启动前确认

请确认启动参数中replicator-store-topicwhite-list的topic已经在Source集群创建

停止

发送GET请求,停止rocketmq-replicator

1
http://${runtime-ip}:${runtime-port}/connectors/${replicator-name}/stop