initial commit

This commit is contained in:
2020-05-22 02:40:51 +02:00
commit 1733c32c41
15 changed files with 570 additions and 0 deletions

View File

@ -0,0 +1,6 @@
import worker.Glue
fun main(args: Array<String>) {
val glue = Glue()
glue.recieve()
}

View File

@ -0,0 +1,8 @@
package model
import com.google.gson.annotations.SerializedName
data class OutputModel (
@SerializedName("tag") val tag: String,
@SerializedName("probability") val probability: Double
)

View File

@ -0,0 +1,54 @@
package worker
import com.google.gson.Gson
import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.ConnectionFactory
import model.OutputModel
import com.viartemev.thewhiterabbit.channel.channel
import com.viartemev.thewhiterabbit.channel.consume
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlin.random.Random
class Glue {
var mqHost: String = System.getenv("MQ_HOST") ?: "localhost"
var mqUserName: String = System.getenv("MQ_USERNAME") ?: "rabbitmq"
var mqPassWord: String = System.getenv("MQ_PASSWORD") ?: "rabbitmq"
var mqInputExchange: String = System.getenv("MQ_IN_EXCHANGE") ?: "input"
var mqOutputExchange: String = System.getenv("MQ_OUT_EXCHANGE") ?: "output"
val gson = Gson()
fun recieve(){
val factory = ConnectionFactory()
factory.host = mqHost
factory.username = mqUserName
factory.password = mqPassWord
val inputConnection = factory.newConnection()
val inputChannel = inputConnection.createChannel()
inputChannel.exchangeDeclare(mqInputExchange, BuiltinExchangeType.FANOUT)
val inputQueueName = inputChannel.queueDeclare().queue
inputChannel.queueBind(inputQueueName, mqInputExchange, "")
val outputConnection = factory.newConnection()
val outputChannel = outputConnection.createChannel()
outputChannel.exchangeDeclare(mqOutputExchange, BuiltinExchangeType.FANOUT)
val outputQueueName = outputChannel.queueDeclare().queue
outputChannel.queueBind(outputQueueName, mqOutputExchange, "")
GlobalScope.launch(Dispatchers.Default) {
inputConnection.channel {
consume(inputQueueName) {
consumeMessageWithConfirm({
val rawJson = String(it.body)
val probability = Random.nextDouble(0.0,1.0)
val outputObject = OutputModel(tag=rawJson, probability=probability)
outputChannel.basicPublish("",outputQueueName, null, gson.toJson(outputObject).toByteArray())
})
}
}
}
}
}