Flink源码分析:WindowOperator底层实现

上一篇文章介绍了 Flink窗口机制的执行流程,其实WindowOperator才是真正负责window中元素存储和计算流程的核心类。这篇文章主要就是分析一下WindowOperator的执行逻辑。

apply方法

接着上一篇从apply方法入手,先来看一下apply的代码逻辑。

private SingleOutputStreamOperator apply(InternalWindowFunction, R, K, W> function, TypeInformation resultType, Function originalFunction) {
// 生成operator的名字
final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
// 获取key的选择器
KeySelector keySel = input.getKeySelector();

WindowOperator<K, T, Iterable<T>, R, W> operator;

if (evictor != null) {
  @SuppressWarnings({"unchecked", "rawtypes"})
  TypeSerializer<StreamRecord<T>> streamRecordSerializer =
      (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

  ListStateDescriptor<StreamRecord<T>> stateDesc =
      new ListStateDescriptor<>("window-contents", streamRecordSerializer);

  operator =
    new EvictingWindowOperator<>(windowAssigner,
      windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
      keySel,
      input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
      stateDesc,
      function,
      trigger,
      evictor,
      allowedLateness,
      lateDataOutputTag);

} else {
  // 定义ListState的状态描述
  ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
    input.getType().createSerializer(getExecutionEnvironment().getConfig()));

  // 构造windowOperator
  operator =
    new WindowOperator<>(windowAssigner,
      windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
      keySel,
      input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
      stateDesc,
      function,
      trigger,
      allowedLateness,
      lateDataOutputTag);
}

return input.transform(opName, resultType, operator);

}

首先是生成operator的name,获取key的选择器,然后主要就是判断evictor是否为空,走不同的构造WindowOperator的逻辑,如果evictor不为空就构造EvictingWindowOperator对象,否则就构造WindowOperator对象,其实EvictingWindowOperator是WindowOperator的一个子类,只是多了一个删除数据的逻辑。我们下面以WindowOperator对象为主来进行分析。

先来看一下WindowOperator对象的继承关系图如下:

简单说一下,WindowOperator继承了 AbstractUdfStreamOperator (这个前面也说过了,所以的operator都会继承自它),然后WindowOperator🈶实现了OneInputStreamOperator接口(这个前面也说过了),AbstractUdfStreamOperator又继承了AbstractStreamOperator这个对象,OneInputStreamOperator接口🈶继承了StreamOperator这个接口,AbstractStreamOperator对象也实现了StreamOperator接口。下面会在具体的分析。

WindowOperator构造方法

/**

  • Creates a new {@code WindowOperator} based on the given policies and user functions.
    */
    public WindowOperator(
    WindowAssigner windowAssigner,
    TypeSerializer windowSerializer,
    KeySelector keySelector,
    TypeSerializer keySerializer,
    StateDescriptor, ?> windowStateDescriptor,
    InternalWindowFunction windowFunction,
    Trigger trigger,
    long allowedLateness,
    OutputTag lateDataOutputTag) { super(windowFunction); checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner),
    “The ” + windowAssigner.getClass().getSimpleName() + ” cannot be used with a WindowOperator. ” +
    “This assigner is only used with the AccumulatingProcessingTimeWindowOperator and ” +
    “the AggregatingProcessingTimeWindowOperator”); checkArgument(allowedLateness >= 0); checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(),
    “window state serializer is not properly initialized”); this.windowAssigner = checkNotNull(windowAssigner);
    this.windowSerializer = checkNotNull(windowSerializer);
    this.keySelector = checkNotNull(keySelector);
    this.keySerializer = checkNotNull(keySerializer);
    this.windowStateDescriptor = windowStateDescriptor;
    this.trigger = checkNotNull(trigger);
    this.allowedLateness = allowedLateness;
    this.lateDataOutputTag = lateDataOutputTag; setChainingStrategy(ChainingStrategy.ALWAYS);
    }

首先会根据给定的策略和自定义的方法构造WindowOperator对象,构造方法里面主要就是检查一系列的参数是否为空,然后初始化这些变量。

WindowOperator包含如下几个重要方法:

open:operator初始化的逻辑

processElement:新元素进入window的时候调用

onEventTime:event time计算触发时候的逻辑

onProcessingTime:processing time计算触发时候的逻辑

open方法:

先来看一下open方法里面都做了哪些初始化操作

