Skip to content

Instantly share code, notes, and snippets.

@PShchahelski
Created August 22, 2023 18:16
Show Gist options
  • Save PShchahelski/a454cffdf9f11799d1b5572a89230c2c to your computer and use it in GitHub Desktop.
Save PShchahelski/a454cffdf9f11799d1b5572a89230c2c to your computer and use it in GitHub Desktop.
TaskBusinessEventConsumer
package com.training.accounting.events
import com.training.accounting.task.domain.TaskService
import com.training.accounting.task.domain.model.toTask
import com.training.accounting.transaction.domain.TransferService
import com.training.scheme.registry.business.task.v1.TaskAddedBusinessEvent
import com.training.scheme.registry.business.task.v1.TaskAddedPayload
import com.training.scheme.registry.business.task.v1.TaskCompletedBusinessEvent
import com.training.scheme.registry.business.task.v1.TaskCompletedPayload
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
import com.training.scheme.registry.business.task.v2.TaskAddedBusinessEvent as TaskAddedBusinessEventV2
import com.training.scheme.registry.business.task.v2.TaskAddedPayload as TaskAddedPayloadV2
private const val TASK_TOPIC_NAME = "task-lifecycle"
@Component
class TaskBusinessEventConsumer(
private val transferService: TransferService,
private val taskService: TaskService,
) {
@KafkaListener(topics = [TASK_TOPIC_NAME], groupId = "group_id_2")
fun taskBusinessEvent(message: ConsumerRecord<String, SpecificRecord>) {
println("Task business message delivered: $message")
when (val event = message.value()) {
is TaskAddedBusinessEvent -> taskAdded(event.payload)
is TaskAddedBusinessEventV2 -> taskAdded(event.payload)
is TaskCompletedBusinessEvent -> taskCompleted(event.payload)
}
}
private fun taskAdded(payload: TaskAddedPayload) {
taskService.addTask(payload.toTask())
transferService.performWithdraw(
userPublicId = payload.assigneePublicId,
taskPublicId = payload.publicId,
amount = payload.assignCost,
)
}
private fun taskAdded(payload: TaskAddedPayloadV2) {
println("PASH# taskAdded# ${Thread.currentThread().name}")
taskService.addTask(payload.toTask())
transferService.performWithdraw(
userPublicId = payload.assigneePublicId,
taskPublicId = payload.publicId,
amount = payload.assignCost,
)
}
private fun taskCompleted(payload: TaskCompletedPayload) {
transferService.performEnrollment(
userPublicId = payload.assigneePublicId,
taskPublicId = payload.publicId,
amount = payload.reward,
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment