1 基础知识
1.1 KSQL
1.1.1 KSQL的概念
1.1.1 KSQL的概念
– KSQL与SQL数据库完全不同,不是基于存储的数据进行按需查询和修改
– KSQL不执行查找,只执行流的连续无限转换(即流处理)
1.1.2 KSQL的范例
– 假设有一个来自用户的点击流和一个关于用户的不断更新的账户信息表
– KSQL允许我们队点击流和用户表进行建模并将两者结合
1.1.3 KSQL的功能
1.1.3.1 实时监控满足实时分析
CREATE TABLE error_counts AS SELECT error_code, count(*)FROM monitoring_stream WINDOW TUMBLING (SIZE 1 MINUTE) WHERE type = 'ERROR'
如上所示,
– 支持创建自定义业务级指标(实时计算的结果)
– 并可以像监视CPU负载一样进行监视和警报
1.1.3.2 安全性和异常检测
CREATE TABLE possible_fraud AS SELECT card_number, count(*) FROM authorization_attempts WINDOW TUMBLING (SIZE 5 SECONDS) GROUP BY card_number HAVING count(*) > 3;
如上所示,
– 将事流转换为数字时间序列聚合的KSQL查询
– 聚合使用Kafka-Elastic连接器写入Elastic并使用Grafana UI实现可视化(像监控分析)
– 通过以上手段查询欺诈、滥用、垃圾邮件、入侵或其他不良行为的模式
1.1.3.3 在线数据集成
CREATE STREAM vip_users AS SELECT userid, page, action FROM clickstream c LEFT JOIN users u ON c.userid = u.user_id WHERE u.level = 'Platinum';
如上所示,
– 公司中的数据库场景,从多个数据库中获取数据且转换后连接然后保存到健值存储
– 长期以来,用于数据集成的ETL(提取、转换和加载)以定期批处理的方式执行
– 例如,实时转存原始数据,然后隔几个小时转换一次的方式不具备实时性
– KSQL与Kafka连接器一起使用时,是实现在线实时数据集成
1.2 ksqlDB
1.2.1 ksqlDB简介
– ksqlDB是KSQL(Apache Kafka的流式SQL引擎)的继承者
– ksqlDB是Confluent Platform的一个组件
– ksqlDB是基于Apache Kafka集群运行
– ksqlDB是Confluent Platform的一部分
– ksqlDB是商业组件
1.2.2 ksqlDB功能
– ksqlDB是Apache Kafka的事件流数据库
– ksqlDB支持使用轻量级的SQL语法实时处理Apache Kafka流
1.2.3 ksqlDB架构
– 架构分三层,分布式Kafka代理层、ksqlDB服务层和ksqlDB CLI客户端层
– ksqlDB CLI一次只能连接一个ksqlDB服务器(ksqlDB CLI本身不支持故障转移)
2 最佳实践
2.1 Kafka集群环境部署
2.2 配置ksqlDB集群
In cfkafka0[2-4],
2.2.1 修改ksqlDB配置文件
cp /etc/ksqldb/ksql-server.properties /etc/ksqldb/ksql-server.properties.default vim /etc/ksqldb/ksql-server.properties
参数修改如下,
listeners=http://0.0.0.0:8088 ksql.logging.processing.topic.auto.create=true ksql.logging.processing.stream.auto.create=true bootstrap.servers=cfkafka02:9092,cfkafka03:9092,cfkafka04:9092 compression.type=snappy state.dir=/var/lib/kafka-streams
2.2.2 启动Kafka Connect服务
systemctl start confluent-ksqldb.service systemctl status confluent-ksqldb.service
如果遇到启动错误,可使用手动启动的方式调试,
export LOG_DIR=/var/log/confluent/ksql sudo -E -u cp-ksql /bin/bash -c '/usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties'
然后,可以使用如下命令查看启动的进程号和端口号,
pgrep -u cp-ksql java netstat -antp | grep `pgrep -u cp-ksql java` | grep :::
最后一条命令显示如下,
tcp6 0 0 :::8083 :::* LISTEN 17536/java tcp6 0 0 :::40347 :::* LISTEN 17536/java
另外,我们还需要使用如下命令配置服务自动启动,
systemctl enable confluent-ksqldb.service
2.3 配置控制中心管理ksqlDB
In cfkafka01,
2.3.1 修改配置文件
vim /etc/confluent-control-center/control-center-production.properties
加入如下参数,
confluent.controlcenter.ksql.ksql-cluster.url=http://cfkafka02:8088,http://cfkafka03:8088,http://cfkafka04:8088
2.3.2 重启服务使配置生效
systemctl restart confluent-control-center
2.3.3 验证配置
http://cfkafka01.cmdschool.org:9021/
登录“控制中心”界面
单击【ksqlDB Clusters】->【ksqlDB】
界面显示如下,
参阅文档
=====================
KSQL的概念
—————-
https://www.confluent.io/en-gb/blog/ksql-streaming-sql-for-apache-kafka/
ksqlDB官网
————-
https://ksqldb.io/
安装ksqkDB
—————-
https://docs.confluent.io/platform/current/ksqldb/installing.html
https://docs.confluent.io/platform/7.1.1/ksqldb/installing.html
Docker ksqkDB
————-
https://hub.docker.com/r/confluentinc/ksqldb-server
环境变量配置
—————-
https://stackoverflow.com/questions/8633461/how-to-keep-environment-variables-when-using-sudo
没有评论