Skip to main content

Spark 与 Flink 设计相似性分析:快速上手另一种大数据计算引擎

· 7 min read

Apache Spark 和 Apache Flink 是当前最主流的分布式计算引擎,二者都支持批处理(Batch Processing)和流处理(Stream Processing),在 API 设计、执行架构和任务调度等方面存在诸多相似之处。网上的教程很多都是在比较两者的差别,那强哥今天进来说说他们的相似点。如果开发者已经熟悉其中之一,可以通过类比快速上手另一个。

本文将重点分析 架构组件 的相似性,并从 API 设计、流批处理逻辑 等角度对比 Spark 和 Flink,帮助开发者快速理解和迁移。


1. 架构组件对比

Spark 和 Flink 的架构设计有许多共同点,比如都采用了 主从结构(Master-Slave),都具备 任务调度、资源管理、数据存储和计算执行 等核心组件。但两者在 执行模式、调度策略、资源管理方式 上有所不同。

1.1 整体架构对比

组件SparkFlink
集群管理Spark Standalone / YARN / Kubernetes / MesosFlink Standalone / YARN / Kubernetes
主节点Driver(应用程序主控)JobManager(任务调度与协调)
工作节点Executor(执行任务的计算节点)TaskManager(执行 Task 的工作节点)
任务调度基于 DAG 计算图,Stage 划分 Task 并调度执行事件驱动的流式任务调度,Pipeline 计算图
存储系统HDFS / S3 / Delta LakeHDFS / S3 / RocksDB(状态存储)

两者的架构在 主从架构、任务调度方式、资源管理 上存在明显的类比关系,我们逐一拆解其核心组件。

spark架构图:

flink架构图:


1.2 组件详细解析

(1)主控节点(Master)

Spark DriverFlink JobManager
负责解析 Spark 应用程序,构建 DAG 任务执行图负责解析 Flink 作业,构建 Streaming 计算图
任务按照 Stage 进行划分,每个 Stage 内部有多个 Task任务按照 Operator 进行划分,每个 Operator 可以并行执行多个 Task
任务调度依赖 YARN/Kubernetes 或 Spark Standalone任务调度由 JobManager 直接管理
资源分配方式:Driver 向集群请求 Executor资源分配方式:JobManager 请求 TaskManager

核心区别

  • Spark 的 Driver 是单点的,任务执行的 DAG 依赖 Driver 维护,如果 Driver 挂掉,任务失败。
  • Flink 的 JobManager 支持高可用(HA)模式,可主备切换,不会因主控故障导致任务失败。

(2)计算执行节点(Worker)

Spark ExecutorFlink TaskManager
每个 Executor 运行多个 Task,Task 共享 Executor 进程每个 TaskManager 运行多个 Task Slot,每个 Task Slot 承载一个 Task
任务以 RDD 形式存储在内存中,默认情况下不会持久化状态任务可以管理状态,存储在 RocksDB 等存储引擎中
计算任务在 Executor 内部进行批处理计算任务在 TaskManager 内部进行流式计算
如果 Executor 挂掉,该 Executor 上的 Task 需要重新调度TaskManager 支持故障恢复,状态可自动回放

(3)任务调度

Spark DAG 调度Flink Streaming Graph 调度
任务被拆分为多个 Stage,每个 Stage 内部执行多个 Task任务被拆分为 Operator,每个 Operator 由多个 Task 组成
Task 之间的数据传输是基于 Shuffle 的Task 之间的数据传输是流式传输
任务执行是 批处理优先,流处理采用微批(Micro-batch)方式任务执行是 流处理优先,事件驱动(Event-driven)
Stage 执行完毕后,下一 Stage 才能执行Operator 之间可以持续流动,不需要等待批次结束

2. API 设计对比:DataFrame / DataSet

Spark 和 Flink 都提供了高层级的 Table API & SQL,以及低层级的 DataFrame / DataSet / DataStream API

示例对比

Spark DataFrame API 示例

val spark = SparkSession.builder().appName("SparkExample").getOrCreate()
val df = spark.read.json("data.json")
df.select("name", "age").filter($"age" > 18).show()

Flink Table API 示例

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
val table = tableEnv.readJson("data.json")
table.select("name", "age").filter($"age" > 18).execute()

3. 流批处理逻辑对比

Spark 和 Flink 都支持批处理和流处理,但执行方式不同:

  • Spark 的流处理基于微批(Micro-Batch),每隔一定时间处理一批数据。
  • Flink 是真正的流处理(Event-driven),每条数据到达就立即处理。

示例对比

Spark Structured Streaming

val spark = SparkSession.builder().appName("StreamExample").getOrCreate()
val streamDF = spark.readStream.format("kafka").option("subscribe", "topic").load()
streamDF.writeStream.outputMode("append").format("console").start()

Flink DataStream

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), props))
stream.print()
env.execute()

4. 结论

对比维度SparkFlink
批处理RDD / DataFrame / DatasetDataSet / Table API
流处理Structured Streaming(微批)DataStream(事件驱动)
计算模型DAG 任务执行Pipeline 任务执行
时间处理主要基于 Processing Time支持 Event Time & Processing Time
状态管理依赖外部存储(如 HDFS, Delta Lake)内置 RocksDB 状态存储

如果你已经掌握 Spark:

  • 可以用 Flink Table API 处理批任务,类似 Spark SQL。
  • Flink DataStream 需要适应事件驱动模式,但 API 设计很类似。
  • 架构组件(Driver / JobManager, Executor / TaskManager)有明显对应关系,学习曲线较低。

总的来说,Spark 更适合批处理,Flink 更擅长流处理。二者 API 设计相似,掌握一个后,迁移到另一个并不困难!

从“面向 Google 编程”到“AI 辅助编程”:Khoj 让你成为高效开发者!

· 13 min read

这几年写代码下来,积累了不少技术文档、代码片段、学习笔记,但一直有个头疼的问题:知识管理。各种资料散落在不同的 IDE、浏览器书签、本地文件夹里,想用的时候总是找不到,或者找到了也忘了当时的背景,效率特别低。相信很多同行都有类似的困扰吧?

曾经的“代码黑洞”:找不到,记不住,Debug 到崩溃

刚开始,我习惯用 IDE 自带的笔记功能,后来发现太不方便了,就换成在线笔记。但免费版限制太多,同步也慢,体验并不好。更麻烦的是,随着项目增多,我的资料越来越多,各种代码片段、技术文档、Stack Overflow 链接,散落在不同的地方,找起来简直要命。

最让我崩溃的是,遇到 Bug 的时候,明明之前看过相关的解决方案,但就是想不起来在哪里,只能一遍又一遍地 Google,Debug 到深夜。这种找不到、记不住、用不上的状态,让我对编程产生了深深的无力感。

