This commit is contained in:
parent
4c0d87d1ea
commit
2180416004
@ -20,6 +20,6 @@ class ConsumerWrapper {
|
||||
val inputQueueName = inputChannel.queueDeclare().queue
|
||||
inputChannel.queueBind(inputQueueName, envConfig.mqExchange, "")
|
||||
|
||||
inputChannel.basicConsume(inputQueueName, true, DatabaseConsumer())
|
||||
inputChannel.basicConsume(inputQueueName, false, DatabaseConsumer(inputChannel))
|
||||
}
|
||||
}
|
@ -2,17 +2,15 @@ package mq
|
||||
|
||||
import api.ApiObject
|
||||
import com.google.gson.Gson
|
||||
import com.rabbitmq.client.AMQP
|
||||
import com.rabbitmq.client.Consumer
|
||||
import com.rabbitmq.client.Envelope
|
||||
import com.rabbitmq.client.ShutdownSignalException
|
||||
import com.rabbitmq.client.*
|
||||
import database.service.ISampleObjectService
|
||||
import org.koin.core.component.KoinComponent
|
||||
import org.koin.core.component.inject
|
||||
|
||||
class DatabaseConsumer : Consumer, KoinComponent {
|
||||
class DatabaseConsumer (channel: Channel) : Consumer, KoinComponent {
|
||||
private val resultObjectService: ISampleObjectService by inject()
|
||||
private val gson = Gson()
|
||||
private val basicChannel = channel
|
||||
override fun handleConsumeOk(consumerTag: String?) {
|
||||
}
|
||||
|
||||
@ -42,8 +40,10 @@ class DatabaseConsumer : Consumer, KoinComponent {
|
||||
val rawJson = body!!.toString(Charsets.UTF_8)
|
||||
val apiObject = gson.fromJson(rawJson, ApiObject::class.java)
|
||||
resultObjectService.addOne(apiObject)
|
||||
basicChannel.basicAck(envelope!!.deliveryTag, false)
|
||||
} catch (e: Exception) {
|
||||
println(e.message)
|
||||
basicChannel.basicNack(envelope!!.deliveryTag, false, true)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user