Skip to content

Instantly share code, notes, and snippets.

@Gustibimo
Created August 9, 2020 13:58
Show Gist options
  • Select an option

  • Save Gustibimo/1fe0ed482f3b15a33b17d82fb640a7cc to your computer and use it in GitHub Desktop.

Select an option

Save Gustibimo/1fe0ed482f3b15a33b17d82fb640a7cc to your computer and use it in GitHub Desktop.

Revisions

  1. Gustibimo created this gist Aug 9, 2020.
    50 changes: 50 additions & 0 deletions postgres_to_kafka.sh
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    # Run postgres instance
    docker run --name postgres -p 5000:5432 debezium/postgres

    # Run zookeeper instance
    docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper

    # Run kafka instance
    docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka

    # Run kafka connect
    docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link postgres:postgres --link kafka:kafka debezium/connect

    # Open psql console
    psql -h localhost -p 5000 -U postgres
    CREATE DATABASE inventory;
    CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);


    # Create connector using kafka connect
    curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
    {
    "name": "inventory-connector",
    "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "inventory",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
    }
    }' | jq

    # Verify created
    curl -H "Accept:application/json" localhost:8083/connectors/ | jq

    # Verify configuration
    curl -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector | jq


    # Start a console viewer on kafka

    docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka watch-topic -a -k dbserver1.public.dumb_table

    # Verify the existence of replication slot in postgres
    SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;