From 5be7150939a837a1ed0b7ce915e5b04b31572d1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Fri, 13 Aug 2021 12:58:06 +0200 Subject: [PATCH] finish consumer --- src/main/kotlin/Main.kt | 4 ++- src/main/kotlin/config/EnvConfig.kt | 1 + src/main/kotlin/di/DatabaseModule.kt | 2 +- src/main/kotlin/mq/ConsumerWrapper.kt | 25 +++++++++++++++++ src/main/kotlin/mq/DatabaseConsumer.kt | 37 ++++++++++++++++++++++++++ 5 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 src/main/kotlin/mq/ConsumerWrapper.kt create mode 100644 src/main/kotlin/mq/DatabaseConsumer.kt diff --git a/src/main/kotlin/Main.kt b/src/main/kotlin/Main.kt index 2e6c32f..5d36e59 100644 --- a/src/main/kotlin/Main.kt +++ b/src/main/kotlin/Main.kt @@ -1,4 +1,5 @@ import di.databasemodule +import mq.ConsumerWrapper import org.koin.core.context.GlobalContext.startKoin fun main(vararg args: String) { @@ -8,5 +9,6 @@ fun main(vararg args: String) { modules(databasemodule) } - + val consumerWrapper = ConsumerWrapper(); + consumerWrapper.recieve(); } \ No newline at end of file diff --git a/src/main/kotlin/config/EnvConfig.kt b/src/main/kotlin/config/EnvConfig.kt index cfb8b9b..86b2673 100644 --- a/src/main/kotlin/config/EnvConfig.kt +++ b/src/main/kotlin/config/EnvConfig.kt @@ -4,6 +4,7 @@ data class EnvConfig ( var mqHost: String = System.getenv("MQ_HOST") ?: "localhost", var mqUserName: String = System.getenv("MQ_USERNAME") ?: "rabbitmq", var mqPassWord: String = System.getenv("MQ_PASSWORD") ?: "rabbitmq", + var mqExchange: String = System.getenv("MQ_EXCHANGE") ?: "rabbitmq", var dbJdbc: String = System.getenv("DB_JDBC") ?: "input", var dbUsername: String = System.getenv("DB_USERNAME") ?: "output", var dbPassowrd: String = System.getenv("DB_PASSOWRD") ?: "output" diff --git a/src/main/kotlin/di/DatabaseModule.kt b/src/main/kotlin/di/DatabaseModule.kt index 40b768f..edfe8ca 100644 --- a/src/main/kotlin/di/DatabaseModule.kt +++ b/src/main/kotlin/di/DatabaseModule.kt @@ -5,5 +5,5 @@ import database.service.ResultObjectService import org.koin.dsl.module val databasemodule = module(createdAtStart = true) { - single { ResultObjectService() as IResultObjectService } + single { ResultObjectService() } } \ No newline at end of file diff --git a/src/main/kotlin/mq/ConsumerWrapper.kt b/src/main/kotlin/mq/ConsumerWrapper.kt new file mode 100644 index 0000000..efc3989 --- /dev/null +++ b/src/main/kotlin/mq/ConsumerWrapper.kt @@ -0,0 +1,25 @@ +package mq + +import com.rabbitmq.client.BuiltinExchangeType +import com.rabbitmq.client.ConnectionFactory +import config.EnvConfig + +class ConsumerWrapper { + private val envConfig = EnvConfig() + + fun recieve(){ + val factory = ConnectionFactory() + factory.host = envConfig.mqHost + factory.username = envConfig.mqUserName + factory.password = envConfig.mqPassWord + + val inputConnection = factory.newConnection() + val inputChannel = inputConnection.createChannel() + + inputChannel.exchangeDeclare(envConfig.mqExchange, BuiltinExchangeType.FANOUT) + val inputQueueName = inputChannel.queueDeclare().queue + inputChannel.queueBind(inputQueueName, envConfig.mqExchange, "") + + inputChannel.basicConsume(inputQueueName, true, DatabaseConsumer()) + } +} \ No newline at end of file diff --git a/src/main/kotlin/mq/DatabaseConsumer.kt b/src/main/kotlin/mq/DatabaseConsumer.kt new file mode 100644 index 0000000..b53b4bb --- /dev/null +++ b/src/main/kotlin/mq/DatabaseConsumer.kt @@ -0,0 +1,37 @@ +package mq + +import api.ApiObject +import com.google.gson.Gson +import com.rabbitmq.client.AMQP +import com.rabbitmq.client.Consumer +import com.rabbitmq.client.Envelope +import com.rabbitmq.client.ShutdownSignalException +import database.service.IResultObjectService +import org.koin.core.component.KoinComponent +import org.koin.core.component.inject + +class DatabaseConsumer: Consumer, KoinComponent { + private val resultObjectService : IResultObjectService by inject() + private val gson = Gson() + override fun handleConsumeOk(consumerTag : String?) { + } + override fun handleCancelOk(p0 : String?) { + throw UnsupportedOperationException() + } + override fun handleRecoverOk(p0 : String?) { + throw UnsupportedOperationException() + } + override fun handleCancel(p0 : String?) { + throw UnsupportedOperationException() + } + + override fun handleShutdownSignal(consumerTag: String?, sig: ShutdownSignalException?) { + println("got shutdown signal") + } + + override fun handleDelivery(consumerTag : String?, envelope : Envelope?, basicProperties : AMQP.BasicProperties?, body : ByteArray?) { + val rawJson = body!!.toString(Charsets.UTF_8) + val apiObject = gson.fromJson(rawJson, ApiObject::class.java) + resultObjectService.addOne(apiObject) + } +} \ No newline at end of file