Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active February 10, 2020 23:41
Show Gist options
  • Select an option

  • Save rmoff/7efa882dfd808dbab4eb7b8e6f9eda16 to your computer and use it in GitHub Desktop.

Select an option

Save rmoff/7efa882dfd808dbab4eb7b8e6f9eda16 to your computer and use it in GitHub Desktop.

Revisions

  1. rmoff revised this gist Feb 28, 2018. No changes.
  2. rmoff revised this gist Feb 28, 2018. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions live-coding-ksql.adoc
    Original file line number Diff line number Diff line change
    @@ -8,6 +8,8 @@ v1.00, 28 Feb 2018

    == Introduction

    image::https://www.dropbox.com/s/asqkfbtexpnu8na/live-ksql-01.png?dl=0&raw=1[Diagram of what's being built with the KSQL demo]

    This code file accompanies the slides and recording available online at : https://www.confluent.io/online-talks/

    Don't forget to check out the #ksql channel on our https://slackpass.io/confluentcommunity[Community Slack group]
  3. rmoff revised this gist Feb 28, 2018. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions live-coding-ksql.adoc
    Original file line number Diff line number Diff line change
    @@ -20,6 +20,8 @@ Built using https://www.confluent.io/download/[Confluent Platform] 4.0 and https

    === Start a `screen` session

    https://en.wikipedia.org/wiki/GNU_Screen[`screen`] is not obligatory, but very useful :)

    `screen -S KSQL`

    <<<
  4. rmoff revised this gist Feb 28, 2018. No changes.
  5. rmoff revised this gist Feb 28, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion live-coding-ksql.adoc
    Original file line number Diff line number Diff line change
    @@ -12,7 +12,7 @@ This code file accompanies the slides and recording available online at : https:

    Don't forget to check out the #ksql channel on our https://slackpass.io/confluentcommunity[Community Slack group]

    ——_Robin Moffatt https://twitter.com/rmoff/[@rmoff]_, 28 Feb 2018
    ——_Robin Moffatt https://twitter.com/rmoff/[@rmoff], 28 Feb 2018_

    == Pre-demo

  6. rmoff revised this gist Feb 28, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion live-coding-ksql.adoc
    Original file line number Diff line number Diff line change
    @@ -12,7 +12,7 @@ This code file accompanies the slides and recording available online at : https:

    Don't forget to check out the #ksql channel on our https://slackpass.io/confluentcommunity[Community Slack group]

    _Robin Moffatt https://twitter.com/rmoff/[@rmoff]_
    ——_Robin Moffatt https://twitter.com/rmoff/[@rmoff]_, 28 Feb 2018

    == Pre-demo

  7. rmoff revised this gist Feb 28, 2018. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions live-coding-ksql.adoc
    Original file line number Diff line number Diff line change
    @@ -12,6 +12,8 @@ This code file accompanies the slides and recording available online at : https:

    Don't forget to check out the #ksql channel on our https://slackpass.io/confluentcommunity[Community Slack group]

    _Robin Moffatt https://twitter.com/rmoff/[@rmoff]_

    == Pre-demo

    Built using https://www.confluent.io/download/[Confluent Platform] 4.0 and https://github.com/confluentinc/ksql/[KSQL 0.5].
  8. rmoff revised this gist Feb 28, 2018. 1 changed file with 6 additions and 0 deletions.
    6 changes: 6 additions & 0 deletions live-coding-ksql.adoc
    Original file line number Diff line number Diff line change
    @@ -6,6 +6,12 @@ v1.00, 28 Feb 2018

    :toc:

    == Introduction

    This code file accompanies the slides and recording available online at : https://www.confluent.io/online-talks/

    Don't forget to check out the #ksql channel on our https://slackpass.io/confluentcommunity[Community Slack group]

    == Pre-demo

    Built using https://www.confluent.io/download/[Confluent Platform] 4.0 and https://github.com/confluentinc/ksql/[KSQL 0.5].
  9. rmoff revised this gist Feb 28, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion live-coding-ksql.adoc
    Original file line number Diff line number Diff line change
    @@ -8,7 +8,7 @@ v1.00, 28 Feb 2018

    == Pre-demo

    Built using Confluent Platform 4.0 and KSQL 0.5.
    Built using https://www.confluent.io/download/[Confluent Platform] 4.0 and https://github.com/confluentinc/ksql/[KSQL 0.5].

    === Start a `screen` session

  10. rmoff created this gist Feb 28, 2018.
    1,034 changes: 1,034 additions & 0 deletions live-coding-ksql.adoc
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,1034 @@
    = Live Coding a KSQL Application
    :source-highlighter: pygments
    :doctype: book
    Robin Moffatt <@rmoff>
    v1.00, 28 Feb 2018

    :toc:

    == Pre-demo

    Built using Confluent Platform 4.0 and KSQL 0.5.

    === Start a `screen` session

    `screen -S KSQL`

    <<<
    === Start Confluent Platform

    [source,bash]
    ----
    confluent destroy
    confluent start
    ----

    [source,bash]
    ----
    Robin@asgard02 ~/c/confluent-4.0.0> confluent start
    Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/
    Starting zookeeper
    zookeeper is [UP]
    Starting kafka
    kafka is [UP]
    Starting schema-registry
    schema-registry is [UP]
    Starting kafka-rest
    kafka-rest is [UP]
    Starting connect
    connect is [UP]
    Robin@asgard02 ~/c/confluent-4.0.0>
    ----

    <<<
    === Populate Users topic


    `kafkacat -P -b localhost:9092 -t mysql_users -K ":"`

    Paste the data to `stdin`:

    ----
    1:{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
    2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
    3:{"uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"}
    4:{"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"}
    5:{"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"}
    6:{"uid":6,"name":"Hojjat","locale":"en_US","address_city":"Palo Alto","elite":"S"}
    7:{"uid":7,"name":"Damian","locale":"en_US","address_city":"London","elite":"S"}
    8:{"uid":8,"name":"Apurva","locale":"en_US","address_city":"Palo Alto","elite":"S"}
    9:{"uid":9,"name":"Neha","locale":"en_US","address_city":"Palo Alto","elite":"P"}
    10:{"uid":10,"name":"Mattias","locale":"DE","address_city":"Palo Alto","elite":"S"}
    11:{"uid":11,"name":"Gwen","locale":"en_US","address_city":"Palo Alto","elite":"P"}
    12:{"uid":12,"name":"Guozhang","locale":"en_US","address_city":"Palo Alto","elite":"S"}
    13:{"uid":13,"name":"Lu","locale":"en_US","address_city":"Palo Alto","elite":"P"}
    14:{"uid":14,"name":"Robin","locale":"en_US","address_city":"Ilkley","elite":"G"}
    2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"San Francisco","elite":"G"}
    2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"P"}
    ----

    Exit: `Ctrl-D`

    <<<
    === Inspect Users topic data

    `kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users`

    [source,bash]
    ----
    kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users
    [...]
    Key: 4
    Value: {"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"}
    Key: 5
    Value: {"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"}
    [...]
    ----

    <<<

    == Demo

    [NOTE]
    ====
    Create blank window in `screen`
    ====

    === Check Confluent is running

    `confluent status`

    [source,bash]
    ----
    Robin@asgard02 ~/c/confluent-4.0.0> confluent status
    Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/
    connect is [UP]
    kafka-rest is [UP]
    schema-registry is [UP]
    kafka is [UP]
    zookeeper is [UP]
    ----

    <<<
    === Run datagen - ratings

    `~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500`

    [source,bash]
    ----
    Robin@asgard02 ~/c/confluent-4.0.0> ~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500
    1 --> ([ 1 | 10 | 3 | 9297 | 1519302621843 | 'iOS' | 'your team here rocks!' ])
    2 --> ([ 2 | 10 | 1 | 3415 | 1519302622767 | 'android' | 'Surprisingly good, maybe you are getting your mojo back at long last!' ])
    3 --> ([ 3 | 17 | 2 | 5197 | 1519302623090 | 'web' | 'meh' ])
    4 --> ([ 4 | 9 | 3 | 9389 | 1519302623414 | 'android' | 'is this as good as it gets? really ?' ])
    ----

    [NOTE]
    ====
    Name `screen` window `ratings data`
    ====


    <<<
    === Start KSQL Server

    [source,bash]
    ----
    ~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties
    ----

    [source,bash]
    ----
    $ ~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties
    ===========================================
    = _ __ _____ ____ _ =
    = | |/ // ____|/ __ \| | =
    = | ' /| (___ | | | | | =
    = | < \___ \| | | | | =
    = | . \ ____) | |__| | |____ =
    = |_|\_\_____/ \___\_\______| =
    = =
    = Streaming SQL Engine for Apache Kafka® =
    ===========================================
    Copyright 2018 Confluent Inc.
    Server 0.5 listening on http://localhost:8090
    To access the KSQL CLI, run:
    ksql-cli remote http://localhost:8090
    To access the UI, point your browser at:
    http://localhost:8090/index.html
    ----

    [NOTE]
    ====
    Name `screen` window `KSQL Server`
    ====

    <<<
    === Launch KSQL CLI

    [NOTE]
    ====
    New `screen` window
    ====

    Connect to KSQL server running locally

    `~/git/ksql/bin/ksql-cli remote http://localhost:8090/`

    [NOTE]
    ====
    Name `screen` window `KSQL CLI`
    ====

    [source,bash]
    ----
    Robin@asgard02 ~> ~/git/ksql/bin/ksql-cli remote http://localhost:8090/
    ===========================================
    = _ __ _____ ____ _ =
    = | |/ // ____|/ __ \| | =
    = | ' /| (___ | | | | | =
    = | < \___ \| | | | | =
    = | . \ ____) | |__| | |____ =
    = |_|\_\_____/ \___\_\______| =
    = =
    = Streaming SQL Engine for Apache Kafka® =
    ===========================================
    Copyright 2018 Confluent Inc.
    CLI v0.5, Server v0.5 located at http://localhost:8090/
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
    ksql>
    ----

    <<<
    === See available Kafka topics

    `show topics;`

    [source,sql]
    ----
    ksql> show topics;
    Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
    -----------------------------------------------------------------------------------------------
    _schemas | false | 1 | 1 | 0 | 0
    connect-configs | false | 1 | 1 | 0 | 0
    connect-offsets | false | 25 | 1 | 0 | 0
    connect-statuses | false | 5 | 1 | 0 | 0
    ksql__commands | true | 1 | 1 | 0 | 0
    ratings | false | 1 | 1 | 0 | 0
    users | false | 1 | 1 | 0 | 0
    -----------------------------------------------------------------------------------------------
    ----

    <<<
    === Inspect a topic contents - Ratings

    [TIP]
    ====
    Don't need to know the format of the data. Can see column names and values.
    ====

    `PRINT 'ratings';`

    Explain TS/Key/Message concept

    [source,sql]
    ----
    ksql> PRINT 'ratings';
    Format:AVRO
    22/02/18 12:55:04 GMT, 5312, {"rating_id": 5312, "user_id": 4, "stars": 4, "route_id": 2440, "rating_time": 1519304104965, "channel": "web", "message": "Surprisingly good, maybe you are getting your mojo back at long last!"}
    22/02/18 12:55:05 GMT, 5313, {"rating_id": 5313, "user_id": 3, "stars": 4, "route_id": 6975, "rating_time": 1519304105213, "channel": "web", "message": "why is it so difficult to keep the bathrooms clean ?"}
    ----

    <<<
    === Inspect a topic contents - Users

    Don't need to know the format of the data. Can see column names and values.

    `PRINT 'mysql_users' FROM BEGINNING;`

    <<<
    === Tell KSQL to process from beginning of topic

    Process from beginning of topic

    `SET 'auto.offset.reset' = 'earliest';`

    [source,sql]
    ----
    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
    ----

    <<<
    === Register Ratings topic for querying

    `CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');`

    Why's it a stream? Because it's a continuous stream of *events*

    [source,sql]
    ----
    ksql> CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');
    Message
    ---------------
    Table created
    ---------------
    ----

    <<<
    === Describe ratings stream

    `DESCRIBE ratings;`

    Note :

    1. System columns for timestamp and key
    2. All the other columns have been picked up automagically - have not had to specify them

    [source,sql]
    ----
    ksql> DESCRIBE ratings;
    Field | Type
    -----------------------------------------
    ROWTIME | BIGINT (system)
    ROWKEY | VARCHAR(STRING) (system)
    RATING_ID | BIGINT
    USER_ID | INTEGER
    STARS | INTEGER
    ROUTE_ID | INTEGER
    RATING_TIME | BIGINT
    CHANNEL | VARCHAR(STRING)
    MESSAGE | VARCHAR(STRING)
    -----------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    ----

    <<<
    === Query ratings stream

    `SELECT * FROM ratings;`

    This is a continuous query!

    [source,sql]
    ----
    ksql> SELECT * FROM ratings;
    1519402268942 | 1 | 1 | 13 | 1 | 3700 | 1519402267832 | ios | airport refurb looks great, will fly outta here more!
    1519402269200 | 2 | 2 | 12 | 2 | 9907 | 1519402269200 | android | (expletive deleted)
    1519402269694 | 3 | 3 | 2 | 1 | 5421 | 1519402269694 | android | is this as good as it gets? really ?
    1519402269857 | 4 | 4 | 18 | 2 | 1462 | 1519402269856 | android | your team here rocks!
    ----

    Cancel the datagen task - note that the query stops.

    Restart the datagen task - query now continues to return data

    <<<
    === Filter the ratings stream

    `SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;`

    Note the use of `LIMIT` so that we just see a sample of the stream of data

    [source,sql]
    ----
    ksql> SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;
    1519402272247 | 13 | 13 | 9 | 1 | 2545 | 1519402272247 | iOS | more peanuts please
    1519402272750 | 14 | 14 | 0 | 2 | 4419 | 1519402272749 | iOS | airport refurb looks great, will fly outta here more!
    1519402273755 | 18 | 18 | 15 | 1 | 5306 | 1519402273755 | iOS | Surprisingly good, maybe you are getting your mojo back at long last!
    1519402278686 | 37 | 37 | 17 | 1 | 725 | 1519402278686 | iOS | meh
    1519402279186 | 39 | 39 | 10 | 1 | 6304 | 1519402279186 | iOS | (expletive deleted)
    LIMIT reached for the partition.
    Query terminated
    ksql>
    ----

    <<<
    === Persist a filtered stream

    ==== Create the stream

    Let's take the poor ratings from people with iOS devices, and create a new stream from them!

    [source,sql]
    ----
    CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
    ----

    [source,sql]
    ----
    ksql> CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    ----

    <<<
    ==== Inspect the stream

    [source,sql]
    ----
    DESCRIBE POOR_RATINGS;
    ----

    [source,sql]
    ----
    ksql> DESCRIBE POOR_RATINGS;
    Field | Type
    -----------------------------------------
    ROWTIME | BIGINT (system)
    ROWKEY | VARCHAR(STRING) (system)
    RATING_ID | BIGINT
    USER_ID | INTEGER
    STARS | INTEGER
    ROUTE_ID | INTEGER
    RATING_TIME | BIGINT
    CHANNEL | VARCHAR(STRING)
    MESSAGE | VARCHAR(STRING)
    -----------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    ksql>
    ----

    <<<
    ==== Inspect the stream further

    [source,sql]
    ----
    DESCRIBE EXTENDED POOR_RATINGS;
    ----

    [source,sql]
    ----
    ksql> DESCRIBE EXTENDED POOR_RATINGS;
    Type : STREAM
    Key field :
    Timestamp field : Not set - using <ROWTIME>
    Key format : STRING
    Value format : AVRO
    Kafka output topic : POOR_RATINGS (partitions: 4, replication: 1)
    Field | Type
    -----------------------------------------
    ROWTIME | BIGINT (system)
    ROWKEY | VARCHAR(STRING) (system)
    RATING_ID | BIGINT
    USER_ID | INTEGER
    STARS | INTEGER
    ROUTE_ID | INTEGER
    RATING_TIME | BIGINT
    CHANNEL | VARCHAR(STRING)
    MESSAGE | VARCHAR(STRING)
    -----------------------------------------
    Queries that write into this STREAM
    -----------------------------------
    id:CSAS_POOR_RATINGS - CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';

    For query topology and execution plan please run: EXPLAIN <QueryId>

    Local runtime statistics
    ------------------------
    messages-per-sec: 0.33 total-messages: 33 last-message: 27/02/18 11:28:58 GMT
    failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
    (Statistics of the local KSQL server interaction with the Kafka topic POOR_RATINGS)
    ksql>
    ----

    <<<
    ==== Query the stream

    [source,sql]
    ----
    SELECT * FROM POOR_RATINGS;
    ----

    [source,sql]
    ----
    ksql> SELECT * FROM POOR_RATINGS;
    1519730883770 | 1608 | 1608 | 18 | 2 | 829 | 1519730883770 | iOS | your team here rocks!
    1519730881132 | 1596 | 1596 | 6 | 2 | 3185 | 1519730881132 | iOS | thank you for the most friendly, helpful experience today at your new lounge
    1519730886192 | 1615 | 1615 | 19 | 1 | 9409 | 1519730886192 | iOS | more peanuts please
    1519730880186 | 1591 | 1591 | 7 | 1 | 9830 | 1519730880186 | iOS | is this as good as it gets? really ?
    1519730894709 | 1655 | 1655 | 7 | 2 | 6036 | 1519730894709 | iOS | meh
    ----

    <<<
    ==== See the Kafka Topic

    From the command line, use the standard Kafka tools to interact with the new stream -- it's just a Kafka topic!

    [source,bash]
    ----
    kafka-topics --zookeeper localhost:2181 --list
    ----

    [source,bash]
    ----
    Robin@asgard02 ~> kafka-topics --zookeeper localhost:2181 --list
    POOR_RATINGS
    __consumer_offsets
    _schemas
    connect-configs
    connect-offsets
    connect-statuses
    ksql__commands
    mysql_users
    ratings
    ----

    <<<
    ==== Inspect the Kafka topic's data

    [source,bash]
    ----
    kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic POOR_RATINGS --from-beginning | jq '.'
    ----

    [source,bash]
    ----
    Robin@asgard02 ~> kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic POOR_RATINGS --from-beginning | jq '.'
    {
    "RATING_ID": {
    "long": 1615
    },
    "USER_ID": {
    "int": 19
    },
    "STARS": {
    "int": 1
    },
    "ROUTE_ID": {
    "int": 9409
    },
    "RATING_TIME": {
    "long": 1519730886192
    },
    "CHANNEL": {
    "string": "iOS"
    },
    "MESSAGE": {
    "string": "more peanuts please"
    }
    }
    ----


    <<<
    === Joining Data in KSQL

    Remember our Users data? Let's bring that into play, and use it to enrich the inbound stream of ratings data.

    <<<
    ==== Inspect Users Data

    Let's check the data first, using the very handy `PRINT` command:

    `PRINT 'mysql_users' FROM BEGINNING;`

    [source,sql]
    ----
    ksql> PRINT 'mysql_users' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1519640382207,"ROWKEY":"1","uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
    {"ROWTIME":1519640382207,"ROWKEY":"2","uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
    {"ROWTIME":1519640382207,"ROWKEY":"3","uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"}
    ----

    <<<
    ==== Create Users Table

    Now, create a `TABLE` over the Kafka topic. Why's it a table? Because for each key (user id), we want to know its value (name, address, etc)

    `CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid');`

    Note that we've specified the `KEY` here, which must match the key of the Kafka message too.
    ----
    ksql> CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid');
    Message
    ---------------
    Table created
    ---------------
    ksql>
    ----

    <<<
    ==== Stream-Table join (1)

    Now let's join our ratings data, which includes user ID, to our user information:

    Basics to start with -- rating message plus the user's name.

    Couple of things to note:
    * We're aliasing the table and stream names to make column names unambiguous
    * I'm using the backspace line continuation character

    [source,sql]
    ----
    SELECT R.MESSAGE, U.NAME \
    FROM RATINGS R LEFT JOIN USERS U \
    ON R.USER_ID = U.UID \
    LIMIT 5;
    ----

    [source,sql]
    ----
    ksql> SELECT R.MESSAGE, U.NAME \
    > FROM RATINGS R LEFT JOIN USERS U \
    > ON R.USER_ID = U.UID \
    > LIMIT 5;
    airport refurb looks great, will fly outta here more! | Damian
    Exceeded all my expectations. Thank you ! | Neil
    (expletive deleted) | Lu
    why is it so difficult to keep the bathrooms clean ? | Apurva
    thank you for the most friendly, helpful experience today at your new lounge | Cliff
    LIMIT reached for the partition.
    Query terminated
    ksql>
    ----

    <<<
    ==== Stream-Table join (2)

    Now let's pull the full set of data, including a reformat of the timestamp into something human readable:

    Note the `IS NOT NULL` clause to filter out any ratings with no corresponding user data

    [source,sql]
    ----
    SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \
    R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
    FROM RATINGS R LEFT JOIN USERS U \
    ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
    ----

    [source,sql]
    ----
    ksql> SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \
    > R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
    > FROM RATINGS R LEFT JOIN USERS U \
    > ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
    18196 | 1 | 1790 | 2018-02-26 12:42:38 | iOS-test | Surprisingly good, maybe you are getting your mojo back at long last! | Robin | Ilkley | G
    18197 | 4 | 2862 | 2018-02-26 12:42:38 | iOS-test | your team here rocks! | Jeremy | Austin | P
    18198 | 1 | 6954 | 2018-02-26 12:42:39 | ios | Exceeded all my expectations. Thank you ! | Nick | Palo Alto | P
    18199 | 1 | 2092 | 2018-02-26 12:42:39 | ios | (expletive deleted) | Neil | London | G
    18200 | 2 | 6042 | 2018-02-26 12:42:39 | ios | more peanuts please | Neil | London | G
    18202 | 2 | 1133 | 2018-02-26 12:42:39 | iOS-test | thank you for the most friendly, helpful experience today at your new lounge | Neha | Palo Alto | P
    18205 | 3 | 4086 | 2018-02-26 12:42:40 | ios | meh | Neil | London | G
    18206 | 1 | 9217 | 2018-02-26 12:42:41 | web | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P
    18207 | 3 | 2655 | 2018-02-26 12:42:41 | android | more peanuts please | Neha | Palo Alto | P
    18209 | 3 | 2086 | 2018-02-26 12:42:42 | iOS-test | meh | Lu | Palo Alto | P
    18210 | 2 | 1629 | 2018-02-26 12:42:42 | web | airport refurb looks great, will fly outta here more! | Neha | Palo Alto | P
    ^CQuery terminated
    ----

    <<<
    ==== Stream-Table join (3)

    Let's persist this as an enriched stream:

    [source,sql]
    ----
    CREATE STREAM RATINGS_FULL AS \
    SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \
    R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
    FROM RATINGS R LEFT JOIN USERS U \
    ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
    ----

    [source,sql]
    ----
    ksql> CREATE STREAM RATINGS_FULL AS \
    > SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \
    > R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
    > FROM RATINGS R LEFT JOIN USERS U \
    > ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    ksql>
    ----

    <<<
    === Filtering an enriched stream

    Which of our Premier customers are not happy?

    `SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;`

    [source,sql]
    ----
    ksql> SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
    1519649077925 | 2 | 18684 | 1 | 3895 | 2018-02-26 12:44:37 | ios | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P
    1519649078280 | 13 | 18685 | 1 | 4088 | 2018-02-26 12:44:38 | iOS | more peanuts please | Lu | Palo Alto | P
    1519649079664 | 1 | 18691 | 2 | 8933 | 2018-02-26 12:44:39 | iOS-test | more peanuts please | Cliff | St Louis | P
    1519649084065 | 13 | 18709 | 1 | 4366 | 2018-02-26 12:44:44 | iOS-test | airport refurb looks great, will fly outta here more! | Lu | Palo Alto | P
    1519649084603 | 3 | 18712 | 2 | 744 | 2018-02-26 12:44:44 | ios | worst. flight. ever. #neveragain | Jeremy | Austin | P
    ^CQuery terminated
    ----

    <<<
    ==== Persist the filtered & enriched stream

    `CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;`

    [source,sql]
    ----
    ksql> CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    ksql>
    ----

    <<<
    ==== Query the new stream

    `SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;`

    [source,sql]
    ----
    ksql> SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;
    1 | why is it so difficult to keep the bathrooms clean ? | Nick
    1 | is this as good as it gets? really ? | Jeremy
    1 | thank you for the most friendly, helpful experience today at your new lounge | Jeremy
    1 | your team here rocks! | Lu
    1 | thank you for the most friendly, helpful experience today at your new lounge | Lu
    1 | why is it so difficult to keep the bathrooms clean ? | Jeremy
    ----

    <<<
    ==== View the underlying topic data

    [source,bash]
    ----
    kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic UNHAPPY_VIPS --from-beginning
    ----

    [source,bash]
    ----
    Robin@asgard02 ~> kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic UNHAPPY_VIPS --from-beginning
    {"RATING_ID":{"long":25},"STARS":{"int":1},"ROUTE_ID":{"int":1548},"KSQL_COL_3":{"string":"2018-02-27 11:21:21"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"thank you for the most friendly, helpful experience today at your new lounge"},"NAME":{"string":"Neha"},"ADDRESS_CITY":{"string":"Palo Alto"},"ELITE":{"string":"P"}}
    {"RATING_ID":{"long":31},"STARS":{"int":2},"ROUTE_ID":{"int":451},"KSQL_COL_3":{"string":"2018-02-27 11:21:22"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"Exceeded all my expectations. Thank you !"},"NAME":{"string":"Cliff"},"ADDRESS_CITY":{"string":"St Louis"},"ELITE":{"string":"P"}}
    ----

    <<<
    === Streaming Aggregates

    Explain windowing

    * Tumbling (e.g. every 5 minutes : 00:00, 00:05, 00:10)
    * Hopping (e.g. every 5 minutes, advancing 1 minute: 00:00-00:05, 00:01-00:06)
    * Session (Sets a timeout for the given key, after which any new data is treated as a new session)

    <<<
    ==== Running Count per Minute

    Show count of ratings per City, per minute

    [source,sql]
    ----
    SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
    FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
    GROUP BY ADDRESS_CITY;
    ----

    [source,sql]
    ----
    ksql> SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
    > FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
    > GROUP BY ADDRESS_CITY;
    Geneva | 11
    Geneva | 11
    Geneva | 14
    Geneva | 10
    Geneva | 19
    ----

    <<<
    ==== CREATE TABLE

    Let's persist that into a TABLE. So far we've only worked with a STREAM. A table gives the state of a given key at a given point in time. So here, for each city, at each minute window, what's the total count

    [source,sql]
    ----
    CREATE TABLE RATINGS_BY_CITY AS \
    SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
    FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
    GROUP BY ADDRESS_CITY;
    ----

    [source,sql]
    ----
    ksql> CREATE TABLE RATINGS_BY_CITY AS \
    > SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
    > FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
    > GROUP BY ADDRESS_CITY;
    Message
    ---------------------------
    Table created and running
    ---------------------------
    ksql>
    ----

    <<<
    ==== Examine the created table's columns

    `DESCRIBE RATINGS_BY_CITY;`

    Note that the city is denoted as `(key)`

    [source,sql]
    ----
    ksql> DESCRIBE RATINGS_BY_CITY;
    Field | Type
    ------------------------------------------
    ROWTIME | BIGINT (system)
    ROWKEY | VARCHAR(STRING) (system)
    ADDRESS_CITY | VARCHAR(STRING) (key)
    RATING_COUNT | BIGINT
    ------------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    ksql>
    ----

    Point out the system columns - `ROWTIME` and `ROWKEY`.

    <<<
    ==== Examine the contents of the new table's columns


    `SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;`

    [source,sql]
    ----
    ksql> SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;
    1519730460000 | Geneva : Window{start=1519730460000 end=-} | Geneva | 11
    1519730460000 | Ilkley : Window{start=1519730460000 end=-} | Ilkley | 11
    1519730520000 | Geneva : Window{start=1519730520000 end=-} | Geneva | 11
    1519730460000 | St Louis : Window{start=1519730460000 end=-} | St Louis | 9
    1519730580000 | Geneva : Window{start=1519730580000 end=-} | Geneva | 14
    ----

    <<<
    ==== Using Functions like `TIMESTAMPTOSTRING`

    KSQL comes with a bunch of functions, both scalar and aggregate (like `COUNT` which we saw previously).

    Let's convert the `ROWTIME` epoch value to a more readable one:

    `SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;`

    [source,sql]
    ----
    ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
    2018-02-27 11:21:00 | London | 25
    2018-02-27 11:21:00 | Palo Alto | 71
    2018-02-27 11:22:00 | Palo Alto | 89
    2018-02-27 11:22:00 | London | 24
    2018-02-27 11:23:00 | London | 25
    ----

    <<<
    ==== Filtering the aggregate table

    This table is just a first class object in KSQL, that we can query and filter as any other:

    `SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';`

    [source,sql]
    ----
    ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';
    2018-02-27 11:21:00 | Ilkley | 11
    2018-02-27 11:22:00 | Ilkley | 6
    2018-02-27 11:23:00 | Ilkley | 13
    2018-02-27 11:24:00 | Ilkley | 14
    2018-02-27 11:25:00 | Ilkley | 15
    ----

    Note how aggregates update within the current window.

    <<<
    ==== Check out the Kafka topic

    Let's step out of KSQL for a moment. When you _create_ a `STREAM or `TABLE` in KSQL, it is backed by a Kafka topic. Let's check this out:

    `kafka-topics --zookeeper localhost:2181 --list|grep RATINGS`

    [source,bash]
    ----
    Robin@asgard02 ~/c/confluent-4.0.0> kafka-topics --zookeeper localhost:2181 --list|grep RATINGS
    RATINGS_BY_CITY
    ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-changelog
    ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-repartition
    [...]
    ----

    And it's just a Kafka topic, that we can consume from just as any other:

    [source,bash]
    ----
    kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic RATINGS_BY_CITY --from-beginning -max-messages 5| jq '.'
    ----

    [source,bash]
    ----
    Robin@asgard02 ~> kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic RATINGS_BY_CITY2 --from-beginning -max-messages 5| jq '.'
    {
    "ADDRESS_CITY": {
    "string": "London"
    },
    "RATING_COUNT": {
    "long": 25
    }
    }
    {
    "ADDRESS_CITY": {
    "string": "Palo Alto"
    },
    "RATING_COUNT": {
    "long": 71
    }
    }
    ----

    Where's the window? Well that's a system column that's part of the message key. If we wanted to expose it further, we could do a `CREATE TABLE` based on that `TIMESTAMPTOSTRING` function which exposes the system column

    <<<
    ==== Exposing Aggregate Window Keys

    `CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;`

    [source,sql]
    ----
    ksql> CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
    Message
    ---------------------------
    Table created and running
    ---------------------------
    ksql>
    ----

    <<<
    ==== Examine table with Timestamp exposed

    `DESCRIBE RATINGS_BY_CITY_TS;`

    [source,sql]
    ----
    ksql> describe RATINGS_BY_CITY_TS;
    Field | Type
    ---------------------------------------------
    ROWTIME | BIGINT (system)
    ROWKEY | VARCHAR(STRING) (system)
    WINDOW_START_TS | VARCHAR(STRING)
    ADDRESS_CITY | VARCHAR(STRING) (key)
    RATING_COUNT | BIGINT
    ---------------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    ----

    <<<
    ==== Look at the data

    `SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;`

    [source,sql]
    ----
    ksql> SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;
    1519730460000 | Geneva : Window{start=1519730460000 end=-} | 2018-02-27 11:21:00 | Geneva | 11
    1519730520000 | Geneva : Window{start=1519730520000 end=-} | 2018-02-27 11:22:00 | Geneva | 11
    1519730580000 | Geneva : Window{start=1519730580000 end=-} | 2018-02-27 11:23:00 | Geneva | 14
    1519730640000 | Geneva : Window{start=1519730640000 end=-} | 2018-02-27 11:24:00 | Geneva | 10
    1519730700000 | Geneva : Window{start=1519730700000 end=-} | 2018-02-27 11:25:00 | Geneva | 19
    LIMIT reached for the partition.
    Query terminated
    ksql>
    ----

    <<<
    ==== Check the Kafka Topic

    [source,bash]
    ----
    kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.'
    ----

    [source,bash]
    ----
    Robin@asgard02 ~> kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.'
    {
    "WINDOW_START_TS": {
    "string": "2018-02-27 11:21:00"
    },
    "ADDRESS_CITY": {
    "string": "Geneva"
    },
    "RATING_COUNT": {
    "long": 11
    }
    }
    ----

    <<<
    == Show KSQL Experimental UI

    http://localhost:8090/index.html

    [source,sql]
    ----
    SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;
    ----