petitviolet blog

    Getting Started with Apache Flink

    2020-12-14

    Apache Flink

    Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

    https://flink.apache.org/flink-architecture.html

    In my words, we can run stateful streaming applications on top of it that consume unbounded data come from wherever you want to integrate with. Apache Flink is a framework for managing the difficulties and complicatedness that every stateful streaming applications have with using external resource management tools, such as Kubernetes. Using Apache Flink brings lots of capabilities into your applications, such as efficient computations, state managements, fault tolerance, event-time processing with watermarks, and so on.

    Launch Flink locally

    There are several options to launch Flink.

    • Standalone cluster
    • Docker
    • Apache Hadoop YARN
    • Kubernetes

    This post describes the first 3 options, standalone cluster, Docker, and Kubernetes.

    Standalone cluster

    You should see the official guide from https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html

    Download Flink from Apache Flink: Downloads

    $ wget https://<example.com>/.../flink-1.11.2/flink-1.11.2-bin-scala_2.12.tgz
    $ tar -xzf flink-1.11.2-bin-scala_2.12.tgz
    $ cd flink-1.11.2
    $ ./bin/flink --version
    Version: 1.11.2, Commit ID: fe36135
    

    Ready to use Flink!

    To launch a cluster locally, just use start-cluster.sh.

    $ ./bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host <host>.
    Starting taskexecutor daemon on host <host>.
    

    The Flink WebUI is available on http://localhost:8081.

    flink webui

    Docker

    Use docker-compose to launch master process(i.e. JobManager) and another worker process(i.e. TaskManager).

    docker-compose.yaml
    version: '2'
    services:
      jobmanager: # master
        image: "flink:1.11"
        ports:
          - "8081:8081"
        command:
          "jobmanager"
        environment:
          JOB_MANAGER_RPC_ADDRESS: jobmanager
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    
      taskmanager: # worker
        image: "flink:1.11"
        command:
          "taskmanager"
        environment:
          JOB_MANAGER_RPC_ADDRESS: jobmanager
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    

    Then, launch docker containers as usual.

    $ docker-compose up -d
    ...
    
    $ docker-compose ps
                Name                          Command               State                Ports
    --------------------------------------------------------------------------------------------------------
    my_dir_jobmanager_1    /docker-entrypoint.sh jobm ...   Up      6123/tcp, 0.0.0.0:8081->8081/tcp
    my_dir_taskmanager_1   /docker-entrypoint.sh task ...   Up      6123/tcp, 8081/tcp
    

    As the same as standalone cluster, Flink WebUI is available on localhost:8081.

    Kubernetes

    To launch a kubernetes cluster locally, minikube is handy for MacOS.

    $ minikube version
    minikube version: v1.15.1
    commit: 23f40a012abb52eff365ff99a709501a61ac5876
    
    $ minikube start
    ...
    
    $ minikube status
    minikube
    type: Control Plane
    host: Running
    kubelet: Running
    apiserver: Running
    kubeconfig: Configured
    

    All kubernetes workload and service definitions are below:

    flink-deployment.yaml
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-master
      labels:
        app: flink-master
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: flink-master
      template:
        metadata:
          labels:
            app: flink-master
        spec:
          containers:
          - name: master
            image: flink:1.11
            args:
              - jobmanager
            ports:
              - containerPort: 6123
                name: rpc
              - containerPort: 6124
                name: blob
              - containerPort: 6125
                name: query
              - containerPort: 8081
                name: webui
            env:
              - name: JOB_MANAGER_RPC_ADDRESS
                value: flink-master
    
    ---
    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-worker
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: flink-worker
      template:
        metadata:
          labels:
            app: flink-worker
        spec:
          containers:
          - name: worker
            image: flink:1.11
            args:
              - taskmanager
            ports:
              - containerPort: 6121
                name: data
              - containerPort: 6122
                name: rpc
              - containerPort: 6125
                name: query
            env:
              - name: JOB_MANAGER_RPC_ADDRESS
                value: flink-master
    
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: flink-master
    spec:
      type: ClusterIP
      selector:
        app: flink-master
      ports:
      - name: rpc
        port: 6123
      - name: blob
        port: 6124
      - name: query
        port: 6125
    
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: flink-webui
    spec:
      type: NodePort
      selector:
        app: flink-master
      ports:
      - name: webui
        port: 8081
        targetPort: 8081
    

    Apply and see the results using kubectl command as usual.

    $ kubectl apply -f flink-deployment.yaml
    deployment.apps/flink-master created
    deployment.apps/flink-worker created
    service/flink-master created
    service/flink-webui created
    
    $ kubectl get all
    NAME                                READY   STATUS    RESTARTS   AGE
    pod/flink-master-68d7db6b7f-mf8wz   1/1     Running   4          50m
    pod/flink-worker-5499494655-nrtmx   1/1     Running   5          50m
    pod/flink-worker-5499494655-xvpgr   1/1     Running   4          50m
    
    NAME                   TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
    service/flink-master   NodePort    10.110.149.118   <none>        6123:32313/TCP   15m
    service/flink-webui    NodePort    10.106.211.108   <none>        8081:30287/TCP   41m
    service/kubernetes     ClusterIP   10.96.0.1        <none>        443/TCP          56m
    
    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/flink-master   1/1     1            1           50m
    deployment.apps/flink-worker   2/2     2            2           50m
    
    NAME                                      DESIRED   CURRENT   READY   AGE
    replicaset.apps/flink-master-68d7db6b7f   1         1         1       50m
    replicaset.apps/flink-worker-5499494655   2         2         2       50m
    

    Note that when you use Minikube, even NodePort is not accessible directly. Instead, use minikube service --url <service name> to expose a service to local.

    $ minikube service --url flink-webui
    šŸƒ  Starting tunnel for service flink-webui.
    |-----------|-------------|-------------|------------------------|
    | NAMESPACE |    NAME     | TARGET PORT |          URL           |
    |-----------|-------------|-------------|------------------------|
    | default   | flink-webui |             | http://127.0.0.1:62315 |
    |-----------|-------------|-------------|------------------------|
    http://127.0.0.1:62315
    

    Now the Flink WebUI is available on http://127.0.0.1:62315 in this example.

    Run Streaming Application

    As a Flink cluster is available locally, it's time to run streaming applications. A giter8 template for Flink is available tillrohrmann/flink-project.g8.

    $ sbt new tillrohrmann/flink-project.g8
    [info] welcome to sbt 1.4.4 (Oracle Corporation Java 14.0.2)
    [info] loading settings for project global-plugins from gpg.sbt ...
    [info] loading global plugins from /Users/hiroki.komurasaki/.sbt/1.0/plugins
    [info] set current project to hoge (in build file:path)
    [info] set current project to hoge (in build file:path)
    
    A Flink Application Project Using sbt
    
    
    name [Flink Project]: my-flink-project
    organization [org.example]: net.petitviolet
    version [0.1-SNAPSHOT]:
    scala_version [2.11.12]: 2.12.11
    flink_version [1.11.2]: 1.11.2
    
    Template applied in /$PWD/./my-flink-project
    
    $ tree ./my-flink-project/
    ./my-flink-project/
    ā”œā”€ā”€ README.md
    ā”œā”€ā”€ build.sbt
    ā”œā”€ā”€ idea.sbt
    ā”œā”€ā”€ project
    ā”‚Ā Ā  ā”œā”€ā”€ assembly.sbt
    ā”‚Ā Ā  ā””ā”€ā”€ build.properties
    ā””ā”€ā”€ src
        ā””ā”€ā”€ main
            ā”œā”€ā”€ resources
            ā”‚Ā Ā  ā””ā”€ā”€ log4j.properties
            ā””ā”€ā”€ scala
                ā””ā”€ā”€ net
                    ā””ā”€ā”€ petitviole
                        ā”œā”€ā”€ Job.scala
                        ā”œā”€ā”€ SocketTextStreamWordCount.scala
                        ā””ā”€ā”€ WordCount.scala
    

    SocketTextStreamWordCount.scala, for example, is a simple Flink job which show wordcounts that come from socket.

    sockettextstreamwordcount.scala
    package net.petitviolet
    
    import org.apache.flink.streaming.api.scala._
    
    object SocketTextStreamWordCount {
    
      def main(args: Array[String]): Unit = {
        if (args.length != 2) {
          System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
          return
        }
    
        val hostName = args(0)
        val port = args(1).toInt
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //Create streams for names and ages by mapping the inputs to the corresponding objects
        val text = env.socketTextStream(hostName, port)
        val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
          .map { (_, 1) }
          .keyBy(0)
          .sum(1)
    
        counts.print()
    
        env.execute("Scala SocketTextStreamWordCount Example")
      }
    
    }
    

    To run this job on local Flink, first it needs do sbt assembly to build a fat JAR including the job.

    $ sbt assembly
    [info] welcome to sbt 1.3.13 (Oracle Corporation Java 14.0.2)
    [info] loading settings for project global-plugins from gpg.sbt ...
    [info] loading global plugins from /Users/hiroki.komurasaki/.sbt/1.0/plugins
    [info] loading settings for project my-flink-project-build from assembly.sbt ...
    [info] loading project definition from /Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/project
    [info] loading settings for project root from idea.sbt,build.sbt ...
    [info] set current project to my-flink-project (in build file:/Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/)
    [warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list
    [info] Checking every *.class/*.jar file's SHA-1.
    [info] Merging files...
    [info] Assembly up to date: /Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/target/scala-2.12/my-flink-project-assembly-0.1-SNAPSHOT.jar
    [success] Total time: 1 s, completed Dec 7, 2020, 9:34:34 PM
    

    Then, flink CLI can be used to submit a job.

    $ ./flink-1.11.2/bin/flink run -c net.petitviolet.SocketTextStreamWordCount ./target/scala-2.12/my-flink-project-assembly-0.1-SNAPSHOT.jar 127.0.0.1 9999
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/flink-1.11.2/lib/flink-dist_2.12-1.11.2.jar) to field java.lang.String.value
    WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    Job has been submitted with JobID 4118fde2f00eb57f4af3207078898241
    
    $ nc -lk 9999 # another session
    To be, or not to be,--that is the question:--",
          "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
          "Or to take arms against a sea of troubles,
    

    What it looks like on the Flink WebUI are:

    job result

    stdout

    Note that if you're using Minikube to launch a Flink cluster, it's necessary to give -m <Flink WebUI url> option to flink CLI.

    Summary

    This post described how to launch Flink locally using CLI, Docker, and Kubernetes, and also how to run a streaming application that runs on top of Flink.