input-service/src/api/route/InputServiceServer.kt

138 lines
4.8 KiB
Kotlin

package com.kmalbz.api.route
import com.google.gson.GsonBuilder
import com.kmalbz.api.model.ApiObject
import com.kmalbz.api.model.SampleObject
import com.kmalbz.database.service.IInputObjectService
import com.rabbitmq.client.BuiltinExchangeType
import com.rabbitmq.client.ConnectionFactory
import com.typesafe.config.ConfigFactory
import io.ktor.application.call
import io.ktor.config.HoconApplicationConfig
import io.ktor.http.HttpStatusCode
import io.ktor.http.content.PartData
import io.ktor.http.content.forEachPart
import io.ktor.http.content.streamProvider
import io.ktor.request.receiveMultipart
import io.ktor.response.respond
import io.ktor.routing.Routing
import io.ktor.routing.get
import io.ktor.routing.post
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.coroutines.yield
import org.jetbrains.exposed.sql.transactions.transaction
import org.koin.ktor.ext.inject
import java.io.File
import java.io.InputStream
import java.io.OutputStream
import java.time.LocalDate
import java.util.*
import javax.activation.MimetypesFileTypeMap
/**
* Input Service
*
* This is the input interface of the Birbnetes system.
*/
class InputServiceServer {
/**
* sample
*/
@KtorExperimentalAPI
fun Routing.registerOutput() {
val resultObjectService by inject<IInputObjectService>()
val appConfig = HoconApplicationConfig(ConfigFactory.load())
val factory = ConnectionFactory()
factory.host = appConfig.property("ktor.mq.host").getString()
factory.username = appConfig.property("ktor.mq.username").getString()
factory.password = appConfig.property("ktor.mq.password").getString()
val connection = factory.newConnection()
val channel = connection.createChannel()
val rabbitExchangeName = appConfig.property("ktor.mq.exchange").getString()
channel.exchangeDeclare(rabbitExchangeName, BuiltinExchangeType.FANOUT)
val queueName = channel.queueDeclare().queue
channel.queueBind(queueName, rabbitExchangeName, "")
val gson = GsonBuilder().setPrettyPrinting().create()
get("/sample"){
call.respond(resultObjectService.getAllInputObjects())
}
post("/sample"){
var description: SampleObject? = null
var soundFile: File? = null
val multipart = call.receiveMultipart()
multipart.forEachPart { part ->
when (part) {
is PartData.FormItem -> {
if (part.name == "description") {
description = gson.fromJson(part.value, SampleObject::class.java)
}
}
is PartData.FileItem -> {
val file = File(System.getProperty( "java.io.tmpdir"), "upload-${System.currentTimeMillis()}")
part.streamProvider().use { input -> file.outputStream().buffered().use { output -> input.copyToSuspend(output) } }
soundFile = file
}
}
part.dispose()
}
if ((description == null) or (soundFile == null)) {
call.respond(HttpStatusCode.ExpectationFailed)
return@post
}
val tag = UUID.randomUUID()
val currentApiObject = ApiObject(tag = tag, date = LocalDate.now(), device_id = description!!.device_id)
transaction {
resultObjectService.addOne(currentApiObject)
}
channel.basicPublish("",queueName, null, tag.toString().toByteArray())
call.respond(currentApiObject)
}
get("/sample/{tagID}") {
val tagID = call.parameters["tagID"] ?: error(HttpStatusCode.NotAcceptable)
val resultObject = resultObjectService.getInputObjectbyTag(UUID.fromString(tagID)) ?: call.respond(HttpStatusCode.NotFound)
call.respond(resultObject)
}
}
private suspend fun InputStream.copyToSuspend(
out: OutputStream,
bufferSize: Int = DEFAULT_BUFFER_SIZE,
yieldSize: Int = 4 * 1024 * 1024,
dispatcher: CoroutineDispatcher = Dispatchers.IO
): Long {
return withContext(dispatcher) {
val buffer = ByteArray(bufferSize)
var bytesCopied = 0L
var bytesAfterYield = 0L
while (true) {
val bytes = read(buffer).takeIf { it >= 0 } ?: break
out.write(buffer, 0, bytes)
if (bytesAfterYield >= yieldSize) {
yield()
bytesAfterYield %= yieldSize
}
bytesCopied += bytes
bytesAfterYield += bytes
}
return@withContext bytesCopied
}
}
}