Skip to main content

4 posts tagged with "flink"

View All Tags

从 Flink Window 到 ProcessFunction:一次内存优化的实战分享

· 5 min read

最近在做一个实时计算项目中,需要对用户行为日志进行聚合分析。起初,同事使用了 Flink 的Window API(如 TumblingEventTimeWindows + WindowFunction)来实现 5 分钟滚动统计。

功能上没问题,但随着数据量的增长,作业频繁触发 GC,内存占用飙升,最终甚至出现了 OOM 错误。经排查后发现:Window 中的数据缓存策略,是性能瓶颈的关键

问题分析

Flink 的 Window API 会在后台自动缓存每个 key、每个窗口内的所有元素:

  • 比如一个 5 分钟窗口,每个用户行为事件都需要被保留在 Flink 的 window state 中;
  • 对于高 QPS 场景,每秒成千上万条事件都要被保存;
  • 随着 key 数量增加(如按用户、页面等分组),缓存压力线性增长。

本质问题:Window 是“批处理”思想的落地,而我们这个需求更适合“事件驱动”处理。

问题优化

为了解决这个问题,我尝试用更轻量的方式重构逻辑 —— 替换 Window,为每个 key 维护一个简单的状态 + 定时器,模拟滚动窗口的功能。

重构后的思路:

  • 每来一条数据,立即更新一个 ValueState(如 count 或 sum);
  • 注册一个定时器(如 ctx.timerService().registerEventTimeTimer(...));
  • 到时间触发处理并清理状态(及时释放内存);
  • 无需保存每条事件,只保存必要的中间值(例如总和、计数等)

示例代码片段

public class RollingCountProcessFunction extends KeyedProcessFunction<String, Event, Result> {

private transient ValueState<Long> countState;

@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("count", Long.class);
countState = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement(Event value, Context ctx, Collector<Result> out) throws Exception {
Long current = Optional.ofNullable(countState.value()).orElse(0L);
countState.update(current + 1);
ctx.timerService().registerEventTimeTimer(value.getEventTime() + 5 * 60 * 1000);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
Long count = countState.value();
out.collect(new Result(ctx.getCurrentKey(), count));
countState.clear(); // 清理状态,释放内存
}
}

性能对比

指标使用 Window API使用 ProcessFunction
内存占用高,频繁 GC显著下降,状态更精简
延迟固定窗口延迟触发实时响应,低延迟
灵活性逻辑固定高度灵活,自定义状态和行为
容错与 Checkpoint支持同样支持

最终效果

在实际部署后,以下指标得到显著优化:

  • TaskManager 内存使用从约 2.5GB 降至 300MB
  • GC 次数减少 90% 以上
  • 窗口统计延迟下降至毫秒级

小结

在高吞吐、低延迟、内存敏感的场景下,Window API 可能不是最优解。它更适合用于“聚合历史数据”,但对于事件驱动类业务,我们可以用 ProcessFunction 构建更轻量的替代方案:

  • 仅保留必要的中间状态;
  • 更快释放内存,避免数据缓存堆积;
  • 同时保持灵活性和一致性。

小结:Flink 是一个高度灵活的平台,正确选择 API 的粒度和机制,远比堆配置参数更有效

官网建议

我们如果有看flink的官方文档,也可以在windows章节看到相关解释和建议,强哥这里直接摘录出来,英文好的可以看看

……引用了半天,公众号每个引用不让超过300字,醉了。那直接给链接大家自己看吧:

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#window-lifecycle

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#window-functions

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#useful-state-size-considerations

是同一页的,但是内容太多,看我发的具体锚点部分就行。

从源码浅看Watermark的打开方式

· 14 min read

赌场失意

老实说,就今天赌场跌成这样,还能有心情给大家分享,甚至早上还去抄了底,此刻大半夜心情毫无波澜的写东西,你们就该好好的拿椅子坐下学习。

不过,我也知道关注我的人里面,很多其实不是学后端的,大数据更别扯蛋了,不过发了推文,没取关的,都是yifu,所以也尽量有的时候分享些别的东西,写文章虽然是想要记录自己的学习和生活,不过既然写了,当然也希望看的人不会觉得太无趣。

