This commit is contained in:
parent
a719addee4
commit
79b68af64c
@ -20,6 +20,6 @@ class ConsumerWrapper {
|
||||
val inputQueueName = inputChannel.queueDeclare().queue
|
||||
inputChannel.queueBind(inputQueueName, envConfig.mqExchange, "")
|
||||
|
||||
inputChannel.basicConsume(inputQueueName, true, DatabaseConsumer())
|
||||
inputChannel.basicConsume(inputQueueName, false, DatabaseConsumer(inputChannel))
|
||||
}
|
||||
}
|
@ -2,17 +2,15 @@ package mq
|
||||
|
||||
import api.ApiObject
|
||||
import com.google.gson.Gson
|
||||
import com.rabbitmq.client.AMQP
|
||||
import com.rabbitmq.client.Consumer
|
||||
import com.rabbitmq.client.Envelope
|
||||
import com.rabbitmq.client.ShutdownSignalException
|
||||
import com.rabbitmq.client.*
|
||||
import database.service.IResultObjectService
|
||||
import org.koin.core.component.KoinComponent
|
||||
import org.koin.core.component.inject
|
||||
|
||||
class DatabaseConsumer: Consumer, KoinComponent {
|
||||
class DatabaseConsumer(channel: Channel): Consumer, KoinComponent {
|
||||
private val resultObjectService : IResultObjectService by inject()
|
||||
private val gson = Gson()
|
||||
private val basicChannel = channel
|
||||
override fun handleConsumeOk(consumerTag : String?) {
|
||||
}
|
||||
override fun handleCancelOk(p0 : String?) {
|
||||
@ -30,8 +28,14 @@ class DatabaseConsumer: Consumer, KoinComponent {
|
||||
}
|
||||
|
||||
override fun handleDelivery(consumerTag : String?, envelope : Envelope?, basicProperties : AMQP.BasicProperties?, body : ByteArray?) {
|
||||
try {
|
||||
val rawJson = body!!.toString(Charsets.UTF_8)
|
||||
val apiObject = gson.fromJson(rawJson, ApiObject::class.java)
|
||||
resultObjectService.addOne(apiObject)
|
||||
basicChannel.basicAck(envelope!!.deliveryTag, false)
|
||||
} catch (e: Exception) {
|
||||
println(e.message)
|
||||
basicChannel.basicNack(envelope!!.deliveryTag, false, true)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user