5.2. ProcessFunction#

Note

本教程已出版为《Flink 原理与实践》,感兴趣的读者请在各大电商平台购买!

在继续介绍 Flink 时间和窗口相关操作之前,我们需要先了解一下 ProcessFunction 系列函数。它们是 Flink 体系中最底层的 API,提供了对数据流更细粒度的操作权限。之前提到的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的 Processing Time 或者是 Watermark 时间戳,调用起来简单但功能相对受限。如果想获取数据流中 Watermark 的时间戳,或者使用定时器,需要使用 ProcessFunction 系列函数。Flink SQL 是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。

目前,这个系列函数主要包括 KeyedProcessFunctionProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessJoinFunctionProcessWindowFunction 等多种函数,这些函数各有侧重,但核心功能比较相似,主要包括两点:

  • 状态:我们可以在这些函数中访问和更新 Keyed State 。

  • 定时器(Timer):像定闹钟一样设置定时器,我们可以在时间维度上设计更复杂的业务逻辑。

状态的介绍可以参考第六章的内容,本节将重点介绍 ProcessFunction 系列函数时间功能上的相关特性。

Timer 的使用方法#

说到时间相关的操作,就不能避开定时器(Timer)。我们可以把 Timer 理解成一个闹钟,使用前先在 Timer 中注册一个未来的时间,当这个时间到达,闹钟会“响起”,程序会执行一个回调函数,回调函数中执行一定的业务逻辑。这里以 KeyedProcessFunction 为例,来介绍 Timer 的注册和使用。

ProcessFunction 有两个重要的方法:processElement()onTimer(),其中 processElement 函数在源码中的 Java 签名如下:

// 处理数据流中的一条元素
public abstract void processElement(I value, Context ctx, Collector<O> out)

processElement() 方法处理数据流中的一条类型为 I 的元素,并通过 Collector<O> 输出出来。Context 是它区别于 FlatMapFunction 等普通函数的特色,开发者可以通过 Context 来获取时间戳,访问 TimerService,设置 Timer。

ProcessFunction 类中另外一个接口是 onTimer() 方法:

// 时间到达后的回调函数
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

这是一个回调函数,当到了“闹钟”时间,Flink 会调用 onTimer(),并执行一些业务逻辑。这里也有一个参数 OnTimerContext,它实际上是继承了上面的那个 Context,与 Context 几乎相同。

使用 Timer 的方法主要逻辑为:

  1. processElement() 方法中通过 Context 注册一个未来的时间戳 t。这个时间戳的语义可以是 Processing Time,也可以是 Event Time,根据业务需求来选择。

  2. onTimer() 方法中实现一些逻辑,到达 t 时刻,onTimer() 方法被自动调用。

Context 中,我们可以获取一个 TimerService,这是一个访问时间戳和 Timer 的接口。我们可以通过 Context.timerService.registerProcessingTimeTimer()Context.timerService.registerEventTimeTimer() 这两个方法来注册 Timer,只需要传入一个时间戳即可。我们可以通过 Context.timerService.deleteProcessingTimeTimerContext.timerService.deleteEventTimeTimer 来删除之前注册的 Timer。此外,还可以从中获取当前的时间戳:Context.timerService.currentProcessingTimeContext.timerService.currentWatermark。这些方法中,名字带有“ProcessingTime”的方法表示该方法基于 Processing Time 语义;名字带有“EventTime”或“Watermark”的方法表示该方法基于 Event Time 语义。

我们只能在 KeyedStream 上注册 Timer。每个 Key 下可以使用不同的时间戳注册不同的 Timer,但是每个 Key 的每个时间戳只能注册一个 Timer。如果想在一个 DataStream 上应用 Timer,可以将所有数据映射到一个伪造的 Key 上,但这样所有数据会流入一个算子子任务。

我们再次以 股票交易 场景来解释如何使用 Timer。一次股票交易包括:股票代号、时间戳、股票价格、成交量。我们现在想看一支股票未来是否一直连续上涨,如果一直上涨,则发送出一个提示。如果新数据比上次数据价格更高且目前没有注册 Timer,则注册一个未来的 Timer,如果在这期间价格降低则把刚才注册的 Timer 删除,如果在这期间价格没有降低,Timer 时间到达后触发 onTimer(),发送一个提示。下面的代码中,intervalMills 表示一个毫秒精度的时间段,如果这个时间段内一支股票价格一直上涨,则会输出文字提示。

