对于流计算程序来说,肯定会用到状态(state),假如状态不自动清除,并且随着作业运行的时间越来越久,就会累积越多越多的状态,就会影响任务的性能,为了有效的控制状态的大小,Flink从1.6.0开始引入了状态的生存时间(TTL)功能,这样就可以实现自动清理状态,控制状态的大小.本文主要介绍一下Flink从1.6.0开始到1.9.1的状态清理不断的演进之路.
Flink1.6.0状态清除
Apache Flink 的 1.6.0 版本引入了状态生存时间特性。它使流处理应用程序的开发人员能够配置算子的状态,使其在定义的生存时间超时后被清除。
在 Flink 的DataStream API 中,应用程序状态是由状态描述符(state descriptor)来定义的。状态生存时间是通过将StateTtlConfiguration对象传递给状态描述符来配置的。下面的 Java 示例演示了如何创建状态生存时间的配置,并将其提供给状态描述符,该状态描述符将用户的上次登录时间保存为Long值.
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Long> lastUserLogin =
new ValueStateDescriptor<>("lastUserLogin", Long.class);
lastUserLogin.enableTimeToLive(ttlConfig);
Flink提供了多个选项来配置生存时间:
1,什么时候重置生存时间?
默认情况下,当状态被修改时,生存时间就会被更新。我们也可以在读操作访问状态时更新相关项的生存时间,但这样要花费额外的写操作来更新时间戳。
2,已经过期的数据是否可以访问?
状态生存时间机制使用的是惰性策略来清除过期状态。这可能导致应用程序会尝试读取过期但尚未删除的状态。用户可以配置对这样的读取请求是否返回过期状态。无论哪种情况,过期状态都会在之后立即被删除。虽然返回已经过期的状态有利于数据可用性,但不返回过期状态更符合相关数据保护法规的要求。
3,哪种时间语义被用于定义生存时间?
在目前的版本中,用户只能根据处理时间(Processing Time)定义状态生存时间。未来的 Flink 版本中计划支持事件时间(Event Time)。
4,快照中的过期数据如何清除?
Flink 1.6.0 已经支持在创建检查点(Checkpoint)或保存点(Savepoint)的完整快照时不包含过期状态。需要注意的是,创建增量快照时并不支持剔除过期状态。完整快照时的过期状态剔除必须如下例所示进行显示启用:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupFullSnapshot()
.build();
上述配置并不会影响本地状态存储的大小,但是整个作业的完整快照的大小将会减小。只有当用户从快照重新加载其状态到本地时,才会清除用户的本地状态。
Flink1.8.0状态清除
由于上述这些限制,在 Flink 1.6.0 中程序仍需要过期后主动删除状态。为了改善用户体验, Flink 1.8.0 引入了两种自主清理策略,分别针对两种状态后端类型:
堆内存状态后端的增量清除
此方法只适用于堆内存状态后端(FsStateBackend和MemoryStateBackend)其基本思路是在存储后端的所有状态条目上维护一个全局的惰性迭代器。某些事件(例如状态访问)会触发增量清理,而每次触发增量清理时,迭代器都会向前遍历删除已遍历的过期数据。以下代码示例展示了如何启用增量清理:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
// check 10 keys for every state access
.cleanupIncrementally(10, false)
.build();
如果启用该功能,则每次状态访问都会触发清除。
而每次清理时,都会检查一定数量的状态条目是否过期。
其中有两个调整参数。
第一个定义了每次清理时要检查的状态条目数。
第二个参数是一个标志位,用于表示是否在每条记录处理(Record processed)之后(而不仅仅是访问状态,State accessed),都还额外触发清除逻辑。
关于这种方法有两个重要的注意事项:首先是增量清理所花费的时间会增加记录处理的延迟。其次,如果没有状态被访问(State accessed)或者没有记录被处理(Record processed),过期的状态也将不会被删除。
RocksDB 状态后端利用后台压缩来清理过期状态
如果使用 RocksDB 状态后端,则可以启用另一种清理策略,该策略基于 Flink 定制的 RocksDB 压缩过滤器(Compaction filter)。RocksDB 会定期运行异步的压缩流程以合并数据并减少相关存储的数据量,该定制的压缩过滤器使用生存时间检查状态条目的过期时间戳,并丢弃所有过期值。
使用此功能的第一步,需要设置以下配置选项:state.backend.rocksdb.ttl.compaction.filter.enabled。一旦配置使用 RocksDB 状态后端后,如以下代码示例将会启用压缩清理策略:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInRocksdbCompactFilter()
.build();
需要注意的是启用 Flink 的生存时间压缩过滤机制后,会放缓 RocksDB 的压缩速度。
Flink1.9.0状态的新功能(State Processor API)
能够在外部直接访问Flink任务的状态是一个社区呼声比较高的需求,在Flink的最新版本1.9.0中就引入了State Processor API,该 API 让用户可以通过 Flink DataSet 作业来灵活读取、写入和修改 Flink 的 Savepoint 和 Checkpoint。
Apache Flink的状态处理器API提供了强大的功能,可使用Flink的批处理DataSet API读取,写入和修改保存点和检查点。这对于诸如分析有趣模式的状态,通过检查差异进行故障排除或审核作业以及为新应用程序引导状态之类的任务很有用。
可查询的状态流
有的时候我们希望从第三方访问流的状态,目前Flink支持的状态查询的功能Queryable State Stream,但是在 Flink 中目前该功能只支持根据 Key 查找,并且不能保证返回值的一致性。所以目前的适用场景还是比较有限的.
在KeyedStream上调用.asQueryableState(stateName,stateDescriptor)会返回一个QueryableStateStream,它提供其值作为可查询状态。根据状态的类型,asQueryableState()方法有以下变体:
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
注意:没有可查询的ListState接收器,因为它会导致列表不断增长,可能无法清除,因此最终会消耗太多内存。
读取已有的 Savepoint
读取状态首先指定到有效保存点或检查点的路径以及应用于还原数据的StateBackend。恢复状态的兼容性保证与恢复DataStream应用程序时的保证相同。
val bEnv = ExecutionEnvironment.getExecutionEnvironment()
val savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend())
读取操作员状态时,只需指定操作员uid,状态名称和类型信息。
val listState = savepoint.readListState(
"my-uid",
"list-state",
Types.INT)
val unionState = savepoint.readUnionState(
"my-uid",
"union-state",
Types.INT)
val broadcastState = savepoint.readBroadcastState(
"my-uid",
"broadcast-state",
Types.INT,
Types.INT)
如果在StateDescriptor中为该状态使用了一个自定义TypeSerializer,则也可以指定它。
val listState = savepoint.readListState(
"uid",
"list-state",
Types.INT,
new MyCustomIntSerializer())
当读取键状态时,用户指定KeyedStateReaderFunction来读取任意列和复杂的状态类型,例如ListState,MapState和AggregatingState。这意味着如果操作员包含有状态的过程函数,例如:
public class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, Void] {
var state: ValueState[Integer];
override def open(parameters: Configuration) {
val stateDescriptor = new ValueStateDescriptor("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);
}
override def processElement(value: Integer, ctx: Context, out: Collector[Void]) {
state.update(value + 1);
}
}
然后,可以通过定义输出类型和相应的KeyedStateReaderFunction进行读取。
case class KeyedState(key: Int, value: Int)
class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] {
var state: ValueState[Integer];
override def open(parameters: Configuration) {
val stateDescriptor = new ValueStateDescriptor("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);
}
override def processKey(
key: Int,
ctx: Context,
out: Collector[Keyedstate]) throws Exception {
val data = KeyedState(key, state.value())
out.collect(data);
}
}
val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
需要注意的是:使用KeyedStateReaderFunction时,必须在open内部急切地注册所有状态描述符。任何尝试调用RuntimeContext#getState,RuntimeContext#getListState或RuntimeContext#getMapState的尝试都将导致RuntimeException。
写入新的 Savepoint
状态编写器基于Savepoint的抽象,其中一个Savepoint可能具有许多运算符,并且使用BootstrapTransformation创建任何特定运算符的状态。
BootstrapTransformation从包含要写入状态的值的DataSet开始。根据您是否正在编写键控或操作员状态,可以选择是否对键控转换进行键控。最后,根据转换应用引导函数;Flink提供了KeyedStateBootstrapFunction(用于编写键状态),StateBootstrapFunction(用于编写非键状态)和BroadcastStateBootstrapFunction(用于广播状态)。
case class Account(id: Int, amount: Double, timestamp: Long)
class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, Account] {
var state: ValueState[Double]
override def open(parameters: Configuration): Unit = {
val descriptor = new ValueStateDescriptor[Double]("total",Types.DOUBLE)
state = getRuntimeContext().getState(descriptor)
}
@throws[Exception]
override def processElement(value: Account, ctx: Context): Unit = {
state.update(value.amount)
}
}
val bEnv = ExecutionEnvironment.getExecutionEnvironment()
val accountDataSet = bEnv.fromCollection(accounts)
val transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
.keyBy(acc -> acc.id)
.transform(new AccountBootstrapper())
KeyedStateBootstrapFunction支持设置事件时间和处理时间计时器。计时器不会在引导功能内触发,只有在DataStream应用程序中还原后才激活。如果设置了处理时间计时器,但直到经过该时间后才恢复状态,计时器将在启动后立即触发。
一旦创建了一个或多个转换,就可以将它们合并为一个保存点。保存点是使用状态后端和最大并行度创建的,它们可以包含任意数量的运算符。
Savepoint
.create(backend, 128)
.withOperator("uid1", transformation1)
.withOperator("uid2", transformation2)
.write(savepointPath)
除了从头开始创建保存点之外,您还可以基于现有保存点,例如在为现有作业引导单个新操作员时。
Savepoint
.load(backend, oldPath)
.withOperator("uid", transformation)
.write(newPath)
需要注意的是:当基于现有状态创建新的保存点时,状态处理器api将指向现有运算符的指针进行浅表复制。这意味着两个保存点共享状态,并且一个保存点不能在不破坏另一个保存点的情况下被删除
声明:文中观点不代表本站立场。本文传送门:https://eyangzhen.com/247123.html