Created
          August 22, 2023 18:16 
        
      - 
      
- 
        Save PShchahelski/a454cffdf9f11799d1b5572a89230c2c to your computer and use it in GitHub Desktop. 
    TaskBusinessEventConsumer
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | package com.training.accounting.events | |
| import com.training.accounting.task.domain.TaskService | |
| import com.training.accounting.task.domain.model.toTask | |
| import com.training.accounting.transaction.domain.TransferService | |
| import com.training.scheme.registry.business.task.v1.TaskAddedBusinessEvent | |
| import com.training.scheme.registry.business.task.v1.TaskAddedPayload | |
| import com.training.scheme.registry.business.task.v1.TaskCompletedBusinessEvent | |
| import com.training.scheme.registry.business.task.v1.TaskCompletedPayload | |
| import org.apache.avro.specific.SpecificRecord | |
| import org.apache.kafka.clients.consumer.ConsumerRecord | |
| import org.springframework.kafka.annotation.KafkaListener | |
| import org.springframework.stereotype.Component | |
| import com.training.scheme.registry.business.task.v2.TaskAddedBusinessEvent as TaskAddedBusinessEventV2 | |
| import com.training.scheme.registry.business.task.v2.TaskAddedPayload as TaskAddedPayloadV2 | |
| private const val TASK_TOPIC_NAME = "task-lifecycle" | |
| @Component | |
| class TaskBusinessEventConsumer( | |
| private val transferService: TransferService, | |
| private val taskService: TaskService, | |
| ) { | |
| @KafkaListener(topics = [TASK_TOPIC_NAME], groupId = "group_id_2") | |
| fun taskBusinessEvent(message: ConsumerRecord<String, SpecificRecord>) { | |
| println("Task business message delivered: $message") | |
| when (val event = message.value()) { | |
| is TaskAddedBusinessEvent -> taskAdded(event.payload) | |
| is TaskAddedBusinessEventV2 -> taskAdded(event.payload) | |
| is TaskCompletedBusinessEvent -> taskCompleted(event.payload) | |
| } | |
| } | |
| private fun taskAdded(payload: TaskAddedPayload) { | |
| taskService.addTask(payload.toTask()) | |
| transferService.performWithdraw( | |
| userPublicId = payload.assigneePublicId, | |
| taskPublicId = payload.publicId, | |
| amount = payload.assignCost, | |
| ) | |
| } | |
| private fun taskAdded(payload: TaskAddedPayloadV2) { | |
| println("PASH# taskAdded# ${Thread.currentThread().name}") | |
| taskService.addTask(payload.toTask()) | |
| transferService.performWithdraw( | |
| userPublicId = payload.assigneePublicId, | |
| taskPublicId = payload.publicId, | |
| amount = payload.assignCost, | |
| ) | |
| } | |
| private fun taskCompleted(payload: TaskCompletedPayload) { | |
| transferService.performEnrollment( | |
| userPublicId = payload.assigneePublicId, | |
| taskPublicId = payload.publicId, | |
| amount = payload.reward, | |
| ) | |
| } | |
| } | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment