Apache Flink with Apache Kafka
2021-01-15
Apache FlinkApache KafkaThis post describes how to utilize Apache Kafka as Source as well as Sink of realtime streaming application that run on top of Apache Flink.
The previous post describes how to launch Apache Flink locally, and use Socket to put events into Flink cluster and process in it. Please refer to it to get started with Apache Flink.
Prepare to Use Apache Kafka with Apache Flink
First, edit build.sbt to include Kafka connector to project dependencies.
val flinkVersion = "1.11.2"
lazy val root = (project in file("."))
.settings(
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
)
)
Further information are available at Apache Flink 1.12 Documentation: Apache Kafka Connector.
For Kafka, we can use docker-compose to launch a Kafka cluster locally.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: ${ADVERTISED_HOST_IP}
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "flink-topic-in:1:1,flink-topic-out:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
The original yaml can be found at wurstmeister/kafka-docker
After starting Kafka cluster by ADVERTISED_HOST_IP="192.168.3.8" docker-compose up -d
, you can see the created topics corresponding to KAFKA_CREATE_TOPICS
written in the docker-compose.yml(see doc).
$ ./kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --list
flink-topic-in
flink-topic-out
$ ./kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --describe
Topic: flink-topic-in PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: flink-topic-in Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: flink-topic-out PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: flink-topic-out Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
As their name indicate, flink-topic-in
is for Source as well as flink-topic-out
is for Sink.
Implement a Job that uses Kafka as Sink and Source
Now it's ready to build streaming applications that run on top of Apache Flink and Kafka running locally.
The sample application will look like
First of all, it needs to get streaming execution environment as usual.
val env = StreamExecutionEnvironment.getExecutionEnvironment
Then, we can configure Source and Sink as below.
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val source = new FlinkKafkaConsumer(
"flink-topic-in", // topic name
new JSONKeyValueDeserializationSchema(false),
properties, // `bootstrap.servers` and `group.id` required
)
val sink = new FlinkKafkaProducer(
"localhost:9092", // bootstrap.servers
"flink-topic-out", // topic name
new SimpleStringSchema()
)
Since it's just a Flinkjob, we can put whatever we want to process in between source and sink.
Let's write a deserializer for messages that come to Kafka. The consumer uses JSONKeyValueDeserializationSchema
, which transform byte array into ObjectNode
that is a kind of Map<String, Any>
, so the desrializer will transform ObjectNode
into whatever want to represent as Scala object instance.
For instance, each message has three attributes and represents a datum from an IoT device which observe my room's metrics, a case class and its deserializer look like below.
// json: '{"deviceId":1,"temperature":20.5,"humidity":60.0}'
case class SensorData(deviceId: Long, temperature: Double, humidity: Double)
object SensorData {
def deserialize(obj: ObjectNode): Option[SensorData] = {
Try {
val value = obj.get("value")
SensorData(
value.get("deviceId").asLong(),
value.get("temperature").asDouble(),
value.get("humidity").asDouble(),
)
} match {
case Success(x) => Some(x)
case Failure(exception) => {
println(s"failed to parse JSON. obj = $obj, exception = $exception")
None
}
}
}
}
After desrializing, aggregating events is a typical usecase in Flink.
// aggregated result container
case class SensorAggregatedResult(deviceId: Long, count: Int, temperatureList: Seq[Double], humidityList: Seq[Double]) { ??? }
// deserializedStream receives deserialized events, that is `SensorData`.
val aggregated: DataStream[SensorAggregatedResult] = deserializedStream
.keyBy { _.deviceId } // grouping events based on `deviceId`
.timeWindow(Time.seconds(20)) // 20 seconds time-window
.apply {
new WindowFunction[SensorData, SensorAggregatedResult, Long, TimeWindow] {
override def apply(
key: Long, // == deviceId
window: TimeWindow,
input: Iterable[SensorData],
out: Collector[SensorAggregatedResult]
): Unit = {
// aggregate events
val (count, temps, hums) = input.foldLeft((0, Seq.empty[Double], Seq.empty[Double])) {
case ((count, acc_tmp, acc_hum), data) =>
(count + 1, data.temperature +: acc_tmp, data.humidity +: acc_hum)
}
// send a result to down stream
out.collect(SensorAggregatedResult(key, count, temps, hums))
}
}
}
At last, the entire job code is below with eliminating codes that I described thus far.
package net.petitviolet
object KafkaJob {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val parameterTool = ParameterTool.fromArgs(args)
val properties = new Properties()
properties.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"))
properties.setProperty("group.id", parameterTool.getRequired("group.id"))
// Events come from Kafka
val source = ???
// put results to Kafka
val sink = ???
// deserialize ObjectNode to SensorData
val deserializedStream: DataStream[SensorData] = source.flatMap { SensorData.deserialize(_) }
// window-ing, and Iterable[SensorData] to SensorAggregatedResult
val aggregated: DataStream[SensorAggregatedResult] = ???
// serialize SensorAggregatedResult to String
val aggregatedStrings: DataStream[String] = processed.map { result => result.serialize }
val graph: DataStreamSink[String] = aggregatedStrings.addSink(sink)
// execute program
env.execute("sensor data calculation from kafka into kafka")
}
}
After submitting this Job and then sending events to flink-topic-in
topic that the job uses as sink, we can see events going into flink-topic-out
topic from Flink by kafka-console-consumer
command, for example.
$ ./kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic flink-topic-out --from-beginning
Of course, we can feed this sink stream from other realtime streaming applications.
Summary
This post explained how to use Kafka as a realtime streaming application's source as well as sink. The sample code is available on petitviolet/flink_practice repository on GitHub.