package mq import api.ApiObject import kotlinx.serialization.json.* import com.rabbitmq.client.* import database.service.ISampleObjectService import kotlinx.serialization.decodeFromString import org.koin.core.component.KoinComponent import org.koin.core.component.inject class DatabaseConsumer (channel: Channel) : Consumer, KoinComponent { private val resultObjectService: ISampleObjectService by inject() private val basicChannel = channel override fun handleConsumeOk(consumerTag: String?) { } override fun handleCancelOk(p0: String?) { throw UnsupportedOperationException() } override fun handleRecoverOk(p0: String?) { throw UnsupportedOperationException() } override fun handleCancel(p0: String?) { throw UnsupportedOperationException() } override fun handleShutdownSignal(consumerTag: String?, sig: ShutdownSignalException?) { println("got shutdown signal") } override fun handleDelivery( consumerTag: String?, envelope: Envelope?, basicProperties: AMQP.BasicProperties?, body: ByteArray? ) { try { val rawJson = body!!.toString(Charsets.UTF_8) val apiObject = Json.decodeFromString(rawJson) resultObjectService.addOne(apiObject) basicChannel.basicAck(envelope!!.deliveryTag,false) } catch (e: Exception) { println(e.message) basicChannel.basicNack(envelope!!.deliveryTag, false, true) } } }