342144 发表于 2016-7-25 10:15:59

kafka数据迁移

场景:老集群将不再使用,上边的kafka集群中的数据要导入到新的集群的kafka中

倒入步骤(例如按照天的topic迁移):
因为kafka默认只保留7天的数据,所以只迁移近7天的数据
1. 首先使用kafka-topic.sh客户端列出kafka集群上所有topic
2. 取出近7天的topic名称,写入到一个txt文本中,每个topic名称一行

3. 导出:for i in $(cat txt);do kafka-console-consumer.sh --zookeeper=... --topic=$i --from-beginning >>$i.txt;done
4. 导入: 先创建topic:kafka-topic.sh create topic
    cat $i.txt | kafka-conso-producer.sh --broker-list=... --topic=$i

第二种方案不常用:
借鉴:http://www.cnblogs.com/dycg/p/3922352.html
具体步骤如下:
1.在新节点上搭建kafka服务
原先我有2台机器, broker.id分别为1和2
现在我新机器上broker.id分别设置为3和4

2.启动所有kafka 服务

3.确认要移动的topics

kafka-topics.sh --list --zookeeper 192.168.103.47:2181查看所有主题

复制这些topic,并写成如下格式的文件, 命名为 topics-to-move.json
{"topics": [
{"topic": "fortest1"},
{"topic": "fortest2"},
{"topic": "fortest3"}
],
"version":1
}

4.生成移动脚本

运行bin/kafka-reassign-partitions.sh --zookeeper 192.168.103.47:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,4" --generate

其中3,4是你的新节点的broker.id

这样就会生成一串新的json数据
{"version":1,"partitions":[{"topic":"fortest1","partition":0,"replicas":},其他部分省略}

将这一串json写入新文件reassignment-node.json中


5.这时候,万事俱备, 开始迁移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.103.47:2181 --reassignment-json-file reassignment-node.json --execute

6.适当时候, 运行如下命令,查看运行结果

bin/kafka-reassign-partitions.sh --zookeeper 192.168.103.47:2181 --reassignment-json-file reassignment-node.json --verify



假设出现
ERROR: Assigned replicas (3,4,0,1) don't match the list of replicas for reassignment (3,4) for partition
这样的错误, 他并不是真的出错,而是指目前仍在复制数据中.
再过一段时间再运行verify命令,他就会消失(加入完成拷贝)



7.数据完成迁移后, 老的服务先别停.
8.修改所有客户端producer或者consumer连接指向新节点.
9.测试正常后,


页: [1]
查看完整版本: kafka数据迁移