54 lines
2.2 KiB
Kotlin
54 lines
2.2 KiB
Kotlin
package worker
|
|
|
|
import com.google.gson.Gson
|
|
import com.rabbitmq.client.BuiltinExchangeType
|
|
import com.rabbitmq.client.ConnectionFactory
|
|
import model.OutputModel
|
|
import com.viartemev.thewhiterabbit.channel.channel
|
|
import com.viartemev.thewhiterabbit.channel.consume
|
|
import kotlinx.coroutines.Dispatchers
|
|
import kotlinx.coroutines.GlobalScope
|
|
import kotlinx.coroutines.launch
|
|
import kotlin.random.Random
|
|
|
|
class Glue {
|
|
var mqHost: String = System.getenv("MQ_HOST") ?: "localhost"
|
|
var mqUserName: String = System.getenv("MQ_USERNAME") ?: "rabbitmq"
|
|
var mqPassWord: String = System.getenv("MQ_PASSWORD") ?: "rabbitmq"
|
|
var mqInputExchange: String = System.getenv("MQ_IN_EXCHANGE") ?: "input"
|
|
var mqOutputExchange: String = System.getenv("MQ_OUT_EXCHANGE") ?: "output"
|
|
val gson = Gson()
|
|
fun recieve(){
|
|
val factory = ConnectionFactory()
|
|
factory.host = mqHost
|
|
factory.username = mqUserName
|
|
factory.password = mqPassWord
|
|
|
|
val inputConnection = factory.newConnection()
|
|
val inputChannel = inputConnection.createChannel()
|
|
|
|
inputChannel.exchangeDeclare(mqInputExchange, BuiltinExchangeType.FANOUT)
|
|
val inputQueueName = inputChannel.queueDeclare().queue
|
|
inputChannel.queueBind(inputQueueName, mqInputExchange, "")
|
|
|
|
val outputConnection = factory.newConnection()
|
|
val outputChannel = outputConnection.createChannel()
|
|
|
|
outputChannel.exchangeDeclare(mqOutputExchange, BuiltinExchangeType.FANOUT)
|
|
val outputQueueName = outputChannel.queueDeclare().queue
|
|
outputChannel.queueBind(outputQueueName, mqOutputExchange, "")
|
|
|
|
GlobalScope.launch(Dispatchers.Default) {
|
|
inputConnection.channel {
|
|
consume(inputQueueName) {
|
|
consumeMessageWithConfirm({
|
|
val rawJson = String(it.body)
|
|
val probability = Random.nextDouble(0.0,1.0)
|
|
val outputObject = OutputModel(tag=rawJson, probability=probability)
|
|
outputChannel.basicPublish("",outputQueueName, null, gson.toJson(outputObject).toByteArray())
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} |