= Live Coding a KSQL Application :source-highlighter: pygments :doctype: book Robin Moffatt <@rmoff> v1.00, 28 Feb 2018 :toc: == Introduction image::https://www.dropbox.com/s/asqkfbtexpnu8na/live-ksql-01.png?dl=0&raw=1[Diagram of what's being built with the KSQL demo] This code file accompanies the slides and recording available online at : https://www.confluent.io/online-talks/ Don't forget to check out the #ksql channel on our https://slackpass.io/confluentcommunity[Community Slack group] ——_Robin Moffatt https://twitter.com/rmoff/[@rmoff], 28 Feb 2018_ == Pre-demo Built using https://www.confluent.io/download/[Confluent Platform] 4.0 and https://github.com/confluentinc/ksql/[KSQL 0.5]. === Start a `screen` session https://en.wikipedia.org/wiki/GNU_Screen[`screen`] is not obligatory, but very useful :) `screen -S KSQL` <<< === Start Confluent Platform [source,bash] ---- confluent destroy confluent start ---- [source,bash] ---- Robin@asgard02 ~/c/confluent-4.0.0> confluent start Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/ Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Robin@asgard02 ~/c/confluent-4.0.0> ---- <<< === Populate Users topic `kafkacat -P -b localhost:9092 -t mysql_users -K ":"` Paste the data to `stdin`: ---- 1:{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"} 3:{"uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"} 4:{"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"} 5:{"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"} 6:{"uid":6,"name":"Hojjat","locale":"en_US","address_city":"Palo Alto","elite":"S"} 7:{"uid":7,"name":"Damian","locale":"en_US","address_city":"London","elite":"S"} 8:{"uid":8,"name":"Apurva","locale":"en_US","address_city":"Palo Alto","elite":"S"} 9:{"uid":9,"name":"Neha","locale":"en_US","address_city":"Palo Alto","elite":"P"} 10:{"uid":10,"name":"Mattias","locale":"DE","address_city":"Palo Alto","elite":"S"} 11:{"uid":11,"name":"Gwen","locale":"en_US","address_city":"Palo Alto","elite":"P"} 12:{"uid":12,"name":"Guozhang","locale":"en_US","address_city":"Palo Alto","elite":"S"} 13:{"uid":13,"name":"Lu","locale":"en_US","address_city":"Palo Alto","elite":"P"} 14:{"uid":14,"name":"Robin","locale":"en_US","address_city":"Ilkley","elite":"G"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"San Francisco","elite":"G"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"P"} ---- Exit: `Ctrl-D` <<< === Inspect Users topic data `kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users` [source,bash] ---- kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users [...] Key: 4 Value: {"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"} Key: 5 Value: {"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"} [...] ---- <<< == Demo [NOTE] ==== Create blank window in `screen` ==== === Check Confluent is running `confluent status` [source,bash] ---- Robin@asgard02 ~/c/confluent-4.0.0> confluent status Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/ connect is [UP] kafka-rest is [UP] schema-registry is [UP] kafka is [UP] zookeeper is [UP] ---- <<< === Run datagen - ratings `~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500` [source,bash] ---- Robin@asgard02 ~/c/confluent-4.0.0> ~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500 1 --> ([ 1 | 10 | 3 | 9297 | 1519302621843 | 'iOS' | 'your team here rocks!' ]) 2 --> ([ 2 | 10 | 1 | 3415 | 1519302622767 | 'android' | 'Surprisingly good, maybe you are getting your mojo back at long last!' ]) 3 --> ([ 3 | 17 | 2 | 5197 | 1519302623090 | 'web' | 'meh' ]) 4 --> ([ 4 | 9 | 3 | 9389 | 1519302623414 | 'android' | 'is this as good as it gets? really ?' ]) ---- [NOTE] ==== Name `screen` window `ratings data` ==== <<< === Start KSQL Server [source,bash] ---- ~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties ---- [source,bash] ---- $ ~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties =========================================== = _ __ _____ ____ _ = = | |/ // ____|/ __ \| | = = | ' /| (___ | | | | | = = | < \___ \| | | | | = = | . \ ____) | |__| | |____ = = |_|\_\_____/ \___\_\______| = = = = Streaming SQL Engine for Apache Kafka® = =========================================== Copyright 2018 Confluent Inc. Server 0.5 listening on http://localhost:8090 To access the KSQL CLI, run: ksql-cli remote http://localhost:8090 To access the UI, point your browser at: http://localhost:8090/index.html ---- [NOTE] ==== Name `screen` window `KSQL Server` ==== <<< === Launch KSQL CLI [NOTE] ==== New `screen` window ==== Connect to KSQL server running locally `~/git/ksql/bin/ksql-cli remote http://localhost:8090/` [NOTE] ==== Name `screen` window `KSQL CLI` ==== [source,bash] ---- Robin@asgard02 ~> ~/git/ksql/bin/ksql-cli remote http://localhost:8090/ =========================================== = _ __ _____ ____ _ = = | |/ // ____|/ __ \| | = = | ' /| (___ | | | | | = = | < \___ \| | | | | = = | . \ ____) | |__| | |____ = = |_|\_\_____/ \___\_\______| = = = = Streaming SQL Engine for Apache Kafka® = =========================================== Copyright 2018 Confluent Inc. CLI v0.5, Server v0.5 located at http://localhost:8090/ Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql> ---- <<< === See available Kafka topics `show topics;` [source,sql] ---- ksql> show topics; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups ----------------------------------------------------------------------------------------------- _schemas | false | 1 | 1 | 0 | 0 connect-configs | false | 1 | 1 | 0 | 0 connect-offsets | false | 25 | 1 | 0 | 0 connect-statuses | false | 5 | 1 | 0 | 0 ksql__commands | true | 1 | 1 | 0 | 0 ratings | false | 1 | 1 | 0 | 0 users | false | 1 | 1 | 0 | 0 ----------------------------------------------------------------------------------------------- ---- <<< === Inspect a topic contents - Ratings [TIP] ==== Don't need to know the format of the data. Can see column names and values. ==== `PRINT 'ratings';` Explain TS/Key/Message concept [source,sql] ---- ksql> PRINT 'ratings'; Format:AVRO 22/02/18 12:55:04 GMT, 5312, {"rating_id": 5312, "user_id": 4, "stars": 4, "route_id": 2440, "rating_time": 1519304104965, "channel": "web", "message": "Surprisingly good, maybe you are getting your mojo back at long last!"} 22/02/18 12:55:05 GMT, 5313, {"rating_id": 5313, "user_id": 3, "stars": 4, "route_id": 6975, "rating_time": 1519304105213, "channel": "web", "message": "why is it so difficult to keep the bathrooms clean ?"} ---- <<< === Inspect a topic contents - Users Don't need to know the format of the data. Can see column names and values. `PRINT 'mysql_users' FROM BEGINNING;` <<< === Tell KSQL to process from beginning of topic Process from beginning of topic `SET 'auto.offset.reset' = 'earliest';` [source,sql] ---- ksql> SET 'auto.offset.reset' = 'earliest'; Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest' ---- <<< === Register Ratings topic for querying `CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');` Why's it a stream? Because it's a continuous stream of *events* [source,sql] ---- ksql> CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO'); Message --------------- Table created --------------- ---- <<< === Describe ratings stream `DESCRIBE ratings;` Note : 1. System columns for timestamp and key 2. All the other columns have been picked up automagically - have not had to specify them [source,sql] ---- ksql> DESCRIBE ratings; Field | Type ----------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) RATING_ID | BIGINT USER_ID | INTEGER STARS | INTEGER ROUTE_ID | INTEGER RATING_TIME | BIGINT CHANNEL | VARCHAR(STRING) MESSAGE | VARCHAR(STRING) ----------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED ; ---- <<< === Query ratings stream `SELECT * FROM ratings;` This is a continuous query! [source,sql] ---- ksql> SELECT * FROM ratings; 1519402268942 | 1 | 1 | 13 | 1 | 3700 | 1519402267832 | ios | airport refurb looks great, will fly outta here more! 1519402269200 | 2 | 2 | 12 | 2 | 9907 | 1519402269200 | android | (expletive deleted) 1519402269694 | 3 | 3 | 2 | 1 | 5421 | 1519402269694 | android | is this as good as it gets? really ? 1519402269857 | 4 | 4 | 18 | 2 | 1462 | 1519402269856 | android | your team here rocks! ---- Cancel the datagen task - note that the query stops. Restart the datagen task - query now continues to return data <<< === Filter the ratings stream `SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;` Note the use of `LIMIT` so that we just see a sample of the stream of data [source,sql] ---- ksql> SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5; 1519402272247 | 13 | 13 | 9 | 1 | 2545 | 1519402272247 | iOS | more peanuts please 1519402272750 | 14 | 14 | 0 | 2 | 4419 | 1519402272749 | iOS | airport refurb looks great, will fly outta here more! 1519402273755 | 18 | 18 | 15 | 1 | 5306 | 1519402273755 | iOS | Surprisingly good, maybe you are getting your mojo back at long last! 1519402278686 | 37 | 37 | 17 | 1 | 725 | 1519402278686 | iOS | meh 1519402279186 | 39 | 39 | 10 | 1 | 6304 | 1519402279186 | iOS | (expletive deleted) LIMIT reached for the partition. Query terminated ksql> ---- <<< === Persist a filtered stream ==== Create the stream Let's take the poor ratings from people with iOS devices, and create a new stream from them! [source,sql] ---- CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS'; ---- [source,sql] ---- ksql> CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS'; Message ---------------------------- Stream created and running ---------------------------- ---- <<< ==== Inspect the stream [source,sql] ---- DESCRIBE POOR_RATINGS; ---- [source,sql] ---- ksql> DESCRIBE POOR_RATINGS; Field | Type ----------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) RATING_ID | BIGINT USER_ID | INTEGER STARS | INTEGER ROUTE_ID | INTEGER RATING_TIME | BIGINT CHANNEL | VARCHAR(STRING) MESSAGE | VARCHAR(STRING) ----------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED ; ksql> ---- <<< ==== Inspect the stream further [source,sql] ---- DESCRIBE EXTENDED POOR_RATINGS; ---- [source,sql] ---- ksql> DESCRIBE EXTENDED POOR_RATINGS; Type : STREAM Key field : Timestamp field : Not set - using Key format : STRING Value format : AVRO Kafka output topic : POOR_RATINGS (partitions: 4, replication: 1) Field | Type ----------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) RATING_ID | BIGINT USER_ID | INTEGER STARS | INTEGER ROUTE_ID | INTEGER RATING_TIME | BIGINT CHANNEL | VARCHAR(STRING) MESSAGE | VARCHAR(STRING) ----------------------------------------- Queries that write into this STREAM ----------------------------------- id:CSAS_POOR_RATINGS - CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS'; For query topology and execution plan please run: EXPLAIN Local runtime statistics ------------------------ messages-per-sec: 0.33 total-messages: 33 last-message: 27/02/18 11:28:58 GMT failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a (Statistics of the local KSQL server interaction with the Kafka topic POOR_RATINGS) ksql> ---- <<< ==== Query the stream [source,sql] ---- SELECT * FROM POOR_RATINGS; ---- [source,sql] ---- ksql> SELECT * FROM POOR_RATINGS; 1519730883770 | 1608 | 1608 | 18 | 2 | 829 | 1519730883770 | iOS | your team here rocks! 1519730881132 | 1596 | 1596 | 6 | 2 | 3185 | 1519730881132 | iOS | thank you for the most friendly, helpful experience today at your new lounge 1519730886192 | 1615 | 1615 | 19 | 1 | 9409 | 1519730886192 | iOS | more peanuts please 1519730880186 | 1591 | 1591 | 7 | 1 | 9830 | 1519730880186 | iOS | is this as good as it gets? really ? 1519730894709 | 1655 | 1655 | 7 | 2 | 6036 | 1519730894709 | iOS | meh ---- <<< ==== See the Kafka Topic From the command line, use the standard Kafka tools to interact with the new stream -- it's just a Kafka topic! [source,bash] ---- kafka-topics --zookeeper localhost:2181 --list ---- [source,bash] ---- Robin@asgard02 ~> kafka-topics --zookeeper localhost:2181 --list POOR_RATINGS __consumer_offsets _schemas connect-configs connect-offsets connect-statuses ksql__commands mysql_users ratings ---- <<< ==== Inspect the Kafka topic's data [source,bash] ---- kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic POOR_RATINGS --from-beginning | jq '.' ---- [source,bash] ---- Robin@asgard02 ~> kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic POOR_RATINGS --from-beginning | jq '.' { "RATING_ID": { "long": 1615 }, "USER_ID": { "int": 19 }, "STARS": { "int": 1 }, "ROUTE_ID": { "int": 9409 }, "RATING_TIME": { "long": 1519730886192 }, "CHANNEL": { "string": "iOS" }, "MESSAGE": { "string": "more peanuts please" } } ---- <<< === Joining Data in KSQL Remember our Users data? Let's bring that into play, and use it to enrich the inbound stream of ratings data. <<< ==== Inspect Users Data Let's check the data first, using the very handy `PRINT` command: `PRINT 'mysql_users' FROM BEGINNING;` [source,sql] ---- ksql> PRINT 'mysql_users' FROM BEGINNING; Format:JSON {"ROWTIME":1519640382207,"ROWKEY":"1","uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"} {"ROWTIME":1519640382207,"ROWKEY":"2","uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"} {"ROWTIME":1519640382207,"ROWKEY":"3","uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"} ---- <<< ==== Create Users Table Now, create a `TABLE` over the Kafka topic. Why's it a table? Because for each key (user id), we want to know its value (name, address, etc) `CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid');` Note that we've specified the `KEY` here, which must match the key of the Kafka message too. ---- ksql> CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid'); Message --------------- Table created --------------- ksql> ---- <<< ==== Stream-Table join (1) Now let's join our ratings data, which includes user ID, to our user information: Basics to start with -- rating message plus the user's name. Couple of things to note: * We're aliasing the table and stream names to make column names unambiguous * I'm using the backspace line continuation character [source,sql] ---- SELECT R.MESSAGE, U.NAME \ FROM RATINGS R LEFT JOIN USERS U \ ON R.USER_ID = U.UID \ LIMIT 5; ---- [source,sql] ---- ksql> SELECT R.MESSAGE, U.NAME \ > FROM RATINGS R LEFT JOIN USERS U \ > ON R.USER_ID = U.UID \ > LIMIT 5; airport refurb looks great, will fly outta here more! | Damian Exceeded all my expectations. Thank you ! | Neil (expletive deleted) | Lu why is it so difficult to keep the bathrooms clean ? | Apurva thank you for the most friendly, helpful experience today at your new lounge | Cliff LIMIT reached for the partition. Query terminated ksql> ---- <<< ==== Stream-Table join (2) Now let's pull the full set of data, including a reformat of the timestamp into something human readable: Note the `IS NOT NULL` clause to filter out any ratings with no corresponding user data [source,sql] ---- SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \ R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \ FROM RATINGS R LEFT JOIN USERS U \ ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL; ---- [source,sql] ---- ksql> SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \ > R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \ > FROM RATINGS R LEFT JOIN USERS U \ > ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL; 18196 | 1 | 1790 | 2018-02-26 12:42:38 | iOS-test | Surprisingly good, maybe you are getting your mojo back at long last! | Robin | Ilkley | G 18197 | 4 | 2862 | 2018-02-26 12:42:38 | iOS-test | your team here rocks! | Jeremy | Austin | P 18198 | 1 | 6954 | 2018-02-26 12:42:39 | ios | Exceeded all my expectations. Thank you ! | Nick | Palo Alto | P 18199 | 1 | 2092 | 2018-02-26 12:42:39 | ios | (expletive deleted) | Neil | London | G 18200 | 2 | 6042 | 2018-02-26 12:42:39 | ios | more peanuts please | Neil | London | G 18202 | 2 | 1133 | 2018-02-26 12:42:39 | iOS-test | thank you for the most friendly, helpful experience today at your new lounge | Neha | Palo Alto | P 18205 | 3 | 4086 | 2018-02-26 12:42:40 | ios | meh | Neil | London | G 18206 | 1 | 9217 | 2018-02-26 12:42:41 | web | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P 18207 | 3 | 2655 | 2018-02-26 12:42:41 | android | more peanuts please | Neha | Palo Alto | P 18209 | 3 | 2086 | 2018-02-26 12:42:42 | iOS-test | meh | Lu | Palo Alto | P 18210 | 2 | 1629 | 2018-02-26 12:42:42 | web | airport refurb looks great, will fly outta here more! | Neha | Palo Alto | P ^CQuery terminated ---- <<< ==== Stream-Table join (3) Let's persist this as an enriched stream: [source,sql] ---- CREATE STREAM RATINGS_FULL AS \ SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \ R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \ FROM RATINGS R LEFT JOIN USERS U \ ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL; ---- [source,sql] ---- ksql> CREATE STREAM RATINGS_FULL AS \ > SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \ > R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \ > FROM RATINGS R LEFT JOIN USERS U \ > ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL; Message ---------------------------- Stream created and running ---------------------------- ksql> ---- <<< === Filtering an enriched stream Which of our Premier customers are not happy? `SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;` [source,sql] ---- ksql> SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3; 1519649077925 | 2 | 18684 | 1 | 3895 | 2018-02-26 12:44:37 | ios | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P 1519649078280 | 13 | 18685 | 1 | 4088 | 2018-02-26 12:44:38 | iOS | more peanuts please | Lu | Palo Alto | P 1519649079664 | 1 | 18691 | 2 | 8933 | 2018-02-26 12:44:39 | iOS-test | more peanuts please | Cliff | St Louis | P 1519649084065 | 13 | 18709 | 1 | 4366 | 2018-02-26 12:44:44 | iOS-test | airport refurb looks great, will fly outta here more! | Lu | Palo Alto | P 1519649084603 | 3 | 18712 | 2 | 744 | 2018-02-26 12:44:44 | ios | worst. flight. ever. #neveragain | Jeremy | Austin | P ^CQuery terminated ---- <<< ==== Persist the filtered & enriched stream `CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;` [source,sql] ---- ksql> CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3; Message ---------------------------- Stream created and running ---------------------------- ksql> ---- <<< ==== Query the new stream `SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;` [source,sql] ---- ksql> SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS; 1 | why is it so difficult to keep the bathrooms clean ? | Nick 1 | is this as good as it gets? really ? | Jeremy 1 | thank you for the most friendly, helpful experience today at your new lounge | Jeremy 1 | your team here rocks! | Lu 1 | thank you for the most friendly, helpful experience today at your new lounge | Lu 1 | why is it so difficult to keep the bathrooms clean ? | Jeremy ---- <<< ==== View the underlying topic data [source,bash] ---- kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic UNHAPPY_VIPS --from-beginning ---- [source,bash] ---- Robin@asgard02 ~> kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic UNHAPPY_VIPS --from-beginning {"RATING_ID":{"long":25},"STARS":{"int":1},"ROUTE_ID":{"int":1548},"KSQL_COL_3":{"string":"2018-02-27 11:21:21"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"thank you for the most friendly, helpful experience today at your new lounge"},"NAME":{"string":"Neha"},"ADDRESS_CITY":{"string":"Palo Alto"},"ELITE":{"string":"P"}} {"RATING_ID":{"long":31},"STARS":{"int":2},"ROUTE_ID":{"int":451},"KSQL_COL_3":{"string":"2018-02-27 11:21:22"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"Exceeded all my expectations. Thank you !"},"NAME":{"string":"Cliff"},"ADDRESS_CITY":{"string":"St Louis"},"ELITE":{"string":"P"}} ---- <<< === Streaming Aggregates Explain windowing * Tumbling (e.g. every 5 minutes : 00:00, 00:05, 00:10) * Hopping (e.g. every 5 minutes, advancing 1 minute: 00:00-00:05, 00:01-00:06) * Session (Sets a timeout for the given key, after which any new data is treated as a new session) <<< ==== Running Count per Minute Show count of ratings per City, per minute [source,sql] ---- SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \ FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \ GROUP BY ADDRESS_CITY; ---- [source,sql] ---- ksql> SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \ > FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \ > GROUP BY ADDRESS_CITY; Geneva | 11 Geneva | 11 Geneva | 14 Geneva | 10 Geneva | 19 ---- <<< ==== CREATE TABLE Let's persist that into a TABLE. So far we've only worked with a STREAM. A table gives the state of a given key at a given point in time. So here, for each city, at each minute window, what's the total count [source,sql] ---- CREATE TABLE RATINGS_BY_CITY AS \ SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \ FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \ GROUP BY ADDRESS_CITY; ---- [source,sql] ---- ksql> CREATE TABLE RATINGS_BY_CITY AS \ > SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \ > FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \ > GROUP BY ADDRESS_CITY; Message --------------------------- Table created and running --------------------------- ksql> ---- <<< ==== Examine the created table's columns `DESCRIBE RATINGS_BY_CITY;` Note that the city is denoted as `(key)` [source,sql] ---- ksql> DESCRIBE RATINGS_BY_CITY; Field | Type ------------------------------------------ ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) ADDRESS_CITY | VARCHAR(STRING) (key) RATING_COUNT | BIGINT ------------------------------------------ For runtime statistics and query details run: DESCRIBE EXTENDED ; ksql> ---- Point out the system columns - `ROWTIME` and `ROWKEY`. <<< ==== Examine the contents of the new table's columns `SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;` [source,sql] ---- ksql> SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5; 1519730460000 | Geneva : Window{start=1519730460000 end=-} | Geneva | 11 1519730460000 | Ilkley : Window{start=1519730460000 end=-} | Ilkley | 11 1519730520000 | Geneva : Window{start=1519730520000 end=-} | Geneva | 11 1519730460000 | St Louis : Window{start=1519730460000 end=-} | St Louis | 9 1519730580000 | Geneva : Window{start=1519730580000 end=-} | Geneva | 14 ---- <<< ==== Using Functions like `TIMESTAMPTOSTRING` KSQL comes with a bunch of functions, both scalar and aggregate (like `COUNT` which we saw previously). Let's convert the `ROWTIME` epoch value to a more readable one: `SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;` [source,sql] ---- ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY; 2018-02-27 11:21:00 | London | 25 2018-02-27 11:21:00 | Palo Alto | 71 2018-02-27 11:22:00 | Palo Alto | 89 2018-02-27 11:22:00 | London | 24 2018-02-27 11:23:00 | London | 25 ---- <<< ==== Filtering the aggregate table This table is just a first class object in KSQL, that we can query and filter as any other: `SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';` [source,sql] ---- ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley'; 2018-02-27 11:21:00 | Ilkley | 11 2018-02-27 11:22:00 | Ilkley | 6 2018-02-27 11:23:00 | Ilkley | 13 2018-02-27 11:24:00 | Ilkley | 14 2018-02-27 11:25:00 | Ilkley | 15 ---- Note how aggregates update within the current window. <<< ==== Check out the Kafka topic Let's step out of KSQL for a moment. When you _create_ a `STREAM or `TABLE` in KSQL, it is backed by a Kafka topic. Let's check this out: `kafka-topics --zookeeper localhost:2181 --list|grep RATINGS` [source,bash] ---- Robin@asgard02 ~/c/confluent-4.0.0> kafka-topics --zookeeper localhost:2181 --list|grep RATINGS RATINGS_BY_CITY ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-changelog ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-repartition [...] ---- And it's just a Kafka topic, that we can consume from just as any other: [source,bash] ---- kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic RATINGS_BY_CITY --from-beginning -max-messages 5| jq '.' ---- [source,bash] ---- Robin@asgard02 ~> kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic RATINGS_BY_CITY2 --from-beginning -max-messages 5| jq '.' { "ADDRESS_CITY": { "string": "London" }, "RATING_COUNT": { "long": 25 } } { "ADDRESS_CITY": { "string": "Palo Alto" }, "RATING_COUNT": { "long": 71 } } ---- Where's the window? Well that's a system column that's part of the message key. If we wanted to expose it further, we could do a `CREATE TABLE` based on that `TIMESTAMPTOSTRING` function which exposes the system column <<< ==== Exposing Aggregate Window Keys `CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;` [source,sql] ---- ksql> CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY; Message --------------------------- Table created and running --------------------------- ksql> ---- <<< ==== Examine table with Timestamp exposed `DESCRIBE RATINGS_BY_CITY_TS;` [source,sql] ---- ksql> describe RATINGS_BY_CITY_TS; Field | Type --------------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) WINDOW_START_TS | VARCHAR(STRING) ADDRESS_CITY | VARCHAR(STRING) (key) RATING_COUNT | BIGINT --------------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED ; ---- <<< ==== Look at the data `SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;` [source,sql] ---- ksql> SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5; 1519730460000 | Geneva : Window{start=1519730460000 end=-} | 2018-02-27 11:21:00 | Geneva | 11 1519730520000 | Geneva : Window{start=1519730520000 end=-} | 2018-02-27 11:22:00 | Geneva | 11 1519730580000 | Geneva : Window{start=1519730580000 end=-} | 2018-02-27 11:23:00 | Geneva | 14 1519730640000 | Geneva : Window{start=1519730640000 end=-} | 2018-02-27 11:24:00 | Geneva | 10 1519730700000 | Geneva : Window{start=1519730700000 end=-} | 2018-02-27 11:25:00 | Geneva | 19 LIMIT reached for the partition. Query terminated ksql> ---- <<< ==== Check the Kafka Topic [source,bash] ---- kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.' ---- [source,bash] ---- Robin@asgard02 ~> kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.' { "WINDOW_START_TS": { "string": "2018-02-27 11:21:00" }, "ADDRESS_CITY": { "string": "Geneva" }, "RATING_COUNT": { "long": 11 } } ---- <<< == Show KSQL Experimental UI http://localhost:8090/index.html [source,sql] ---- SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS; ----