import org.eclipse.paho.client.mqttv3.* import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import java.time.Instant class MqttSubscriber() : MqttCallbackExtended { private val broker = "tcp://mqtt.k8s.kmlabz.com:1883" private val clientId = MqttClient.generateClientId() private val persistence = MemoryPersistence() private val mqttClient = MqttClient(broker, clientId, persistence) private val benchValues = HashMap() private val connOpts = MqttConnectOptions() init { connOpts.isCleanSession = true connOpts.userName = "birbnetes" connOpts.password = "de4d2182".toCharArray() connOpts.isAutomaticReconnect = true mqttClient.setCallback(this) } override fun connectionLost(arg0: Throwable) { System.err.println("connection lost") } override fun deliveryComplete(arg0: IMqttDeliveryToken) { } @Throws(Exception::class) override fun messageArrived(topic: String, message: MqttMessage) { println("topic: $topic") benchValues.put(topic.split("/")[1].toInt(),Instant.now()) println("message: " + String(message.payload)) } override fun connectComplete(reconnect: Boolean, serverURI: String?) { println("Subscribing to all topics") try { mqttClient.subscribe("#", 0) }catch (e: MqttException) { e.printStackTrace(); } } fun connect() { println("Connecting to broker") try { mqttClient.connect(connOpts) }catch (e: MqttException) { e.printStackTrace(); } } fun disconnect(): HashMap { println("Disconnecting from broker") try { mqttClient.disconnect() }catch (e: MqttException) { e.printStackTrace(); } mqttClient.close() return benchValues } }