refactor
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Torma Kristóf 2020-05-22 04:24:15 +02:00
parent 03a06728bb
commit d77b1508a3
Signed by: tormakris
GPG Key ID: DC83C4F2C41B1047
3 changed files with 88 additions and 39 deletions

View File

@ -0,0 +1,9 @@
package config
data class EnvConfig (
var mqHost: String = System.getenv("MQ_HOST") ?: "localhost",
var mqUserName: String = System.getenv("MQ_USERNAME") ?: "rabbitmq",
var mqPassWord: String = System.getenv("MQ_PASSWORD") ?: "rabbitmq",
var mqInputExchange: String = System.getenv("MQ_IN_EXCHANGE") ?: "input",
var mqOutputExchange: String = System.getenv("MQ_OUT_EXCHANGE") ?: "output"
)

View File

@ -0,0 +1,67 @@
package consumer
import config.EnvConfig
import com.google.gson.Gson
import com.rabbitmq.client.*
import com.viartemev.thewhiterabbit.channel.confirmChannel
import com.viartemev.thewhiterabbit.channel.publish
import com.viartemev.thewhiterabbit.publisher.OutboundMessage
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import model.OutputModel
import kotlin.random.Random
class GlueConsumer() : Consumer {
val envConfig = EnvConfig()
val factory = ConnectionFactory()
val outputConnection:Connection
val outputChannel:Channel
val gson = Gson()
override fun handleConsumeOk(consumerTag : String?) {
}
override fun handleCancelOk(p0 : String?) {
throw UnsupportedOperationException()
}
override fun handleRecoverOk(p0 : String?) {
throw UnsupportedOperationException()
}
override fun handleCancel(p0 : String?) {
throw UnsupportedOperationException()
}
override fun handleDelivery(consumerTag : String?, envelope : Envelope?, basicProperties : AMQP.BasicProperties?, body : ByteArray?) {
val rawJson = body!!.toString()
println(rawJson)
val probability = Random.nextDouble(0.0,1.0)
val outputObject = OutputModel(tag=rawJson, probability=probability)
GlobalScope.launch(Dispatchers.Default) {
outputConnection.confirmChannel {
publish {
publishWithConfirm(
OutboundMessage(
envConfig.mqOutputExchange,
"",
MessageProperties.PERSISTENT_BASIC,
gson.toJson(outputObject).toString()
)
)
}
}
}
}
override fun handleShutdownSignal(p0 : String?, p1 : ShutdownSignalException?) {
println("got shutdown signal")
}
init {
factory.host = envConfig.mqHost
factory.username = envConfig.mqUserName
factory.password = envConfig.mqPassWord
outputConnection=factory.newConnection()
outputChannel=outputConnection.createChannel()
outputChannel.exchangeDeclare(envConfig.mqOutputExchange, BuiltinExchangeType.FANOUT)
}
}

View File

@ -1,53 +1,26 @@
package worker package worker
import com.google.gson.Gson import config.EnvConfig
import com.rabbitmq.client.BuiltinExchangeType import com.rabbitmq.client.*
import com.rabbitmq.client.ConnectionFactory import consumer.GlueConsumer
import model.OutputModel
import com.viartemev.thewhiterabbit.channel.channel
import com.viartemev.thewhiterabbit.channel.consume
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlin.random.Random
class Glue { class Glue {
var mqHost: String = System.getenv("MQ_HOST") ?: "localhost" val envConfig = EnvConfig()
var mqUserName: String = System.getenv("MQ_USERNAME") ?: "rabbitmq"
var mqPassWord: String = System.getenv("MQ_PASSWORD") ?: "rabbitmq"
var mqInputExchange: String = System.getenv("MQ_IN_EXCHANGE") ?: "input"
var mqOutputExchange: String = System.getenv("MQ_OUT_EXCHANGE") ?: "output"
val gson = Gson()
fun recieve(){ fun recieve(){
val factory = ConnectionFactory() val factory = ConnectionFactory()
factory.host = mqHost factory.host = envConfig.mqHost
factory.username = mqUserName factory.username = envConfig.mqUserName
factory.password = mqPassWord factory.password = envConfig.mqPassWord
val inputConnection = factory.newConnection() val inputConnection = factory.newConnection()
val inputChannel = inputConnection.createChannel() val inputChannel = inputConnection.createChannel()
inputChannel.exchangeDeclare(mqInputExchange, BuiltinExchangeType.FANOUT) inputChannel.exchangeDeclare(envConfig.mqInputExchange, BuiltinExchangeType.FANOUT)
val inputQueueName = inputChannel.queueDeclare().queue val inputQueueName = inputChannel.queueDeclare().queue
inputChannel.queueBind(inputQueueName, mqInputExchange, "") inputChannel.queueBind(inputQueueName, envConfig.mqInputExchange, "")
val outputConnection = factory.newConnection() inputChannel.basicConsume(envConfig.mqInputExchange, true, GlueConsumer())
val outputChannel = outputConnection.createChannel()
outputChannel.exchangeDeclare(mqOutputExchange, BuiltinExchangeType.FANOUT)
GlobalScope.launch(Dispatchers.Default) {
inputConnection.channel {
consume(inputQueueName) {
consumeMessageWithConfirm({
val rawJson = String(it.body)
println(rawJson)
val probability = Random.nextDouble(0.0,1.0)
val outputObject = OutputModel(tag=rawJson, probability=probability)
outputChannel.basicPublish(mqOutputExchange,"", null, gson.toJson(outputObject).toByteArray())
})
}
}
}
} }
} }