complete input service logic
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Torma Kristóf 2020-05-21 22:27:27 +02:00
parent e86987749b
commit 10d305689d
Signed by: tormakris
GPG Key ID: DC83C4F2C41B1047
3 changed files with 114 additions and 18 deletions

View File

@ -11,9 +11,6 @@ import org.apache.http.HttpException
import com.kmalbz.database.DatabaseFactory import com.kmalbz.database.DatabaseFactory
import com.kmalbz.database.dao.InputObjects import com.kmalbz.database.dao.InputObjects
import io.ktor.util.KtorExperimentalAPI import io.ktor.util.KtorExperimentalAPI
import com.rabbitmq.client.*
import com.typesafe.config.ConfigFactory
import io.ktor.config.HoconApplicationConfig
import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.transactions.transaction
import org.koin.ktor.ext.Koin import org.koin.ktor.ext.Koin
@ -38,21 +35,6 @@ fun Application.module() {
SchemaUtils.create(InputObjects) SchemaUtils.create(InputObjects)
} }
val appConfig = HoconApplicationConfig(ConfigFactory.load())
val factory = ConnectionFactory()
factory.host = appConfig.property("ktor.mq.host").getString()
factory.username = appConfig.property("ktor.mq.username").getString()
factory.password = appConfig.property("ktor.mq.password").getString()
val connection = factory.newConnection()
val channel = connection.createChannel()
val rabbitExchangeName = appConfig.property("ktor.mq.exchange").getString()
channel.exchangeDeclare(rabbitExchangeName, BuiltinExchangeType.FANOUT)
val queueName = channel.queueDeclare().queue
channel.queueBind(queueName, rabbitExchangeName, "")
routing { routing {
install(StatusPages) { install(StatusPages) {
exception<HttpException> { exception<HttpException> {

View File

@ -0,0 +1,9 @@
package com.kmalbz.api.model
import com.google.gson.annotations.SerializedName
import java.time.LocalDate
data class SampleObject(
@SerializedName("date") val date: LocalDate,
@SerializedName("device_id") val device_id: String
)

View File

@ -1,13 +1,40 @@
package com.kmalbz.api.route package com.kmalbz.api.route
import com.google.gson.Gson
import com.kmalbz.api.model.ApiObject
import com.kmalbz.api.model.SampleObject
import com.kmalbz.database.service.IInputObjectService import com.kmalbz.database.service.IInputObjectService
import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.MessageProperties
import com.typesafe.config.ConfigFactory
import com.viartemev.thewhiterabbit.channel.confirmChannel
import com.viartemev.thewhiterabbit.channel.publish
import com.viartemev.thewhiterabbit.publisher.OutboundMessage
import io.ktor.application.call import io.ktor.application.call
import io.ktor.config.HoconApplicationConfig
import io.ktor.http.HttpStatusCode import io.ktor.http.HttpStatusCode
import io.ktor.http.content.PartData
import io.ktor.http.content.forEachPart
import io.ktor.http.content.streamProvider
import io.ktor.request.receiveMultipart
import io.ktor.response.respond import io.ktor.response.respond
import io.ktor.routing.Routing import io.ktor.routing.Routing
import io.ktor.routing.get import io.ktor.routing.get
import io.ktor.routing.post import io.ktor.routing.post
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.coroutines.yield
import org.jetbrains.exposed.sql.transactions.transaction
import org.koin.ktor.ext.inject import org.koin.ktor.ext.inject
import java.io.File
import java.io.InputStream
import java.io.OutputStream
import java.util.*
import javax.activation.MimetypesFileTypeMap
/** /**
* Input Service * Input Service
@ -18,13 +45,68 @@ class InputServiceServer {
/** /**
* sample * sample
*/ */
@KtorExperimentalAPI
fun Routing.registerOutput() { fun Routing.registerOutput() {
val resultObjectService by inject<IInputObjectService>() val resultObjectService by inject<IInputObjectService>()
val appConfig = HoconApplicationConfig(ConfigFactory.load())
val factory = ConnectionFactory()
factory.host = appConfig.property("ktor.mq.host").getString()
factory.username = appConfig.property("ktor.mq.username").getString()
factory.password = appConfig.property("ktor.mq.password").getString()
val connection = factory.newConnection()
val channel = connection.createChannel()
val rabbitExchangeName = appConfig.property("ktor.mq.exchange").getString()
channel.exchangeDeclare(rabbitExchangeName, BuiltinExchangeType.FANOUT)
val queueName = channel.queueDeclare().queue
channel.queueBind(queueName, rabbitExchangeName, "")
get("/sample"){ get("/sample"){
call.respond(resultObjectService.getAllInputObjects()) call.respond(resultObjectService.getAllInputObjects())
} }
post("/sample"){ post("/sample"){
var description: SampleObject? = null
var soundFile: File? = null
val multipart = call.receiveMultipart()
multipart.forEachPart { part ->
when (part) {
is PartData.FormItem -> {
if (part.name == "description") {
val gson = Gson()
description = gson.fromJson(part.value, SampleObject::class.java)
}
}
is PartData.FileItem -> {
val file = File(System.getProperty( "java.io.tmpdir"), "upload-${System.currentTimeMillis()}")
part.streamProvider().use { input -> file.outputStream().buffered().use { output -> input.copyToSuspend(output) } }
val fileType = MimetypesFileTypeMap().getContentType(file)
if (fileType != "audio/wave")
call.respond(HttpStatusCode.BadRequest)
soundFile = file
}
}
part.dispose()
}
if (description == null || soundFile == null)
call.respond(HttpStatusCode.ExpectationFailed)
val tag = UUID.randomUUID().toString()
transaction {
resultObjectService.addOne(ApiObject(tag = tag, date = description!!.date, device_id = description!!.device_id))
}
connection.confirmChannel {
publish {
publishWithConfirm(OutboundMessage(rabbitExchangeName, "", MessageProperties.PERSISTENT_BASIC, tag))
}
}
call.respond(resultObjectService.getAllInputObjects()) call.respond(resultObjectService.getAllInputObjects())
} }
@ -35,4 +117,27 @@ class InputServiceServer {
call.respond(resultObject) call.respond(resultObject)
} }
} }
private suspend fun InputStream.copyToSuspend(
out: OutputStream,
bufferSize: Int = DEFAULT_BUFFER_SIZE,
yieldSize: Int = 4 * 1024 * 1024,
dispatcher: CoroutineDispatcher = Dispatchers.IO
): Long {
return withContext(dispatcher) {
val buffer = ByteArray(bufferSize)
var bytesCopied = 0L
var bytesAfterYield = 0L
while (true) {
val bytes = read(buffer).takeIf { it >= 0 } ?: break
out.write(buffer, 0, bytes)
if (bytesAfterYield >= yieldSize) {
yield()
bytesAfterYield %= yieldSize
}
bytesCopied += bytes
bytesAfterYield += bytes
}
return@withContext bytesCopied
}
}
} }