package com.kmalbz.api.route import com.google.gson.Gson import com.kmalbz.api.model.ApiObject import com.kmalbz.api.model.SampleObject import com.kmalbz.database.service.IInputObjectService import com.rabbitmq.client.BuiltinExchangeType import com.rabbitmq.client.ConnectionFactory import com.rabbitmq.client.MessageProperties import com.typesafe.config.ConfigFactory import com.viartemev.thewhiterabbit.channel.confirmChannel import com.viartemev.thewhiterabbit.channel.publish import com.viartemev.thewhiterabbit.publisher.OutboundMessage import io.ktor.application.call import io.ktor.config.HoconApplicationConfig import io.ktor.http.HttpStatusCode import io.ktor.http.content.PartData import io.ktor.http.content.forEachPart import io.ktor.http.content.streamProvider import io.ktor.request.receiveMultipart import io.ktor.response.respond import io.ktor.routing.Routing import io.ktor.routing.get import io.ktor.routing.post import io.ktor.util.KtorExperimentalAPI import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import kotlinx.coroutines.yield import org.jetbrains.exposed.sql.transactions.transaction import org.koin.ktor.ext.inject import java.io.File import java.io.InputStream import java.io.OutputStream import java.util.* import javax.activation.MimetypesFileTypeMap /** * Input Service * * This is the input interface of the Birbnetes system. */ class InputServiceServer { /** * sample */ @KtorExperimentalAPI fun Routing.registerOutput() { val resultObjectService by inject() val appConfig = HoconApplicationConfig(ConfigFactory.load()) val factory = ConnectionFactory() factory.host = appConfig.property("ktor.mq.host").getString() factory.username = appConfig.property("ktor.mq.username").getString() factory.password = appConfig.property("ktor.mq.password").getString() val connection = factory.newConnection() val channel = connection.createChannel() val rabbitExchangeName = appConfig.property("ktor.mq.exchange").getString() channel.exchangeDeclare(rabbitExchangeName, BuiltinExchangeType.FANOUT) val queueName = channel.queueDeclare().queue channel.queueBind(queueName, rabbitExchangeName, "") get("/sample"){ call.respond(resultObjectService.getAllInputObjects()) } post("/sample"){ var description: SampleObject? = null var soundFile: File? = null val multipart = call.receiveMultipart() multipart.forEachPart { part -> when (part) { is PartData.FormItem -> { if (part.name == "description") { val gson = Gson() description = gson.fromJson(part.value, SampleObject::class.java) } } is PartData.FileItem -> { val file = File(System.getProperty( "java.io.tmpdir"), "upload-${System.currentTimeMillis()}") part.streamProvider().use { input -> file.outputStream().buffered().use { output -> input.copyToSuspend(output) } } val fileType = MimetypesFileTypeMap().getContentType(file) if (fileType != "audio/wave") call.respond(HttpStatusCode.BadRequest) soundFile = file } } part.dispose() } if (description == null || soundFile == null) call.respond(HttpStatusCode.ExpectationFailed) val tag = UUID.randomUUID().toString() val currentApiObject = ApiObject(tag = tag, date = description!!.date, device_id = description!!.device_id) transaction { resultObjectService.addOne(currentApiObject) } connection.confirmChannel { publish { publishWithConfirm(OutboundMessage(rabbitExchangeName, "", MessageProperties.PERSISTENT_BASIC, tag)) } } call.respond(currentApiObject) } get("/sample/{tagID}") { val tagID = call.parameters["tagID"] ?: error(HttpStatusCode.NotAcceptable) val resultObject = resultObjectService.getInputObjectbyTag(tagID) ?: call.respond(HttpStatusCode.NotFound) call.respond(resultObject) } } private suspend fun InputStream.copyToSuspend( out: OutputStream, bufferSize: Int = DEFAULT_BUFFER_SIZE, yieldSize: Int = 4 * 1024 * 1024, dispatcher: CoroutineDispatcher = Dispatchers.IO ): Long { return withContext(dispatcher) { val buffer = ByteArray(bufferSize) var bytesCopied = 0L var bytesAfterYield = 0L while (true) { val bytes = read(buffer).takeIf { it >= 0 } ?: break out.write(buffer, 0, bytes) if (bytesAfterYield >= yieldSize) { yield() bytesAfterYield %= yieldSize } bytesCopied += bytes bytesAfterYield += bytes } return@withContext bytesCopied } } }