petitviolet blog

    ksqlDB first impression

    2020-05-08

    Apache Kafka

    Recently, I've listened a couple of podcast episodes about ksqlDB.

    @MichaelDrogalis had talks about ksqlDB, architecture, usecases, etc. It sounds really exciting to me :)

    What is ksqlDB?

    GitHub repository: confluentinc/ksql
    ksqlDB is a streaming SQL processor for Apache Kafka built by Confluent as a successor of kSQL. According to the shownotes of the episodes:

    Kafka's pubsub interface for writing and reading topics is not ideal for all of these applications, which has led to the creation of ksqlDB, a database system built for streaming applications that uses Kafka as the underlying infrastructure for storing data.

    Building applications on top of unbounded event streams is a complex endeavor, requiring careful integration of multiple disparate systems that were engineered in isolation. The ksqlDB project was created to address this state of affairs by building a unified layer on top of the Kafka ecosystem for stream processing. Developers can work with the SQL constructs that they are familiar with while automatically getting the durability and reliability that Kafka offers.

    Also see ksqlDB Concepts which describes its concept; collections, events, queries, etc. More detailed internal architecture is described in ksqlDB Architecture.

    Getting started

    The official quickstart document is a good resource to get started.
    ksqlDB: The event streaming database purpose-built for stream processing applications.
    ksqlDB Examples have further examples.

    The quickstart doc shows it uses docker-compose to launch ksqlDB along with Kafka internally.

    $ docker-compose ps
        Name                 Command            State                     Ports
    ----------------------------------------------------------------------------------------------
    broker          /etc/confluent/docker/run   Up      0.0.0.0:29092->29092/tcp, 9092/tcp
    ksqldb-cli      /bin/sh                     Up
    ksqldb-server   /usr/bin/docker/run         Up      0.0.0.0:8088->8088/tcp
    zookeeper       /etc/confluent/docker/run   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
    
    $ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
    
                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =  Event Streaming Database purpose-built =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2019 Confluent Inc.
    
    CLI v0.8.1, Server v0.8.1 located at http://ksqldb-server:8088
    
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
    
    ksql>
    

    Install kafka shell clients, and then create a topic.

    $ wget https://downloads.apache.org/kafka/2.4.1/kafka_2.12-2.4.1.tgz
    $ tar -xzf kafka_2.12-2.4.1.tgz
    $ ./kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic weblogs --partitions 1 --replication-factor 1
    
    Created topic weblogs.
    $ ./kafka_2.12-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic weblogs
    
    Topic: weblogs  PartitionCount: 1       ReplicationFactor: 1    Configs:
            Topic: weblogs  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    

    Actually, this was my first time to launch kafka cluster.

    Collections in ksqlDB

    ksqlDB offeres two ways to manage data sources in an immutable manner and a mutable one.

    Streams are immutable, append-only collections. They're useful for representing a series of historical facts. Adding multiple events with the same key means that they are simply appended to the end of the stream. Tables are mutable collections. They let you represent the latest version of each value per key. They're helpful for modeling change over time, and they're often used to represent aggregations. https://docs.ksqldb.io/en/latest/

    We might say "Streams" is similar to Kafka topics and "Tables" is a kind of RDBMS one, and both of two are built on top of Kafka topics.

    Create stream

    To create a ksql stream, use create stream statement with specifying 'kafka_topic'. "Stream" stands for a source of immutable event stream as described in the previous section.

    ksql> create stream weblogs(time bigint, user_agent varchar, path varchar, status int) 
          with (kafka_topic='weblogs', value_format='json', timestamp='time');
    
     Message
    ----------------
     Stream created
    ----------------
    
    ksql> describe extended weblogs;
    
    Name                 : WEBLOGS
    Type                 : STREAM
    Key field            :
    Timestamp field      : TIME
    Key format           : KAFKA
    Value format         : JSON
    Kafka topic          : weblogs (partitions: 1, replication: 1)
    Statement            : create stream weblogs(time bigint, user_agent varchar, path varchar, status int) with (kafka_topic='weblogs', value_format='json', timestamp='time');
    
     Field      | Type
    ----------------------------------------
     ROWTIME    | BIGINT           (system)
     ROWKEY     | VARCHAR(STRING)  (system)
     TIME       | BIGINT
     USER_AGENT | VARCHAR(STRING)
     PATH       | VARCHAR(STRING)
     STATUS     | INTEGER
    ----------------------------------------
    
    Local runtime statistics
    ------------------------
    
    
    (Statistics of the local KSQL server interaction with the Kafka topic weblogs)
    

    If the underlying Kafka topic does not exist, with specifying replicas and partitions in the create stream statement will create the underlying Kafka topic at the same time. Then, we can read the stream using select statment just as SQL.

    ksql> select * from weblogs where status != 200 emit changes;
    +-----------+-------+-----------+-----------+-------+-------+
    |ROWTIME    |ROWKEY |TIME       |USER_AGENT |PATH   |STATUS |
    

    By adding emit changes suffix to the query, it starts to read the stream. Next, launch another session and insert some records into the stream using insert into statement.

    ksql> insert into weblogs(time, user_agent, path, status) values (1586665380, 'Firefox', '/index', 500);
    ksql> insert into weblogs(time, user_agent, path, status) values (1586665380, 'Firefox', '/index', 200);
    ksql> insert into weblogs(time, user_agent, path, status) values (1586665381, 'Chrome', '/index', 404);
    

    Those records are inserted into the stream "weblogs" so that the read session look like as following.

    ksql> select * from weblogs where status != 200 emit changes;
    +-----------+-------+-----------+-----------+-------+-------+
    |ROWTIME    |ROWKEY |TIME       |USER_AGENT |PATH   |STATUS |
    +-----------+-------+-----------+-----------+-------+-------+
    |1586665380 |null   |1586665380 |Firefox    |/index |500    |
    |1586665381 |null   |1586665381 |Chrome     |/index |404    |
    

    We can hit Ctrl-C to stop reading the stream.

    Create table from a stream

    To create a table which is a mutable collection, use create table statement.
    Create a ksqlDB Table
    Data source for ksqlDB tables are:

    • Kafka topic whether or not already exist
      • Can create underlying Kafka topic at the same time
    • Streaming query result
    • ksqlDB stream

    I use the second option in this post.

    ksql> create table ok_weblogs as select user_agent, path, status, count(*) as count 
          from weblogs window tumbling (size 3 minutes) 
          where status >= 200 and status < 400 
          group by user_agent, path, status 
          emit changes;
    
     Message
    -----------------------------------------
     Created query with ID CTAS_OK_WEBLOGS_0
    -----------------------------------------
    
    ksql> describe extended ok_weblogs;
    
    Name                 : OK_WEBLOGS
    Type                 : TABLE
    Key field            :
    Timestamp field      : Not set - using <ROWTIME>
    Key format           : KAFKA
    Value format         : JSON
    Kafka topic          : OK_WEBLOGS (partitions: 1, replication: 1)
    Statement            : CREATE TABLE OK_WEBLOGS WITH (KAFKA_TOPIC='OK_WEBLOGS', PARTITIONS=1, REPLICAS=1) AS SELECT
      WEBLOGS.USER_AGENT USER_AGENT,
      WEBLOGS.PATH PATH,
      WEBLOGS.STATUS STATUS,
      COUNT(*) COUNT
    FROM WEBLOGS WEBLOGS
    WINDOW TUMBLING ( SIZE 3 MINUTES )
    WHERE ((WEBLOGS.STATUS >= 200) AND (WEBLOGS.STATUS < 400))
    GROUP BY WEBLOGS.USER_AGENT, WEBLOGS.PATH, WEBLOGS.STATUS
    EMIT CHANGES;
    
     Field      | Type
    ----------------------------------------------------------------
     ROWTIME    | BIGINT           (system)
     ROWKEY     | VARCHAR(STRING)  (system) (Window type: TUMBLING)
     USER_AGENT | VARCHAR(STRING)
     PATH       | VARCHAR(STRING)
     STATUS     | INTEGER
     COUNT      | BIGINT
    ----------------------------------------------------------------
    
    Queries that write from this TABLE
    -----------------------------------
    CTAS_OK_WEBLOGS_0 (RUNNING) : CREATE TABLE OK_WEBLOGS WITH (KAFKA_TOPIC='OK_WEBLOGS', PARTITIONS=1, REPLICAS=1) AS SELECT  WEBLOGS.USER_AGENT USER_AGENT,  WEBLOGS.PATH PATH,  WEBLOGS.STATUS STATUS,  COUNT(*) COUNTFROM WEBLOGS WEBLOGSWINDOW TUMBLING ( SIZE 3 MINUTES ) WHERE ((WEBLOGS.STATUS >= 200) AND (WEBLOGS.STATUS < 400))GROUP BY WEBLOGS.USER_AGENT, WEBLOGS.PATH, WEBLOGS.STATUSEMIT CHANGES;
    
    For query topology and execution plan please run: EXPLAIN <QueryId>
    
    Local runtime statistics
    ------------------------
    
    
    (Statistics of the local KSQL server interaction with the Kafka topic OK_WEBLOGS)
    

    Tumbling windows are a series of fixed-sized, non-overlapping, and contiguous time intervals. As you might aware of, an interesting thing is that the create table ok_weblogs as select... query created a query named CTAS_OK_WEBLOGS_0. Use show queries to show the details of the query.

    ksql> show queries;
     Query ID           | Status  | Sink Name  | Sink Kafka Topic | Query String
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     CTAS_OK_WEBLOGS_0 | RUNNING | OK_WEBLOGS | OK_WEBLOGS       | CREATE TABLE OK_WEBLOGS WITH (KAFKA_TOPIC='OK_WEBLOGS', PARTITIONS=1, REPLICAS=1) AS SELECT  WEBLOGS.USER_AGENT USER_AGENT,  WEBLOGS.PATH PATH,  WEBLOGS.STATUS STATUS,  COUNT(*) COUNTFROM WEBLOGS WEBLOGSWINDOW TUMBLING ( SIZE 3 MINUTES ) WHERE ((WEBLOGS.STATUS >= 200) AND (WEBLOGS.STATUS < 400))GROUP BY WEBLOGS.USER_AGENT, WEBLOGS.PATH, WEBLOGS.STATUSEMIT CHANGES;
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    For detailed information on a Query run: EXPLAIN <Query ID>;
    

    According to this query, the create table statement using ksqlDB stream as its data source actually creates a query along with underlying kafka topic. If you are interested in the more detail, use explain CTAS_OK_WEBLOGS_0 for that.

    I'm going to insert records into the stream. Instead of using insert into statement, in this case, pushing messages to underlying kafka topic directly via kafka-console-producer.sh which is included in the Kafka pack.

    $ ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh \
      --broker-list localhost:29092 \
      --topic weblogs \
      --property parse.key=true \
      --property key.separator=:
    >key_a-1:{"time":1585699200000,"user_agent":"chrome","path":"/index","status":200} # 2020-04-01 09:00:00
    >key_a-1:{"time":1585699260000,"user_agent":"chrome","path":"/index","status":404} # 2020-04-01 09:01:00
    >key_a-1:{"time":1585699320000,"user_agent":"chrome","path":"/index","status":200} # 2020-04-01 09:02:00
    >key_b-1:{"time":1585699500000,"user_agent":"chrome","path":"/index","status":200} # 2020-04-01 09:05:00
    >key_b-1:{"time":1585699560000,"user_agent":"chrome","path":"/index","status":404} # 2020-04-01 09:06:00
    >key_b-1:{"time":1585699620000,"user_agent":"chrome","path":"/index","status":200} # 2020-04-01 09:07:00
    >
    >key_a-2:{"time":1585699380000,"user_agent":"firefox","path":"/users","status":200} # 2020-04-01 09:03:00
    >key_a-2:{"time":1585699440000,"user_agent":"firefox","path":"/users","status":200} # 2020-04-01 09:04:00
    >key_a-2:{"time":1585699500000,"user_agent":"firefox","path":"/hoge!","status":200} # 2020-04-01 09:05:00
    >key_b-2:{"time":1585699680000,"user_agent":"firefox","path":"/users","status":200} # 2020-04-01 09:08:00
    >key_b-2:{"time":1585699740000,"user_agent":"firefox","path":"/users","status":200} # 2020-04-01 09:09:00
    >key_b-2:{"time":1585699800000,"user_agent":"firefox","path":"/hoge!","status":200} # 2020-04-01 09:10:00
    >
    >key_a-3:{"time":1585699560000,"user_agent":"vivaldi","path":"/index","status":200} # 2020-04-01 09:06:00
    >key_b-3:{"time":1585699860000,"user_agent":"vivaldi","path":"/index","status":200} # 2020-04-01 09:11:00
    

    These events are ingested into the kafka topic 'weblogs', and go to 'weblogs' stream in ksqlDb, at last, get into the 'ok_weblogs' table. Read records from the table using select statement with emit changes as well as reading stream.

    ksql> select rowkey, 
                 TIMESTAMPTOSTRING(windowstart, 'yyyy-MM-dd HH:mm:ss') as window_start, 
                 TIMESTAMPTOSTRING(windowend, 'yyyy-MM-dd HH:mm:ss') as window_end, count 
          from ok_weblogs 
          emit changes;
    +-----------------------+---------------------+---------------------+-------+
    |ROWKEY                 |WINDOW_START         |WINDOW_END           |COUNT  |
    +-----------------------+---------------------+---------------------+-------+
    |chrome|+|/index|+|200  |2020-04-01 00:00:00  |2020-04-01 00:03:00  |2      |
    |chrome|+|/index|+|200  |2020-04-01 00:03:00  |2020-04-01 00:06:00  |1      |
    |chrome|+|/index|+|200  |2020-04-01 00:06:00  |2020-04-01 00:09:00  |1      |
    |firefox|+|/users|+|200  |2020-04-01 00:03:00  |2020-04-01 00:06:00  |2      |
    |firefox|+|/hoge!|+|200  |2020-04-01 00:03:00  |2020-04-01 00:06:00  |1      |
    |firefox|+|/users|+|200  |2020-04-01 00:06:00  |2020-04-01 00:09:00  |1      |
    |firefox|+|/users|+|200  |2020-04-01 00:09:00  |2020-04-01 00:12:00  |1      |
    |firefox|+|/hoge!|+|200  |2020-04-01 00:09:00  |2020-04-01 00:12:00  |1      |
    |vivaldi|+|/index|+|200 |2020-04-01 00:06:00  |2020-04-01 00:09:00  |1      |
    |vivaldi|+|/index|+|200 |2020-04-01 00:09:00  |2020-04-01 00:12:00  |1      |
    

    Inserted records are processed based on the window time of the table, tumbling (size 3 minutes) in this example.

    Query Push vs Pull

    Using emit changes called Push Query in ksqlDB. ksqlDB also offers Pull Query, but there are some limitations to use pull query. If we submit a query without emit changes on weblogs stream, we'll get such results:

    ksql> select * from weblogs;
    'WEBLOGS' is not materialized. Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause
     KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT <fields>, <aggregate_functions> FROM <sources> GROUP BY <key>' style statement.
    Query syntax in KSQL has changed. There are now two broad categories of queries:
    - Pull queries: query the current state of the system, return a result, and terminate.
    - Push queries: query the state of the system in motion and continue to output results until they meet a LIMIT condition or are terminated by the user.
    
    'EMIT CHANGES' is used to to indicate a query is a push query. To convert a pull query into a push query, which was the default behavior in older versions of KSQL, add `EMIT CHANGES` to the end of the statement before any LIMIT clause.
    
    For example, the following are pull queries:
            'SELECT * FROM X WHERE ROWKEY=Y;' (non-windowed table)
            'SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>=Z;' (windowed table)
    
    The following is a push query:
            'SELECT * FROM X EMIT CHANGES;'
    
    Note: Persistent queries, e.g. `CREATE TABLE AS ...`, have an implicit `EMIT CHANGES`, but we recommend adding `EMIT CHANGES` to these statements for clarify.
    

    As the error message shows, pull query can be submitted on materialized something. It is table. Submitting a pull query on a table requires filtering by the key column.

    ksql> select * from ok_weblogs;
    Missing WHERE clause. Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause
    Pull queries require a WHERE clause that:
     - limits the query to a single key, e.g. `SELECT * FROM X WHERE <key-column>>=Y;`.
     - limits the time bounds of the windowed table. This can be:
        + a single window lower bound, e.g. `WHERE WINDOWSTART = z`, or
        + a range, e.g. `WHERE a <= WINDOWSTART AND WINDOWSTART < b
    WINDOWSTART currently supports operators: [EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL]
    WINDOWSTART currently comparison with epoch milliseconds or a datetime string in the form: yyyy-MM-dd'T'HH:mm:ss.SSS with an optional numeric 4-digit timezone, e.g. '+0100'
    

    Then, add where clause to follow the constraints.

    ksql> select rowkey, count from ok_weblogs where rowkey = 'firefox|+|/hoge!|+|200';
    +-----------------------+------+
    |ROWKEY                 |COUNT |
    +-----------------------+------+
    |firefox|+|/hoge!|+|200  |1     |
    |firefox|+|/hoge!|+|200  |1     |
    Query terminated
    

    The rowkey looks like user_agent, path and status are joined with |+| that are given as group by clause in create table statement.

    Thoughts

    ksqlDB offers two ways to manage event stream as Streams and Tables on top of Kafka which is well-known durable distributed queue. It sounds really nice to me that we can use SQL to extract data from Kafka via ksqlDB regardless of collection type.
    This would be useful to build CQRS architecture which has two storage systems for Command and Read, the former needs to be durable and high throughput for event ingestions, the latter needs to source the events ingested into the Command storage and also should be optimized for read. Kafka is designed for the command system, and ksqlDB's stream uses it as its backend. ksqlDB also offeres Table to represent mutable data sourced from Streams that we can see it as desined for Read system in CQRS.