watermark

Flink DataStream中流动着不同的元素,统称为StreamElementStreamElement可以是StreamRecordWatermarkStreamStatusLatencyMarker中任何一种类型。

StreamElement
StreamElement是一个抽象类(是Flink 承载消息的基类),其他四种类型继承StreamElement

public abstract class StreamElement {
//判断是否是Watermark
public final boolean isWatermark() {
return getClass() == Watermark.class;
}
//判断是否为StreamStatus
public final boolean isStreamStatus() {
return getClass() == StreamStatus.class;
}
//判断是否为StreamRecord
public final boolean isRecord() {
return getClass() == StreamRecord.class;
}
//判断是否为LatencyMarker
public final boolean isLatencyMarker() {
return getClass() == LatencyMarker.class;
}
//转换为StreamRecord
public final <E> StreamRecord<E> asRecord() {
return (StreamRecord<E>) this;
}
//转换为Watermark
public final Watermark asWatermark() {
return (Watermark) this;
}
//转换为StreamStatus
public final StreamStatus asStreamStatus() {
return (StreamStatus) this;
}
//转换为LatencyMarker
public final LatencyMarker asLatencyMarker() {
return (LatencyMarker) this;
}
}

**Watermark**
Watermark继承了StreamElementWatermark 是和事件一个级别的抽象,其内部包含一个成员变量时间戳timestamp,标识当前数据的时间进度。Watermark实际上作为数据流的一部分随数据流流动。

@PublicEvolving
public final class Watermark extends StreamElement {
/*The watermark that signifies end-of-event-time. */
public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
/* The timestamp of the watermark in milliseconds. */
private final long timestamp;
/* Creates a new watermark with the given timestamp in milliseconds.*/
public Watermarklong timestamp) {
this.timestamp = timestamp;
}
/*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
public long getTimestamp() {
return timestamp;
}
}

回顾下

上周我们分享了flink watermark,用商品过期时间来类比watermark的使用场景。我们来看看我们当时watermark是怎么使用的还记得吗?

SingleOutputStreamOperator<String> resultStream = socketStream
.map((MapFunction<String, Product>) value -> {
String[] parts = value.split(",");
String name = parts[0];
long timestamp = Long.parseLong(parts[1]);
return new Product(name, timestamp);
})
// 指定 Watermark 生成策略(最大允许 10 天的乱序)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Product>forBoundedOutOfOrderness(Duration.ofDays(10))
.withTimestampAssigner((event, timestamp) -> event.productionTimestamp)
)
.keyBy(product -> product.productName) // 按商品名分组
.window(TumblingEventTimeWindows.of(Time.days(1))) // 窗口大小 1 天
.sideOutputLateData(expiredTag) // 过期商品进入侧输出流
.process(new ExpiryCheckProcessWindowFunction()); // 窗口计算逻辑

一大段用了ramda表达式写在了一起,我们就挑设置watermark部分看:

.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Product>forBoundedOutOfOrderness(Duration.ofDays(10))
.withTimestampAssigner((event, timestamp) -> event.productionTimestamp)
)

这里调用了WatermarkStrategyforBoundedOutOfOrdernesswithTimestampAssigner。通过WatermarkStrategy就设置好了watermark,那我们就来看看WatermarkStrategy是干嘛的。

WatermarkStrategy

WatermarkStrategy从调用来看,像是个类,可是看源码,其实它是个接口,里面定义了许多default方法。来看看接口的备注:

The WatermarkStrategy defines how to generate Watermarks in the stream sources. The WatermarkStrategy is a builder/factory for the WatermarkGenerator that generates the watermarks and the TimestampAssigner which assigns the internal timestamp of a record.
This interface is split into three parts: 1) methods that an implementor of this interface needs to implement, 2) builder methods for building a WatermarkStrategy on a base strategy, 3) convenience methods for constructing a WatermarkStrategy for common built-in strategies or based on a WatermarkGeneratorSupplier

内容说的很清楚,WatermarkStrategy是个接口也是工厂,可以用WatermarkGenerator生成Watermarks,还可以用TimestampAssigner来设置数据记录的内部时间戳。接口的两个非default无具体实现的方法:createWatermarkGeneratorcreateTimestampAssigner就是做这两件事用的。

而除此之外的其他有具体实现以default方法或者static静态实现分为两部分,default方法是在基础的strategy之上通过builder模式构建WatermarkStrategy的;static静态实现一种是利用已经内置好的常见strategy,这些常见strategy是预先定义好的,可以直接拿来使用,就像使用现成的工具一样方便;另一种是基于WatermarkGeneratorSupplier来构造,通过它可以根据具体需求来生成相应的watermarkStrategy,这种方式更具灵活性,可以根据不同的场景和要求来定制。

@Public
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {

// ------------------------------------------------------------------------
// Methods that implementors need to implement.
// ------------------------------------------------------------------------

/** Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this strategy.
*/
@Override
default TimestampAssigner<T> createTimestampAssigner(
TimestampAssignerSupplier.Context context) {
// By default, this is {@link RecordTimestampAssigner},
// for cases where records come out of a source with valid timestamps, for example from
// Kafka.
return new RecordTimestampAssigner<>();
}