偶然的转机:告别混乱,拥抱高效

一次偶然的机会,我在 GitHub 上发现了 Khoj 这个开源项目。它是一个开源的个人 AI 助手,可以帮助你管理知识、提高效率。当时也没多想,就下载下来试试。

一开始,我只是抱着试试看的心态,把一些常用的技术文档和代码片段导入进去。导入的过程很简单,支持 Markdown、TXT、PDF 等多种格式。导入之后,我发现这个工具会自动分析我的文档内容和代码结构,并建立索引,方便我快速检索。

告别“大海捞针”:快速检索,精准定位(那些让我直呼“卧槽”的瞬间!)

以前我要找一段代码,要在各个项目里翻来覆去,或者在 Stack Overflow 里输入关键词,但经常搜不到想要的结果。现在,我只需要在这个工具里输入关键词,它就能快速找到相关的代码片段和技术文档,而且还能根据上下文,给出更精准的搜索结果。

“卧槽”瞬间 1: 上个月,我要做一个性能优化的项目,需要用到之前看过的一篇关于“缓存策略”的文章。时间紧迫,我之前虽然看过一些相关的资料,但都散落在不同的地方,根本来不及整理。抱着试试看的心态,我在这个工具里输入“缓存策略”,结果它竟然在几秒钟之内,找到了我之前看过的所有相关资料,包括技术博客、源码分析、性能测试报告等等。更让我震惊的是,它还自动生成了一份性能优化方案,并提取了关键代码片段,让我直接可以开始优化代码!当时我就惊呆了,这效率简直逆天!

知识关联,触类旁通(那些让我直呼“卧槽”的瞬间!)

更让我惊喜的是,这个工具还能自动分析我的文档内容和代码结构,并建立关联。例如,如果我在不同的文档中提到了同一个设计模式,它会自动将它们关联起来,方便我理解和记忆。

“卧槽”瞬间 2: 有一次,我在学习“微服务架构”的时候,发现它和“SOA 架构”有很多相似之处。这个工具自动将我关于“微服务架构”和“SOA 架构”的笔记关联起来,让我对这两个架构有了更深入的理解。更牛逼的是,它还自动生成了一份关于“微服务架构与 SOA 架构的对比分析”的报告,让我对这个领域有了更深入的认识,直接在技术分享会上把那些架构师都给镇住了!

AI 助手,随时待命(那些让我直呼“卧槽”的瞬间!)

除了快速检索和知识关联,这个工具还内置了一个 AI 助手,可以像和朋友聊天一样,向它提问。例如,我可以问它:“什么是 CAP 理论?”,它会搜索我的笔记和互联网,并给出详细的解释和案例。

“卧槽”瞬间 3: 前几天,Leader 突然问我:“你对 Serverless 有什么看法?” 我当时脑子一片空白,根本不知道该怎么回答。情急之下,我打开这个工具,问它:“Serverless 的优缺点是什么?” 结果它竟然在几秒钟之内,给我生成了一份关于“Serverless 的优缺点”的详细报告,包括技术原理、应用场景、最佳实践等等。我直接把这份报告发给 Leader,Leader 看完之后,对我赞不绝口,说我技术扎实,对新技术有敏锐的嗅觉!当时我就在心里默默感谢这个 AI 助手,简直是我的救命稻草!

我的编程知识管理神器:Khoj

说了这么多,相信大家已经猜到我说的这个工具是什么了。没错,它就是 Khoj

自从用了 Khoj,我的编程知识管理方式发生了彻底的改变。以前我总是感觉被知识淹没,现在我终于可以掌控全局,高效利用我的知识资产。

程序员专属“卧槽”时刻,Khoj 还能这么玩!

  • 代码生成与分析: 还在手动写那些重复的 CRUD 代码?让 Khoj 帮你生成!只需要简单描述你的需求,它就能自动生成代码,还能帮你分析代码的性能瓶颈,提出优化建议 1
    • 例子: 比如,我需要一个用户管理的 API 接口,只需要告诉 Khoj,它就能自动生成 Controller、Service、Model 等代码,还能自动生成 Swagger 文档,简直不要太爽!
  • Bug 修复与调试: 遇到疑难杂症的 Bug,不知道从何下手?让 Khoj 帮你分析!只需要把错误信息和相关代码片段输入进去,它就能帮你找出 Bug 的原因,并给出修复建议。
    • 例子: 比如,我遇到一个内存泄漏的问题,Debug 了好几天都没找到原因。后来我把错误信息和相关代码片段输入到 Khoj,它竟然在几秒钟之内,找到了内存泄漏的原因,并给出了修复建议,让我避免了通宵加班的命运!
  • 技术选型与架构设计: 项目需要技术选型,不知道该选择哪个框架?让 Khoj 帮你分析!只需要告诉它你的需求和约束条件,它就能帮你分析各种框架的优缺点,并给出最佳的技术选型方案。
    • 例子: 比如,我需要做一个高并发的 API 服务,不知道该选择 Spring Boot 还是 Go。我把我的需求和约束条件告诉 Khoj,它分析了 Spring Boot 和 Go 的优缺点,并建议我选择 Go,因为 Go 在高并发方面更有优势。后来我采用了 Go,果然性能提升了很多!
  • 自动化代码审查: Khoj 还可以作为代码审查工具,帮助你发现代码中的潜在问题,例如代码风格不一致、安全漏洞等等,提高代码质量.

总而言之,Khoj 不仅仅是一个知识管理工具,更是一个强大的编程助手,可以帮助你提高编程效率,提升技术水平,让你成为一个更优秀的程序员! 赶紧用起来,体验一下那些让你惊呼“卧槽”的瞬间吧!

Khoj vs ChatGPT:程序员的选择

你可能会问,ChatGPT 也能做这些事情,我为什么要选择 Khoj 呢? 让我来告诉你 Khoj 独有的“牛逼”之处:

  • 专属知识库: ChatGPT 是一个通用的 AI 模型,它没有你的个人知识库,无法理解你的代码风格、技术偏好和项目背景。而 Khoj 可以连接你的个人知识库,包括你的笔记、文档、代码片段等等,为你提供更精准、更个性化的服务。
  • 离线可用: ChatGPT 需要联网才能使用,而 Khoj 可以在离线状态下使用,让你随时随地都能访问你的知识库,即使在没有网络的情况下也能高效工作。
  • 开源免费: ChatGPT 是一个商业产品,需要付费才能使用,而 Khoj 是一个开源项目,你可以免费使用,并根据自己的需求进行定制。
  • 代码执行能力: Khoj 能够生成和运行 Python 代码,进行数据分析、生成图表等任务,这对于需要进行数据分析和可视化的程序员来说非常有用 1
  • 自定义 Agent: 可以创建自定义 Agent,根据不同的任务需求,定制 Agent 的行为和知识库,实现更高级的自动化 2
  • 语义搜索: Khoj 的搜索功能更加强大,即使你记不清关键词,也能通过语义搜索找到相关的笔记 3