// 三个泛型分别为 Key、输入、输出
public static class IncreaseAlertFunction
    extends KeyedProcessFunction<String, StockPrice, String> {

    private long intervalMills;
    // 状态句柄
    private ValueState<Double> lastPrice;
    private ValueState<Long> currentTimer;

    public IncreaseAlertFunction(long intervalMills) throws Exception {
      	this.intervalMills = intervalMills;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 从 RuntimeContext 中获取状态
        lastPrice = getRuntimeContext().getState(
          new ValueStateDescriptor<Double>("lastPrice", Types.DOUBLE()));
        currentTimer = getRuntimeContext().getState(
          new ValueStateDescriptor<Long>("timer", Types.LONG()));
    }

    @Override
    public void processElement(StockPrice stock, Context context, Collector<String> out) throws Exception {

        // 状态第一次使用时,未做初始化,返回 null
        if (null == lastPrice.value()) {
          	// 第一次使用 lastPrice,不做任何处理
        } else {
            double prevPrice = lastPrice.value();
            long curTimerTimestamp;
            if (null == currentTimer.value()) {
              	curTimerTimestamp = 0;
            } else {
              	curTimerTimestamp = currentTimer.value();
            }
            if (stock.price < prevPrice) {
                // 如果新流入的股票价格降低,删除 Timer,否则该 Timer 一直保留
                context.timerService().deleteEventTimeTimer(curTimerTimestamp);
                currentTimer.clear();
            } else if (stock.price >= prevPrice && curTimerTimestamp == 0) {
                // 如果新流入的股票价格升高
                // curTimerTimestamp 为 0 表示 currentTimer 状态中是空的,还没有对应的 Timer
                // 新 Timer = 当前时间 + interval
                long timerTs = context.timestamp() + intervalMills;

                context.timerService().registerEventTimeTimer(timerTs);
                // 更新 currentTimer 状态,后续数据会读取 currentTimer,做相关判断
                currentTimer.update(timerTs);
            }
        }
        // 更新 lastPrice
        lastPrice.update(stock.price);
    }

    @Override
    public void onTimer(long ts, OnTimerContext ctx, Collector<String> out) throws Exception {
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        out.collect(formatter.format(ts) + ", symbol: " + ctx.getCurrentKey() +
                    " monotonically increased for " + intervalMills + " millisecond.");
        // 清空 currentTimer 状态
        currentTimer.clear();
    }
}

在主逻辑里,通过下面的 process() 算子调用 KeyedProcessFunction

DataStream<StockPrice> inputStream = ...

DataStream<String> warnings = inputStream
                .keyBy(stock -> stock.symbol)
                // 调用 process 函数
                .process(new IncreaseAlertFunction(3000));

Checkpoint 时,Timer 也会随其他状态数据一起保存起来。如果使用 Processing Time 语义设置一些 Timer,重启时这个时间戳已经过期,那些回调函数会立刻被调用执行。

侧输出#

ProcessFunction 的另一大特色功能是可以将一部分数据发送到另外一个流中,而且输出到的两个流数据类型可以不一样。这个功能被称为为侧输出(Side Output)。我们通过 OutputTag<T> 来标记另外一个数据流:

OutputTag<StockPrice> highVolumeOutput = 
  new OutputTag<StockPrice>("high-volume-trade"){};

ProcessFunction 中,我们可以使用 Context.output 方法将某类数据过滤出来。OutputTag 是这个方法的第一个参数,用来表示输出到哪个数据流。

public static class SideOutputFunction 
  extends KeyedProcessFunction<String, StockPrice, String> {
    @Override
    public void processElement(StockPrice stock, Context context, Collector<String> out) throws Exception {
        if (stock.volume > 100) {
          	context.output(highVolumeOutput, stock);
        } else {
          	out.collect("normal tick data");
        }
    }
}

在主逻辑中,通过下面的方法先调用 ProcessFunction,再获取侧输出:

DataStream<StockPrice> inputStream = ...