/**
* Provides configuration for watermark alignment of a maximum watermark of multiple
* sources/tasks/partitions in the same watermark group. The group may contain completely
* independent sources (e.g. File and Kafka).
*
* <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
* the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
*/
@PublicEvolving
default WatermarkAlignmentParams getAlignmentParameters() {
return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
}

// ------------------------------------------------------------------------
// Builder methods for enriching a base WatermarkStrategy
// ------------------------------------------------------------------------

/**
* Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given
* {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}).
*
* <p>You can use this when a {@link TimestampAssigner} needs additional context, for example
* access to the metrics system.
*
* <pre>
* {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
* .forMonotonousTimestamps()
* .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
* }</pre>
*/
default WatermarkStrategy<T> withTimestampAssigner(
TimestampAssignerSupplier<T> timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
}

/**
* Creates a new {@code WatermarkStrategy} that wraps this strategy but instead uses the given
* {@link SerializableTimestampAssigner}.
*
* <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda
* function.
*
* <pre>
* {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
* .<CustomObject>forMonotonousTimestamps()
* .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
* }</pre>
*/
default WatermarkStrategy<T> withTimestampAssigner(
SerializableTimestampAssigner<T> timestampAssigner) {
checkNotNull(timestampAssigner, "timestampAssigner");
return new WatermarkStrategyWithTimestampAssigner<>(
this, TimestampAssignerSupplier.of(timestampAssigner));
}

/**
* Creates a new enriched {@link WatermarkStrategy} that also does idleness detection in the
* created {@link WatermarkGenerator}.
*
* <p>Add an idle timeout to the watermark strategy. If no records flow in a partition of a
* stream for that amount of time, then that partition is considered "idle" and will not hold
* back the progress of watermarks in downstream operators.
*
* <p>Idleness can be important if some partitions have little data and might not have events
* during some periods. Without idleness, these streams can stall the overall event time
* progress of the application.
*/
default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
checkNotNull(idleTimeout, "idleTimeout");
checkArgument(
!(idleTimeout.isZero() || idleTimeout.isNegative()),
"idleTimeout must be greater than zero");
return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}

/**
* Creates a new {@link WatermarkStrategy} that configures the maximum watermark drift from
* other sources/tasks/partitions in the same watermark group. The group may contain completely
* independent sources (e.g. File and Kafka).
*
* <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
* the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
*
* @param watermarkGroup A group of sources to align watermarks
* @param maxAllowedWatermarkDrift Maximal drift, before we pause consuming from the
* source/task/partition
*/
@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
String watermarkGroup, Duration maxAllowedWatermarkDrift) {
return withWatermarkAlignment(
watermarkGroup,
maxAllowedWatermarkDrift,
WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);
}