简单来说,ChatGPT 就像一个知识渊博的老师,可以回答你各种问题,但它不了解你。而 Khoj 就像你的私人助理,它了解你的知识体系、工作习惯和项目需求,为你提供更贴心的服务。

如果你是一个注重效率、追求个性化、热爱开源的程序员,那么 Khoj 绝对是你的不二之选!

真诚推荐:让 Khoj 成为你的编程利器

如果你也面临着编程知识管理的困扰,强烈推荐你试试 Khoj!它绝对会让你眼前一亮,并彻底改变你的工作和学习方式。

传送门:

P.S. Khoj 是一个开源项目,你可以免费使用,并根据自己的需求进行定制。如果你有任何建议或想法,欢迎参与到 Khoj 的开发中来,一起打造更好的 AI 助手!

没经过我同意,flink window就把数据存到state里的了?

· 9 min read

不知道大家在初次使用Flink的时候,是否对Flink中定义本地变量和状态比较好奇,这俩有啥区别?

而且在使用Window API时明明没有显式地创建状态,也没调用getState(),却依然把每个窗口里的所有元素都自动缓存到 StateBackend里,这到底是怎么做到的?它怎么可以自作主张呢。

本地变量 vs Managed State

我们先来看看第一个问题,怎么清楚的区分本地变量和状态的区别呢?我们看下表:

特性本地变量(Local Variable / Field)Managed State (ValueState, ListState…)
生命周期仅在当前 processElement() 或算子实例初始化时有效;Task 重启或 failover 后重置为初始值跨事件、跨 checkpoint 持久化;重启后按最新 checkpoint 恢复
容错不参与 Flink 容错;Task 重启或 Job 恢复后丢失参与 checkpoint/savepoint;保证 Exactly-once 语义
序列化不会被 Flink 自动序列化;只存在 JVM 堆栈或算子对象里通过 TypeSerializer 序列化到 StateBackend(内存或 RocksDB)
使用场景临时计数、方法内临时缓存,无需跨事件保留需要累积、聚合、窗口缓存、跨事件关联时使用

简单写个代码看看

//本地字段无法容错 
public class MyMapFunction extends RichMapFunction<Event, Integer> {
private int counter = 0; // 普通字段

@Override
public Integer map(Event value) {
counter += 1;
return counter; // 失败重启后,counter 会被重置为 0
}
}

//使用 Managed State
public class MyStatefulMap extends RichMapFunction<Event, Integer> {
private transient ValueState<Integer> counterState;

@Override
public void open(Configuration cfg) {
counterState = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Integer.class));
}

@Override
public Integer map(Event value) throws Exception {
Integer cnt = Optional.ofNullable(counterState.value()).orElse(0);
cnt += 1;
counterState.update(cnt); // 这个值会被 checkpoint 序列化并恢复
return cnt;
}
}

Window怎么存元素到state的?

Flink 的 Window 算子并没有让我们在代码里手动 getState(),我们一般都只是这样写:

dataStream
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
@Override
public void process(String key,
Context ctx,
Iterable<Event> elems,
Collector<Result> out) { … }
});

却能自动把 5 分钟窗口里的所有 Event 缓存起来。而且在强哥之前的文章中也提到,如果Window定义的时间跨度太长,缓存在state里面的数据过多,可能会对服务性能造成影响,官网也是提到了的:

Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

那么,Window背后究竟发生了什么?我们来看关键源码位置(注:以下源码基于flink-streaming-java:1.18.1)。

1. 隐式注册StateDescriptor

我们先看示例代码

source
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) {
return Tuple2.of(value, 1);
}
})
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new WindowFunctionExample())
;

在执行process方法的时候,Flink会调用org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder#apply(org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction<java.lang.Iterable<T>,R,K,W>),创建WindowOperator

   private <R> WindowOperator<K, T, ?, R, W> apply(
InternalWindowFunction<Iterable<T>, R, K, W> function) {
if (evictor != null) {
return buildEvictingWindowOperator(function);
} else {
ListStateDescriptor<T> stateDesc =
new ListStateDescriptor<>(
WINDOW_STATE_NAME, inputType.createSerializer(config));

return buildWindowOperator(stateDesc, function);
}
}

可以看到这里创建了ListStateDescriptor

然后,在执行Window的生命周期方法org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#open(),Flink 会为每个 key + window 分配一个内部缓存state:

// create (or restore) the state that hold the actual window contents
// NOTE - the state may be null in the case of the overriding evicting window operator
if (windowStateDescriptor != null) {
windowState =
(InternalAppendingState<K, W, IN, ACC, ACC>)
getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}

public <N, S extends State, V> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
Preconditions.checkNotNull(this.keySerializer, "State key serializer has not been configured in the config. This operation cannot use partitioned state.");
InternalKvState<K, ?, ?> kvState = (InternalKvState)this.keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
}

kvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, this.ttlTimeProvider), stateDescriptor, this.latencyTrackingStateConfig);
this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
this.publishQueryableStateIfEnabled(stateDescriptor, kvState);
}

return kvState;
}

这里的 windowState 就是 Flink 为你隐藏起来的“窗口元素缓存”:

  • windowState 存放当前 window 的所有元素
  • Flink 会在每次触发窗口(watermark 到达或触发器触发)时,把这个windowState 内容取出来,交给你的 process() 方法
  • 当窗口关闭后,会 clear()这个windowState

2. 数据到达:write to state

WindowOperator#processElement() 中,Flink 收到新的事件时,会执行:

// 把元素追加到 windowState
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
……
//触发器触发,则获取窗口状态内容
TriggerResult triggerResult = triggerContext.onElement(element);

if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);

if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}

这段代码每来一条事件,就把它 序列化 并写入到 StateBackend(Memory/RocksDB),而不是保存在 Java内存对象里。

同时,如果是触发了触发器,则会返回窗口内容。还会创建一个定时器,定时执行窗口计算。

3. 定时触发:read from state

当定时器触发时,WindowOperator#onEventTime() 会调用:

TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
//触发器触发
if (triggerResult.isFire()) {
// 读取并反序列化所有元素
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(triggerContext.window, contents);
}
}

// 清空 state
if (triggerResult.isPurge()) {
windowState.clear();
}

add())到 get())再到 clear()),整个过程都是通过 Flink 的 StateBackend 完成的——这保证了:

  1. Exactly-once 语义:即使 TaskManager 宕机或网络抖动,StateBackend 里缓存的窗口数据都能在恢复后自动重新加载。
  2. 水平扩容/重分区:当做 rescale 操作或 job 重启时,Flink 会把 state 分片迁移到新的并行度实例,保证窗口数据完整。

为什么 Window 必须序列化?

  1. 分布式容错
    Window 算子可能在多个 TM 上并行,节点故障、网络分区、作业重调度都可能发生,只有把窗口数据写到 StateBackend,才能在恢复时不丢失任何事件。

  2. Checkpoint & Savepoint
    Flink 借助 Kafka、文件系统做 checkpoint/savepoint;所有 state(包括窗口元素)都会被打包到 checkpoint 里,保证 Exactly-once 与故障恢复。

  3. 弹性伸缩
    扩容、缩容时需要重新分配并行任务,StateBackend 会把各个 key + window 的数据按照新的并行度迁移到对应实例。

总结

源码分析完了,写个小总结吧

  • 本地变量 只能在当前算子实例、当前方法调用中生存,不会参与序列化;重启或缩容后会丢失。
  • Managed State(包括我们手动声明的 ValueState、也包括 WindowOperator 背后隐式的 ListState)会被 Flink 序列化到 StateBackend,参与 checkpoint/savepoint、支持容错恢复和重分区。
  • 虽然 Window API 没让你在代码里 getState(),但其核心实现却在算子初始化时自动注册了 ListStateDescriptor,并在 processElement() / onTimer() 里读写、清理这个 state。

了解了这套机制,你就能:

  1. 在自定义算子里灵活选择到底用本地变量还是 Managed State;
  2. 明白为什么 Window API 自带的“隐式状态”一定要序列化到后端,以及如何通过 StateBackend 配置(Memory vs RocksDB)来优化性能

从 Flink Window 到 ProcessFunction:一次内存优化的实战分享

· 5 min read

最近在做一个实时计算项目中,需要对用户行为日志进行聚合分析。起初,同事使用了 Flink 的Window API(如 TumblingEventTimeWindows + WindowFunction)来实现 5 分钟滚动统计。

功能上没问题,但随着数据量的增长,作业频繁触发 GC,内存占用飙升,最终甚至出现了 OOM 错误。经排查后发现:Window 中的数据缓存策略,是性能瓶颈的关键

问题分析

Flink 的 Window API 会在后台自动缓存每个 key、每个窗口内的所有元素:

  • 比如一个 5 分钟窗口,每个用户行为事件都需要被保留在 Flink 的 window state 中;
  • 对于高 QPS 场景,每秒成千上万条事件都要被保存;
  • 随着 key 数量增加(如按用户、页面等分组),缓存压力线性增长。

本质问题:Window 是“批处理”思想的落地,而我们这个需求更适合“事件驱动”处理。

问题优化

为了解决这个问题,我尝试用更轻量的方式重构逻辑 —— 替换 Window,为每个 key 维护一个简单的状态 + 定时器,模拟滚动窗口的功能。

重构后的思路:

  • 每来一条数据,立即更新一个 ValueState(如 count 或 sum);
  • 注册一个定时器(如 ctx.timerService().registerEventTimeTimer(...));
  • 到时间触发处理并清理状态(及时释放内存);
  • 无需保存每条事件,只保存必要的中间值(例如总和、计数等)

示例代码片段

public class RollingCountProcessFunction extends KeyedProcessFunction<String, Event, Result> {

private transient ValueState<Long> countState;

@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("count", Long.class);
countState = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement(Event value, Context ctx, Collector<Result> out) throws Exception {
Long current = Optional.ofNullable(countState.value()).orElse(0L);
countState.update(current + 1);
ctx.timerService().registerEventTimeTimer(value.getEventTime() + 5 * 60 * 1000);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
Long count = countState.value();
out.collect(new Result(ctx.getCurrentKey(), count));
countState.clear(); // 清理状态,释放内存
}
}

性能对比

指标使用 Window API使用 ProcessFunction
内存占用高,频繁 GC显著下降,状态更精简
延迟固定窗口延迟触发实时响应,低延迟
灵活性逻辑固定高度灵活,自定义状态和行为
容错与 Checkpoint支持同样支持

最终效果

在实际部署后,以下指标得到显著优化:

  • TaskManager 内存使用从约 2.5GB 降至 300MB
  • GC 次数减少 90% 以上
  • 窗口统计延迟下降至毫秒级

小结

在高吞吐、低延迟、内存敏感的场景下,Window API 可能不是最优解。它更适合用于“聚合历史数据”,但对于事件驱动类业务,我们可以用 ProcessFunction 构建更轻量的替代方案:

  • 仅保留必要的中间状态;
  • 更快释放内存,避免数据缓存堆积;
  • 同时保持灵活性和一致性。

小结:Flink 是一个高度灵活的平台,正确选择 API 的粒度和机制,远比堆配置参数更有效

官网建议

我们如果有看flink的官方文档,也可以在windows章节看到相关解释和建议,强哥这里直接摘录出来,英文好的可以看看

……引用了半天,公众号每个引用不让超过300字,醉了。那直接给链接大家自己看吧:

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#window-lifecycle

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#window-functions

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#useful-state-size-considerations

是同一页的,但是内容太多,看我发的具体锚点部分就行。

从源码浅看Watermark的打开方式

· 14 min read

赌场失意

老实说,就今天赌场跌成这样,还能有心情给大家分享,甚至早上还去抄了底,此刻大半夜心情毫无波澜的写东西,你们就该好好的拿椅子坐下学习。

不过,我也知道关注我的人里面,很多其实不是学后端的,大数据更别扯蛋了,不过发了推文,没取关的,都是yifu,所以也尽量有的时候分享些别的东西,写文章虽然是想要记录自己的学习和生活,不过既然写了,当然也希望看的人不会觉得太无趣。

watermark

Flink DataStream中流动着不同的元素,统称为StreamElementStreamElement可以是StreamRecordWatermarkStreamStatusLatencyMarker中任何一种类型。

StreamElement
StreamElement是一个抽象类(是Flink 承载消息的基类),其他四种类型继承StreamElement

public abstract class StreamElement {
//判断是否是Watermark
public final boolean isWatermark() {
return getClass() == Watermark.class;
}
//判断是否为StreamStatus
public final boolean isStreamStatus() {
return getClass() == StreamStatus.class;
}
//判断是否为StreamRecord
public final boolean isRecord() {
return getClass() == StreamRecord.class;
}
//判断是否为LatencyMarker
public final boolean isLatencyMarker() {
return getClass() == LatencyMarker.class;
}
//转换为StreamRecord
public final <E> StreamRecord<E> asRecord() {
return (StreamRecord<E>) this;
}
//转换为Watermark
public final Watermark asWatermark() {
return (Watermark) this;
}
//转换为StreamStatus
public final StreamStatus asStreamStatus() {
return (StreamStatus) this;
}
//转换为LatencyMarker
public final LatencyMarker asLatencyMarker() {
return (LatencyMarker) this;
}
}

