blog.petitviolet.net

Getting Started with Apache Flink

2020-12-14

Apache Flink

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

https://flink.apache.org/flink-architecture.html

In my words, we can run stateful streaming applications on top of it that consume unbounded data come from wherever you want to integrate with. Apache Flink is a framework for managing the difficulties and complicatedness that every stateful streaming applications have with using external resource management tools, such as Kubernetes. Using Apache Flink brings lots of capabilities into your applications, such as efficient computations, state managements, fault tolerance, event-time processing with watermarks, and so on.

There are several options to launch Flink.

  • Standalone cluster
  • Docker
  • Apache Hadoop YARN
  • Kubernetes

This post describes the first 3 options, standalone cluster, Docker, and Kubernetes.

Standalone cluster

You should see the official guide from https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html

Download Flink from Apache Flink: Downloads

$ wget https://<example.com>/.../flink-1.11.2/flink-1.11.2-bin-scala_2.12.tgz
$ tar -xzf flink-1.11.2-bin-scala_2.12.tgz
$ cd flink-1.11.2
$ ./bin/flink --version
Version: 1.11.2, Commit ID: fe36135

Ready to use Flink!

To launch a cluster locally, just use start-cluster.sh.

$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host <host>.
Starting taskexecutor daemon on host <host>.

The Flink WebUI is available on http://localhost:8081.

flink webui

Docker

Use docker-compose to launch master process(i.e. JobManager) and another worker process(i.e. TaskManager).

docker-compose.yaml
version: '2'
services:
  jobmanager: # master
    image: "flink:1.11"
    ports:
      - "8081:8081"
    command:
      "jobmanager"
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  taskmanager: # worker
    image: "flink:1.11"
    command:
      "taskmanager"
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Then, launch docker containers as usual.

$ docker-compose up -d
...

$ docker-compose ps
            Name                          Command               State                Ports
--------------------------------------------------------------------------------------------------------
my_dir_jobmanager_1    /docker-entrypoint.sh jobm ...   Up      6123/tcp, 0.0.0.0:8081->8081/tcp
my_dir_taskmanager_1   /docker-entrypoint.sh task ...   Up      6123/tcp, 8081/tcp

As the same as standalone cluster, Flink WebUI is available on localhost:8081.

Kubernetes

To launch a kubernetes cluster locally, minikube is handy for MacOS.

$ minikube version
minikube version: v1.15.1
commit: 23f40a012abb52eff365ff99a709501a61ac5876

$ minikube start
...

$ minikube status
minikube
type: Control Plane
host: Running
kubelet: Running
apiserver: Running
kubeconfig: Configured

All kubernetes workload and service definitions are below:

flink-deployment.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-master
  labels:
    app: flink-master
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink-master
  template:
    metadata:
      labels:
        app: flink-master
    spec:
      containers:
      - name: master
        image: flink:1.11
        args:
          - jobmanager
        ports:
          - containerPort: 6123
            name: rpc
          - containerPort: 6124
            name: blob
          - containerPort: 6125
            name: query
          - containerPort: 8081
            name: webui
        env:
          - name: JOB_MANAGER_RPC_ADDRESS
            value: flink-master

---

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-worker
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink-worker
  template:
    metadata:
      labels:
        app: flink-worker
    spec:
      containers:
      - name: worker
        image: flink:1.11
        args:
          - taskmanager
        ports:
          - containerPort: 6121
            name: data
          - containerPort: 6122
            name: rpc
          - containerPort: 6125
            name: query
        env:
          - name: JOB_MANAGER_RPC_ADDRESS
            value: flink-master

---
apiVersion: v1
kind: Service
metadata:
  name: flink-master
spec:
  type: ClusterIP
  selector:
    app: flink-master
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: query
    port: 6125

---
apiVersion: v1
kind: Service
metadata:
  name: flink-webui
spec:
  type: NodePort
  selector:
    app: flink-master
  ports:
  - name: webui
    port: 8081
    targetPort: 8081

Apply and see the results using kubectl command as usual.

$ kubectl apply -f flink-deployment.yaml
deployment.apps/flink-master created
deployment.apps/flink-worker created
service/flink-master created
service/flink-webui created

$ kubectl get all
NAME                                READY   STATUS    RESTARTS   AGE
pod/flink-master-68d7db6b7f-mf8wz   1/1     Running   4          50m
pod/flink-worker-5499494655-nrtmx   1/1     Running   5          50m
pod/flink-worker-5499494655-xvpgr   1/1     Running   4          50m

