• never leave
    2019-06-03
    官网上说inner join的watermark是可选的,outer join的watermark是必选的。但是我感觉应该都是必选的吧,就像案例中的inner join一样,如果不是必须的话,旧数据一直保存在内存中,有可能导致内存不够。

    作者回复: never leave同学,感谢提问。现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。

    Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。

    由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    
     10
  • Poleness
    2019-06-04
    请问下,这里解析kafka的value的时候,自定义了schema,但真正生产中很多数据的类型结构是很复杂的,徒手写schema的方式不一定可行。不知道有没有更优雅的方式?
    (看了源码,如果是json等格式好像可以自动推断,但是对于kafka,他的sourceSchema好像是写死的,不知大家有没有好的建议或者经验?)
    
     2
  • jon
    2019-06-03
    不支持完全输出是因为join的只是一个时间窗口内的数据
    在这个例子中inner join使用watermark 是必须的,left joinwatermark不是必须的

    作者回复: jon,感谢提问。

    现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。

    Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。

    由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    
     2
  • lhk
    2019-09-17
    老师你好,请教个watermark的问题:水印是为了解决数据出现延迟时,流处理程序要等待多久。那超过这个时间的数据就丢弃了吗?程序不会再处理他们了吗?比如水印设置30分钟,那31分钟到来的数据就不管了是吧?
    
     1
  • Feng.X
    2019-06-03
    老师,请问join操作里有riderId了,为什么要加上endTime > startTime AND endTime <= startTime + interval 2 hours?

    作者回复: Feng.X,感谢提问。

    这个限制目的在于抛弃任何长于2个小时的出租车出行数据。

    对于这个例子来说,这样一个对事件时间的限制确实不是必须的。加入它其实是为了告诉你,在基于事件时间来join两个流时,我们一般不考虑时间跨度过大的情况,因为它们没有普遍意义,还会影响数据分析的结果。

    举例,对于一个网页广告,我们需要知道用户看到一个广告后要多长时间才会去点击它,从而评估广告的效果。这里显然有两个流:一个代表用户看到广告的事件,另一个代表用户点击广告的事件。尽管我们可以通过用户的ID来Join这两个流,但是我们需要加一个限制,就是点击广告的时间不能比看到广告的时间晚太久,否则Join的结果很可能是不准确的。比如,用户可能在1:00和2:00都看到了广告,但是只在2:01点击了它,我们应该把2:00和2:01Join起来,而不应该Join1:00和2:01,因为1:00看到的广告并没有促使他点击。

    
     1
  • 刘万里
    2019-06-03
    老师 您好,最近好久没用spark,有个问题请教一下,现在最新spark是否已经支持cep了
    
     1
  • 北京知府
    2019-06-12
    这部分数据有个不太直观的地方,那就是同一次出行会有两个记录,…
    为何会出现两个记录?用一条记录也能记录出发和到达吧?
    
    
  • Ming
    2019-06-03
    我猜:

    对于inner join来说,用不用watermark只是纯粹的一个性能考量,不影响单条数据的正确性,只影响最终分析的样本大小。
    对于outer join来说,用watermark会影响单条数据正确性,所以在逻辑上看应该是不推荐的,除非会有内存泄漏的风险。

    我倒是好奇为啥spark把这个特性叫水印
    展开

    作者回复: Ming,感谢提问。

    现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。

    Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。

    由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    
    
我们在线,来聊聊吧