**Watermark**
Watermark继承了StreamElementWatermark 是和事件一个级别的抽象,其内部包含一个成员变量时间戳timestamp,标识当前数据的时间进度。Watermark实际上作为数据流的一部分随数据流流动。

@PublicEvolving
public final class Watermark extends StreamElement {
/*The watermark that signifies end-of-event-time. */
public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
/* The timestamp of the watermark in milliseconds. */
private final long timestamp;
/* Creates a new watermark with the given timestamp in milliseconds.*/
public Watermarklong timestamp) {
this.timestamp = timestamp;
}
/*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
public long getTimestamp() {
return timestamp;
}
}

回顾下

上周我们分享了flink watermark,用商品过期时间来类比watermark的使用场景。我们来看看我们当时watermark是怎么使用的还记得吗?

SingleOutputStreamOperator<String> resultStream = socketStream
.map((MapFunction<String, Product>) value -> {
String[] parts = value.split(",");
String name = parts[0];
long timestamp = Long.parseLong(parts[1]);
return new Product(name, timestamp);
})
// 指定 Watermark 生成策略(最大允许 10 天的乱序)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Product>forBoundedOutOfOrderness(Duration.ofDays(10))
.withTimestampAssigner((event, timestamp) -> event.productionTimestamp)
)
.keyBy(product -> product.productName) // 按商品名分组
.window(TumblingEventTimeWindows.of(Time.days(1))) // 窗口大小 1 天
.sideOutputLateData(expiredTag) // 过期商品进入侧输出流
.process(new ExpiryCheckProcessWindowFunction()); // 窗口计算逻辑

一大段用了ramda表达式写在了一起,我们就挑设置watermark部分看:

.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Product>forBoundedOutOfOrderness(Duration.ofDays(10))
.withTimestampAssigner((event, timestamp) -> event.productionTimestamp)
)

这里调用了WatermarkStrategyforBoundedOutOfOrdernesswithTimestampAssigner。通过WatermarkStrategy就设置好了watermark,那我们就来看看WatermarkStrategy是干嘛的。

WatermarkStrategy

WatermarkStrategy从调用来看,像是个类,可是看源码,其实它是个接口,里面定义了许多default方法。来看看接口的备注:

The WatermarkStrategy defines how to generate Watermarks in the stream sources. The WatermarkStrategy is a builder/factory for the WatermarkGenerator that generates the watermarks and the TimestampAssigner which assigns the internal timestamp of a record.
This interface is split into three parts: 1) methods that an implementor of this interface needs to implement, 2) builder methods for building a WatermarkStrategy on a base strategy, 3) convenience methods for constructing a WatermarkStrategy for common built-in strategies or based on a WatermarkGeneratorSupplier

内容说的很清楚,WatermarkStrategy是个接口也是工厂,可以用WatermarkGenerator生成Watermarks,还可以用TimestampAssigner来设置数据记录的内部时间戳。接口的两个非default无具体实现的方法:createWatermarkGeneratorcreateTimestampAssigner就是做这两件事用的。

而除此之外的其他有具体实现以default方法或者static静态实现分为两部分,default方法是在基础的strategy之上通过builder模式构建WatermarkStrategy的;static静态实现一种是利用已经内置好的常见strategy,这些常见strategy是预先定义好的,可以直接拿来使用,就像使用现成的工具一样方便;另一种是基于WatermarkGeneratorSupplier来构造,通过它可以根据具体需求来生成相应的watermarkStrategy,这种方式更具灵活性,可以根据不同的场景和要求来定制。

@Public
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {

// ------------------------------------------------------------------------
// Methods that implementors need to implement.
// ------------------------------------------------------------------------

/** Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this strategy.
*/
@Override
default TimestampAssigner<T> createTimestampAssigner(
TimestampAssignerSupplier.Context context) {
// By default, this is {@link RecordTimestampAssigner},
// for cases where records come out of a source with valid timestamps, for example from
// Kafka.
return new RecordTimestampAssigner<>();
}

/**
* Provides configuration for watermark alignment of a maximum watermark of multiple
* sources/tasks/partitions in the same watermark group. The group may contain completely
* independent sources (e.g. File and Kafka).
*
* <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
* the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
*/
@PublicEvolving
default WatermarkAlignmentParams getAlignmentParameters() {
return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
}

// ------------------------------------------------------------------------
// Builder methods for enriching a base WatermarkStrategy
// ------------------------------------------------------------------------

/**
* Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given
* {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}).
*
* <p>You can use this when a {@link TimestampAssigner} needs additional context, for example
* access to the metrics system.
*
* <pre>
* {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
* .forMonotonousTimestamps()
* .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
* }</pre>
*/
default WatermarkStrategy<T> withTimestampAssigner(
TimestampAssignerSupplier<T> timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
}

/**
* Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given
* {@link SerializableTimestampAssigner}.
*
* <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda
* function.
*
* <pre>
* {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
* .<CustomObject>forMonotonousTimestamps()
* .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
* }</pre>
*/
default WatermarkStrategy<T> withTimestampAssigner(
SerializableTimestampAssigner<T> timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(
this, TimestampAssignerSupplier.of(timestampAssigner));
}

/**
* Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
* created {@link WatermarkGenerator}.
*
* <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
* stream for that amount of time, then that partition is considered "idle" and will not hold
* back the progress of watermarks in downstream operators.
*
* <p>Idleness can be important if some partitions have little data and might not have events
* during some periods. Without idleness, these streams can stall the overall event time
* progress of the application.
*/
default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
checkNotNull(idleTimeout, "idleTimeout");
checkArgument(
!(idleTimeout.isZero() || idleTimeout.isNegative()),
"idleTimeout must be greater than zero");
return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}