/**
* Creates a new {@link WatermarkStrategy} that configures the maximum watermark drift from
* other sources/tasks/partitions in the same watermark group. The group may contain completely
* independent sources (e.g. File and Kafka).
*
* <p>Once configured Flink will "pause" consuming from a source/task/partition that is ahead of
* the emitted watermark in the group by more than the maxAllowedWatermarkDrift.
*
* @param watermarkGroup A group of sources to align watermarks
* @param maxAllowedWatermarkDrift Maximal drift, before we pause consuming from the
* source/task/partition
* @param updateInterval How often tasks should notify coordinator about the current watermark
* and how often the coordinator should announce the maximal aligned watermark.
*/
@PublicEvolving
default WatermarkStrategy<T> withWatermarkAlignment(
String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {
return new WatermarksWithWatermarkAlignment<T>(
this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);
}

// ------------------------------------------------------------------------
// Convenience methods for common watermark strategies
// ------------------------------------------------------------------------

/**
* Creates a watermark strategy for situations with monotonously ascending timestamps.
*
* <p>The watermarks are generated periodically and tightly follow the latest timestamp in the
* data. The delay introduced by this strategy is mainly the periodic interval in which the
* watermarks are generated.
*
* @see AscendingTimestampsWatermarks
*/
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}

/**
* Creates a watermark strategy for situations where records are out of order, but you can place
* an upper bound on how far the events are out of order. An out-of-order bound B means that
* once the an event with timestamp T was encountered, no events older than {@code T - B} will
* follow any more.
*
* <p>The watermarks are generated periodically. The delay introduced by this watermark strategy
* is the periodic interval length, plus the out of orderness bound.
*
* @see BoundedOutOfOrdernessWatermarks
*/
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}

/** Creates a watermark strategy based on an existing {@link WatermarkGeneratorSupplier}. */
static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
return generatorSupplier::createWatermarkGenerator;
}

/**
* Creates a watermark strategy that generates no watermarks at all. This may be useful in
* scenarios that do pure processing-time based stream processing.
*/
static <T> WatermarkStrategy<T> noWatermarks() {
return (ctx) -> new NoWatermarksGenerator<>();
}
}

其实不是很想贴这么长段的源码进来,可是这个类又比较重要。所以各位担待一下。

这里也要注意一点,watermark其实就是处理基于事件时间的触发时机的关键机制,但它比事件时间多了一个含义:乱序处理 + 推进触发计算。同时,watermark主要处理的是延迟而不是排序它能帮助窗口在等待一定时间后触发计算,尽可能覆盖乱序数据,但不会保证窗口内的事件处理顺序,需要你自己排序处理。。

我们在日常使用的时候,其实只要和上面的例子中那样,用withTimestampAssigner告诉flink使用数据的哪个字段作为事件时间,以及使用一个内置的实现类比如forBoundedOutOfOrderness告诉flink,为存在乱序事件但可确定最大无序时间边界的场景创建水位线生成策略就已经足够了。一般要自己调用forGenerator方法来创建WatermarkStrategy的情况比较少。

forGenerator自己实现一个

既然我们是在学习,那么要快速的熟悉一个东西该怎么使用,无疑是自己实现一个上手和了解的更全面。所以,接下来,我们就用forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) 来构造一个WatermarkStrategy

下面,我们以常见的词频统计作为示例,展现一下 Watermark 的实现代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置 Watermark 生成周期为 2s
env.getConfig().setAutoWatermarkInterval(2000);

// 输入数据格式:时间戳,单词,频次
// 例如:1000,a,1
DataStreamSource<String> source = env.socketTextStream("192.168.117.128", 9999);

// 定义 Watermark 策略
WatermarkStrategy<String> strategy =
WatermarkStrategy.<String>forGenerator(new MyWatermarkGeneratorSupplier<>(Duration.ofSeconds(3)))
.withTimestampAssigner(context -> (s, l) -> Long.parseLong(s.split(",")[0]));