@Override
public void open() throws Exception {
super.open();

this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
timestampedCollector = new TimestampedCollector<>(output);

internalTimerService =
    getInternalTimerService("window-timers", windowSerializer, this);

triggerContext = new Context(null, null);
processContext = new WindowContext(null);

windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
  @Override
  public long getCurrentProcessingTime() {
    return internalTimerService.currentProcessingTime();
  }
};

// create (or restore) the state that hold the actual window contents
// NOTE - the state may be null in the case of the overriding evicting window operator
// 在这里它是不为空的就是刚才WindowedStream里面创建的
if (windowStateDescriptor != null) {
  windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}

// create the typed and helper states for merging windows
// 这个是session window才会有的合并窗口用的
if (windowAssigner instanceof MergingWindowAssigner) {

  // store a typed reference for the state of merging windows - sanity check
  if (windowState instanceof InternalMergingState) {
    windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState;
  }
  // TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
  // TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
  // TODO activate the sanity check once resolved

// else if (windowState != null) {
// throw new IllegalStateException(
// “The window uses a merging assigner, but the window state is not mergeable.”);
// }

  @SuppressWarnings("unchecked")
  final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class;

  final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>(
      typedTuple,
      new TypeSerializer[] {windowSerializer, windowSerializer});

  final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor =
      new ListStateDescriptor<>("merging-window-set", tupleSerializer);

  // get the state that stores the merging sets
  mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>)
      getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
  mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
}

}

open方法里面主要就是初始化窗口的状态,如果是session window的话,会多初始化一个关于合并窗口的状态。

processElement方法
当数据达到window的时候,会调用windowoperator的processElement方法

@Override
public void processElement(StreamRecord element) throws Exception {
// windowAssigner先把数据分配到不同的窗口中
final Collection elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);

//if element is handled by none of assigned elementWindows
// 如果元素不是由指定的元素窗口处理的 (标记数据是否还需要处理 如果元素被处理过 返回fasle)
boolean isSkippedElement = true;
// 获取当前的key
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
// 判断是否是session window
if (windowAssigner instanceof MergingWindowAssigner) {
  MergingWindowSet<W> mergingWindows = getMergingWindowSet();

  for (W window: elementWindows) {

    // adding the new window might result in a merge, in that case the actualWindow
    // is the merged window and we work with that. If we don't merge then
    // actualWindow == window
    W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
      @Override
      public void merge(W mergeResult,
          Collection<W> mergedWindows, W stateWindowResult,
          Collection<W> mergedStateWindows) throws Exception {

        if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
          throw new UnsupportedOperationException("The end timestamp of an " +
              "event-time window cannot become earlier than the current watermark " +
              "by merging. Current watermark: " + internalTimerService.currentWatermark() +
              " window: " + mergeResult);
        } else if (!windowAssigner.isEventTime()) {
          long currentProcessingTime = internalTimerService.currentProcessingTime();
          if (mergeResult.maxTimestamp() <= currentProcessingTime) {
            throw new UnsupportedOperationException("The end timestamp of a " +
              "processing-time window cannot become earlier than the current processing time " +
              "by merging. Current processing time: " + currentProcessingTime +
              " window: " + mergeResult);
          }
        }

        triggerContext.key = key;
        triggerContext.window = mergeResult;

        triggerContext.onMerge(mergedWindows);

        for (W m: mergedWindows) {
          triggerContext.window = m;
          triggerContext.clear();
          deleteCleanupTimer(m);
        }

        // merge the merged state windows into the newly resulting state window
        windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
      }
    });

    // drop if the window is already late
    if (isWindowLate(actualWindow)) {
      mergingWindows.retireWindow(actualWindow);
      continue;
    }
    isSkippedElement = false;

    W stateWindow = mergingWindows.getStateWindow(actualWindow);
    if (stateWindow == null) {
      throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
    }

    windowState.setCurrentNamespace(stateWindow);
    windowState.add(element.getValue());

    triggerContext.key = key;
    triggerContext.window = actualWindow;

    TriggerResult triggerResult = triggerContext.onElement(element);
    // 窗口触发的时候
    if (triggerResult.isFire()) {
      // 从状态里面把数据拿出来
      ACC contents = windowState.get();
      if (contents == null) {
        continue;
      }
      emitWindowContents(actualWindow, contents);
    }

    if (triggerResult.isPurge()) {
      windowState.clear();
    }
    registerCleanupTimer(actualWindow);
  }

  // need to make sure to update the merging state in state
  mergingWindows.persist();
} else {
  // 循环处理每一个窗口
  for (W window: elementWindows) {

    // drop if the window is already late
    // 如果窗口已经晚了就删除 如果水印在结束时间戳加上允许的延迟之后
    // 如果watermark超过了window_end_time + allowlate_time 就不需要处理了
    if (isWindowLate(window)) {
      continue;
    }
    // 标记为fasle
    isSkippedElement = false;
    // 设置窗口状态的namespace
    windowState.setCurrentNamespace(window);
    // 把数据先保存在windowState里面
    windowState.add(element.getValue());

    triggerContext.key = key;
    triggerContext.window = window;

    TriggerResult triggerResult = triggerContext.onElement(element);
    // 判断窗口是否触发
    if (triggerResult.isFire()) {
      ACC contents = windowState.get();
      if (contents == null) {
        continue;
      }
      // 发送数据到我们定义的function里面 触发窗口的计算逻辑
      emitWindowContents(window, contents);
    }
    // 如果是purge就清除窗口状态的数据
    if (triggerResult.isPurge()) {
      windowState.clear();
    }
    // 注册一个timer去删除窗口里面的数据
    registerCleanupTimer(window);
  }
}

// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
// 设置了晚到时间 并且在晚到时间内数据达到 如果定义了测流输出就把数据用测流输出 否则就删除晚到的数据
if (isSkippedElement && isElementLate(element)) {
  if (lateDataOutputTag != null){
    sideOutput(element);
  } else {
    // 迟到被丢弃的数据 + 1
    this.numLateRecordsDropped.inc();
  }
}

}

先是windowAssigner 把数据分配到不同的窗口中,然后获取当前的key,这个key就是keyby里面的那个key。然后又判断是否是session window 分别走两个不同的处理逻辑,因为session window和其他的window 的逻辑是不一样的,这里我们主要是分析不是session window的情况,也就是上面else里面的逻辑,循环处理每一个window,如果是迟到的窗口会直接忽略,设置当前窗口的namespace,把数据先保存到windowstate里面,判断窗口是否触发,如果触发就发送数据到我们定义的function里面 触发窗口的计算逻辑 ,如果触发了purge操作,则清空window中的内容 最后注册一个timer去删除窗口里面的数据 循环处理完后 判断数据是否晚到 并且在晚到时间内数据达到 如果定义了测流输出就把数据用测流输出 否则就删除晚到的数据 大体的逻辑就是这样,session window的逻辑和这个差不多,但是会有合并状态的过程,这里就不在分析了,有兴趣的可以自己看一下。

onEventTime方法
基于eventtime触发计算的时候会调用这个方法。
@Override
public void onEventTime(InternalTimer timer) throws Exception {
// 获取key和window
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();

MergingWindowSet<W> mergingWindows;
// 如果是session window的话
if (windowAssigner instanceof MergingWindowAssigner) {
  mergingWindows = getMergingWindowSet();
  // 获取给定的状态窗口 状态窗口是我们保留窗口的实际状态
  W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
  if (stateWindow == null) {
    // Timer firing for non-existent window, this can only happen if a
    // trigger did not clean up timers. We have already cleared the merging
    // window and therefore the Trigger state, however, so nothing to do.
    return;
  } else {
    // 设置当前窗口的命名空间
    windowState.setCurrentNamespace(stateWindow);
  }
} else {
  // 设置当前窗口的命名空间
  windowState.setCurrentNamespace(triggerContext.window);
  mergingWindows = null;
}

TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
// 判断是否需要触发计算
if (triggerResult.isFire()) {
  // 获取窗口的数据 开始计算
  ACC contents = windowState.get();
  if (contents != null) {
    emitWindowContents(triggerContext.window, contents);
  }
}
// 如果是purge trigger 则删除窗口状态的数据
if (triggerResult.isPurge()) {
  windowState.clear();
}
// 如果是event time类型,并且定时器触发时间是window的cleanup时间的时候,意味着该窗口的数据已经处理完毕,需要清除该窗口的所有状态
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
  clearAllState(triggerContext.window, windowState, mergingWindows);
}
// 持久化合并窗口的状态
if (mergingWindows != null) {
  // need to make sure to update the merging state in state
  mergingWindows.persist();
}

}

基于processingtime的计算这里就不在分析了,跟上面的eventtime逻辑完全一样,只是TriggerResult 调用的时候时间不一样。

WindowOperator的源码还是比较多的,里面还有很多细节的地方,上面只是分析了主要的逻辑实现,细节方面还需要我们仔细去分析。

声明:来自JasonLee实时计算,仅代表创作者观点。链接:https://eyangzhen.com/8324.html

JasonLee实时计算的头像JasonLee实时计算

相关推荐

添加微信
添加微信
Ai学习群
返回顶部