Apache Flink with Apache Kafka


Apache FlinkApache Kafka

This post describes how to utilize Apache Kafka as Source as well as Sink of realtime streaming application that run on top of Apache Flink.

The previous post describes how to launch Apache Flink locally, and use Socket to put events into Flink cluster and process in it. Please refer to it to get started with Apache Flink.

First, edit build.sbt to include Kafka connector to project dependencies.

val flinkVersion = "1.11.2"

lazy val root = (project in file("."))
    libraryDependencies ++= Seq(
      "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
      "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
      "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
      "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,

Further information are available at Apache Flink 1.12 Documentation: Apache Kafka Connector.

For Kafka, we can use docker-compose to launch a Kafka cluster locally.

version: '2'
    image: wurstmeister/zookeeper
        - "2181:2181"
    image: wurstmeister/kafka
        - zookeeper
      - "9092:9092"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "flink-topic-in:1:1,flink-topic-out:1:1"
      - /var/run/docker.sock:/var/run/docker.sock

The original yaml can be found at wurstmeister/kafka-docker

After starting Kafka cluster by ADVERTISED_HOST_IP="" docker-compose up -d, you can see the created topics corresponding to KAFKA_CREATE_TOPICS written in the docker-compose.yml(see doc).

$ ./kafka_2.12-2.4.1/bin/ --zookeeper localhost:2181 --list

$ ./kafka_2.12-2.4.1/bin/ --zookeeper localhost:2181 --describe
Topic: flink-topic-in   PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: flink-topic-in   Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
Topic: flink-topic-out  PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: flink-topic-out  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

As their name indicate, flink-topic-in is for Source as well as flink-topic-out is for Sink.

Implement a Job that uses Kafka as Sink and Source

Now it’s ready to build streaming applications that run on top of Apache Flink and Kafka running locally.

The sample application will look like


First of all, it needs to get streaming execution environment as usual.

val env = StreamExecutionEnvironment.getExecutionEnvironment

Then, we can configure Source and Sink as below.

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("", "test")

val source = new FlinkKafkaConsumer(
    "flink-topic-in", // topic name
    new JSONKeyValueDeserializationSchema(false),
    properties, // `bootstrap.servers` and `` required

val sink = new FlinkKafkaProducer(
    "localhost:9092", // bootstrap.servers
    "flink-topic-out", // topic name
    new SimpleStringSchema()

Since it’s just a Flinkjob, we can put whatever we want to process in between source and sink. Let’s write a deserializer for messages that come to Kafka. The consumer uses JSONKeyValueDeserializationSchema, which transform byte array into ObjectNode that is a kind of Map<String, Any>, so the desrializer will transform ObjectNode into whatever want to represent as Scala object instance.

For instance, each message has three attributes and represents a datum from an IoT device which observe my room’s metrics, a case class and its deserializer look like below.

// json: '{"deviceId":1,"temperature":20.5,"humidity":60.0}'
case class SensorData(deviceId: Long, temperature: Double, humidity: Double)

object SensorData {
  def deserialize(obj: ObjectNode): Option[SensorData] = {
    Try {
      val value = obj.get("value")
    } match {
      case Success(x) => Some(x)
      case Failure(exception) => {
        println(s"failed to parse JSON. obj = $obj, exception = $exception")

After desrializing, aggregating events is a typical usecase in Flink.

// aggregated result container
case class SensorAggregatedResult(deviceId: Long, count: Int, temperatureList: Seq[Double], humidityList: Seq[Double]) { ??? }

// deserializedStream receives deserialized events, that is `SensorData`.
val aggregated: DataStream[SensorAggregatedResult] = deserializedStream
    .keyBy { _.deviceId } // grouping events based on `deviceId`
    .timeWindow(Time.seconds(20)) // 20 seconds time-window
    .apply {
        new WindowFunction[SensorData, SensorAggregatedResult, Long, TimeWindow] {
            override def apply(
                key: Long, // == deviceId
                window: TimeWindow,
                input: Iterable[SensorData],
                out: Collector[SensorAggregatedResult]
            ): Unit = {
                // aggregate events
                val (count, temps, hums) = input.foldLeft((0, Seq.empty[Double], Seq.empty[Double])) {
                    case ((count, acc_tmp, acc_hum), data) =>
                    (count + 1, data.temperature +: acc_tmp, data.humidity +: acc_hum)
                // send a result to down stream
                out.collect(SensorAggregatedResult(key, count, temps, hums))

At last, the entire job code is below with eliminating codes that I described thus far.

package net.petitviolet

object KafkaJob {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val parameterTool = ParameterTool.fromArgs(args)
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"))
    properties.setProperty("", parameterTool.getRequired(""))

    // Events come from Kafka
    val source = ???

    // put results to Kafka
    val sink = ???

    // deserialize ObjectNode to SensorData
    val deserializedStream: DataStream[SensorData] = source.flatMap { SensorData.deserialize(_) }

    // window-ing, and Iterable[SensorData] to SensorAggregatedResult
    val aggregated: DataStream[SensorAggregatedResult] = ???

    // serialize SensorAggregatedResult to String
    val aggregatedStrings: DataStream[String] = { result => result.serialize }

    val graph: DataStreamSink[String] = aggregatedStrings.addSink(sink)

    // execute program
    env.execute("sensor data calculation from kafka into kafka")

After submitting this Job and then sending events to flink-topic-in topic that the job uses as sink, we can see events going into flink-topic-out topic from Flink by kafka-console-consumer command, for example.

$ ./kafka_2.12-2.4.1/bin/ --bootstrap-server=localhost:9092 --topic flink-topic-out --from-beginning

Of course, we can feed this sink stream from other realtime streaming applications.


This post explained how to use Kafka as a realtime streaming application’s source as well as sink. The sample code is available on petitviolet/flink_practice repository on GitHub.