// 业务处理:transformation
source
.assignTimestampsAndWatermarks(strategy)
.map(value -> {
String[] splits = value.split(",");
return Tuple2.of(splits[1].trim(), Integer.parseInt(splits[2].trim()));
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) -> {
System.out.println("-----reduce invoked----" + value1.f0 + "==>" + (value1.f1 + value2.f1));
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}, new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {

@Override
public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) {

FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");

for (Tuple2<String, Integer> element : iterable) {
collector.collect("[" + format.format(context.window().getStart()) + "==>" +
format.format(context.window().getEnd()) + "], " + element.f0 + "==>" + element.f1);
}
}
})
.print();

env.execute("Watermark App");

在本例中,为了方便 Watermark 的打印,我们实现了自定义的 WatermarkGenerator,其代码如下所示:

public class MyWatermarkGeneratorSupplier<T> implements WatermarkGeneratorSupplier<T> {

private final Duration duration;

public MyWatermarkGeneratorSupplier(Duration duration) {
this.duration = duration;
}

@Override
public WatermarkGenerator<T> createWatermarkGenerator(Context context) {
return new MyWatermarkGenerator<>(duration);
}
}

public class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyWatermarkGenerator.class);

private long maxTimestamp;
private final long outOfOrdernessMillis;

public MyWatermarkGenerator(Duration maxOutOfOrderness) {
Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.maxTimestamp = -9223372036854775808L + this.outOfOrdernessMillis + 1L;
}

public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
this.maxTimestamp = Math.max(this.maxTimestamp, eventTimestamp);
}

public void onPeriodicEmit(WatermarkOutput output) {
Watermark watermark = new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L);
output.emitWatermark(watermark);
LOGGER.info("current watermark ==> {}", watermark.getTimestamp());
}
}

好了,代码比较简单,自己看看就差不多了,今天就到这吧。

用商品保质期彻底搞懂Flink Watermark

· 11 min read

为什么要理解 Watermark?

在流处理(Stream Processing)中,数据通常是乱序到达的,也就是说,一个时间戳更早的事件,可能会比一个时间戳更晚的事件更迟到达。

那么问题来了:

  • 我们怎么知道某个时间段的数据已经完整,可以进行计算?
  • 我们如何处理迟到的数据?

这就需要用到Watermark(水位线)机制。可是,初学者在学习Watermark的时候,总是非常的不好理解,到底Watermark什么时候触发计算?而计算的时候,会计算哪些数据?

那么,接下来强哥就举个比较容易理解的例子,让我们快速记住Watermark的机制。

用超市的“商品保质期”来理解 Watermark

我们假设自己是超市的商品管理员,每天需要检查商品的生产日期,决定哪些商品可以上架销售,哪些商品需要下架。

规则

  1. 每个商品都有一个生产日期(相当于事件的 event_time)。
  2. 商品的保质期是 10 天,超过 10 天就必须丢弃(相当于 Watermark)。
  3. 我们每天都会检查所有商品,并决定哪些商品可以销售,哪些商品应该下架。

例子:商品检查流程

假设今天是 2025-03-15 00:00:00,Watermark 设置为10 天前(即 2025-03-05 00:00:00)。
我们收到以下商品数据:

商品编号生产日期(时间戳)生产日期(转换后)
p117417952000002025-03-13 00:00:00
p217416224000002025-03-11 00:00:00
p317407584000002025-03-01 00:00:00
p417410176000002025-03-04 00:00:00
p517411040000002025-03-05 00:00:00

我们用Flink Java来实现这个逻辑:

