petitviolet blog

    Apache Flink Basics

    2021-02-10

    Apache Flink

    The last 2 posts described how to launch Apache Flink and run realtime streaming applications on it with Apache Kafka.

    In order to understand Apache Flink more, this post describes the basic knowleges of Apache Flink as well as its architecture, concepts, and how to configure. Official document is available at here.

    Components

    See Flink Architecture.

    Apache Flink runtime consists of 2 components: a JobManager and more than one TaskMangers.

    JobManager

    The JobManager in a Flink Cluster is the master process that controls and coordinates the distributed exuection of Flink Applications. This process consists of 3 components: a ResourceManager, a Dispatcher, and JobMasters

    • ResourceManager is responsible for managing task-slots in a Flink cluster that is the unit of resource scheduling.
    • Dispatcher provides a REST interface and WebUI to interact with Flink cluster.
      • starts a new JobMaster for each submitted job
    • JobMaster manages the execution of a single JobGraph.

    TaskManager

    The TaskManager is a worker process of a Flink cluster and execute the tasks. The number of task slots in a TaskManager is the same with the number of concurrent processing tasks.

    Checkpoint

    Flink is able to recovery in case of failures by being based on consistent checkpoints mechanism.

    env.enableCheckpointing(10000L, CheckpointingMode.AT_LEAST_ONCE)
    

    This configuration allows Flink to take snapshots every 10 seconds, and get being able to restart from the latest checkpoint in case of a failure. About CheckpointingMode, please see the document.

    Savepoint is a special checkpoint that a user has to explicitly tirgger its creation. We can start a compatible application from savepoints.

    StateBackend

    StateBackend is to manage checkpoints. It stores all of states in memory by default, that is MemoryStatebackend. We can use other type of StateBackend, such as FsStateBackend and RocksDBStateBackend.

    val stateBackend: StateBackend = new RocksDBStateBackend("file:///tmp/flink-states")
    env.setStateBackend(stateBackend)
    

    StateBackends except for MemoryStatebackend are able to use local filesystem as well as remote filesystem like HDFS and AWS S3 by specifying URL. According to the document, FsStateBackend keeps states in-memory until writing them into files. On the other hand, RocksDBStateBackend keeps all of states in RocksDB. Thus, the former can show relatively high performance(throughput/latency), but the latter can keep much more volume data. For more detailed information of remote StateBackend, please see the document about filesystem and State Backends.

    Time semantics

    Document is available.

    In the world of realtime streaming processing, there is a couple of very important concepts to understand what "time" stands for: timestamps and watermarks.

    Timestamp

    A timestamp of an event is to represent when the event happened. Due to various reasons, such as network delays and inconsistency of machine clocks, it's not easy to process events based on when the event occurred. For example, if you want to know how many events are observed for the last 5 minutes, how to determine which events should be counted? There are 2 different notions of time: processing-time and event-time.

    • ProcessingTime
      • a machine that processes incoming events within a given time-window generates and assignes a timestamp to each event with using its local clock
      • low latency, but the results depend on the processing performance and are not deterministic
    • EventTime
      • a time when an event actually happened
      • a timestamp that is attached to the events
      • high latency, but the results don't depend on the processing performance and are deterministic
        • being able to deal with out-of-order events

    As the choice of how to deal with time affect the outcome of the streaming application, we have to choose it carefully.

    Watermark

    Document is here: Apache Flink 1.12 Documentation: Generating Watermarks Watermark is a special timestamp that Flink uses it to determine a point in time when to trigger an event-time window. In order to generate watermarks in a streaming application, Flink expects WatermarkStrategy which consists of TimestampAssigner and WatermarkGenerator. Flink offers several built-in WatermarkStrategy factories, such as forBoundedOutOfOrderness, it's useful to implement own watermark generator. Very simple example is below.

    case class SensorData(deviceId: Long, temperature: Double, humidity: Double, timestamp: Option[Long])
    
    val watermarkStrategy: WatermarkStrategy[SensorData] = {
      val timestampAssigner = new SerializableTimestampAssigner[SensorData] {
        override def extractTimestamp(element: SensorData, recordTimestamp: Long): Long = {
          element.timestamp getOrElse recordTimestamp
        }
      }
    
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner(timestampAssigner)
    }
    

    Be sure that each event's timestamp are milliseconds since the Epoch. I face an problematic situation that the WatermarkStrategy doesn't generate watermarks as I expected because each event's timestamp was seconds since the Epoch and obviously they were too old to be used as watermarks.

    Deployment

    Flink offers 3 types of deployments; Session mode, Application mode, and per-job mode, which are described at Apache Flink 1.12 Documentation: Deployment

    Session mode

    This sounds the most straightforward to me because a Flink cluster running as session mode can be considered a server that is able to receive more than one jobs from clients. Because of this server-client manner, there would be a noisy-neighbor problem, which a job consumes lots of resources that blocks other job submitted, for example. As described in the previous post, standalone cluster can be launched as session mode by ./bin/start-cluster.sh.

    $ ./bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host <host>.
    Starting taskexecutor daemon on host <host>.
    

    By ./bin/start-cluster.sh, a session mode Flink cluster is started on the machine, then you're able to submit jobs to the cluster.

    Application mode

    Application mode is to launch a dedicated and isolated Flink cluster to run an streaming application. After completing execution of the application, the Application mode Flink cluster will be shutdown. For Application mode, what is confusing and hard to understand is the separation of JobManager and TaskManager in Flink. Even if a Flink cluster with an embedded application is running as Application mode cluster, there should be available TaskManagers to be able to run the application. Thus, Application mode cluster can be considered the combo of a JobManager and a streaming application.

    Standalone cluster can run as Application mode with ./bin/standalone-job.sh and ./bin/taskmanager.sh.

    $ ./flink-1.11.2/bin/standalone-job.sh start --job-classname net.petitviolet.KafkaJob --bootstrap.servers "$(ip):9092" --kafka-topic-in flink-topic-in --kafka-topic-out flink-topic-out --group.id test
    
    # on other session
    $ ./flink-1.11.2/bin/taskmanager.sh start
    

    If there is no available TaskManagers, the Application mode cluster will die after a while for searching available TaskManagers to run the job. In addition, using start-foreground was so good that I could understand what made the JobManager failed because all logs are shown in the console.

    Per-Job mode

    Per-Job mode is to launch a dedicated Flink cluster for each submitted job. Say Per-Job mode is rather similar to Application mode, but the granularity of resource isolation is a bit different since in Per-Job mode, resource provider frameworks such as YARN and Kubernetes manages the lifecycle of clusters. It's impossible to use Per-Job mode with standalone deployment according to the document, unfortunately.