petitviolet blog

    Apache Flink with Apache Kafka


    Apache FlinkApache Kafka

    This post describes how to utilize Apache Kafka as Source as well as Sink of realtime streaming application that run on top of Apache Flink.

    The previous post describes how to launch Apache Flink locally, and use Socket to put events into Flink cluster and process in it. Please refer to it to get started with Apache Flink.

    Prepare to Use Apache Kafka with Apache Flink

    First, edit build.sbt to include Kafka connector to project dependencies.

    val flinkVersion = "1.11.2"
    lazy val root = (project in file("."))
        libraryDependencies ++= Seq(
          "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
          "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
          "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
          "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,

    Further information are available at Apache Flink 1.12 Documentation: Apache Kafka Connector.

    For Kafka, we can use docker-compose to launch a Kafka cluster locally.

    version: '2'
        image: wurstmeister/zookeeper
            - "2181:2181"
        image: wurstmeister/kafka
            - zookeeper
          - "9092:9092"
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_CREATE_TOPICS: "flink-topic-in:1:1,flink-topic-out:1:1"
          - /var/run/docker.sock:/var/run/docker.sock

    The original yaml can be found at wurstmeister/kafka-docker

    After starting Kafka cluster by ADVERTISED_HOST_IP="" docker-compose up -d, you can see the created topics corresponding to KAFKA_CREATE_TOPICS written in the docker-compose.yml(see doc).

    $ ./kafka_2.12-2.4.1/bin/ --zookeeper localhost:2181 --list
    $ ./kafka_2.12-2.4.1/bin/ --zookeeper localhost:2181 --describe
    Topic: flink-topic-in   PartitionCount: 1       ReplicationFactor: 1    Configs:
            Topic: flink-topic-in   Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: flink-topic-out  PartitionCount: 1       ReplicationFactor: 1    Configs:
            Topic: flink-topic-out  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

    As their name indicate, flink-topic-in is for Source as well as flink-topic-out is for Sink.

    Implement a Job that uses Kafka as Sink and Source

    Now it's ready to build streaming applications that run on top of Apache Flink and Kafka running locally.

    The sample application will look like


    First of all, it needs to get streaming execution environment as usual.

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    Then, we can configure Source and Sink as below.

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("", "test")
    val source = new FlinkKafkaConsumer(
        "flink-topic-in", // topic name
        new JSONKeyValueDeserializationSchema(false),
        properties, // `bootstrap.servers` and `` required
    val sink = new FlinkKafkaProducer(
        "localhost:9092", // bootstrap.servers
        "flink-topic-out", // topic name
        new SimpleStringSchema()

    Since it's just a Flinkjob, we can put whatever we want to process in between source and sink. Let's write a deserializer for messages that come to Kafka. The consumer uses JSONKeyValueDeserializationSchema, which transform byte array into ObjectNode that is a kind of Map<String, Any>, so the desrializer will transform ObjectNode into whatever want to represent as Scala object instance.

    For instance, each message has three attributes and represents a datum from an IoT device which observe my room's metrics, a case class and its deserializer look like below.

    // json: '{"deviceId":1,"temperature":20.5,"humidity":60.0}'
    case class SensorData(deviceId: Long, temperature: Double, humidity: Double)
    object SensorData {
      def deserialize(obj: ObjectNode): Option[SensorData] = {
        Try {
          val value = obj.get("value")
        } match {
          case Success(x) => Some(x)
          case Failure(exception) => {
            println(s"failed to parse JSON. obj = $obj, exception = $exception")

    After desrializing, aggregating events is a typical usecase in Flink.

    // aggregated result container
    case class SensorAggregatedResult(deviceId: Long, count: Int, temperatureList: Seq[Double], humidityList: Seq[Double]) { ??? }
    // deserializedStream receives deserialized events, that is `SensorData`.
    val aggregated: DataStream[SensorAggregatedResult] = deserializedStream
        .keyBy { _.deviceId } // grouping events based on `deviceId`
        .timeWindow(Time.seconds(20)) // 20 seconds time-window
        .apply {
            new WindowFunction[SensorData, SensorAggregatedResult, Long, TimeWindow] {
                override def apply(
                    key: Long, // == deviceId
                    window: TimeWindow,
                    input: Iterable[SensorData],
                    out: Collector[SensorAggregatedResult]
                ): Unit = {
                    // aggregate events
                    val (count, temps, hums) = input.foldLeft((0, Seq.empty[Double], Seq.empty[Double])) {
                        case ((count, acc_tmp, acc_hum), data) =>
                        (count + 1, data.temperature +: acc_tmp, data.humidity +: acc_hum)
                    // send a result to down stream
                    out.collect(SensorAggregatedResult(key, count, temps, hums))

    At last, the entire job code is below with eliminating codes that I described thus far.

    package net.petitviolet
    object KafkaJob {
      def main(args: Array[String]): Unit = {
        // set up the execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val parameterTool = ParameterTool.fromArgs(args)
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"))
        properties.setProperty("", parameterTool.getRequired(""))
        // Events come from Kafka
        val source = ???
        // put results to Kafka
        val sink = ???
        // deserialize ObjectNode to SensorData
        val deserializedStream: DataStream[SensorData] = source.flatMap { SensorData.deserialize(_) }
        // window-ing, and Iterable[SensorData] to SensorAggregatedResult
        val aggregated: DataStream[SensorAggregatedResult] = ???
        // serialize SensorAggregatedResult to String
        val aggregatedStrings: DataStream[String] = { result => result.serialize }
        val graph: DataStreamSink[String] = aggregatedStrings.addSink(sink)
        // execute program
        env.execute("sensor data calculation from kafka into kafka")

    After submitting this Job and then sending events to flink-topic-in topic that the job uses as sink, we can see events going into flink-topic-out topic from Flink by kafka-console-consumer command, for example.

    $ ./kafka_2.12-2.4.1/bin/ --bootstrap-server=localhost:9092 --topic flink-topic-out --from-beginning

    Of course, we can feed this sink stream from other realtime streaming applications.


    This post explained how to use Kafka as a realtime streaming application's source as well as sink. The sample code is available on petitviolet/flink_practice repository on GitHub.