Kafka核心技术与实战
胡夕
人人贷计算平台部总监,Apache Kafka Contributor
立即订阅
8408 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 为什么要学习Kafka?
免费
Kafka入门 (5讲)
01 | 消息引擎系统ABC
02 | 一篇文章带你快速搞定Kafka术语
03 | Kafka只是消息引擎系统吗?
04 | 我应该选择哪种Kafka?
05 | 聊聊Kafka的版本号
Kafka的基本使用 (3讲)
06 | Kafka线上集群部署方案怎么做?
07 | 最最最重要的集群参数配置(上)
08 | 最最最重要的集群参数配置(下)
客户端实践及原理剖析 (14讲)
09 | 生产者消息分区机制原理剖析
10 | 生产者压缩算法面面观
11 | 无消息丢失配置怎么实现?
12 | 客户端都有哪些不常见但是很高级的功能?
13 | Java生产者是如何管理TCP连接的?
14 | 幂等生产者和事务生产者是一回事吗?
15 | 消费者组到底是什么?
16 | 揭开神秘的“位移主题”面纱
17 | 消费者组重平衡能避免吗?
18 | Kafka中位移提交那些事儿
19 | CommitFailedException异常怎么处理?
20 | 多线程开发消费者实例
21 | Java 消费者是如何管理TCP连接的?
22 | 消费者组消费进度监控都怎么实现?
深入Kafka内核 (5讲)
23 | Kafka副本机制详解
24 | 请求是怎么被处理的?
25 | 消费者组重平衡全流程解析
26 | 你一定不能错过的Kafka控制器
27 | 关于高水位和Leader Epoch的讨论
管理与监控 (12讲)
28 | 主题管理知多少?
29 | Kafka动态配置了解下?
30 | 怎么重设消费者组位移?
31 | 常见工具脚本大汇总
32 | KafkaAdminClient:Kafka的运维利器
33 | Kafka认证机制用哪家?
34 | 云环境下的授权该怎么做?
35 | 跨集群备份解决方案MirrorMaker
36 | 你应该怎么监控Kafka?
37 | 主流的Kafka监控框架
38 | 调优Kafka,你做到了吗?
39 | 从0搭建基于Kafka的企业级实时日志流处理平台
高级Kafka应用之流处理 (3讲)
40 | Kafka Streams与其他流处理平台的差异在哪里?
41 | Kafka Streams DSL开发实例
42 | Kafka Streams在金融领域的应用
结束语 (1讲)
结束语 | 以梦为马,莫负韶华!
特别放送 (2讲)
加餐 | 搭建开发环境、阅读源码方法、经典学习资料大揭秘
用户故事 | 黄云:行百里者半九十
Kafka核心技术与实战
登录|注册

39 | 从0搭建基于Kafka的企业级实时日志流处理平台

胡夕 2019-09-03
你好,我是胡夕。今天我要和你分享的主题是:从 0 搭建基于 Kafka 的企业级实时日志流处理平台。
简单来说,我们要实现一些大数据组件的组合,就如同玩乐高玩具一样,把它们“插”在一起,“拼”成一个更大一点的玩具。
在任何一个企业中,服务器每天都会产生很多的日志数据。这些数据内容非常丰富,包含了我们的线上业务数据用户行为数据以及后端系统数据。实时分析这些数据,能够帮助我们更快地洞察潜在的趋势,从而有针对性地做出决策。今天,我们就使用 Kafka 搭建一个这样的平台。

流处理架构

