diff --git a/src/main/kotlin/mq/ConsumerWrapper.kt b/src/main/kotlin/mq/ConsumerWrapper.kt index efc3989..a3ca196 100644 --- a/src/main/kotlin/mq/ConsumerWrapper.kt +++ b/src/main/kotlin/mq/ConsumerWrapper.kt @@ -20,6 +20,6 @@ class ConsumerWrapper { val inputQueueName = inputChannel.queueDeclare().queue inputChannel.queueBind(inputQueueName, envConfig.mqExchange, "") - inputChannel.basicConsume(inputQueueName, true, DatabaseConsumer()) + inputChannel.basicConsume(inputQueueName, false, DatabaseConsumer(inputChannel)) } } \ No newline at end of file diff --git a/src/main/kotlin/mq/DatabaseConsumer.kt b/src/main/kotlin/mq/DatabaseConsumer.kt index 6dbf64a..4bb25a6 100644 --- a/src/main/kotlin/mq/DatabaseConsumer.kt +++ b/src/main/kotlin/mq/DatabaseConsumer.kt @@ -2,17 +2,15 @@ package mq import api.ApiObject import com.google.gson.Gson -import com.rabbitmq.client.AMQP -import com.rabbitmq.client.Consumer -import com.rabbitmq.client.Envelope -import com.rabbitmq.client.ShutdownSignalException +import com.rabbitmq.client.* import database.service.ISampleObjectService import org.koin.core.component.KoinComponent import org.koin.core.component.inject -class DatabaseConsumer : Consumer, KoinComponent { +class DatabaseConsumer (channel: Channel) : Consumer, KoinComponent { private val resultObjectService: ISampleObjectService by inject() private val gson = Gson() + private val basicChannel = channel override fun handleConsumeOk(consumerTag: String?) { } @@ -42,8 +40,10 @@ class DatabaseConsumer : Consumer, KoinComponent { val rawJson = body!!.toString(Charsets.UTF_8) val apiObject = gson.fromJson(rawJson, ApiObject::class.java) resultObjectService.addOne(apiObject) + basicChannel.basicAck(envelope!!.deliveryTag, false) } catch (e: Exception) { println(e.message) + basicChannel.basicNack(envelope!!.deliveryTag, false, true) } } } \ No newline at end of file