import org.eclipse.paho.client.mqttv3.* import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence class MqttSubscriber : MqttCallbackExtended { private val broker = "tcp://mqtt.k8s.kmlabz.com:1883" private val clientId = MqttClient.generateClientId() private val persistence = MemoryPersistence() private val mqttClient = MqttAsyncClient(broker, clientId, persistence) override fun connectionLost(arg0: Throwable) { System.err.println("connection lost") } override fun deliveryComplete(arg0: IMqttDeliveryToken) { } @Throws(Exception::class) override fun messageArrived(topic: String, message: MqttMessage) { println("topic: $topic") println("message: " + String(message.payload)) } override fun connectComplete(reconnect: Boolean, serverURI: String?) { mqttClient.subscribe("#", 0) } fun connect() { val connOpts = MqttConnectOptions() connOpts.isCleanSession = true mqttClient.setCallback(MqttSubscriber()) mqttClient.connect(connOpts) } }