diff --git a/src/Application.kt b/src/Application.kt index 832b98a..06293d2 100644 --- a/src/Application.kt +++ b/src/Application.kt @@ -1,8 +1,7 @@ package com.kmalbz -import com.google.gson.Gson -import com.kmalbz.api.model.ApiObject import com.kmalbz.api.route.OutputServiceRDBServer +import com.kmalbz.consumer.DatabaseConsumer import io.ktor.application.* import io.ktor.response.* import io.ktor.routing.* @@ -12,15 +11,10 @@ import io.ktor.features.* import org.apache.http.HttpException import com.kmalbz.database.DatabaseFactory import com.kmalbz.database.dao.ResultObjects -import com.kmalbz.database.service.ResultObjectService import io.ktor.util.KtorExperimentalAPI import com.rabbitmq.client.* import com.typesafe.config.ConfigFactory import io.ktor.config.HoconApplicationConfig -import com.viartemev.thewhiterabbit.channel.* -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.launch import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.transactions.transaction import org.koin.ktor.ext.Koin @@ -60,20 +54,7 @@ fun Application.module() { val queueName = channel.queueDeclare().queue channel.queueBind(queueName, rabbitExchangeName, "") - GlobalScope.launch(Dispatchers.Default) { - connection.channel { - consume(queueName) { - consumeMessageWithConfirm({ - val resultObjectService = ResultObjectService() - val rawJson = String(it.body) - val gson = Gson() - val apiObject = gson.fromJson(rawJson,ApiObject::class.java) - - resultObjectService.addOne(apiObject) - }) - } - } - } + channel.basicConsume(rabbitExchangeName, true, DatabaseConsumer()) routing { install(StatusPages) { diff --git a/src/consumer/DatabaseConsumer.kt b/src/consumer/DatabaseConsumer.kt new file mode 100644 index 0000000..7ac76da --- /dev/null +++ b/src/consumer/DatabaseConsumer.kt @@ -0,0 +1,35 @@ +package com.kmalbz.consumer + +import com.google.gson.Gson +import com.kmalbz.api.model.ApiObject +import com.kmalbz.database.service.ResultObjectService +import com.rabbitmq.client.AMQP.BasicProperties +import com.rabbitmq.client.Consumer +import com.rabbitmq.client.Envelope +import com.rabbitmq.client.ShutdownSignalException + +class DatabaseConsumer : Consumer { + val resultObjectService = ResultObjectService() + + val gson = Gson() + override fun handleConsumeOk(consumerTag : String?) { + } + override fun handleCancelOk(p0 : String?) { + throw UnsupportedOperationException() + } + override fun handleRecoverOk(p0 : String?) { + throw UnsupportedOperationException() + } + override fun handleCancel(p0 : String?) { + throw UnsupportedOperationException() + } + override fun handleDelivery(consumerTag : String?, envelope : Envelope?, basicProperties : BasicProperties?, body : ByteArray?) { + val rawJson = body.toString() + val apiObject = gson.fromJson(rawJson, ApiObject::class.java) + + resultObjectService.addOne(apiObject) + } + override fun handleShutdownSignal(p0 : String?, p1 : ShutdownSignalException?) { + println("got shutdown signal") + } +} \ No newline at end of file