Getting Started with Apache Flink
2020-12-14
Apache FlinkApache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
https://flink.apache.org/flink-architecture.html
In my words, we can run stateful streaming applications on top of it that consume unbounded data come from wherever you want to integrate with. Apache Flink is a framework for managing the difficulties and complicatedness that every stateful streaming applications have with using external resource management tools, such as Kubernetes. Using Apache Flink brings lots of capabilities into your applications, such as efficient computations, state managements, fault tolerance, event-time processing with watermarks, and so on.
Launch Flink locally
There are several options to launch Flink.
- Standalone cluster
- Docker
- Apache Hadoop YARN
- Kubernetes
This post describes the first 3 options, standalone cluster, Docker, and Kubernetes.
Standalone cluster
You should see the official guide from https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html
Download Flink from Apache Flink: Downloads
$ wget https://<example.com>/.../flink-1.11.2/flink-1.11.2-bin-scala_2.12.tgz
$ tar -xzf flink-1.11.2-bin-scala_2.12.tgz
$ cd flink-1.11.2
$ ./bin/flink --version
Version: 1.11.2, Commit ID: fe36135
Ready to use Flink!
To launch a cluster locally, just use start-cluster.sh
.
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host <host>.
Starting taskexecutor daemon on host <host>.
The Flink WebUI is available on http://localhost:8081
.
Docker
Use docker-compose to launch master process(i.e. JobManager) and another worker process(i.e. TaskManager).
version: '2'
services:
jobmanager: # master
image: "flink:1.11"
ports:
- "8081:8081"
command:
"jobmanager"
environment:
JOB_MANAGER_RPC_ADDRESS: jobmanager
volumes:
- /var/run/docker.sock:/var/run/docker.sock
taskmanager: # worker
image: "flink:1.11"
command:
"taskmanager"
environment:
JOB_MANAGER_RPC_ADDRESS: jobmanager
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Then, launch docker containers as usual.
$ docker-compose up -d
...
$ docker-compose ps
Name Command State Ports
--------------------------------------------------------------------------------------------------------
my_dir_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp
my_dir_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
As the same as standalone cluster, Flink WebUI is available on localhost:8081.
Kubernetes
To launch a kubernetes cluster locally, minikube is handy for MacOS.
$ minikube version
minikube version: v1.15.1
commit: 23f40a012abb52eff365ff99a709501a61ac5876
$ minikube start
...
$ minikube status
minikube
type: Control Plane
host: Running
kubelet: Running
apiserver: Running
kubeconfig: Configured
All kubernetes workload and service definitions are below:
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-master
labels:
app: flink-master
spec:
replicas: 1
selector:
matchLabels:
app: flink-master
template:
metadata:
labels:
app: flink-master
spec:
containers:
- name: master
image: flink:1.11
args:
- jobmanager
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: webui
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-master
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-worker
spec:
replicas: 2
selector:
matchLabels:
app: flink-worker
template:
metadata:
labels:
app: flink-worker
spec:
containers:
- name: worker
image: flink:1.11
args:
- taskmanager
ports:
- containerPort: 6121
name: data
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-master
---
apiVersion: v1
kind: Service
metadata:
name: flink-master
spec:
type: ClusterIP
selector:
app: flink-master
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
---
apiVersion: v1
kind: Service
metadata:
name: flink-webui
spec:
type: NodePort
selector:
app: flink-master
ports:
- name: webui
port: 8081
targetPort: 8081
Apply and see the results using kubectl
command as usual.
$ kubectl apply -f flink-deployment.yaml
deployment.apps/flink-master created
deployment.apps/flink-worker created
service/flink-master created
service/flink-webui created
$ kubectl get all
NAME READY STATUS RESTARTS AGE
pod/flink-master-68d7db6b7f-mf8wz 1/1 Running 4 50m
pod/flink-worker-5499494655-nrtmx 1/1 Running 5 50m
pod/flink-worker-5499494655-xvpgr 1/1 Running 4 50m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/flink-master NodePort 10.110.149.118 <none> 6123:32313/TCP 15m
service/flink-webui NodePort 10.106.211.108 <none> 8081:30287/TCP 41m
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 56m
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/flink-master 1/1 1 1 50m
deployment.apps/flink-worker 2/2 2 2 50m
NAME DESIRED CURRENT READY AGE
replicaset.apps/flink-master-68d7db6b7f 1 1 1 50m
replicaset.apps/flink-worker-5499494655 2 2 2 50m
Note that when you use Minikube, even NodePort
is not accessible directly. Instead, use minikube service --url <service name>
to expose a service to local.
$ minikube service --url flink-webui
š Starting tunnel for service flink-webui.
|-----------|-------------|-------------|------------------------|
| NAMESPACE | NAME | TARGET PORT | URL |
|-----------|-------------|-------------|------------------------|
| default | flink-webui | | http://127.0.0.1:62315 |
|-----------|-------------|-------------|------------------------|
http://127.0.0.1:62315
Now the Flink WebUI is available on http://127.0.0.1:62315 in this example.
Run Streaming Application
As a Flink cluster is available locally, it's time to run streaming applications. A giter8 template for Flink is available tillrohrmann/flink-project.g8.
$ sbt new tillrohrmann/flink-project.g8
[info] welcome to sbt 1.4.4 (Oracle Corporation Java 14.0.2)
[info] loading settings for project global-plugins from gpg.sbt ...
[info] loading global plugins from /Users/hiroki.komurasaki/.sbt/1.0/plugins
[info] set current project to hoge (in build file:path)
[info] set current project to hoge (in build file:path)
A Flink Application Project Using sbt
name [Flink Project]: my-flink-project
organization [org.example]: net.petitviolet
version [0.1-SNAPSHOT]:
scala_version [2.11.12]: 2.12.11
flink_version [1.11.2]: 1.11.2
Template applied in /$PWD/./my-flink-project
$ tree ./my-flink-project/
./my-flink-project/
āāā README.md
āāā build.sbt
āāā idea.sbt
āāā project
āĀ Ā āāā assembly.sbt
āĀ Ā āāā build.properties
āāā src
āāā main
āāā resources
āĀ Ā āāā log4j.properties
āāā scala
āāā net
āāā petitviole
āāā Job.scala
āāā SocketTextStreamWordCount.scala
āāā WordCount.scala
SocketTextStreamWordCount.scala, for example, is a simple Flink job which show wordcounts that come from socket.
package net.petitviolet
import org.apache.flink.streaming.api.scala._
object SocketTextStreamWordCount {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
return
}
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.sum(1)
counts.print()
env.execute("Scala SocketTextStreamWordCount Example")
}
}
To run this job on local Flink, first it needs do sbt assembly
to build a fat JAR including the job.
$ sbt assembly
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 14.0.2)
[info] loading settings for project global-plugins from gpg.sbt ...
[info] loading global plugins from /Users/hiroki.komurasaki/.sbt/1.0/plugins
[info] loading settings for project my-flink-project-build from assembly.sbt ...
[info] loading project definition from /Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/project
[info] loading settings for project root from idea.sbt,build.sbt ...
[info] set current project to my-flink-project (in build file:/Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/)
[warn] Multiple main classes detected. Run 'show discoveredMainClasses' to see the list
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[info] Assembly up to date: /Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/hoge/my-flink-project/target/scala-2.12/my-flink-project-assembly-0.1-SNAPSHOT.jar
[success] Total time: 1 s, completed Dec 7, 2020, 9:34:34 PM
Then, flink
CLI can be used to submit a job.
$ ./flink-1.11.2/bin/flink run -c net.petitviolet.SocketTextStreamWordCount ./target/scala-2.12/my-flink-project-assembly-0.1-SNAPSHOT.jar 127.0.0.1 9999
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/hiroki.komurasaki/.ghq/github.com/petitviolet/flink_sandbox/flink-1.11.2/lib/flink-dist_2.12-1.11.2.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID 4118fde2f00eb57f4af3207078898241
$ nc -lk 9999 # another session
To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,
What it looks like on the Flink WebUI are:
Note that if you're using Minikube to launch a Flink cluster, it's necessary to give -m <Flink WebUI url>
option to flink
CLI.
Summary
This post described how to launch Flink locally using CLI, Docker, and Kubernetes, and also how to run a streaming application that runs on top of Flink.