import org.eclipse.paho.client.mqttv3.* import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import java.time.Instant class MqttSubscriber() : MqttCallbackExtended { private val broker = System.getenv("BROKER") ?: "tcp://mqtt.k8s.kmlabz.com:1883" private val clientId = MqttClient.generateClientId() private val persistence = MemoryPersistence() private val mqttClient = MqttClient(broker, clientId, persistence) private val benchValues = LinkedHashMap() private val connOpts = MqttConnectOptions() init { connOpts.isCleanSession = true connOpts.userName = System.getenv("USERNAME") ?: "birbnetes" connOpts.password = System.getenv("PASSWORD").toCharArray() connOpts.isAutomaticReconnect = true mqttClient.setCallback(this) } override fun connectionLost(arg0: Throwable) { System.err.println("connection lost") } override fun deliveryComplete(arg0: IMqttDeliveryToken) { } @Throws(Exception::class) override fun messageArrived(topic: String, message: MqttMessage) { benchValues[topic.split("/")[1].toInt()] = Instant.now() } override fun connectComplete(reconnect: Boolean, serverURI: String?) { println("Subscribing to all topics") try { mqttClient.subscribe("#", 0) }catch (e: MqttException) { e.printStackTrace(); } } fun connect() { println("Connecting to broker") try { mqttClient.connect(connOpts) }catch (e: MqttException) { e.printStackTrace(); } } fun disconnect(): LinkedHashMap { println("Disconnecting from broker") try { mqttClient.disconnect() }catch (e: MqttException) { e.printStackTrace(); } mqttClient.close() return benchValues } }