Skip to content

Instantly share code, notes, and snippets.

@IanVaughan
Created March 31, 2023 08:02
Show Gist options
  • Select an option

  • Save IanVaughan/f4202cb3583c9a633b44b63a0691c514 to your computer and use it in GitHub Desktop.

Select an option

Save IanVaughan/f4202cb3583c9a633b44b63a0691c514 to your computer and use it in GitHub Desktop.

Revisions

  1. Ian Vaughan created this gist Mar 31, 2023.
    88 changes: 88 additions & 0 deletions karafka.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,88 @@
    # frozen_string_literal: true

    module Karafka
    class App
    CONFIG = Rails.application.credentials.kafka

    setup do |config|
    config.client_id = 'cas_client'

    config.kafka = {
    'bootstrap.servers': CONFIG.fetch(:brokers).first,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': CONFIG.dig(:producer, :sasl_username),
    'sasl.password': CONFIG.dig(:producer, :sasl_password),
    'client.id': 'cas_karafka'
    }

    # Recreate consumers with each batch. This will allow Rails code reload to work in the
    # development mode. Otherwise Karafka process would not be aware of code changes
    config.consumer_persistence = !Rails.env.development?
    end

    # Comment out this part if you are not using instrumentation and/or you are not
    # interested in logging events for certain environments. Since instrumentation
    # notifications add extra boilerplate, if you want to achieve max performance,
    # listen to only what you really need for given environment.
    Karafka.monitor.subscribe(Karafka::Instrumentation::LoggerListener.new)
    # Karafka.monitor.subscribe(Karafka::Instrumentation::ProctitleListener.new)

    # This logger prints the producer development info using the Karafka logger.
    # It is similar to the consumer logger listener but producer oriented.
    Karafka.producer.monitor.subscribe(
    WaterDrop::Instrumentation::LoggerListener.new(Karafka.logger)
    )

    routes.draw do
    # Uncomment this if you use Karafka with ActiveJob
    # You need to define the topic per each queue name you use
    # active_job_topic :default
    # topic :example do
    # # Uncomment this if you want Karafka to manage your topics configuration
    # # Managing topics configuration via routing will allow you to ensure config consistency
    # # across multiple environments
    # #
    # # config(partitions: 2, 'cleanup.policy': 'compact')
    # consumer ExampleConsumer
    # end

    consumer_group :truelayer_consumer do
    topic 'raw-transactions-truelayer' do
    consumer OpenBanking::TransactionsConsumer
    end
    end
    consumer_group :basiq_consumer do
    topic 'raw-transactions-basiq' do
    consumer OpenBanking::BasiqTransactionsConsumer
    end
    end
    consumer_group :plaid_consumer do
    topic 'raw-transactions-plaid' do
    consumer OpenBanking::PlaidTransactionsConsumer
    end
    end
    # consumer_group :tink_consumer do
    # topic 'raw-transactions-tink' do
    # consumer OpenBanking::TinkTransactionsConsumer
    # end
    # end
    # consumer_group :accountscore_consumer do
    # topic 'raw-transactions-accountscore' do
    # consumer OpenBanking::AccountScoreTransactionsConsumer
    # end
    # end
    # consumer_group :company_consumer do
    # topic 'company-events' do
    # consumer CompanyEventsConsumer
    # end
    # end
    end
    end
    end

    # Karafka::Web.enable!

    # You can tag your processes with any info you want and it is going to be visible via the Web UI
    git_hash = `git rev-parse --short HEAD`.strip
    Karafka::Process.tags.add(:commit, "##{git_hash}")