Implemented MQTT reception

This commit is contained in:
Pünkösd Marcell 2020-10-23 03:44:42 +02:00
parent 8dd3f1cb3c
commit 0dea6c0f51

View File

@ -1,14 +1,14 @@
import io
import sys
import time import time
import pycurl import pycurl
import itertools import itertools
import argparse
import multiprocessing import multiprocessing
import threading import paho.mqtt.client
import queue
from datetime import datetime from datetime import datetime
from blist import blist from blist import blist
from dataclasses import dataclass from dataclasses import dataclass
import threading
import queue
@dataclass(frozen=False, init=True) @dataclass(frozen=False, init=True)
@ -46,7 +46,7 @@ def independent_worker(number_generator, result_queue, filename: str, timeout: f
jobid = next(number_generator) jobid = next(number_generator)
prepared_curl.setopt(pycurl.HTTPPOST, [ prepared_curl.setopt(pycurl.HTTPPOST, [
('file', ( ('file', (
pycurl.FORM_FILE, filename pycurl.FORM_FILE, filename # Copying buffers from Python memory would be even slower... trust me
)), )),
('description', ( ('description', (
pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}', pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}',
@ -81,11 +81,22 @@ def independent_worker(number_generator, result_queue, filename: str, timeout: f
) )
def main(): def mqtt_on_connect(client, userdata, flags, rc):
num_workers = 50 client.subscribe(f"command/#")
timeout = 10.0
filename = '/home/marcsello/XauddzikzfKhogEcCYmufFMUrjYTwhwf.wav'
def mqtt_on_command(client, userdata, message):
# msg = json.loads(message.payload.decode())
# if msg['command'] == 'doAlert':
_id = int(message.topic[8:]) # len('command/') = 8
userdata.put(ReturnedRequest(id=_id, arrived=time.time()))
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
userdata.set()
def run_benchmark(num_workers: int, timeout: float, filename: str):
result_queue = multiprocessing.Queue() result_queue = multiprocessing.Queue()
workers = [] workers = []
@ -93,23 +104,122 @@ def main():
for _ in range(num_workers): for _ in range(num_workers):
workers.append(multiprocessing.Process( workers.append(multiprocessing.Process(
target=independent_worker, target=independent_worker,
args=(number_gen, result_queue, filename, timeout)) args=(number_gen, result_queue, filename, timeout)
) ))
for w in workers: for w in workers:
w.start() w.start()
completed_workers = 0 completed_workers = 0
total_numer_of_completed_requests = 0 all_requests_completed = {}
while completed_workers < num_workers: while completed_workers < num_workers:
result = result_queue.get() results = result_queue.get()
total_numer_of_completed_requests += result[1] for result in results[2]:
all_requests_completed[result.id] = result
completed_workers += 1 completed_workers += 1
for w in workers: for w in workers:
w.join() w.join()
print(total_numer_of_completed_requests / timeout) return all_requests_completed
def main():
parser = argparse.ArgumentParser(
description='Birbnetes Benchmarker 3'
)
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)')
parser.add_argument('--timeout', type=int, required=False, default=10,
help='Maximum time for request sending (0 for unlimited)')
parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload')
parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample",
help='The target endpoint')
parser.add_argument('--inflight-timeout', type=int, required=False, default=30,
help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)')
parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes",
help="Username for the MQTT server")
parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server")
parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com",
help="Hostname for the MQTT server")
parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server")
args = parser.parse_args()
print("Workers:", args.workers)
print("Timeout:", args.timeout, "sec")
alerts_arrived_queue = queue.Queue()
print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})")
subscription_waiter = threading.Event()
# Preparing connection
mqtt_client = paho.mqtt.client.Client(client_id="10")
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_subscribe = mqtt_on_subscribe
mqtt_client.user_data_set(subscription_waiter)
if args.mqtt_password:
mqtt_client.username_pw_set(args.mqtt_username, args.mqtt_password)
# this is a blocking call, will wait until connecion is complete
mqtt_client.connect(args.mqtt_host, args.mqtt_port, 60)
# Start loop in background
mqtt_client.loop_start()
# Wait for subscription to be made
subscription_waiter.wait()
# Set final command reciever
mqtt_client.user_data_set(alerts_arrived_queue)
mqtt_client.on_message = mqtt_on_command
print("MQTT Complete!")
print("Running benchmark...")
benchmark_results = run_benchmark(args.workers, args.timeout, args.file)
print("Waiting for inflight MQTT messages to arrive...")
# Wait for inflight messages for a little
total_successful_uploads = len([req for req in benchmark_results.values() if req.upload_status_code == 200])
all_arrived = False
for _ in range(int(args.inflight_timeout)):
time.sleep(1)
if alerts_arrived_queue.qsize() >= total_successful_uploads:
all_arrived = True
break
if not all_arrived:
print("WARNING: Not all MQTT Messages arrived!")
# Disconnectiong MQTT
mqtt_client.disconnect()
mqtt_client.loop_stop() # This stops further recieving MQTT messages
# Aggregate results
total_answered = 0
while True:
try:
returned_job = alerts_arrived_queue.get_nowait()
except queue.Empty:
break
try:
benchmark_results[returned_job.id].alert_arrived = returned_job.arrived
except KeyError:
print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id)
total_answered += 1
# print some mini statistics
total_runtime = \
benchmark_results[max(benchmark_results.keys())].upload_finished - \
benchmark_results[min(benchmark_results.keys())].upload_started
print(
f"{len(benchmark_results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered"
)
print(f"Test total runtime was {total_runtime} seconds")
print("HTTP Request/sec:", len(benchmark_results) / total_runtime)
if __name__ == '__main__': if __name__ == '__main__':