Skip to content

Instantly share code, notes, and snippets.

@jpzk
Created July 11, 2019 06:26
Show Gist options
  • Select an option

  • Save jpzk/a02a21ae7f80c5fdaae9ea2e5fe5b85a to your computer and use it in GitHub Desktop.

Select an option

Save jpzk/a02a21ae7f80c5fdaae9ea2e5fe5b85a to your computer and use it in GitHub Desktop.

Revisions

  1. jpzk renamed this gist Jul 11, 2019. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. jpzk created this gist Jul 11, 2019.
    68 changes: 68 additions & 0 deletions kafka streams example
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,68 @@
    package com.github.simplesteph.udemy.kafka.streams

    import java.lang
    import java.util.Properties

    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.serialization.Serdes
    import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
    import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}

    object FavouriteColourAppScala {
    def main(args: Array[String]): Unit = {

    val config: Properties = new Properties
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-colour-scala")
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)

    // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")

    val builder: KStreamBuilder = new KStreamBuilder

    // Step 1: We create the topic of users keys to colours
    val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input")

    val usersAndColours: KStream[String, String] = textLines
    // 1 - we ensure that a comma is here as we will split on it
    .filter((key: String, value: String) => value.contains(","))
    // 2 - we select a key that will be the user id (lowercase for safety)
    .selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)
    // 3 - we get the colour from the value (lowercase for safety)
    .mapValues[String]((value: String) => value.split(",")(1).toLowerCase)
    // 4 - we filter undesired colours (could be a data sanitization step)
    .filter((user: String, colour: String) => List("green", "blue", "red").contains(colour))

    val intermediaryTopic = "user-keys-and-colours-scala"
    usersAndColours.to(intermediaryTopic)

    // step 2 - we read that topic as a KTable so that updates are read correctly
    val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic)

    // step 3 - we count the occurrences of colours
    val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable
    // 5 - we group by colour within the KTable
    .groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour))
    .count("CountsByColours")

    // 6 - we output the results to a Kafka Topic - don't forget the serializers
    favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala")

    val streams: KafkaStreams = new KafkaStreams(builder, config)
    streams.cleanUp()
    streams.start()

    // print the topology
    System.out.println(streams.toString)

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime.addShutdownHook(new Thread {
    override def run(): Unit = {
    streams.close()
    }
    })
    }
    }