From 0dea6c0f51522b9909823b719e29549ff66a1790 Mon Sep 17 00:00:00 2001 From: marcsello Date: Fri, 23 Oct 2020 03:44:42 +0200 Subject: [PATCH] Implemented MQTT reception --- benchmark3.py | 140 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 125 insertions(+), 15 deletions(-) diff --git a/benchmark3.py b/benchmark3.py index 79c4d4c..af3f745 100644 --- a/benchmark3.py +++ b/benchmark3.py @@ -1,14 +1,14 @@ -import io -import sys import time import pycurl import itertools +import argparse import multiprocessing -import threading -import queue +import paho.mqtt.client from datetime import datetime from blist import blist from dataclasses import dataclass +import threading +import queue @dataclass(frozen=False, init=True) @@ -46,7 +46,7 @@ def independent_worker(number_generator, result_queue, filename: str, timeout: f jobid = next(number_generator) prepared_curl.setopt(pycurl.HTTPPOST, [ ('file', ( - pycurl.FORM_FILE, filename + pycurl.FORM_FILE, filename # Copying buffers from Python memory would be even slower... trust me )), ('description', ( pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}', @@ -81,11 +81,22 @@ def independent_worker(number_generator, result_queue, filename: str, timeout: f ) -def main(): - num_workers = 50 - timeout = 10.0 - filename = '/home/marcsello/XauddzikzfKhogEcCYmufFMUrjYTwhwf.wav' +def mqtt_on_connect(client, userdata, flags, rc): + client.subscribe(f"command/#") + +def mqtt_on_command(client, userdata, message): + # msg = json.loads(message.payload.decode()) + # if msg['command'] == 'doAlert': + _id = int(message.topic[8:]) # len('command/') = 8 + userdata.put(ReturnedRequest(id=_id, arrived=time.time())) + + +def mqtt_on_subscribe(client, userdata, mid, granted_qos): + userdata.set() + + +def run_benchmark(num_workers: int, timeout: float, filename: str): result_queue = multiprocessing.Queue() workers = [] @@ -93,23 +104,122 @@ def main(): for _ in range(num_workers): workers.append(multiprocessing.Process( target=independent_worker, - args=(number_gen, result_queue, filename, timeout)) - ) + args=(number_gen, result_queue, filename, timeout) + )) for w in workers: w.start() completed_workers = 0 - total_numer_of_completed_requests = 0 + all_requests_completed = {} while completed_workers < num_workers: - result = result_queue.get() - total_numer_of_completed_requests += result[1] + results = result_queue.get() + for result in results[2]: + all_requests_completed[result.id] = result + completed_workers += 1 for w in workers: w.join() - print(total_numer_of_completed_requests / timeout) + return all_requests_completed + + +def main(): + parser = argparse.ArgumentParser( + description='Birbnetes Benchmarker 3' + ) + + parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)') + parser.add_argument('--timeout', type=int, required=False, default=10, + help='Maximum time for request sending (0 for unlimited)') + + parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload') + parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample", + help='The target endpoint') + + parser.add_argument('--inflight-timeout', type=int, required=False, default=30, + help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)') + + parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes", + help="Username for the MQTT server") + parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server") + parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com", + help="Hostname for the MQTT server") + parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server") + + args = parser.parse_args() + + print("Workers:", args.workers) + print("Timeout:", args.timeout, "sec") + + alerts_arrived_queue = queue.Queue() + + print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})") + subscription_waiter = threading.Event() + # Preparing connection + mqtt_client = paho.mqtt.client.Client(client_id="10") + mqtt_client.on_connect = mqtt_on_connect + mqtt_client.on_subscribe = mqtt_on_subscribe + mqtt_client.user_data_set(subscription_waiter) + if args.mqtt_password: + mqtt_client.username_pw_set(args.mqtt_username, args.mqtt_password) + # this is a blocking call, will wait until connecion is complete + mqtt_client.connect(args.mqtt_host, args.mqtt_port, 60) + # Start loop in background + mqtt_client.loop_start() + # Wait for subscription to be made + subscription_waiter.wait() + # Set final command reciever + mqtt_client.user_data_set(alerts_arrived_queue) + mqtt_client.on_message = mqtt_on_command + print("MQTT Complete!") + + print("Running benchmark...") + benchmark_results = run_benchmark(args.workers, args.timeout, args.file) + print("Waiting for inflight MQTT messages to arrive...") + # Wait for inflight messages for a little + + total_successful_uploads = len([req for req in benchmark_results.values() if req.upload_status_code == 200]) + all_arrived = False + for _ in range(int(args.inflight_timeout)): + time.sleep(1) + if alerts_arrived_queue.qsize() >= total_successful_uploads: + all_arrived = True + break + + if not all_arrived: + print("WARNING: Not all MQTT Messages arrived!") + + # Disconnectiong MQTT + mqtt_client.disconnect() + mqtt_client.loop_stop() # This stops further recieving MQTT messages + + # Aggregate results + total_answered = 0 + while True: + try: + returned_job = alerts_arrived_queue.get_nowait() + except queue.Empty: + break + + try: + benchmark_results[returned_job.id].alert_arrived = returned_job.arrived + except KeyError: + print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id) + + total_answered += 1 + + # print some mini statistics + total_runtime = \ + benchmark_results[max(benchmark_results.keys())].upload_finished - \ + benchmark_results[min(benchmark_results.keys())].upload_started + + print( + f"{len(benchmark_results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered" + ) + print(f"Test total runtime was {total_runtime} seconds") + print("HTTP Request/sec:", len(benchmark_results) / total_runtime) if __name__ == '__main__':