From 10d305689d316f62d238fcfda229044f594bfe4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Thu, 21 May 2020 22:27:27 +0200 Subject: [PATCH] complete input service logic --- src/Application.kt | 18 ----- src/api/model/SampleObject.kt | 9 +++ src/api/route/InputServiceServer.kt | 105 ++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 18 deletions(-) create mode 100644 src/api/model/SampleObject.kt diff --git a/src/Application.kt b/src/Application.kt index 9c1e403..dcf4f0a 100644 --- a/src/Application.kt +++ b/src/Application.kt @@ -11,9 +11,6 @@ import org.apache.http.HttpException import com.kmalbz.database.DatabaseFactory import com.kmalbz.database.dao.InputObjects import io.ktor.util.KtorExperimentalAPI -import com.rabbitmq.client.* -import com.typesafe.config.ConfigFactory -import io.ktor.config.HoconApplicationConfig import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.transactions.transaction import org.koin.ktor.ext.Koin @@ -38,21 +35,6 @@ fun Application.module() { SchemaUtils.create(InputObjects) } - val appConfig = HoconApplicationConfig(ConfigFactory.load()) - val factory = ConnectionFactory() - factory.host = appConfig.property("ktor.mq.host").getString() - factory.username = appConfig.property("ktor.mq.username").getString() - factory.password = appConfig.property("ktor.mq.password").getString() - - val connection = factory.newConnection() - val channel = connection.createChannel() - - val rabbitExchangeName = appConfig.property("ktor.mq.exchange").getString() - - channel.exchangeDeclare(rabbitExchangeName, BuiltinExchangeType.FANOUT) - val queueName = channel.queueDeclare().queue - channel.queueBind(queueName, rabbitExchangeName, "") - routing { install(StatusPages) { exception { diff --git a/src/api/model/SampleObject.kt b/src/api/model/SampleObject.kt new file mode 100644 index 0000000..bbe78a1 --- /dev/null +++ b/src/api/model/SampleObject.kt @@ -0,0 +1,9 @@ +package com.kmalbz.api.model + +import com.google.gson.annotations.SerializedName +import java.time.LocalDate + +data class SampleObject( + @SerializedName("date") val date: LocalDate, + @SerializedName("device_id") val device_id: String +) \ No newline at end of file diff --git a/src/api/route/InputServiceServer.kt b/src/api/route/InputServiceServer.kt index 4831620..6a64da1 100644 --- a/src/api/route/InputServiceServer.kt +++ b/src/api/route/InputServiceServer.kt @@ -1,13 +1,40 @@ package com.kmalbz.api.route +import com.google.gson.Gson +import com.kmalbz.api.model.ApiObject +import com.kmalbz.api.model.SampleObject import com.kmalbz.database.service.IInputObjectService +import com.rabbitmq.client.BuiltinExchangeType +import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.client.MessageProperties +import com.typesafe.config.ConfigFactory +import com.viartemev.thewhiterabbit.channel.confirmChannel +import com.viartemev.thewhiterabbit.channel.publish +import com.viartemev.thewhiterabbit.publisher.OutboundMessage import io.ktor.application.call +import io.ktor.config.HoconApplicationConfig import io.ktor.http.HttpStatusCode +import io.ktor.http.content.PartData +import io.ktor.http.content.forEachPart +import io.ktor.http.content.streamProvider +import io.ktor.request.receiveMultipart import io.ktor.response.respond import io.ktor.routing.Routing import io.ktor.routing.get import io.ktor.routing.post +import io.ktor.util.KtorExperimentalAPI +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import kotlinx.coroutines.yield +import org.jetbrains.exposed.sql.transactions.transaction import org.koin.ktor.ext.inject +import java.io.File +import java.io.InputStream +import java.io.OutputStream +import java.util.* +import javax.activation.MimetypesFileTypeMap + /** * Input Service @@ -18,13 +45,68 @@ class InputServiceServer { /** * sample */ + @KtorExperimentalAPI fun Routing.registerOutput() { val resultObjectService by inject() + + val appConfig = HoconApplicationConfig(ConfigFactory.load()) + val factory = ConnectionFactory() + factory.host = appConfig.property("ktor.mq.host").getString() + factory.username = appConfig.property("ktor.mq.username").getString() + factory.password = appConfig.property("ktor.mq.password").getString() + + val connection = factory.newConnection() + val channel = connection.createChannel() + + val rabbitExchangeName = appConfig.property("ktor.mq.exchange").getString() + + channel.exchangeDeclare(rabbitExchangeName, BuiltinExchangeType.FANOUT) + val queueName = channel.queueDeclare().queue + channel.queueBind(queueName, rabbitExchangeName, "") + get("/sample"){ call.respond(resultObjectService.getAllInputObjects()) } post("/sample"){ + var description: SampleObject? = null + var soundFile: File? = null + val multipart = call.receiveMultipart() + multipart.forEachPart { part -> + when (part) { + is PartData.FormItem -> { + if (part.name == "description") { + val gson = Gson() + description = gson.fromJson(part.value, SampleObject::class.java) + } + } + is PartData.FileItem -> { + val file = File(System.getProperty( "java.io.tmpdir"), "upload-${System.currentTimeMillis()}") + part.streamProvider().use { input -> file.outputStream().buffered().use { output -> input.copyToSuspend(output) } } + val fileType = MimetypesFileTypeMap().getContentType(file) + if (fileType != "audio/wave") + call.respond(HttpStatusCode.BadRequest) + soundFile = file + } + } + part.dispose() + } + + if (description == null || soundFile == null) + call.respond(HttpStatusCode.ExpectationFailed) + + val tag = UUID.randomUUID().toString() + + transaction { + resultObjectService.addOne(ApiObject(tag = tag, date = description!!.date, device_id = description!!.device_id)) + } + + connection.confirmChannel { + publish { + publishWithConfirm(OutboundMessage(rabbitExchangeName, "", MessageProperties.PERSISTENT_BASIC, tag)) + } + } + call.respond(resultObjectService.getAllInputObjects()) } @@ -35,4 +117,27 @@ class InputServiceServer { call.respond(resultObject) } } + private suspend fun InputStream.copyToSuspend( + out: OutputStream, + bufferSize: Int = DEFAULT_BUFFER_SIZE, + yieldSize: Int = 4 * 1024 * 1024, + dispatcher: CoroutineDispatcher = Dispatchers.IO + ): Long { + return withContext(dispatcher) { + val buffer = ByteArray(bufferSize) + var bytesCopied = 0L + var bytesAfterYield = 0L + while (true) { + val bytes = read(buffer).takeIf { it >= 0 } ?: break + out.write(buffer, 0, bytes) + if (bytesAfterYield >= yieldSize) { + yield() + bytesAfterYield %= yieldSize + } + bytesCopied += bytes + bytesAfterYield += bytes + } + return@withContext bytesCopied + } + } }