如果在网上搜索实时日志流处理,你应该能够搜到很多教你搭建实时流处理平台做日志分析的教程。这些教程使用的技术栈大多是 Flume+Kafka+Storm、Spark Streaming 或 Flink。特别是 Flume+Kafka+Flink 的组合,逐渐成为了实时日志流处理的标配。不过,要搭建这样的处理平台,你需要用到 3 个框架才能实现,这既增加了系统复杂度,也提高了运维成本。
今天,我来演示一下如何使用 Apache Kafka 这一个框架,实现一套实时日志流处理系统。换句话说,我使用的技术栈是 Kafka Connect+Kafka Core+Kafka Streams 的组合。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(9)

  • 老鱼
    老师,上述Kafka Connect+Kafka Core+Kafka Streams例子中,生产者和消费者分别是什么?

    作者回复: 此时,生产者和消费者化身成这个大平台的小组件了。Connect中只有producer,将读取的日志行数据写入到Kafka源主题中。Streams中既有producer也有consumer:producer负责将计算结果实时写入到目标Kafka主题;consumer负责从源主题中读取消息供下游实时计算之用。

    2019-09-03
    2
  • 丁丁历险记
    终于等到了。
    2019-12-11
  • leige
    胡老师,请问对于迟到的数据,os_check主题会生成多条记录吗?此时消费者应用程序应该如何处理?

    作者回复: 不会生成多条记录,但是的确可能会被丢弃(如果late太多)

    2019-12-09
  • AF
    老师,您在处理json串的时候为什么用Gson,而不用Alibaba的fastjson呢?

    作者回复: 只是举个例子而已,没说一定只能用Gson做JSON的序列化

    2019-11-12
  • 老师,请教connect读取mysql数据库中,我的添加connector命令是curl -X POST http://l-lzw:8083/connectors -H "Content-Type: application/json" -d '{"name": "mysql-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "lzw-mysql","database.port": "3306","database.user": "root","database.password": "123456","database.server.id": "1","database.server.name": "pydata","database.whitelist": "employee","topic.prefix": "test-mysql-","database.history.kafka.bootstrap.servers": "l-lzw:9092,l-lzw2:9092","database.history.kafka.topic": "db.history.mysql"}}'

    消费topic时返回如下:
    {"source":{"version":"0.9.5.Final","connector":"mysql","name":"pydata","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000004","pos":1021,"row":0,"snapshot":true,"thread":null,"db":null,"table":null,"query":null},"databaseName":"","ddl":"SET character_set_server=latin1, collation_server=latin1_swedish_ci;"}}

    没有正常可能原因是什么?

    2019-09-29
  • godtrue
    打卡,仅仅使用kafka这一个大数据组件就能实现一个企业级的实时日志流处理平台。
    获取——存储——清洗——转存——展示
    2019-09-24
  • ban
    老师,示例中开启Connect后启动读取的是本机的nginx日志,但如果nginx日志是在其他机器上面,那Connect是不是支持远程读的还是怎么样可以读取到其他机器的日志?

    作者回复: 在Nginx日志机器上开启,因为目前File Connector只支持从本地文件读取

    2019-09-14
  • jyc
    请教一下,集群版的connector是说每个kafka节点都启动一个吗?还有它读取的nginx日志就在本地?谢谢

    作者回复: 在Nginx日志本地

    2019-09-06
    1
  • 写点啥呢
    请问胡老师,console-consumer输出的message,为什么结束时间是一个很大的整数?从开始时间看,它应该是millisecond epoch,原本以为结束时间应该也是开始时间+2 second,但是文章中的例子看着不像:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic os-check --from-beginning --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer --property key.deserializer.default.windowed.key.serde.inner=org.apache.kafka.common.serialization.Serdes\$StringSerde
    [android@1565743788000/9223372036854775807] 1522
    [ios@1565743788000/9223372036854775807] 478
    [ios@1565743790000/9223372036854775807] 1912
    [android@1565743790000/9223372036854775807] 5313
    [ios@1565743792000/9223372036854775807] 780
    [android@1565743792000/9223372036854775807] 1949
    [android@1565743794000/9223372036854775807] 37
    ……

    作者回复: 这里的结束时间在代码中没有指定,因此默认值是Long.MAX_VALUE

    2019-09-03
收起评论
9
返回
顶部