This commit is contained in:
parent
b65be4e913
commit
701f51a334
@ -3,17 +3,17 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
|
|||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
class MqttSubscriber() : MqttCallbackExtended {
|
class MqttSubscriber() : MqttCallbackExtended {
|
||||||
private val broker = "tcp://mqtt.k8s.kmlabz.com:1883"
|
private val broker = System.getenv("BROKER") ?: "tcp://mqtt.k8s.kmlabz.com:1883"
|
||||||
private val clientId = MqttClient.generateClientId()
|
private val clientId = MqttClient.generateClientId()
|
||||||
private val persistence = MemoryPersistence()
|
private val persistence = MemoryPersistence()
|
||||||
private val mqttClient = MqttClient(broker, clientId, persistence)
|
private val mqttClient = MqttClient(broker, clientId, persistence)
|
||||||
private val benchValues = HashMap<Int, Instant>()
|
private val benchValues = LinkedHashMap<Int, Instant>()
|
||||||
private val connOpts = MqttConnectOptions()
|
private val connOpts = MqttConnectOptions()
|
||||||
|
|
||||||
init {
|
init {
|
||||||
connOpts.isCleanSession = true
|
connOpts.isCleanSession = true
|
||||||
connOpts.userName = "birbnetes"
|
connOpts.userName = System.getenv("USERNAME") ?: "birbnetes"
|
||||||
connOpts.password = "de4d2182".toCharArray()
|
connOpts.password = System.getenv("PASSWORD").toCharArray()
|
||||||
connOpts.isAutomaticReconnect = true
|
connOpts.isAutomaticReconnect = true
|
||||||
mqttClient.setCallback(this)
|
mqttClient.setCallback(this)
|
||||||
}
|
}
|
||||||
@ -27,9 +27,7 @@ class MqttSubscriber() : MqttCallbackExtended {
|
|||||||
|
|
||||||
@Throws(Exception::class)
|
@Throws(Exception::class)
|
||||||
override fun messageArrived(topic: String, message: MqttMessage) {
|
override fun messageArrived(topic: String, message: MqttMessage) {
|
||||||
println("topic: $topic")
|
benchValues[topic.split("/")[1].toInt()] = Instant.now()
|
||||||
benchValues.put(topic.split("/")[1].toInt(),Instant.now())
|
|
||||||
println("message: " + String(message.payload))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
|
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
|
||||||
@ -50,7 +48,7 @@ class MqttSubscriber() : MqttCallbackExtended {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun disconnect(): HashMap<Int, Instant> {
|
fun disconnect(): LinkedHashMap<Int, Instant> {
|
||||||
println("Disconnecting from broker")
|
println("Disconnecting from broker")
|
||||||
try {
|
try {
|
||||||
mqttClient.disconnect()
|
mqttClient.disconnect()
|
||||||
|
@ -7,8 +7,12 @@ fun main() = runBlocking{
|
|||||||
GlobalScope.launch {
|
GlobalScope.launch {
|
||||||
val mqtt = MqttSubscriber()
|
val mqtt = MqttSubscriber()
|
||||||
mqtt.connect()
|
mqtt.connect()
|
||||||
delay(10000L)
|
val waitTime = System.getenv("WAITTIME").toLong() ?: 10000L
|
||||||
mqtt.disconnect()
|
delay(waitTime)
|
||||||
|
val resultMap = mqtt.disconnect()
|
||||||
|
val resultProcessor = ProcessResults()
|
||||||
|
resultProcessor.mapToProcess=resultMap
|
||||||
|
resultProcessor.process()
|
||||||
}
|
}
|
||||||
delay(20000L)
|
delay(20000L)
|
||||||
}
|
}
|
16
src/main/kotlin/ProcessResults.kt
Normal file
16
src/main/kotlin/ProcessResults.kt
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import java.text.SimpleDateFormat
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessResults{
|
||||||
|
var mapToProcess = LinkedHashMap<Int, Instant>()
|
||||||
|
private var formatter = SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
|
||||||
|
|
||||||
|
|
||||||
|
fun process(){
|
||||||
|
for((id, instant) in this.mapToProcess){
|
||||||
|
val currInstantString = formatter.format(instant)
|
||||||
|
println("$id, $currInstantString")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user