petitviolet blog

    Apache Flink with Apache Kafka

    2021-01-15

    Apache FlinkApache Kafka

    This post describes how to utilize Apache Kafka as Source as well as Sink of realtime streaming application that run on top of Apache Flink.

    The previous post describes how to launch Apache Flink locally, and use Socket to put events into Flink cluster and process in it. Please refer to it to get started with Apache Flink.

    Prepare to Use Apache Kafka with Apache Flink

    First, edit build.sbt to include Kafka connector to project dependencies.

    build.sbt
    val flinkVersion = "1.11.2"
    
    lazy val root = (project in file("."))
      .settings(
        libraryDependencies ++= Seq(
          "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
          "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
          "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
          "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
        )
      )
    

    Further information are available at Apache Flink 1.12 Documentation: Apache Kafka Connector.

    For Kafka, we can use docker-compose to launch a Kafka cluster locally.

    docker-compose.yml
    version: '2'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        ports:
            - "2181:2181"
      kafka:
        image: wurstmeister/kafka
        depends_on:
            - zookeeper
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: ${ADVERTISED_HOST_IP}
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_CREATE_TOPICS: "flink-topic-in:1:1,flink-topic-out:1:1"
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    

    The original yaml can be found at wurstmeister/kafka-docker

    After starting Kafka cluster by ADVERTISED_HOST_IP="192.168.3.8" docker-compose up -d, you can see the created topics corresponding to KAFKA_CREATE_TOPICS written in the docker-compose.yml(see doc).

    $ ./kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --list
    flink-topic-in
    flink-topic-out
    
    $ ./kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --describe
    Topic: flink-topic-in   PartitionCount: 1       ReplicationFactor: 1    Configs:
            Topic: flink-topic-in   Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: flink-topic-out  PartitionCount: 1       ReplicationFactor: 1    Configs:
            Topic: flink-topic-out  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
    

    As their name indicate, flink-topic-in is for Source as well as flink-topic-out is for Sink.

    Implement a Job that uses Kafka as Sink and Source

    Now it's ready to build streaming applications that run on top of Apache Flink and Kafka running locally.

    The sample application will look like

    flink-kafka.gif

    First of all, it needs to get streaming execution environment as usual.

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    

    Then, we can configure Source and Sink as below.

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")
    
    val source = new FlinkKafkaConsumer(
        "flink-topic-in", // topic name
        new JSONKeyValueDeserializationSchema(false),
        properties, // `bootstrap.servers` and `group.id` required
    )
    
    val sink = new FlinkKafkaProducer(
        "localhost:9092", // bootstrap.servers
        "flink-topic-out", // topic name
        new SimpleStringSchema()
    )
    

    Since it's just a Flinkjob, we can put whatever we want to process in between source and sink. Let's write a deserializer for messages that come to Kafka. The consumer uses JSONKeyValueDeserializationSchema, which transform byte array into ObjectNode that is a kind of Map<String, Any>, so the desrializer will transform ObjectNode into whatever want to represent as Scala object instance.

    For instance, each message has three attributes and represents a datum from an IoT device which observe my room's metrics, a case class and its deserializer look like below.

    // json: '{"deviceId":1,"temperature":20.5,"humidity":60.0}'
    case class SensorData(deviceId: Long, temperature: Double, humidity: Double)
    
    object SensorData {
      def deserialize(obj: ObjectNode): Option[SensorData] = {
        Try {
          val value = obj.get("value")
          SensorData(
            value.get("deviceId").asLong(),
            value.get("temperature").asDouble(),
            value.get("humidity").asDouble(),
          )
        } match {
          case Success(x) => Some(x)
          case Failure(exception) => {
            println(s"failed to parse JSON. obj = $obj, exception = $exception")
            None
          }
        }
      }
    }
    

    After desrializing, aggregating events is a typical usecase in Flink.

    // aggregated result container
    case class SensorAggregatedResult(deviceId: Long, count: Int, temperatureList: Seq[Double], humidityList: Seq[Double]) { ??? }
    
    // deserializedStream receives deserialized events, that is `SensorData`.
    val aggregated: DataStream[SensorAggregatedResult] = deserializedStream
        .keyBy { _.deviceId } // grouping events based on `deviceId`
        .timeWindow(Time.seconds(20)) // 20 seconds time-window
        .apply {
            new WindowFunction[SensorData, SensorAggregatedResult, Long, TimeWindow] {
                override def apply(
                    key: Long, // == deviceId
                    window: TimeWindow,
                    input: Iterable[SensorData],
                    out: Collector[SensorAggregatedResult]
                ): Unit = {
                    // aggregate events
                    val (count, temps, hums) = input.foldLeft((0, Seq.empty[Double], Seq.empty[Double])) {
                        case ((count, acc_tmp, acc_hum), data) =>
                        (count + 1, data.temperature +: acc_tmp, data.humidity +: acc_hum)
                    }
                    // send a result to down stream
                    out.collect(SensorAggregatedResult(key, count, temps, hums))
                }
            }
        }
    

    At last, the entire job code is below with eliminating codes that I described thus far.

    kafkajob.scala
    package net.petitviolet
    
    object KafkaJob {
      def main(args: Array[String]): Unit = {
        // set up the execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val parameterTool = ParameterTool.fromArgs(args)
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"))
        properties.setProperty("group.id", parameterTool.getRequired("group.id"))
    
        // Events come from Kafka
        val source = ???
    
        // put results to Kafka
        val sink = ???
    
        // deserialize ObjectNode to SensorData
        val deserializedStream: DataStream[SensorData] = source.flatMap { SensorData.deserialize(_) }
    
        // window-ing, and Iterable[SensorData] to SensorAggregatedResult
        val aggregated: DataStream[SensorAggregatedResult] = ???
    
        // serialize SensorAggregatedResult to String
        val aggregatedStrings: DataStream[String] = processed.map { result => result.serialize }
    
        val graph: DataStreamSink[String] = aggregatedStrings.addSink(sink)
    
        // execute program
        env.execute("sensor data calculation from kafka into kafka")
      }
    }
    

    After submitting this Job and then sending events to flink-topic-in topic that the job uses as sink, we can see events going into flink-topic-out topic from Flink by kafka-console-consumer command, for example.

    $ ./kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic flink-topic-out --from-beginning
    

    Of course, we can feed this sink stream from other realtime streaming applications.

    Summary

    This post explained how to use Kafka as a realtime streaming application's source as well as sink. The sample code is available on petitviolet/flink_practice repository on GitHub.