如何部署Confluent ksqlDB集群?

Confluent

1 基础知识

1.1 KSQL

1.1.1 KSQL的概念

– KSQL与SQL数据库完全不同,不是基于存储的数据进行按需查询和修改
– KSQL不执行查找,只执行流的连续无限转换(即流处理)

1.1.2 KSQL的范例

– 假设有一个来自用户的点击流和一个关于用户的不断更新的账户信息表
– KSQL允许我们队点击流和用户表进行建模并将两者结合

1.1.3 KSQL的功能

1.1.3.1 实时监控满足实时分析

CREATE TABLE error_counts AS
SELECT error_code, count(*)FROM monitoring_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
WHERE type = 'ERROR'

如上所示,
– 支持创建自定义业务级指标(实时计算的结果)
– 并可以像监视CPU负载一样进行监视和警报

1.1.3.2 安全性和异常检测

CREATE TABLE possible_fraud AS
SELECT card_number, count(*)
FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY card_number
HAVING count(*) > 3;

如上所示,
– 将事流转换为数字时间序列聚合的KSQL查询
– 聚合使用Kafka-Elastic连接器写入Elastic并使用Grafana UI实现可视化(像监控分析)
– 通过以上手段查询欺诈、滥用、垃圾邮件、入侵或其他不良行为的模式

1.1.3.3 在线数据集成

CREATE STREAM vip_users AS
SELECT userid, page, action 
FROM clickstream c 
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level = 'Platinum';

如上所示,
– 公司中的数据库场景,从多个数据库中获取数据且转换后连接然后保存到健值存储
– 长期以来,用于数据集成的ETL(提取、转换和加载)以定期批处理的方式执行
– 例如,实时转存原始数据,然后隔几个小时转换一次的方式不具备实时性
– KSQL与Kafka连接器一起使用时,是实现在线实时数据集成

1.2 ksqlDB

1.2.1 ksqlDB简介

– ksqlDB是KSQL(Apache Kafka的流式SQL引擎)的继承者
– ksqlDB是Confluent Platform的一个组件
– ksqlDB是基于Apache Kafka集群运行
– ksqlDB是Confluent Platform的一部分
– ksqlDB是商业组件

1.2.2 ksqlDB功能

– ksqlDB是Apache Kafka的事件流数据库
– ksqlDB支持使用轻量级的SQL语法实时处理Apache Kafka流

1.2.3 ksqlDB架构


– 架构分三层,分布式Kafka代理层、ksqlDB服务层和ksqlDB CLI客户端层
– ksqlDB CLI一次只能连接一个ksqlDB服务器(ksqlDB CLI本身不支持故障转移)

2 最佳实践

2.1 Kafka集群环境部署

如何部署Confluent Kafka集群?

2.2 配置ksqlDB集群

In cfkafka0[2-4],

2.2.1 修改ksqlDB配置文件

cp /etc/ksqldb/ksql-server.properties /etc/ksqldb/ksql-server.properties.default
vim /etc/ksqldb/ksql-server.properties

参数修改如下,

listeners=http://0.0.0.0:8088
ksql.logging.processing.topic.auto.create=true
ksql.logging.processing.stream.auto.create=true
bootstrap.servers=cfkafka02:9092,cfkafka03:9092,cfkafka04:9092
compression.type=snappy
state.dir=/var/lib/kafka-streams

2.2.2 启动Kafka Connect服务

systemctl start confluent-ksqldb.service
systemctl status confluent-ksqldb.service

如果遇到启动错误,可使用手动启动的方式调试,

export LOG_DIR=/var/log/confluent/ksql
sudo -E -u cp-ksql /bin/bash -c '/usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties'

然后,可以使用如下命令查看启动的进程号和端口号,

pgrep -u cp-ksql java
netstat -antp | grep `pgrep -u cp-ksql java` | grep :::

最后一条命令显示如下,

tcp6       0      0 :::8083                 :::*                    LISTEN      17536/java
tcp6       0      0 :::40347                :::*                    LISTEN      17536/java

另外,我们还需要使用如下命令配置服务自动启动,

systemctl enable confluent-ksqldb.service

2.3 配置控制中心管理ksqlDB

In cfkafka01,

2.3.1 修改配置文件

vim /etc/confluent-control-center/control-center-production.properties

加入如下参数,

confluent.controlcenter.ksql.ksql-cluster.url=http://dn2cfkafka02:8088,http://dn2cfkafka03:8088,http://dn2cfkafka04:8088

2.3.2 重启服务使配置生效

systemctl restart confluent-control-center

2.3.3 验证配置

http://cfkafka01.cmdschool.org:9021/
登录“控制中心”界面
单击【ksqlDB Clusters】->【ksqlDB】
界面显示如下,

参阅文档
=====================

KSQL的概念
—————-
https://www.confluent.io/en-gb/blog/ksql-streaming-sql-for-apache-kafka/

ksqlDB官网
————-
https://ksqldb.io/

安装ksqkDB
—————-
https://docs.confluent.io/platform/current/ksqldb/installing.html
https://docs.confluent.io/platform/7.1.1/ksqldb/installing.html

Docker ksqkDB
————-
https://hub.docker.com/r/confluentinc/ksqldb-server

环境变量配置
—————-
https://stackoverflow.com/questions/8633461/how-to-keep-environment-variables-when-using-sudo

没有评论

发表回复

Confluent
如何更新Kafka topic的Replication Factor?

1 前言 一个问题,一篇文章,一出故事。 笔者部署完Confluent Kafka集群后发现Kafk …

Confluent
如何修复Kafka主题connect-configs报错?

1 前言 一个问题,一篇文章,一出故事。 笔者安装完成Kafka connect后发现Kafka疯狂 …

Confluent
如何修复Kafka connect集群启动错误?

1 前言 一个问题,一篇文章,一出故事。 笔者从新初始化Kafka集群后出现Kafka connec …