import org.eclipse.paho.client.mqttv3.* import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import java.time.Instant class MqttSubscriber : MqttCallbackExtended { private val broker = "tcp://mqtt.k8s.kmlabz.com:1883" private val clientId = MqttClient.generateClientId() private val persistence = MemoryPersistence() private val mqttClient = MqttAsyncClient(broker, clientId, persistence) private val benchValues = HashMap() override fun connectionLost(arg0: Throwable) { System.err.println("connection lost") mqttClient.reconnect() } override fun deliveryComplete(arg0: IMqttDeliveryToken) { } @Throws(Exception::class) override fun messageArrived(topic: String, message: MqttMessage) { println("topic: $topic") benchValues.put(topic.split("/")[1].toInt(),Instant.now()) println("message: " + String(message.payload)) } override fun connectComplete(reconnect: Boolean, serverURI: String?) { println("Subscribing to all topics") mqttClient.subscribe("#", 0) } fun connect() { println("Connecting to broker") val connOpts = MqttConnectOptions() connOpts.isCleanSession = true connOpts.userName = "birbnetes" connOpts.password = "de4d2182".toCharArray() mqttClient.setCallback(MqttSubscriber()) mqttClient.connect(connOpts) } fun disconnect(): HashMap { println("Disconnecting from broker") mqttClient.disconnect() mqttClient.close() return benchValues } }