1 基础知识
1.1 Kafka Connect的介绍
– Kafak Connect是一个框架,支持将数据流导入和导出Apache Kafka
– Kafka Connect是Apache Kafka一个免费、开源组件
– Kafka Connect支持与数据库、健值存储、搜索索引和文件系统之间简单集成
1.2 Kafka Connect的作用
– Kafka Connect用于Apache Kafka与其他系统之间可扩展且可靠传输数据流的工具
– Kafak Connect支持大量的数据流入和流出Kakfa
– Kafka Connect支持收集数据库或应用服务器的事件流到Kafka的主题并进行低延迟流处理
– Kafka Connect支持导出数据流到辅助存储、查询系统或批处理系统进行离线分析
1.3 Kafka Connect的特点
– 以数据为中心的管道(类似Linux链接各个进程间的管道)
– 灵活和可伸缩(单台到多台机器群集都支持)
– 可重用性和可扩展性(流式管道可服用和支持分布式群集)
1.4 Kafka Connect的架构
– 独立模式,所有流处理都在一个进程中执行,建议用于测试
– 分布模式,支持负载平衡和横向扩展
1.5 Kafka Connect的工作模式
– 源连接器(Source connector)从数据库中的表或应用程序服务器中收集数据并以流形式传输到Kafka主题
– 导槽连接器(Sink Connector),支持从Kafka主题传输数据到二级索引(如Elasticsearch或Hadoop等批处理系统)进行离线分析
1.6 Kafka Connect的插件介绍
– Kafka Connect插件是一组JAR文件
– Kafka Connect插件包含一个或多个连接器、转换或转换器实现
– Kafka Connect插件相互隔离互不影响
– Kafka Connect插件需要确保每个插件只安装一个版本
1.7 Confluent平台的连接器
– Confluent平台附带几个内置的连接器
– Confluent支持数据流到常用系统或从常用系统导入
– Confluent支持关系型数据库或HDFS
– Confluent详细插件请参阅如下链接,
https://www.confluent.io/product/connectors/?_ga=2.18072048.852791858.1655884896-259847667.1655884896
2 最佳实践
2.1 Kafka集群环境部署
2.2 配置Kafka Connect集群
In cfkafka0[2-4],
2.2.1 修改Kafka Connect配置文件
cp /etc/kafka/connect-distributed.properties /etc/kafka/connect-distributed.properties.default vim /etc/kafka/connect-distributed.properties
修改如下参数,
bootstrap.servers=cfkafka02:9092,cfkafka03:9092,cfkafka04:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 plugin.path=/usr/share/java
另外,如下参数是配置“控制中心服务”额外添加的,如果有请保留,
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
2.2.2 启动Kafka Connect服务
vim /usr/lib/systemd/system/confluent-kafka-connect.service
配置修改如下,
[Unit] Description=Apache Kafka Connect - distributed Documentation=http://docs.confluent.io/ After=network.target confluent-server.target [Service] Environment="EXTRA_ARGS=-Duser.dir=/data/kafka-connect" Type=simple User=cp-kafka-connect Group=confluent ExecStart=/usr/bin/connect-distributed /etc/kafka/connect-distributed.properties TimeoutStopSec=180 Restart=no [Install] WantedBy=multi-user.target
根据配置的需求,我们需要使用如下命令创建用户目录,
mkdir -p /data/kafka-connect chown cp-kafka-connect:confluent -R /data/kafka-connect chmod 770 -R /data/kafka-connect
然后,你需要使用如下命令重载服务使配置生效,
systemctl daemon-reload
然后,请使用如下命令启动服务并查看服务状态,
systemctl start confluent-kafka-connect systemctl status confluent-kafka-connect
另外,如果遇到错误,可使用如下命令手动启动调试,
sudo -u cp-kafka-connect /usr/bin/connect-distributed /etc/kafka/connect-distributed.properties
另外,你需要使用如下命令授予运行用户“cp-kafka-connect”的默认组“confluent”写入权限,
chmod 775 -R /var/log/kafka/
这样,你才能使用如下命令查看到Kafka Connect的日志,详细命令如下,
tail /var/log/kafka/connect.log
然后,可以使用如下命令查看启动的进程号和端口号,
pgrep -u cp-kafka-connect java netstat -antp | grep `pgrep -u cp-kafka-connect java` | grep :::
最后一条命令显示如下,
tcp6 0 0 :::8083 :::* LISTEN 17536/java tcp6 0 0 :::40347 :::* LISTEN 17536/java
另外,我们还需要使用如下命令配置服务自动启动,
systemctl enable confluent-kafka-connect
如果遇到运行中的错误,你可以使用如下命令输出运行日志,
journalctl -u confluent-kafka-connect.service --since "1 hour ago"
详细的进程信息,你可以参考如下命令,
jinfo `pgrep -u cp-kafka-connect java` pgrep -u cp-kafka-connect java -a
2.3 配置控制中心管理Kafka Connect
In cfkafka01,
2.3.1 修改配置文件
vim /etc/confluent-control-center/control-center-production.properties
加入如下参数,
confluent.controlcenter.connect.connect-cluster.cluster=http://cfkafka02:8083,http://cfkafka03:8083,http://cfkafka04:8083
2.3.2 重启服务使配置生效
systemctl restart confluent-control-center
2.3.3 验证配置
http://cfkafka01.cmdschool.org:9021/
登录“控制中心”界面
单击【Connect Clusters】->【Connect】
界面显示如下,
2.4 安装Kafka Connect插件
In cfkafka0[2-4],
2.4.1 下载插件安装包
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-ftps/versions/1.0.3-preview/confluentinc-kafka-connect-ftps-1.0.3-preview.zip
其他版本请从如下链接下载,
https://www.confluent.io/hub/confluentinc/kafka-connect-ftps
2.4.2 安装插件安装包
confluent-hub install confluentinc-kafka-connect-ftps-1.0.3-preview.zip
详细安装向导如下,
confluent-hub install confluentinc-kafka-connect-ftps-1.0.3-preview.zip The component can be installed in any of the following Confluent Platform installations: 1. / (installed rpm/deb package) 2. / (where this tool is installed) Choose one of these to continue the installation (1-2): 1 Do you want to install this into /usr/share/confluent-hub-components? (yN) n Specify installation directory: /usr/share/java Component's license: Confluent Software Evaluation License https://www.confluent.io/software-evaluation-license I agree to the software license agreement (yN) y Installing a component Kafka Connect Ftps Connector 1.0.3-preview, provided by Confluent, Inc. from the local file: confluentinc-kafka-connect-ftps-1.0.3-preview.zip into directory: /usr/share/java Detected Worker's configs: 1. Standard: /etc/kafka/connect-distributed.properties 2. Standard: /etc/kafka/connect-standalone.properties 3. Standard: /etc/schema-registry/connect-avro-distributed.properties 4. Standard: /etc/schema-registry/connect-avro-standalone.properties 5. Used by Connect process with PID : /etc/kafka/connect-distributed.properties Do you want to update all detected configs? (yN) y Adding installation directory to plugin path in the following files: /etc/kafka/connect-distributed.properties /etc/kafka/connect-standalone.properties /etc/schema-registry/connect-avro-distributed.properties /etc/schema-registry/connect-avro-standalone.properties /etc/kafka/connect-distributed.properties Completed
http://cfkafka01.cmdschool.org:9021/
登录“控制中心”界面
单击【Connect Clusters】->【Connect】
界面显示如下,
如上图所示,
单击单击【Connect Clusters】->【Add connector】即可看到需要配置的连接器,
注:具体配置请自行摸索,本章不再详述
参阅文档
====================
连接器的概念
—————–
https://docs.confluent.io/platform/current/connect/concepts.html
https://docs.confluent.io/platform/current/connect/index.html
连接器插件安装
——————
https://docs.confluent.io/platform/current/connect/userguide.html#connect-installing-plugins
https://docs.confluent.io/home/connect/self-managed/userguide.html
https://kafka.apache.org/documentation/#connectconfigs
Apache Kafka 连接器
——————–
https://kafka.apache.org/documentation/#connect
MQTT Connector fails with MqttException(0)
————————
https://support.confluent.io/hc/en-us/articles/8241836881172-MQTT-Connector-fails-with-MqttException-0-
没有评论