From 79b68af64c133fb495c4f2424b0a8df76e8a8f34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Tue, 17 Aug 2021 17:38:09 +0200 Subject: [PATCH] manual ack --- src/main/kotlin/mq/ConsumerWrapper.kt | 2 +- src/main/kotlin/mq/DatabaseConsumer.kt | 20 ++++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/mq/ConsumerWrapper.kt b/src/main/kotlin/mq/ConsumerWrapper.kt index efc3989..a3ca196 100644 --- a/src/main/kotlin/mq/ConsumerWrapper.kt +++ b/src/main/kotlin/mq/ConsumerWrapper.kt @@ -20,6 +20,6 @@ class ConsumerWrapper { val inputQueueName = inputChannel.queueDeclare().queue inputChannel.queueBind(inputQueueName, envConfig.mqExchange, "") - inputChannel.basicConsume(inputQueueName, true, DatabaseConsumer()) + inputChannel.basicConsume(inputQueueName, false, DatabaseConsumer(inputChannel)) } } \ No newline at end of file diff --git a/src/main/kotlin/mq/DatabaseConsumer.kt b/src/main/kotlin/mq/DatabaseConsumer.kt index b53b4bb..6243a6b 100644 --- a/src/main/kotlin/mq/DatabaseConsumer.kt +++ b/src/main/kotlin/mq/DatabaseConsumer.kt @@ -2,17 +2,15 @@ package mq import api.ApiObject import com.google.gson.Gson -import com.rabbitmq.client.AMQP -import com.rabbitmq.client.Consumer -import com.rabbitmq.client.Envelope -import com.rabbitmq.client.ShutdownSignalException +import com.rabbitmq.client.* import database.service.IResultObjectService import org.koin.core.component.KoinComponent import org.koin.core.component.inject -class DatabaseConsumer: Consumer, KoinComponent { +class DatabaseConsumer(channel: Channel): Consumer, KoinComponent { private val resultObjectService : IResultObjectService by inject() private val gson = Gson() + private val basicChannel = channel override fun handleConsumeOk(consumerTag : String?) { } override fun handleCancelOk(p0 : String?) { @@ -30,8 +28,14 @@ class DatabaseConsumer: Consumer, KoinComponent { } override fun handleDelivery(consumerTag : String?, envelope : Envelope?, basicProperties : AMQP.BasicProperties?, body : ByteArray?) { - val rawJson = body!!.toString(Charsets.UTF_8) - val apiObject = gson.fromJson(rawJson, ApiObject::class.java) - resultObjectService.addOne(apiObject) + try { + val rawJson = body!!.toString(Charsets.UTF_8) + val apiObject = gson.fromJson(rawJson, ApiObject::class.java) + resultObjectService.addOne(apiObject) + basicChannel.basicAck(envelope!!.deliveryTag, false) + } catch (e: Exception) { + println(e.message) + basicChannel.basicNack(envelope!!.deliveryTag, false, true) + } } } \ No newline at end of file