package mq import api.ApiObject import com.rabbitmq.client.* import database.service.IResultObjectService import kotlinx.serialization.decodeFromString import kotlinx.serialization.json.Json import org.koin.core.component.KoinComponent import org.koin.core.component.inject class DatabaseConsumer(channel: Channel): Consumer, KoinComponent { private val resultObjectService : IResultObjectService by inject() private val basicChannel = channel override fun handleConsumeOk(consumerTag : String?) { } override fun handleCancelOk(p0 : String?) { throw UnsupportedOperationException() } override fun handleRecoverOk(p0 : String?) { throw UnsupportedOperationException() } override fun handleCancel(p0 : String?) { throw UnsupportedOperationException() } override fun handleShutdownSignal(consumerTag: String?, sig: ShutdownSignalException?) { println("got shutdown signal") } override fun handleDelivery(consumerTag : String?, envelope : Envelope?, basicProperties : AMQP.BasicProperties?, body : ByteArray?) { try { val rawJson = body!!.toString(Charsets.UTF_8) val apiObject = Json.decodeFromString(rawJson) resultObjectService.addOne(apiObject) basicChannel.basicAck(envelope!!.deliveryTag, false) } catch (e: Exception) { println(e.message) basicChannel.basicNack(envelope!!.deliveryTag, false, true) } } }