代码功能

  • 监听Socket 端口(localhost:9999),实时接收商品数据
  • 设置Watermark(允许 10 天的乱序,自动丢弃过期商品
  • 使用1 天窗口,统计保质期内的商品
  • 把过期商品放入侧输出流,并打印到控制台
import com.google.common.collect.Lists;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;

public class FlinkWatermarkExpiryCheck {

// 商品数据结构
public static class Product {
public String productName;
public long productionTimestamp; // 生产时间戳(毫秒)

public Product(String productName, long productionTimestamp) {
this.productName = productName;
this.productionTimestamp = productionTimestamp;
}
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// Socket 输入流(监听 localhost:9999)
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

// 侧输出流:存放过期商品
OutputTag<Product> expiredTag = new OutputTag<Product>("expired") {};

// 处理输入数据
SingleOutputStreamOperator<String> resultStream = socketStream
.map((MapFunction<String, Product>) value -> {
String[] parts = value.split(",");
String name = parts[0];
long timestamp = Long.parseLong(parts[1]);
return new Product(name, timestamp);
})
// 指定 Watermark 生成策略(最大允许 10 天的乱序)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Product>forBoundedOutOfOrderness(Duration.ofDays(10))
.withTimestampAssigner((event, timestamp) -> event.productionTimestamp)
)
.keyBy(product -> product.productName) // 按商品名分组
.window(TumblingEventTimeWindows.of(Time.days(1))) // 窗口大小 1 天
.sideOutputLateData(expiredTag) // 过期商品进入侧输出流
.process(new ExpiryCheckProcessWindowFunction()); // 窗口计算逻辑

// 处理过期商品(从侧输出流获取)
DataStream<Product> expiredProducts = resultStream.getSideOutput(expiredTag);
expiredProducts.process(new ProcessFunction<Product, String>() {
@Override
public void processElement(Product product, Context ctx, Collector<String> out) throws Exception {
String formattedTime = DateFormatUtils.format(product.productionTimestamp, "yyyy-MM-dd HH:mm:ss");
aliveProducts.remove(product.productName);
System.out.println(String.format(
"🚫 过期商品: %s | 生产日期: %s (已超过 10 天)",
product.productName, formattedTime
));
}
});

// 输出有效商品
resultStream.print();

env.execute("Flink Watermark Expiry Check");
}

private static List<String> aliveProducts = Lists.newArrayList("p1", "p2", "p3", "p4", "p5");


// 处理窗口计算,输出当前窗口的时间信息
public static class ExpiryCheckProcessWindowFunction
extends ProcessWindowFunction<Product, String, String, TimeWindow> {

@Override
public void process(String key,
Context context,
Iterable<Product> elements,
Collector<String> out) {
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
long currentWatermark = context.currentWatermark();
long systemTime = System.currentTimeMillis();

// 格式化时间
String windowStartTime = DateFormatUtils.format(windowStart, "yyyy-MM-dd HH:mm:ss");
String windowEndTime = DateFormatUtils.format(windowEnd, "yyyy-MM-dd HH:mm:ss");
String watermarkTime = DateFormatUtils.format(currentWatermark, "yyyy-MM-dd HH:mm:ss");
String systemTimeFormatted = DateFormatUtils.format(systemTime, "yyyy-MM-dd HH:mm:ss");

// 输出窗口信息
System.out.println(String.format(
"✅ 商品检查完成 | Watermark: %s | 窗口: [%s ~ %s] | 系统时间: %s",
watermarkTime, windowStartTime, windowEndTime, systemTimeFormatted
));
// 统计有效商品
elements.forEach(product -> {
System.out.println(String.format(
"🚫 过期商品: %s | 生产日期: %s (已超过 10 天)",
product.productName, DateFormatUtils.format(product.productionTimestamp, "yyyy-MM-dd HH:mm:ss")
));
aliveProducts.remove(product.productName);
});



System.out.println("🛒 有效商品:" + aliveProducts);
}
}
}

开启nc,启动flink服务后,我们在控制台先输入今天的日期:

今天,1741968000000

然后开始检查所有商品,在控制台输入:

p1,1741795200000
p2,1741622400000
p3,1740758400000
p4,1741017600000
p5,1741104000000

会看到,flink端打印输出:

🚫 过期商品: p3 | 生产日期: 2025-03-01 00:00:00 (已超过 10 天)
🚫 过期商品: p4 | 生产日期: 2025-03-04 00:00:00 (已超过 10 天)

也就是说,今天检查到了p3和p4两个商品已经过期了。

时间到了第二天,我们在控制台输入:

第二天,1742054400000

会看到,flink端打印输出:

✅ 商品检查完成 | Watermark: 2025-03-05 23:59:59 | 窗口: [2025-03-04 08:00:00 ~ 2025-03-05 08:00:00] | 系统时间: 2025-04-03 18:01:34
🚫 过期商品: p5 | 生产日期: 2025-03-05 00:00:00 (已超过 10 天)
🛒 有效商品:[p1, p2]

商品p5在第二天过期了,为什么呢?我们看到,商品p5的生产日期是2025-03-05 00:00:00,而今天是:2025-03-16 00:00:00,也就是说:2025-03-06 00:00:00之后的商品才是没过期的。我们看到Watermark: 2025-03-05 23:59:59,就是代表只要小于等于2025-03-05 23:59:59的商品就是过期了,所以p5过期了。

时间到了3月21日,我们在控制台输入:

3月21日,1742486400000

发现flink端没有输出

也就是说没有商品过期,因为今天,Watermark: 2025-03-10 23:59:59,没有在这个之前的商品,最老的商品是p2:2025-03-11 00:00:00。所以没有商品过期。

时间到了3月22日,我们在控制台输入:

3月22日,1742572800000

会看到,flink端打印输出:

✅ 商品检查完成 | Watermark: 2025-03-11 23:59:59 | 窗口: [2025-03-10 08:00:00 ~ 2025-03-11 08:00:00] | 系统时间: 2025-04-03 18:26:01
🚫 过期商品: p2 | 生产日期: 2025-03-11 00:00:00 (已超过 10 天)
🛒 有效商品:[p1]

商品p2过期了,为什么呢?因为商品p2的生产日期是2025-03-11 00:00:00,而今天是:2025-03-22 00:00:00,也就是说:2025-03-12 00:00:00之后的商品才是没过期的。我们看到Watermark: 2025-03-11 23:59:59,就是代表只要小于等于2025-03-11 23:59:59的商品就是过期了,所以p2过期了。

水位线与窗口的关系

通过学习了以上例子后,我们再用文字描述总结可能就能更好的理解了。

水位线与窗口的关系:

  • 触发计算:水位线的时间戳用于触发窗口的关闭和计算。当水位线的时间戳超过窗口的结束时间时,窗口会关闭并开始计算。
  • 处理迟到数据:水位线可以帮助窗口处理迟到数据。迟到数据是指在窗口关闭后到达的数据。通过设置水位线的延迟,可以允许一定时间内的迟到数据被窗口收集并处理。
  • 独立性:虽然水位线和窗口紧密配合,但它们是独立的机制。水位线的生成基于事件时间和延迟,而窗口的计算基于窗口的定义和水位线的触发。

最后再看一个示例: 假设有一个流处理任务,需要对每10秒的数据进行聚合计算。具体步骤如下:

设置水位线:假设水位线的延迟为2秒。

创建窗口:定义一个滚动窗口,窗口大小为10秒。

数据到达:

数据1(时间戳:12:00:00)到达,水位线设置为12:00:00 - 2秒 = 11:59:58。

数据2(时间戳:12:00:05)到达,水位线更新为12:00:05 - 2秒 = 12:00:03。

数据3(时间戳:12:00:12)到达,水位线更新为12:00:12 - 2秒 = 12:00:10。

触发计算:当水位线的时间戳达到12:00:10时,触发12:00:00 - 12:00:10窗口的计算。

处理迟到数据:如果在水位线为12:00:10之后,又来了一个时间戳为12:00:04的数据,该数据会被丢弃或放入下一个窗口,具体取决于系统的配置。 通过水位线和窗口的配合,流处理系统可以在处理乱序数据的同时,保证计算的正确性和效率。

好了,通过上面的内容,是否理解了flink的水位线与窗口了呢?那强哥再问个问题:

超市的例子,控制台输入多少,最后的一个商品才会过期呢?

Flink 处理延迟的手段

· 6 min read

牢记

watermark相关的文章写了几篇,今天先做个小节吧。

  • watermark是针对事件时间的,主要的内容是一个时间戳,用来表示当前事件时间的进展
  • watermark解决的是数据延迟问题
  • watermark(t),表示在当前流中事件时间已经达到了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据。也就是说watermark的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • watermark不解决的窗口计算时的数据顺序
  • watermark其实可以简单的理解为触发窗口计算的卡口。在没有设置watermark的时候,窗口:[窗口开始时间,窗口结束时间)是在处理时间到达窗口结束时间时触发。而设置了watermark后,触发窗口的计算时间就是:窗口结束时间+watermark延迟的时间。只是在flink的实现中,要注意到onEventonPeriodicEmit两个方法。

