Getting Started with Apache Flink
2020-12-14
Apache FlinkApache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
In my words, we can run stateful streaming applications on top of it that consume unbounded data come from wherever you want to integrate with. Apache Flink is a framework for managing the difficulties and complicatedness that every stateful streaming applications have with using external resource management tools, such as Kubernetes. Using Apache Flink brings lots of capabilities into your applications, such as efficient computations, state managements, fault tolerance, event-time processing with watermarks, and so on.
Launch Flink locally
There are several options to launch Flink.
- Standalone cluster
 - Docker
 - Apache Hadoop YARN
 - Kubernetes
 
This post describes the first 3 options, standalone cluster, Docker, and Kubernetes.
Standalone cluster
You should see the official guide from https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html
Download Flink from Apache Flink: Downloads
$ wget https://<example.com>/.../flink-1.11.2/flink-1.11.2-bin-scala_2.12.tgz
$ tar -xzf flink-1.11.2-bin-scala_2.12.tgz
$ cd flink-1.11.2
$ ./bin/flink --version
Version: 1.11.2, Commit ID: fe36135
Ready to use Flink!
To launch a cluster locally, just use start-cluster.sh.
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host <host>.
Starting taskexecutor daemon on host <host>.
The Flink WebUI is available on http://localhost:8081.
Docker
Use docker-compose to launch master process(i.e. JobManager) and another worker process(i.e. TaskManager).
version: '2'
services:
  jobmanager: # master
    image: "flink:1.11"
    ports:
      - "8081:8081"
    command:
      "jobmanager"
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  taskmanager: # worker
    image: "flink:1.11"
    command:
      "taskmanager"
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
Then, launch docker containers as usual.
$ docker-compose up -d
...
$ docker-compose ps
            Name                          Command               State                Ports
--------------------------------------------------------------------------------------------------------
my_dir_jobmanager_1    /docker-entrypoint.sh jobm ...   Up      6123/tcp, 0.0.0.0:8081->8081/tcp
my_dir_taskmanager_1   /docker-entrypoint.sh task ...   Up      6123/tcp, 8081/tcp
As the same as standalone cluster, Flink WebUI is available on localhost:8081.
Kubernetes
To launch a kubernetes cluster locally, minikube is handy for MacOS.
$ minikube version
minikube version: v1.15.1
commit: 23f40a012abb52eff365ff99a709501a61ac5876
$ minikube start
...
$ minikube status
minikube
type: Control Plane
host: Running
kubelet: Running
apiserver: Running
kubeconfig: Configured
All kubernetes workload and service definitions are below:
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-master
  labels:
    app: flink-master
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-master
  template:
    metadata:
      labels:
        app: flink-master
    spec:
      containers:
      - name: master
        image: flink:1.11
        args:
          - jobmanager
        ports:
          - containerPort: 6123
            name: rpc
          - containerPort: 6124
            name: blob
          - containerPort: 6125
            name: query
          - containerPort: 8081
            name: webui
        env:
          - name: JOB_MANAGER_RPC_ADDRESS
            value: flink-master
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-worker
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink-worker
  template:
    metadata:
      labels:
        app: flink-worker
    spec:
      containers:
      - name: worker
        image: flink:1.11
        args:
          - taskmanager
        ports:
          - containerPort: 6121
            name: data
          - containerPort: 6122
            name: rpc
          - containerPort: 6125
            name: query
        env:
          - name: JOB_MANAGER_RPC_ADDRESS
            value: flink-master
---
apiVersion: v1
kind: Service
metadata:
  name: flink-master
spec:
  type: ClusterIP
  selector:
    app: flink-master
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: query
    port: 6125
---
apiVersion: v1
kind: Service
metadata:
  name: flink-webui
spec:
  type: NodePort
  selector:
    app: flink-master
  ports:
  - name: webui
    port: 8081
    targetPort: 8081
