昨天在群里一个同学问了这样一个问题,Flink怎么根据数据里面的某个字段动态把数据写入多个kafka的topic.
其实这个问题非常简单,你肯定会想到说写多个sink就可以了,Flink确实也是支持多个sink的,但是他的需求是可能会写入上千个topic(我们这里不去讨论这个需求是否合理或者是否有这样的场景),我们肯定不可能会复制上千遍的代码.
那其实Flink提供了高阶的序列化模式,与FlinkKafkaConsumer类似,FlinkKafkaProducer提供了一个叫KeyedSerializationSchema的高级序列化模式的接口,这个模式允许分开地序列化key和value。同时允许重写目标topic,因此一个FlinkKafkaProducer可以发送数据到多个topic。
下面来看下KeyedSerializationSchema接口的源码
/** @deprecated */
@Deprecated
@PublicEvolving
public interface KeyedSerializationSchema extends Serializable {
byte[] serializeKey(T var1);
byte[] serializeValue(T var1);
String getTargetTopic(T var1);
}
可以看到这个接口里面有3个方法,可以分别对key,value进行序列化,还可以设置一个topic,但是你会发现这个接口已经被标记为Deprecated.我们先来用这个接口实现一下,虽然被标记为Deprecated,但是还是可以用的.
/**
- 自定义序列化 根据数据里面的topic写入到不同的kafka的topic
*/
class MyKeyedSerialization extends KeyedSerializationSchema[Person] {
private lazy val CHARSET = Charset.forName(“UTF-8”) // key的序列化
override def serializeKey(t: Person): Array[Byte] = t.age.toString.getBytes(CHARSET) // value的序列化
override def serializeValue(t: Person): Array[Byte] = t.toString.getBytes(CHARSET) // 从数据里面获取topic名称
override def getTargetTopic(t: Person): String = t.topic
}
既然Flink官方不推荐使用这个接口了,那肯定有相应的替换接口,我们稍微找一下Flink的源码,就会发现有一个KafkaSerializationSchema新的接口.
那我们再来看下KafkaSerializationSchema这个接口的源码
@PublicEvolving
public interface KafkaSerializationSchema extends Serializable {
ProducerRecord serialize(T var1, @Nullable Long var2);
}
可以看到这里就一个serialize方法,有两个参数,第二个参数可以为空,它返回的是一个ProducerRecord对象.这个对象就是我们发送的数据封装成了一个对象,我们稍微看下这个对象的源码.
/**
* Create a record to be sent to Kafka
*
* @param topic The topic the record will be appended to
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
}
/**
* Create a record with no key
*
* @param topic The topic this record should be sent to
* @param value The record contents
*/
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
}
这里面有很多个构造方法,有带key的,也有不带key的,然后我们来实现一下KafkaSerializationSchema这个接口.
class MyKafkaSerialization extends KafkaSerializationSchema[Person] {
override def serialize(t: Person, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](t.topic, t.name.getBytes(), t.toString.getBytes(StandardCharsets.UTF_8))
}
}
这里我设置了key是name,topic是从数据里面取的,为了方便看,我在数据里面添加了一个topic的字段.
下面看一下完成的代码
package flink.streaming
import java.lang
import java.nio.charset.{Charset, StandardCharsets}
import java.text.SimpleDateFormat
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.functions.{Partitioner, RichMapFunction}
import org.apache.flink.api.common.serialization.{SimpleStringSchema}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.MathUtils
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
object FlinkKafkaDemoTest {
private val broker_myself = ""
private val group_id_myself = ""
private val topic_ = ""
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(1)
env.enableCheckpointing(10 * 1000)
// 设置任务的重启策略,间隔10s,重启3次
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1,Time.seconds(10)))
val producerProps = new Properties()
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker_myself)
producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "3")
// 如果下面配置的是exactly-once的语义 这里必须配置为all
producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
val producer = new FlinkKafkaProducer[Person]("",
new MyKafkaSerialization,
producerProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
producer.setLogFailuresOnly(false)
val properties = new Properties()
properties.setProperty("bootstrap.servers", broker_myself)
properties.setProperty("group.id", group_id_myself)
val ds = env.addSource(new FlinkKafkaConsumer[String](topic_, new SimpleStringSchema(), properties).setStartFromLatest())
.filter(!_.isEmpty)
.map(x => JSON.parseObject(x, classOf[Person]))
.addSink(producer)
env.execute(this.getClass.getSimpleName.replace("$",""))
}
}
/**
- 自定义序列化 根据数据里面的topic写入到不同的kafka的topic
*/
class MyKeyedSerialization extends KeyedSerializationSchema[Person] {
private lazy val CHARSET = Charset.forName(“UTF-8”) // key的序列化
override def serializeKey(t: Person): Array[Byte] = t.age.toString.getBytes(CHARSET) // value的序列化
override def serializeValue(t: Person): Array[Byte] = t.toString.getBytes(CHARSET) // 从数据里面获取topic名称
override def getTargetTopic(t: Person): String = t.topic
}
class MyKafkaSerialization extends KafkaSerializationSchema[Person] {
override def serialize(t: Person, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](t.topic, t.name.getBytes(), t.toString.getBytes(StandardCharsets.UTF_8))
}
}
运行一下,看数据是否写入了两个topic(数据里面topic只有两个),代码里面用的是KafkaSerializationSchema这个接口.两个都是测试过的,这里就不打印第一种方式了.
可以看到jason_flink-1,jason_flink-2两个topic的数据都能正常消费.说明写入成功了,这篇文章先介绍到这里.
推荐阅读:
Flink1.10.0 SQL DDL 把计算结果写入kafka
JasonLee,公众号:JasonLee的博客Flink1.10.0 SQL DDL 把计算结果写入kafka
Flink 1.10.0 SQL DDL中如何定义watermark和计算列
JasonLee,公众号:JasonLee的博客Flink 1.10.0 SQL DDL中如何定义watermark和计算列
FlinkSQL使用DDL语句创建kafka源表
JasonLee,公众号:JasonLee的博客FlinkSQL使用DDL语句创建kafka源表
Flink 状态清除的演进之路
JasonLee,公众号:JasonLee的博客Flink 状态清除的演进之路
声明:来自JasonLee实时计算,仅代表创作者观点。链接:https://eyangzhen.com/5710.html