NAME                   TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
service/flink-master   NodePort    10.110.149.118   <none>        6123:32313/TCP   15m
service/flink-webui    NodePort    10.106.211.108   <none>        8081:30287/TCP   41m
service/kubernetes     ClusterIP   10.96.0.1        <none>        443/TCP          56m

NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-master   1/1     1            1           50m
deployment.apps/flink-worker   2/2     2            2           50m

NAME                                      DESIRED   CURRENT   READY   AGE
replicaset.apps/flink-master-68d7db6b7f   1         1         1       50m
replicaset.apps/flink-worker-5499494655   2         2         2       50m

Note that when you use Minikube, even NodePort is not accessible directly. Instead, use minikube service --url <service name> to expose a service to local.

$ minikube service --url flink-webui
🏃  Starting tunnel for service flink-webui.
|-----------|-------------|-------------|------------------------|
| NAMESPACE |    NAME     | TARGET PORT |          URL           |
|-----------|-------------|-------------|------------------------|
| default   | flink-webui |             | http://127.0.0.1:62315 |
|-----------|-------------|-------------|------------------------|
http://127.0.0.1:62315

Now the Flink WebUI is available on http://127.0.0.1:62315 in this example.

Run Streaming Application

As a Flink cluster is available locally, it’s time to run streaming applications. A giter8 template for Flink is available tillrohrmann/flink-project.g8.

$ sbt new tillrohrmann/flink-project.g8
[info] welcome to sbt 1.4.4 (Oracle Corporation Java 14.0.2)
[info] loading settings for project global-plugins from gpg.sbt ...
[info] loading global plugins from /Users/hiroki.komurasaki/.sbt/1.0/plugins
[info] set current project to hoge (in build file:path)
[info] set current project to hoge (in build file:path)

A Flink Application Project Using sbt


name [Flink Project]: my-flink-project
organization [org.example]: net.petitviolet
version [0.1-SNAPSHOT]:
scala_version [2.11.12]: 2.12.11
flink_version [1.11.2]: 1.11.2

Template applied in /$PWD/./my-flink-project

$ tree ./my-flink-project/
./my-flink-project/
├── README.md
├── build.sbt
├── idea.sbt
├── project
│   ├── assembly.sbt
│   └── build.properties
└── src
    └── main
        ├── resources
        │   └── log4j.properties
        └── scala
            └── net
                └── petitviole
                    ├── Job.scala
                    ├── SocketTextStreamWordCount.scala
                    └── WordCount.scala

SocketTextStreamWordCount.scala, for example, is a simple Flink job which show wordcounts that come from socket.

SocketTextStreamWordCount.scala
package net.petitviolet

import org.apache.flink.streaming.api.scala._

object SocketTextStreamWordCount {

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //Create streams for names and ages by mapping the inputs to the corresponding objects
    val text = env.socketTextStream(hostName, port)
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .sum(1)

    counts.print()

    env.execute("Scala SocketTextStreamWordCount Example")
  }

}

To run this job on local Flink, first it needs do sbt assembly to build a fat JAR including the job.

$ sbt assembly
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 14.0.2)
[info] loading settings for project global-plugins from gpg.sbt ...
[info] loading global plugins from /Users/hiroki.komurasaki/.sbt/1.0/plugins
[info] loading settings for project my-flink-project-build from assembly.sbt ...
[info] loading project definition from /Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/project
[info] loading settings for project root from idea.sbt,build.sbt ...
[info] set current project to my-flink-project (in build file:/Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/)
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[info] Assembly up to date: /Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/target/scala-2.12/my-flink-project-assembly-0.1-SNAPSHOT.jar
[success] Total time: 1 s, completed Dec 7, 2020, 9:34:34 PM

Then, flink CLI can be used to submit a job.

$ ./flink-1.11.2/bin/flink run -c net.petitviolet.SocketTextStreamWordCount ./target/scala-2.12/my-flink-project-assembly-0.1-SNAPSHOT.jar 127.0.0.1 9999
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/flink-1.11.2/lib/flink-dist_2.12-1.11.2.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID 4118fde2f00eb57f4af3207078898241

$ nc -lk 9999 # another session
To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,

What it looks like on the Flink WebUI are:

job result

stdout

Note that if you’re using Minikube to launch a Flink cluster, it’s necessary to give -m <Flink WebUI url> option to flink CLI.

Summary

This post described how to launch Flink locally using CLI, Docker, and Kubernetes, and also how to run a streaming application that runs on top of Flink.