Apply and see the results using kubectl command as usual.
$ kubectl apply -f flink-deployment.yaml
deployment.apps/flink-master created
deployment.apps/flink-worker created
service/flink-master created
service/flink-webui created
$ kubectl get all
NAME                                READY   STATUS    RESTARTS   AGE
pod/flink-master-68d7db6b7f-mf8wz   1/1     Running   4          50m
pod/flink-worker-5499494655-nrtmx   1/1     Running   5          50m
pod/flink-worker-5499494655-xvpgr   1/1     Running   4          50m
NAME                   TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
service/flink-master   NodePort    10.110.149.118   <none>        6123:32313/TCP   15m
service/flink-webui    NodePort    10.106.211.108   <none>        8081:30287/TCP   41m
service/kubernetes     ClusterIP   10.96.0.1        <none>        443/TCP          56m
NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-master   1/1     1            1           50m
deployment.apps/flink-worker   2/2     2            2           50m
NAME                                      DESIRED   CURRENT   READY   AGE
replicaset.apps/flink-master-68d7db6b7f   1         1         1       50m
replicaset.apps/flink-worker-5499494655   2         2         2       50m
Note that when you use Minikube, even NodePort is not accessible directly. Instead, use minikube service --url <service name> to expose a service to local.
$ minikube service --url flink-webui
š  Starting tunnel for service flink-webui.
|-----------|-------------|-------------|------------------------|
| NAMESPACE |    NAME     | TARGET PORT |          URL           |
|-----------|-------------|-------------|------------------------|
| default   | flink-webui |             | http://127.0.0.1:62315 |
|-----------|-------------|-------------|------------------------|
http://127.0.0.1:62315
Now the Flink WebUI is available on http://127.0.0.1:62315 in this example.
Run Streaming Application
As a Flink cluster is available locally, it's time to run streaming applications. A giter8 template for Flink is available tillrohrmann/flink-project.g8.
$ sbt new tillrohrmann/flink-project.g8
[info] welcome to sbt 1.4.4 (Oracle Corporation Java 14.0.2)
[info] loading settings for project global-plugins from gpg.sbt ...
[info] loading global plugins from /Users/hiroki.komurasaki/.sbt/1.0/plugins
[info] set current project to hoge (in build file:path)
[info] set current project to hoge (in build file:path)
A Flink Application Project Using sbt
name [Flink Project]: my-flink-project
organization [org.example]: net.petitviolet
version [0.1-SNAPSHOT]:
scala_version [2.11.12]: 2.12.11
flink_version [1.11.2]: 1.11.2
Template applied in /$PWD/./my-flink-project
$ tree ./my-flink-project/
./my-flink-project/
āāā README.md
āāā build.sbt
āāā idea.sbt
āāā project
āĀ Ā  āāā assembly.sbt
āĀ Ā  āāā build.properties
āāā src
    āāā main
        āāā resources
        āĀ Ā  āāā log4j.properties
        āāā scala
            āāā net
                āāā petitviole
                    āāā Job.scala
                    āāā SocketTextStreamWordCount.scala
                    āāā WordCount.scala
SocketTextStreamWordCount.scala, for example, is a simple Flink job which show wordcounts that come from socket.
package net.petitviolet
import org.apache.flink.streaming.api.scala._
object SocketTextStreamWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }
    val hostName = args(0)
    val port = args(1).toInt
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //Create streams for names and ages by mapping the inputs to the corresponding objects
    val text = env.socketTextStream(hostName, port)
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .sum(1)
    counts.print()
    env.execute("Scala SocketTextStreamWordCount Example")
  }
}
To run this job on local Flink, first it needs do sbt assembly to build a fat JAR including the job.
$ sbt assembly
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 14.0.2)
[info] loading settings for project global-plugins from gpg.sbt ...
[info] loading global plugins from /Users/hiroki.komurasaki/.sbt/1.0/plugins
[info] loading settings for project my-flink-project-build from assembly.sbt ...
[info] loading project definition from /Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/project
[info] loading settings for project root from idea.sbt,build.sbt ...
[info] set current project to my-flink-project (in build file:/Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/)
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[info] Assembly up to date: /Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/target/scala-2.12/my-flink-project-assembly-0.1-SNAPSHOT.jar
[success] Total time: 1 s, completed Dec 7, 2020, 9:34:34 PM
Then, flink CLI can be used to submit a job.
$ ./flink-1.11.2/bin/flink run -c net.petitviolet.SocketTextStreamWordCount ./target/scala-2.12/my-flink-project-assembly-0.1-SNAPSHOT.jar 127.0.0.1 9999
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/flink-1.11.2/lib/flink-dist_2.12-1.11.2.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID 4118fde2f00eb57f4af3207078898241
$ nc -lk 9999 # another session
To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,
What it looks like on the Flink WebUI are:
Note that if you're using Minikube to launch a Flink cluster, it's necessary to give -m <Flink WebUI url> option to flink CLI.
Summary
This post described how to launch Flink locally using CLI, Docker, and Kubernetes, and also how to run a streaming application that runs on top of Flink.