/**
* Creates a new {@link WatermarkStrategy} that configures the maximum watermark drift from
* other sources/tasks/partitions in the same watermark group. The group may contain completely
* independent sources (e.g. File and Kafka).
*
* <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
* the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
*
* @param watermarkGroup A group of sources to align watermarks
* @param maxAllowedWatermarkDrift Maximal drift, before we pause consuming from the
* source/task/partition
*/
@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
String watermarkGroup, Duration maxAllowedWatermarkDrift) {
return withWatermarkAlignment(
watermarkGroup,
maxAllowedWatermarkDrift,
WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
}

/**
* Creates a new {@link WatermarkStrategy} that configures the maximum watermark drift from
* other sources/tasks/partitions in the same watermark group. The group may contain completely
* independent sources (e.g. File and Kafka).
*
* <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
* the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
*
* @param watermarkGroup A group of sources to align watermarks
* @param maxAllowedWatermarkDrift Maximal drift, before we pause consuming from the
* source/task/partition
* @param updateInterval How often tasks should notify coordinator about the current watermark
* and how often the coordinator should announce the maximal aligned watermark.
*/
@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
return new WatermarksWithWatermarkAlignment<T>(
this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
}

// ------------------------------------------------------------------------
// Convenience methods for common watermark strategies
// ------------------------------------------------------------------------

/**
* Creates a watermark strategy for situations with monotonously ascending timestamps.
*
* <p>The watermarks are generated periodically and tightly follow the latest timestamp in the
* data. The delay introduced by this strategy is mainly the periodic interval in which the
* watermarks are generated.
*
* @see AscendingTimestampsWatermarks
*/
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}

/**
* Creates a watermark strategy for situations where records are out of order, but you can place
* an upper bound on how far the events are out of order. An out-of-order bound B means that
* once the an event with timestamp T was encountered, no events older than {@code T - B} will
* follow any more.
*
* <p>The watermarks are generated periodically. The delay introduced by this watermark strategy
* is the periodic interval length, plus the out of orderness bound.
*
* @see BoundedOutOfOrdernessWatermarks
*/
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}

/** Creates a watermark strategy based on an existing {@link WatermarkGeneratorSupplier}. */
static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
return generatorSupplier::createWatermarkGenerator;
}

/**
* Creates a watermark strategy that generates no watermarks at all. This may be useful in
* scenarios that do pure processing-time based stream processing.
*/
static <T> WatermarkStrategy<T> noWatermarks() {
return (ctx) -> new NoWatermarksGenerator<>();
}
}

其实不是很想贴这么长段的源码进来,可是这个类又比较重要。所以各位担待一下。

这里也要注意一点,watermark其实就是处理基于事件时间的触发时机的关键机制,但它比事件时间多了一个含义:乱序处理 + 推进触发计算。同时,watermark主要处理的是延迟而不是排序它能帮助窗口在等待一定时间后触发计算,尽可能覆盖乱序数据,但不会保证窗口内的事件处理顺序,需要你自己排序处理。。

我们在日常使用的时候,其实只要和上面的例子中那样,用withTimestampAssigner告诉flink使用数据的哪个字段作为事件时间,以及使用一个内置的实现类比如forBoundedOutOfOrderness告诉flink,为存在乱序事件但可确定最大无序时间边界的场景创建水位线生成策略就已经足够了。一般要自己调用forGenerator方法来创建WatermarkStrategy的情况比较少。

forGenerator自己实现一个

既然我们是在学习,那么要快速的熟悉一个东西该怎么使用,无疑是自己实现一个上手和了解的更全面。所以,接下来,我们就用forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) 来构造一个WatermarkStrategy

下面,我们以常见的词频统计作为示例,展现一下 Watermark 的实现代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置 Watermark 生成周期为 2s
env.getConfig().setAutoWatermarkInterval(2000);

// 输入数据格式:时间戳,单词,频次
// 例如:1000,a,1
DataStreamSource<String> source = env.socketTextStream("192.168.117.128", 9999);

// 定义 Watermark 策略
WatermarkStrategy<String> strategy =
WatermarkStrategy.<String>forGenerator(new MyWatermarkGeneratorSupplier<>(Duration.ofSeconds(3)))
.withTimestampAssigner(context -> (s, l) -> Long.parseLong(s.split(",")[0]));

// 业务处理:transformation
source
.assignTimestampsAndWatermarks(strategy)
.map(value -> {
String[] splits = value.split(",");
return Tuple2.of(splits[1].trim(), Integer.parseInt(splits[2].trim()));
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) -> {
System.out.println("-----reduce invoked----" + value1.f0 + "==>" + (value1.f1 + value2.f1));
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}, new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {

@Override
public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) {

FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");

for (Tuple2<String, Integer> element : iterable) {
collector.collect("[" + format.format(context.window().getStart()) + "==>" +
format.format(context.window().getEnd()) + "], " + element.f0 + "==>" + element.f1);
}
}
})
.print();

env.execute("Watermark App");

在本例中,为了方便 Watermark 的打印,我们实现了自定义的 WatermarkGenerator,其代码如下所示:

public class MyWatermarkGeneratorSupplier<T> implements WatermarkGeneratorSupplier<T> {

private final Duration duration;

public MyWatermarkGeneratorSupplier(Duration duration) {
this.duration = duration;
}

@Override
public WatermarkGenerator<T> createWatermarkGenerator(Context context) {
return new MyWatermarkGenerator<>(duration);
}
}

public class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyWatermarkGenerator.class);

private long maxTimestamp;
private final long outOfOrdernessMillis;

public MyWatermarkGenerator(Duration maxOutOfOrderness) {
Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.maxTimestamp = -9223372036854775808L + this.outOfOrdernessMillis + 1L;
}

public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
this.maxTimestamp = Math.max(this.maxTimestamp, eventTimestamp);
}

public void onPeriodicEmit(WatermarkOutput output) {
Watermark watermark = new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L);
output.emitWatermark(watermark);
LOGGER.info("current watermark ==> {}", watermark.getTimestamp());
}
}

好了,代码比较简单,自己看看就差不多了,今天就到这吧。

用商品保质期彻底搞懂Flink Watermark

· 11 min read

为什么要理解 Watermark?

在流处理(Stream Processing)中,数据通常是乱序到达的,也就是说,一个时间戳更早的事件,可能会比一个时间戳更晚的事件更迟到达。

那么问题来了:

  • 我们怎么知道某个时间段的数据已经完整,可以进行计算?
  • 我们如何处理迟到的数据?

这就需要用到Watermark(水位线)机制。可是,初学者在学习Watermark的时候,总是非常的不好理解,到底Watermark什么时候触发计算?而计算的时候,会计算哪些数据?

那么,接下来强哥就举个比较容易理解的例子,让我们快速记住Watermark的机制。

用超市的“商品保质期”来理解 Watermark

我们假设自己是超市的商品管理员,每天需要检查商品的生产日期,决定哪些商品可以上架销售,哪些商品需要下架。

规则

  1. 每个商品都有一个生产日期(相当于事件的 event_time)。
  2. 商品的保质期是 10 天,超过 10 天就必须丢弃(相当于 Watermark)。
  3. 我们每天都会检查所有商品,并决定哪些商品可以销售,哪些商品应该下架。