SingleOutputStreamOperator<String> mainStream = inputStream
    .keyBy(stock -> stock.symbol)
    // 调用 process 函数,包含侧输出逻辑
    .process(new SideOutputFunction());

DataStream<StockPrice> sideOutputStream = mainStream.getSideOutput(highVolumeOutput);

其中,SingleOutputStreamOperatorDataStream 的一种,它只有一种输出。下面是它在 Flink 源码中的定义:

public class SingleOutputStreamOperator<T> extends DataStream<T> {
  	...
}

这个例子中,KeyedProcessFunction 的输出类型是 String,而 SideOutput 的输出类型是 StockPrice,两者可以不同。

在两个流上使用 ProcessFunction {#process-on-two-streams}#

我们在 DataStream API 部分曾提到使用 connect() 将两个数据流的合并,如果想从更细的粒度在两个数据流进行一些操作,可以使用 CoProcessFunctionKeyedCoProcessFunction。这两个函数都有 processElement1()processElement2() 方法,分别对第一个数据流和第二个数据流的每个元素进行处理。第一个数据流类型、第二个数据流类型和经过函数处理后的输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现两个数据流上的 Join:

  • 创建一到多个状态,两个数据流都能访问到这些状态,这里以状态 a 为例。

  • processElement1() 方法处理第一个数据流,更新状态 a。

  • processElement2() 方法处理第二个数据流,根据状态 a 中的数据,生成相应的输出。

我们这次将股票价格结合媒体评价两个数据流一起讨论,假设对于某支股票有一个媒体评价数据流,媒体评价数据流包含了对该支股票的正负评价。两支数据流一起流入 KeyedCoProcessFunctionprocessElement2() 方法处理流入的媒体数据,将媒体评价更新到状态 mediaState 上,processElement1() 方法处理流入的股票交易数据,获取 mediaState 状态,生成到新的数据流。两个方法分别处理两个数据流,共享一个状态,通过状态来通信。

在主逻辑中,我们将两个数据流 connect(),然后按照股票代号进行 keyBy(),进而使用 process()

// 读入股票数据流
DataStream<StockPrice> stockStream = ...

// 读入媒体评价数据流
DataStream<Media> mediaStream = ...

DataStream<StockPrice> joinStream = stockStream.connect(mediaStream)
    .keyBy("symbol", "symbol")
    // 调用 process 函数
    .process(new JoinStockMediaProcessFunction());

KeyedCoProcessFunction 的具体实现:

/**
  * 四个泛型:Key,第一个流类型,第二个流类型,输出。
  */
public static class JoinStockMediaProcessFunction extends KeyedCoProcessFunction<String, StockPrice, Media, StockPrice> {
    // mediaState
    private ValueState<String> mediaState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 从 RuntimeContext 中获取状态
        mediaState = getRuntimeContext().getState(
          	new ValueStateDescriptor<String>("mediaStatusState", Types.STRING));
    }

    @Override
    public void processElement1(StockPrice stock, Context context, Collector<StockPrice> collector) throws Exception {
        String mediaStatus = mediaState.value();
        if (null != mediaStatus) {
            stock.mediaStatus = mediaStatus;
            collector.collect(stock);
        }
    }

    @Override
    public void processElement2(Media media, Context context, Collector<StockPrice> collector) throws Exception {
        // 第二个流更新 mediaState
        mediaState.update(media.status);
    }
}

这个例子比较简单,没有使用 Timer,实际的业务场景中状态一般用到 Timer 将过期的状态清除。两个数据流的中间数据放在状态中,为避免状态的无限增长,需要使用 Timer 清除过期数据。

很多互联网 APP 的机器学习样本拼接都可能依赖这个函数来实现:服务端的机器学习特征是实时生成的,用户在 APP 上的行为是交互后产生的,两者属于两个不同的数据流,用户行为是机器学习所需要标注的正负样本,因此可以按照这个逻辑来将两个数据流拼接起来,通过拼接更快得到下一轮机器学习的样本数据。

使用 Event Time 时,两个数据流必须都设置好 Watermark,只设置一个流的 Event Time 和 Watermark,无法在 CoProcessFunctionKeyedCoProcessFunction 中使用 Timer 功能,因为 process 算子无法确定自己应该以怎样的时间来处理数据。