回顾

上篇文章其实最后的一个例子写完后,时间太晚了,没来的及多解释下,我们再简单回顾下

public class MyWatermarkGenerator<T> implements WatermarkGenerator<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyWatermarkGenerator.class);

private long maxTimestamp;
private final long outOfOrdernessMillis;

public MyWatermarkGenerator(Duration maxOutOfOrderness) {
Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.maxTimestamp = -9223372036854775808L + this.outOfOrdernessMillis + 1L;
}

public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
this.maxTimestamp = Math.max(this.maxTimestamp, eventTimestamp);
}

public void onPeriodicEmit(WatermarkOutput output) {
Watermark watermark = new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L);
output.emitWatermark(watermark);
LOGGER.info("current watermark ==> {}", watermark.getTimestamp());
}
}

我们自己实现了onEventonPeriodicEmit方法。其实,有看过forBoundedOutOfOrderness源码的小伙伴应该会有些熟悉,因为我们自己实现的MyWatermarkGenerator基本上和它的实现差不多。forBoundedOutOfOrderness的源码是这样的:

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

/** The maximum timestamp encountered so far. */
private long maxTimestamp;

/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;

/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}

// ------------------------------------------------------------------------

@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}

onEvent方法是每条数据来的时候都会执行,在代码中可以看到,这里这是设置了maxTimestamp,并没有触发watermark更新。而要触发watermark更新,需要调用:output.emitWatermark方法。

onPeriodicEmit则是flink周期性触发执行的,默认200ms一次。我们看到,在它的实现里面,就触发了watermark更新。如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms。

env.getConfig().setAutoWatermarkInterval(400L);

我们知道BoundedOutOfOrdernessWatermarksWatermarkStrategy乱序流中内置水位线的设置。而另外一个forMonotonousTimestamps有序流中内置水位线的设置。其实,从源码来看,forMonotonousTimestamps内部的实现最后还是走了BoundedOutOfOrdernessWatermarks

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

/** Creates a new watermark generator with for ascending timestamps. */
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}

所以,其实理解了BoundedOutOfOrdernessWatermarks乱序流中内置水位线的设置就算是理解了水位线啦。

迟到的数据

从什么的watermark的设置中,我们其实可以看到,watermark只是做了延迟窗口计算的触发,但是当窗口开始计算后,还有延迟的数据,就没法再进入窗口计算了,还是可能存在数据丢失的情况。而如果我们的数据非常重要,不能延迟该怎么办呢?

flink还提供了两个手段处理这些延迟数据:

  • Allow Lateness 窗口已经开始计算时,允许延迟一段时间后再关闭窗口。
  • Side Output对真正延迟的数据,统一放入到另一条输出流中。有种兜底处理的意思。 代码中这么使用就好了,这块就不多描述了。兜底手段,记得有这个就行。
OutputTag<Product> expiredTag = new OutputTag<Product>("expired") {};
socketStream
.map((MapFunction<String, Product>) value -> {
String[] parts = value.split(",");
String name = parts[0];
long timestamp = Long.parseLong(parts[1]);
return new Product(name, timestamp);
})
// 指定 Watermark 生成策略(最大允许 10 天的乱序)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Product>forBoundedOutOfOrderness(Duration.ofDays(10))
.withTimestampAssigner((event, timestamp) -> event.productionTimestamp)
)
.keyBy(product -> product.productName) // 按商品名分组
.window(TumblingEventTimeWindows.of(Time.days(1))) // 窗口大小 1 天
.allowedLateness(Time.seconds(2)) // 推迟2s关窗
.sideOutputLateData(expiredTag) // 过期商品进入侧输出流
.process(new ExpiryCheckProcessWindowFunction()); // 窗口计算逻辑

好啦,watermark相关的知识就这些啦。watermark完结,撒花~