All checks were successful
continuous-integration/drone/push Build is passing
145 lines
5.3 KiB
Kotlin
145 lines
5.3 KiB
Kotlin
package com.kmalbz.api.route
|
|
|
|
import com.google.gson.GsonBuilder
|
|
import com.kmalbz.api.model.ApiObject
|
|
import com.kmalbz.api.model.SampleObject
|
|
import com.kmalbz.database.service.IInputObjectService
|
|
import com.rabbitmq.client.BuiltinExchangeType
|
|
import com.rabbitmq.client.ConnectionFactory
|
|
import com.rabbitmq.client.MessageProperties
|
|
import com.typesafe.config.ConfigFactory
|
|
import com.viartemev.thewhiterabbit.channel.confirmChannel
|
|
import com.viartemev.thewhiterabbit.channel.publish
|
|
import com.viartemev.thewhiterabbit.publisher.OutboundMessage
|
|
import io.ktor.application.call
|
|
import io.ktor.config.HoconApplicationConfig
|
|
import io.ktor.http.HttpStatusCode
|
|
import io.ktor.http.content.PartData
|
|
import io.ktor.http.content.forEachPart
|
|
import io.ktor.http.content.streamProvider
|
|
import io.ktor.request.receiveMultipart
|
|
import io.ktor.response.respond
|
|
import io.ktor.routing.Routing
|
|
import io.ktor.routing.get
|
|
import io.ktor.routing.post
|
|
import io.ktor.util.KtorExperimentalAPI
|
|
import kotlinx.coroutines.CoroutineDispatcher
|
|
import kotlinx.coroutines.Dispatchers
|
|
import kotlinx.coroutines.withContext
|
|
import kotlinx.coroutines.yield
|
|
import org.jetbrains.exposed.sql.transactions.transaction
|
|
import org.koin.ktor.ext.inject
|
|
import java.io.File
|
|
import java.io.InputStream
|
|
import java.io.OutputStream
|
|
import java.time.LocalDate
|
|
import java.util.*
|
|
import javax.activation.MimetypesFileTypeMap
|
|
|
|
|
|
/**
|
|
* Input Service
|
|
*
|
|
* This is the input interface of the Birbnetes system.
|
|
*/
|
|
class InputServiceServer {
|
|
/**
|
|
* sample
|
|
*/
|
|
@KtorExperimentalAPI
|
|
fun Routing.registerOutput() {
|
|
val resultObjectService by inject<IInputObjectService>()
|
|
|
|
val appConfig = HoconApplicationConfig(ConfigFactory.load())
|
|
val factory = ConnectionFactory()
|
|
factory.host = appConfig.property("ktor.mq.host").getString()
|
|
factory.username = appConfig.property("ktor.mq.username").getString()
|
|
factory.password = appConfig.property("ktor.mq.password").getString()
|
|
|
|
val connection = factory.newConnection()
|
|
val channel = connection.createChannel()
|
|
|
|
val rabbitExchangeName = appConfig.property("ktor.mq.exchange").getString()
|
|
|
|
channel.exchangeDeclare(rabbitExchangeName, BuiltinExchangeType.FANOUT)
|
|
val queueName = channel.queueDeclare().queue
|
|
channel.queueBind(queueName, rabbitExchangeName, "")
|
|
|
|
val gson = GsonBuilder().setPrettyPrinting().create()
|
|
|
|
get("/sample"){
|
|
call.respond(resultObjectService.getAllInputObjects())
|
|
}
|
|
|
|
post("/sample"){
|
|
var description: SampleObject? = null
|
|
var soundFile: File? = null
|
|
val multipart = call.receiveMultipart()
|
|
multipart.forEachPart { part ->
|
|
when (part) {
|
|
is PartData.FormItem -> {
|
|
if (part.name == "description") {
|
|
print(part.value)
|
|
description = gson.fromJson(part.value, SampleObject::class.java)
|
|
}
|
|
}
|
|
is PartData.FileItem -> {
|
|
val file = File(System.getProperty( "java.io.tmpdir"), "upload-${System.currentTimeMillis()}")
|
|
part.streamProvider().use { input -> file.outputStream().buffered().use { output -> input.copyToSuspend(output) } }
|
|
val fileType = MimetypesFileTypeMap().getContentType(file)
|
|
if (fileType != "audio/wave")
|
|
call.respond(HttpStatusCode.BadRequest)
|
|
soundFile = file
|
|
}
|
|
}
|
|
part.dispose()
|
|
}
|
|
|
|
if (description == null || soundFile == null)
|
|
call.respond(HttpStatusCode.ExpectationFailed)
|
|
|
|
val tag = UUID.randomUUID()
|
|
|
|
val currentApiObject = ApiObject(tag = tag, date = LocalDate.now(), device_id = description!!.device_id)
|
|
|
|
transaction {
|
|
resultObjectService.addOne(currentApiObject)
|
|
}
|
|
|
|
channel.basicPublish("",queueName, null, tag.toString().toByteArray())
|
|
|
|
call.respond(currentApiObject)
|
|
}
|
|
|
|
get("/sample/{tagID}") {
|
|
val tagID = call.parameters["tagID"] ?: error(HttpStatusCode.NotAcceptable)
|
|
val resultObject = resultObjectService.getInputObjectbyTag(UUID.fromString(tagID)) ?: call.respond(HttpStatusCode.NotFound)
|
|
|
|
call.respond(resultObject)
|
|
}
|
|
}
|
|
private suspend fun InputStream.copyToSuspend(
|
|
out: OutputStream,
|
|
bufferSize: Int = DEFAULT_BUFFER_SIZE,
|
|
yieldSize: Int = 4 * 1024 * 1024,
|
|
dispatcher: CoroutineDispatcher = Dispatchers.IO
|
|
): Long {
|
|
return withContext(dispatcher) {
|
|
val buffer = ByteArray(bufferSize)
|
|
var bytesCopied = 0L
|
|
var bytesAfterYield = 0L
|
|
while (true) {
|
|
val bytes = read(buffer).takeIf { it >= 0 } ?: break
|
|
out.write(buffer, 0, bytes)
|
|
if (bytesAfterYield >= yieldSize) {
|
|
yield()
|
|
bytesAfterYield %= yieldSize
|
|
}
|
|
bytesCopied += bytes
|
|
bytesAfterYield += bytes
|
|
}
|
|
return@withContext bytesCopied
|
|
}
|
|
}
|
|
}
|