From d77b1508a3b2cd440e25d4d7e5c1f067c17307f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Fri, 22 May 2020 04:24:15 +0200 Subject: [PATCH] refactor --- src/main/kotlin/config/EnvConfig.kt | 9 ++++ src/main/kotlin/consumer/GlueConsumer.kt | 67 ++++++++++++++++++++++++ src/main/kotlin/worker/Glue.kt | 51 +++++------------- 3 files changed, 88 insertions(+), 39 deletions(-) create mode 100644 src/main/kotlin/config/EnvConfig.kt create mode 100644 src/main/kotlin/consumer/GlueConsumer.kt diff --git a/src/main/kotlin/config/EnvConfig.kt b/src/main/kotlin/config/EnvConfig.kt new file mode 100644 index 0000000..6882892 --- /dev/null +++ b/src/main/kotlin/config/EnvConfig.kt @@ -0,0 +1,9 @@ +package config + +data class EnvConfig ( + var mqHost: String = System.getenv("MQ_HOST") ?: "localhost", + var mqUserName: String = System.getenv("MQ_USERNAME") ?: "rabbitmq", + var mqPassWord: String = System.getenv("MQ_PASSWORD") ?: "rabbitmq", + var mqInputExchange: String = System.getenv("MQ_IN_EXCHANGE") ?: "input", + var mqOutputExchange: String = System.getenv("MQ_OUT_EXCHANGE") ?: "output" +) \ No newline at end of file diff --git a/src/main/kotlin/consumer/GlueConsumer.kt b/src/main/kotlin/consumer/GlueConsumer.kt new file mode 100644 index 0000000..f87defd --- /dev/null +++ b/src/main/kotlin/consumer/GlueConsumer.kt @@ -0,0 +1,67 @@ +package consumer + +import config.EnvConfig +import com.google.gson.Gson +import com.rabbitmq.client.* +import com.viartemev.thewhiterabbit.channel.confirmChannel +import com.viartemev.thewhiterabbit.channel.publish +import com.viartemev.thewhiterabbit.publisher.OutboundMessage +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import model.OutputModel +import kotlin.random.Random + +class GlueConsumer() : Consumer { + val envConfig = EnvConfig() + val factory = ConnectionFactory() + + val outputConnection:Connection + val outputChannel:Channel + + val gson = Gson() + override fun handleConsumeOk(consumerTag : String?) { + } + override fun handleCancelOk(p0 : String?) { + throw UnsupportedOperationException() + } + override fun handleRecoverOk(p0 : String?) { + throw UnsupportedOperationException() + } + override fun handleCancel(p0 : String?) { + throw UnsupportedOperationException() + } + + override fun handleDelivery(consumerTag : String?, envelope : Envelope?, basicProperties : AMQP.BasicProperties?, body : ByteArray?) { + val rawJson = body!!.toString() + println(rawJson) + val probability = Random.nextDouble(0.0,1.0) + val outputObject = OutputModel(tag=rawJson, probability=probability) + GlobalScope.launch(Dispatchers.Default) { + outputConnection.confirmChannel { + publish { + publishWithConfirm( + OutboundMessage( + envConfig.mqOutputExchange, + "", + MessageProperties.PERSISTENT_BASIC, + gson.toJson(outputObject).toString() + ) + ) + } + } + } + } + override fun handleShutdownSignal(p0 : String?, p1 : ShutdownSignalException?) { + println("got shutdown signal") + } + + init { + factory.host = envConfig.mqHost + factory.username = envConfig.mqUserName + factory.password = envConfig.mqPassWord + outputConnection=factory.newConnection() + outputChannel=outputConnection.createChannel() + outputChannel.exchangeDeclare(envConfig.mqOutputExchange, BuiltinExchangeType.FANOUT) + } +} \ No newline at end of file diff --git a/src/main/kotlin/worker/Glue.kt b/src/main/kotlin/worker/Glue.kt index 98c50bb..f271b8f 100644 --- a/src/main/kotlin/worker/Glue.kt +++ b/src/main/kotlin/worker/Glue.kt @@ -1,53 +1,26 @@ package worker -import com.google.gson.Gson -import com.rabbitmq.client.BuiltinExchangeType -import com.rabbitmq.client.ConnectionFactory -import model.OutputModel -import com.viartemev.thewhiterabbit.channel.channel -import com.viartemev.thewhiterabbit.channel.consume -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.launch -import kotlin.random.Random +import config.EnvConfig +import com.rabbitmq.client.* +import consumer.GlueConsumer + class Glue { - var mqHost: String = System.getenv("MQ_HOST") ?: "localhost" - var mqUserName: String = System.getenv("MQ_USERNAME") ?: "rabbitmq" - var mqPassWord: String = System.getenv("MQ_PASSWORD") ?: "rabbitmq" - var mqInputExchange: String = System.getenv("MQ_IN_EXCHANGE") ?: "input" - var mqOutputExchange: String = System.getenv("MQ_OUT_EXCHANGE") ?: "output" - val gson = Gson() + val envConfig = EnvConfig() + fun recieve(){ val factory = ConnectionFactory() - factory.host = mqHost - factory.username = mqUserName - factory.password = mqPassWord + factory.host = envConfig.mqHost + factory.username = envConfig.mqUserName + factory.password = envConfig.mqPassWord val inputConnection = factory.newConnection() val inputChannel = inputConnection.createChannel() - inputChannel.exchangeDeclare(mqInputExchange, BuiltinExchangeType.FANOUT) + inputChannel.exchangeDeclare(envConfig.mqInputExchange, BuiltinExchangeType.FANOUT) val inputQueueName = inputChannel.queueDeclare().queue - inputChannel.queueBind(inputQueueName, mqInputExchange, "") + inputChannel.queueBind(inputQueueName, envConfig.mqInputExchange, "") - val outputConnection = factory.newConnection() - val outputChannel = outputConnection.createChannel() - - outputChannel.exchangeDeclare(mqOutputExchange, BuiltinExchangeType.FANOUT) - - GlobalScope.launch(Dispatchers.Default) { - inputConnection.channel { - consume(inputQueueName) { - consumeMessageWithConfirm({ - val rawJson = String(it.body) - println(rawJson) - val probability = Random.nextDouble(0.0,1.0) - val outputObject = OutputModel(tag=rawJson, probability=probability) - outputChannel.basicPublish(mqOutputExchange,"", null, gson.toJson(outputObject).toByteArray()) - }) - } - } - } + inputChannel.basicConsume(envConfig.mqInputExchange, true, GlueConsumer()) } } \ No newline at end of file