例子:商品检查流程

假设今天是 2025-03-15 00:00:00,Watermark 设置为10 天前(即 2025-03-05 00:00:00)。
我们收到以下商品数据:

商品编号生产日期(时间戳)生产日期(转换后)
p117417952000002025-03-13 00:00:00
p217416224000002025-03-11 00:00:00
p317407584000002025-03-01 00:00:00
p417410176000002025-03-04 00:00:00
p517411040000002025-03-05 00:00:00

我们用Flink Java来实现这个逻辑:

代码功能

  • 监听Socket 端口(localhost:9999),实时接收商品数据
  • 设置Watermark(允许 10 天的乱序,自动丢弃过期商品
  • 使用1 天窗口,统计保质期内的商品
  • 把过期商品放入侧输出流,并打印到控制台
import com.google.common.collect.Lists;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;

public class FlinkWatermarkExpiryCheck {

// 商品数据结构
public static class Product {
public String productName;
public long productionTimestamp; // 生产时间戳(毫秒)

public Product(String productName, long productionTimestamp) {
this.productName = productName;
this.productionTimestamp = productionTimestamp;
}
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// Socket 输入流(监听 localhost:9999)
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

// 侧输出流:存放过期商品
OutputTag<Product> expiredTag = new OutputTag<Product>("expired") {};

// 处理输入数据
SingleOutputStreamOperator<String> resultStream = socketStream
.map((MapFunction<String, Product>) value -> {
String[] parts = value.split(",");
String name = parts[0];
long timestamp = Long.parseLong(parts[1]);
return new Product(name, timestamp);
})
// 指定 Watermark 生成策略(最大允许 10 天的乱序)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Product>forBoundedOutOfOrderness(Duration.ofDays(10))
.withTimestampAssigner((event, timestamp) -> event.productionTimestamp)
)
.keyBy(product -> product.productName) // 按商品名分组
.window(TumblingEventTimeWindows.of(Time.days(1))) // 窗口大小 1 天
.sideOutputLateData(expiredTag) // 过期商品进入侧输出流
.process(new ExpiryCheckProcessWindowFunction()); // 窗口计算逻辑

// 处理过期商品(从侧输出流获取)
DataStream<Product> expiredProducts = resultStream.getSideOutput(expiredTag);
expiredProducts.process(new ProcessFunction<Product, String>() {
@Override
public void processElement(Product product, Context ctx, Collector<String> out) throws Exception {
String formattedTime = DateFormatUtils.format(product.productionTimestamp, "yyyy-MM-dd HH:mm:ss");
aliveProducts.remove(product.productName);
System.out.println(String.format(
"🚫 过期商品: %s | 生产日期: %s (已超过 10 天)",
product.productName, formattedTime
));
}
});

// 输出有效商品
resultStream.print();

env.execute("Flink Watermark Expiry Check");
}

private static List<String> aliveProducts = Lists.newArrayList("p1", "p2", "p3", "p4", "p5");


// 处理窗口计算,输出当前窗口的时间信息
public static class ExpiryCheckProcessWindowFunction
extends ProcessWindowFunction<Product, String, String, TimeWindow> {

@Override
public void process(String key,
Context context,
Iterable<Product> elements,
Collector<String> out) {
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
long currentWatermark = context.currentWatermark();
long systemTime = System.currentTimeMillis();

// 格式化时间
String windowStartTime = DateFormatUtils.format(windowStart, "yyyy-MM-dd HH:mm:ss");
String windowEndTime = DateFormatUtils.format(windowEnd, "yyyy-MM-dd HH:mm:ss");
String watermarkTime = DateFormatUtils.format(currentWatermark, "yyyy-MM-dd HH:mm:ss");
String systemTimeFormatted = DateFormatUtils.format(systemTime, "yyyy-MM-dd HH:mm:ss");

// 输出窗口信息
System.out.println(String.format(
"✅ 商品检查完成 | Watermark: %s | 窗口: [%s ~ %s] | 系统时间: %s",
watermarkTime, windowStartTime, windowEndTime, systemTimeFormatted
));
// 统计有效商品
elements.forEach(product -> {
System.out.println(String.format(
"🚫 过期商品: %s | 生产日期: %s (已超过 10 天)",
product.productName, DateFormatUtils.format(product.productionTimestamp, "yyyy-MM-dd HH:mm:ss")
));
aliveProducts.remove(product.productName);
});



System.out.println("🛒 有效商品:" + aliveProducts);
}
}
}

开启nc,启动flink服务后,我们在控制台先输入今天的日期:

今天,1741968000000

然后开始检查所有商品,在控制台输入:

p1,1741795200000
p2,1741622400000
p3,1740758400000
p4,1741017600000
p5,1741104000000

会看到,flink端打印输出:

🚫 过期商品: p3 | 生产日期: 2025-03-01 00:00:00 (已超过 10 天)
🚫 过期商品: p4 | 生产日期: 2025-03-04 00:00:00 (已超过 10 天)

也就是说,今天检查到了p3和p4两个商品已经过期了。

时间到了第二天,我们在控制台输入:

第二天,1742054400000

会看到,flink端打印输出:

✅ 商品检查完成 | Watermark: 2025-03-05 23:59:59 | 窗口: [2025-03-04 08:00:00 ~ 2025-03-05 08:00:00] | 系统时间: 2025-04-03 18:01:34
🚫 过期商品: p5 | 生产日期: 2025-03-05 00:00:00 (已超过 10 天)
🛒 有效商品:[p1, p2]

商品p5在第二天过期了,为什么呢?我们看到,商品p5的生产日期是2025-03-05 00:00:00,而今天是:2025-03-16 00:00:00,也就是说:2025-03-06 00:00:00之后的商品才是没过期的。我们看到Watermark: 2025-03-05 23:59:59,就是代表只要小于等于2025-03-05 23:59:59的商品就是过期了,所以p5过期了。

时间到了3月21日,我们在控制台输入:

3月21日,1742486400000

发现flink端没有输出

也就是说没有商品过期,因为今天,Watermark: 2025-03-10 23:59:59,没有在这个之前的商品,最老的商品是p2:2025-03-11 00:00:00。所以没有商品过期。

时间到了3月22日,我们在控制台输入:

3月22日,1742572800000

会看到,flink端打印输出:

✅ 商品检查完成 | Watermark: 2025-03-11 23:59:59 | 窗口: [2025-03-10 08:00:00 ~ 2025-03-11 08:00:00] | 系统时间: 2025-04-03 18:26:01
🚫 过期商品: p2 | 生产日期: 2025-03-11 00:00:00 (已超过 10 天)
🛒 有效商品:[p1]

