This commit is contained in:
		@@ -1,12 +1,13 @@
 | 
				
			|||||||
import org.eclipse.paho.client.mqttv3.*
 | 
					import org.eclipse.paho.client.mqttv3.*
 | 
				
			||||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 | 
					import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 | 
				
			||||||
 | 
					import java.time.Instant
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class MqttSubscriber : MqttCallbackExtended {
 | 
					class MqttSubscriber : MqttCallbackExtended {
 | 
				
			||||||
    private val broker = "tcp://mqtt.k8s.kmlabz.com:1883"
 | 
					    private val broker = "tcp://mqtt.k8s.kmlabz.com:1883"
 | 
				
			||||||
    private val clientId = MqttClient.generateClientId()
 | 
					    private val clientId = MqttClient.generateClientId()
 | 
				
			||||||
    private val persistence = MemoryPersistence()
 | 
					    private val persistence = MemoryPersistence()
 | 
				
			||||||
    private val mqttClient = MqttAsyncClient(broker, clientId, persistence)
 | 
					    private val mqttClient = MqttAsyncClient(broker, clientId, persistence)
 | 
				
			||||||
 | 
					    private val benchValues = HashMap<Int, Instant>()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    override fun connectionLost(arg0: Throwable) {
 | 
					    override fun connectionLost(arg0: Throwable) {
 | 
				
			||||||
        System.err.println("connection lost")
 | 
					        System.err.println("connection lost")
 | 
				
			||||||
@@ -18,7 +19,9 @@ class MqttSubscriber : MqttCallbackExtended {
 | 
				
			|||||||
    @Throws(Exception::class)
 | 
					    @Throws(Exception::class)
 | 
				
			||||||
    override fun messageArrived(topic: String, message: MqttMessage) {
 | 
					    override fun messageArrived(topic: String, message: MqttMessage) {
 | 
				
			||||||
        println("topic: $topic")
 | 
					        println("topic: $topic")
 | 
				
			||||||
 | 
					        benchValues.put(topic.split("/")[1].toInt(),Instant.now())
 | 
				
			||||||
        println("message: " + String(message.payload))
 | 
					        println("message: " + String(message.payload))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    override fun connectComplete(reconnect: Boolean, serverURI: String?) {
 | 
					    override fun connectComplete(reconnect: Boolean, serverURI: String?) {
 | 
				
			||||||
@@ -33,4 +36,10 @@ class MqttSubscriber : MqttCallbackExtended {
 | 
				
			|||||||
        mqttClient.setCallback(MqttSubscriber())
 | 
					        mqttClient.setCallback(MqttSubscriber())
 | 
				
			||||||
        mqttClient.connect(connOpts)
 | 
					        mqttClient.connect(connOpts)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fun disconnect(): HashMap<Int, Instant> {
 | 
				
			||||||
 | 
					        mqttClient.disconnect()
 | 
				
			||||||
 | 
					        mqttClient.close()
 | 
				
			||||||
 | 
					        return benchValues
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -1,5 +1,6 @@
 | 
				
			|||||||
fun main(){
 | 
					fun main(){
 | 
				
			||||||
    val mqtt = MqttSubscriber()
 | 
					    val mqtt = MqttSubscriber()
 | 
				
			||||||
    mqtt.connect()
 | 
					    mqtt.connect()
 | 
				
			||||||
    println("Hello World!")
 | 
					    Thread.sleep(10000)
 | 
				
			||||||
 | 
					    mqtt.disconnect()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user