• 老鱼
    2019-09-03
    老师,上述Kafka Connect+Kafka Core+Kafka Streams例子中,生产者和消费者分别是什么?

    作者回复: 此时,生产者和消费者化身成这个大平台的小组件了。Connect中只有producer,将读取的日志行数据写入到Kafka源主题中。Streams中既有producer也有consumer:producer负责将计算结果实时写入到目标Kafka主题;consumer负责从源主题中读取消息供下游实时计算之用。

    
     2
  • Ball
    2020-01-14
    老师我有问题要请教下,添加 Connector 步骤里面是用 http REST 接口新建的,那新建的 Connector 是跑在 Broker 里面还是说又启动了一个新的 Java 进程执行 Connector?

    作者回复: 新的java进程

    
     1
  • 陈国林
    2020-02-03
    老师好,说下我这边的一些实践。19年一直在做容器日志平台,我们目前的方案是 Fluentd + Kafka + ELK。使用Fluentd做为容器平台的采集器实时采集数据,采集完之后数据写入Kafka通过Kafka进行解耦,使用Logstash消费后写入ES。这套方案目前在容器环境下应该可以说是标配

    作者回复: 棒棒:)

    
    
  • 丁丁历险记
    2019-12-11
    终于等到了。
    
    
  • leige
    2019-12-09
    胡老师,请问对于迟到的数据,os_check主题会生成多条记录吗?此时消费者应用程序应该如何处理?

    作者回复: 不会生成多条记录,但是的确可能会被丢弃(如果late太多)

    
    
  • aof
    2019-11-12
    老师,您在处理json串的时候为什么用Gson,而不用Alibaba的fastjson呢?

    作者回复: 只是举个例子而已,没说一定只能用Gson做JSON的序列化

    
    
  • 伟
    2019-09-29
    老师,请教connect读取mysql数据库中,我的添加connector命令是curl -X POST http://l-lzw:8083/connectors -H "Content-Type: application/json" -d '{"name": "mysql-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "lzw-mysql","database.port": "3306","database.user": "root","database.password": "123456","database.server.id": "1","database.server.name": "pydata","database.whitelist": "employee","topic.prefix": "test-mysql-","database.history.kafka.bootstrap.servers": "l-lzw:9092,l-lzw2:9092","database.history.kafka.topic": "db.history.mysql"}}'

    消费topic时返回如下:
    {"source":{"version":"0.9.5.Final","connector":"mysql","name":"pydata","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000004","pos":1021,"row":0,"snapshot":true,"thread":null,"db":null,"table":null,"query":null},"databaseName":"","ddl":"SET character_set_server=latin1, collation_server=latin1_swedish_ci;"}}

    没有正常可能原因是什么?

    展开
    
    
  • godtrue
    2019-09-24
    打卡,仅仅使用kafka这一个大数据组件就能实现一个企业级的实时日志流处理平台。
    获取——存储——清洗——转存——展示
    
    
  • ban
    2019-09-14
    老师,示例中开启Connect后启动读取的是本机的nginx日志,但如果nginx日志是在其他机器上面,那Connect是不是支持远程读的还是怎么样可以读取到其他机器的日志?

    作者回复: 在Nginx日志机器上开启,因为目前File Connector只支持从本地文件读取

    
    
  • jyc
    2019-09-06
    请教一下,集群版的connector是说每个kafka节点都启动一个吗?还有它读取的nginx日志就在本地?谢谢

    作者回复: 在Nginx日志本地

     1
    
  • 写点啥呢
    2019-09-03
    请问胡老师,console-consumer输出的message,为什么结束时间是一个很大的整数?从开始时间看,它应该是millisecond epoch,原本以为结束时间应该也是开始时间+2 second,但是文章中的例子看着不像:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic os-check --from-beginning --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer --property key.deserializer.default.windowed.key.serde.inner=org.apache.kafka.common.serialization.Serdes\$StringSerde
    [android@1565743788000/9223372036854775807] 1522
    [ios@1565743788000/9223372036854775807] 478
    [ios@1565743790000/9223372036854775807] 1912
    [android@1565743790000/9223372036854775807] 5313
    [ios@1565743792000/9223372036854775807] 780
    [android@1565743792000/9223372036854775807] 1949
    [android@1565743794000/9223372036854775807] 37
    ……
    展开

    作者回复: 这里的结束时间在代码中没有指定,因此默认值是Long.MAX_VALUE

    
    
我们在线,来聊聊吧