商品p2过期了,为什么呢?因为商品p2的生产日期是2025-03-11 00:00:00,而今天是:2025-03-22 00:00:00,也就是说:2025-03-12 00:00:00之后的商品才是没过期的。我们看到Watermark: 2025-03-11 23:59:59,就是代表只要小于等于2025-03-11 23:59:59的商品就是过期了,所以p2过期了。

水位线与窗口的关系

通过学习了以上例子后,我们再用文字描述总结可能就能更好的理解了。

水位线与窗口的关系:

  • 触发计算:水位线的时间戳用于触发窗口的关闭和计算。当水位线的时间戳超过窗口的结束时间时,窗口会关闭并开始计算。
  • 处理迟到数据:水位线可以帮助窗口处理迟到数据。迟到数据是指在窗口关闭后到达的数据。通过设置水位线的延迟,可以允许一定时间内的迟到数据被窗口收集并处理。
  • 独立性:虽然水位线和窗口紧密配合,但它们是独立的机制。水位线的生成基于事件时间和延迟,而窗口的计算基于窗口的定义和水位线的触发。

最后再看一个示例: 假设有一个流处理任务,需要对每10秒的数据进行聚合计算。具体步骤如下:

设置水位线:假设水位线的延迟为2秒。

创建窗口:定义一个滚动窗口,窗口大小为10秒。

数据到达:

数据1(时间戳:12:00:00)到达,水位线设置为12:00:00 - 2秒 = 11:59:58。

数据2(时间戳:12:00:05)到达,水位线更新为12:00:05 - 2秒 = 12:00:03。

数据3(时间戳:12:00:12)到达,水位线更新为12:00:12 - 2秒 = 12:00:10。

触发计算:当水位线的时间戳达到12:00:10时,触发12:00:00 - 12:00:10窗口的计算。

处理迟到数据:如果在水位线为12:00:10之后,又来了一个时间戳为12:00:04的数据,该数据会被丢弃或放入下一个窗口,具体取决于系统的配置。 通过水位线和窗口的配合,流处理系统可以在处理乱序数据的同时,保证计算的正确性和效率。

好了,通过上面的内容,是否理解了flink的水位线与窗口了呢?那强哥再问个问题:

超市的例子,控制台输入多少,最后的一个商品才会过期呢?

Flink 处理延迟的手段

· 6 min read

牢记

watermark相关的文章写了几篇,今天先做个小节吧。

  • watermark是针对事件时间的,主要的内容是一个时间戳,用来表示当前事件时间的进展
  • watermark解决的是数据延迟问题
  • watermark(t),表示在当前流中事件时间已经达到了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据。也就是说watermark的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • watermark不解决的窗口计算时的数据顺序
  • watermark其实可以简单的理解为触发窗口计算的卡口。在没有设置watermark的时候,窗口:[窗口开始时间,窗口结束时间)是在处理时间到达窗口结束时间时触发。而设置了watermark后,触发窗口的计算时间就是:窗口结束时间+watermark延迟的时间。只是在flink的实现中,要注意到onEventonPeriodicEmit两个方法。

回顾

上篇文章其实最后的一个例子写完后,时间太晚了,没来的及多解释下,我们再简单回顾下

public class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyWatermarkGenerator.class);

private long maxTimestamp;
private final long outOfOrdernessMillis;

public MyWatermarkGenerator(Duration maxOutOfOrderness) {
Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.maxTimestamp = -9223372036854775808L + this.outOfOrdernessMillis + 1L;
}

public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
this.maxTimestamp = Math.max(this.maxTimestamp, eventTimestamp);
}

public void onPeriodicEmit(WatermarkOutput output) {
Watermark watermark = new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L);
output.emitWatermark(watermark);
LOGGER.info("current watermark ==> {}", watermark.getTimestamp());
}
}

我们自己实现了onEventonPeriodicEmit方法。其实,有看过forBoundedOutOfOrderness源码的小伙伴应该会有些熟悉,因为我们自己实现的MyWatermarkGenerator基本上和它的实现差不多。forBoundedOutOfOrderness的源码是这样的:

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

/** The maximum timestamp encountered so far. */
private long maxTimestamp;

/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;

/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}

// ------------------------------------------------------------------------

@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}

onEvent方法是每条数据来的时候都会执行,在代码中可以看到,这里这是设置了maxTimestamp,并没有触发watermark更新。而要触发watermark更新,需要调用:output.emitWatermark方法。

onPeriodicEmit则是flink周期性触发执行的,默认200ms一次。我们看到,在它的实现里面,就触发了watermark更新。如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms。

env.getConfig().setAutoWatermarkInterval(400L);

我们知道BoundedOutOfOrdernessWatermarksWatermarkStrategy乱序流中内置水位线的设置。而另外一个forMonotonousTimestamps有序流中内置水位线的设置。其实,从源码来看,forMonotonousTimestamps内部的实现最后还是走了BoundedOutOfOrdernessWatermarks

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

/** Creates a new watermark generator with for ascending timestamps. */
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}

所以,其实理解了BoundedOutOfOrdernessWatermarks乱序流中内置水位线的设置就算是理解了水位线啦。

迟到的数据

从什么的watermark的设置中,我们其实可以看到,watermark只是做了延迟窗口计算的触发,但是当窗口开始计算后,还有延迟的数据,就没法再进入窗口计算了,还是可能存在数据丢失的情况。而如果我们的数据非常重要,不能延迟该怎么办呢?

flink还提供了两个手段处理这些延迟数据:

  • Allow Lateness 窗口已经开始计算时,允许延迟一段时间后再关闭窗口。
  • Side Output对真正延迟的数据,统一放入到另一条输出流中。有种兜底处理的意思。 代码中这么使用就好了,这块就不多描述了。兜底手段,记得有这个就行。
OutputTag<Product> expiredTag = new OutputTag<Product>("expired") {};
socketStream
.map((MapFunction<String, Product>) value -> {
String[] parts = value.split(",");
String name = parts[0];
long timestamp = Long.parseLong(parts[1]);
return new Product(name, timestamp);
})
// 指定 Watermark 生成策略(最大允许 10 天的乱序)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Product>forBoundedOutOfOrderness(Duration.ofDays(10))
.withTimestampAssigner((event, timestamp) -> event.productionTimestamp)
)
.keyBy(product -> product.productName) // 按商品名分组
.window(TumblingEventTimeWindows.of(Time.days(1))) // 窗口大小 1 天
.allowedLateness(Time.seconds(2)) // 推迟2s关窗
.sideOutputLateData(expiredTag) // 过期商品进入侧输出流
.process(new ExpiryCheckProcessWindowFunction()); // 窗口计算逻辑

好啦,watermark相关的知识就这些啦。watermark完结,撒花~