benchmarks/benchmark2.py

472 lines
16 KiB
Python

import os
import signal
import time
import sys
import json
import requests
import pycurl
import textwrap
from datetime import datetime
from dataclasses import dataclass
import argparse
import paho.mqtt.client
import queue
import multiprocessing
import threading
@dataclass(frozen=False, init=False)
class UploadRequest:
id: int
upload_started: float = None
upload_finished: float = None
upload_status_code: bool = False
alert_arrived: float = None
def __init__(self, _id: int):
self.id = _id
@dataclass(frozen=False, init=True)
class ReturnedRequest:
id: int
arrived: float
class KebabRemover(threading.Thread):
def __init__(self, output_queue: queue.Queue):
super().__init__()
self.results = {}
self._output_queue = output_queue
self._active = True
self._lock = threading.Lock()
def run(self):
with self._lock:
while self._active:
try:
job = self._output_queue.get(timeout=2)
self.results[job.id] = job
except queue.Empty:
pass
def get_results(self) -> dict:
with self._lock:
return self.results
def stop(self):
self._active = False
def print_progress_meme(char: str):
print(char, end='')
sys.stdout.flush()
def allout_sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float):
content_len = len(file_contents)
session = requests.Session()
try:
while True:
try:
job = input_queue.get_nowait()
except queue.Empty:
return
files = {
"file": (
f"birbbenchmark2_testfile_{job.id}.wav",
file_contents,
'audio/wave',
{'Content-length': content_len}
),
"description": (
None,
json.dumps({'date': datetime.now().isoformat(), 'device_id': str(job.id)}),
"application/json"
)
}
try:
job.upload_started = time.time()
r = session.post(target_url, files=files, timeout=request_timeout)
job.upload_finished = time.time()
status_code = r.status_code
except requests.exceptions.Timeout:
print_progress_meme('?')
continue
except requests.exceptions.ConnectionError:
print_progress_meme('!')
continue
if status_code == 200:
print_progress_meme('.')
else:
print_progress_meme('!')
job.upload_status_code = status_code
sent_queue.put(job)
except KeyboardInterrupt:
return
def timeouting_sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float,
timeout: float):
content_len = len(file_contents)
session = requests.Session()
start_time = time.time()
count = 0
try:
while True:
try:
job = input_queue.get_nowait()
except queue.Empty:
return
files = {
"file": (
f"birbbenchmark2_testfile_{job.id}.wav",
file_contents,
'audio/wave',
{'Content-length': content_len}
),
"description": (
None,
json.dumps({'date': datetime.now().isoformat(), 'device_id': str(job.id)}),
"application/json"
)
}
# SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN
try:
job.upload_started = time.time()
r = session.post(target_url, files=files, timeout=request_timeout)
job.upload_finished = time.time()
except requests.exceptions.Timeout:
print_progress_meme('?')
continue
except requests.exceptions.ConnectionError:
print_progress_meme('!')
continue
status_code = r.status_code
if status_code == 200:
print_progress_meme('.')
else:
print_progress_meme('!')
job.upload_status_code = status_code
sent_queue.put(job)
# check for timeout
count += 1
if count % 50 == 0:
if (time.time() - start_time) >= timeout:
return
except KeyboardInterrupt:
return
def mqtt_on_connect(client, userdata, flags, rc):
client.subscribe(f"command/#")
def mqtt_on_command(client, userdata, message):
# msg = json.loads(message.payload.decode())
# if msg['command'] == 'doAlert':
_id = int(message.topic[8:]) # len('command/') = 8
userdata.put(ReturnedRequest(id=_id, arrived=time.time()))
print_progress_meme(',')
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
userdata.set()
def infinite_job_generator(input_queue, target_ready_jobs: int):
i = 0
target_ready_jobs_local = target_ready_jobs
first = True
while True:
jobs_ready = input_queue.qsize()
if jobs_ready < target_ready_jobs_local:
if not first: # In the first run there are exactly zero jobs ready, so
if jobs_ready < (target_ready_jobs_local / 2):
target_ready_jobs_local *= 2
else:
first = False
jobs_to_generate = target_ready_jobs_local - jobs_ready
for _ in range(jobs_to_generate):
i += 1
input_queue.put(UploadRequest(i))
time.sleep(0.02)
if not threading.current_thread().active:
return
def run_benchmark(
num_requests: int,
num_workers: int,
sound_file_path: str,
target_url: str,
timeout: float,
inflight_timeout: float,
request_timeout: float,
strategy: str,
mqtt_client: paho.mqtt.client.Client):
if strategy == 'thr':
input_queue = queue.Queue()
sent_queue = queue.Queue()
returned_queue = queue.Queue()
print("Using threading strategy")
elif strategy == 'proc':
input_queue = multiprocessing.Queue()
sent_queue = multiprocessing.Queue()
returned_queue = multiprocessing.Queue()
print("Using multiprocessing strategy")
else:
print("Wrong strategy")
return
sender_workers = []
input_generator_thread = None
if num_requests == 0:
print(f"Unlimited amount of requests will be generated...")
input_generator_thread = threading.Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100))
input_generator_thread.active = True
input_generator_thread.start()
else:
print(f"Preparing {num_requests} requests...")
for i in range(num_requests):
input_queue.put(UploadRequest(i + 1))
print("Waiting for MQTT subscription to complete")
subscribe_complete_event = threading.Event()
mqtt_client.user_data_set(subscribe_complete_event)
mqtt_client.loop_start() # Puts mqtt stuff to a background thread
subscribe_complete_event.wait()
mqtt_client.user_data_set(returned_queue)
print(f"Preparing {num_workers} workers...")
with open(sound_file_path, "rb") as f:
file_contents = f.read()
finished_task_remover = KebabRemover(sent_queue)
if timeout == 0:
args = {'target': allout_sender_worker,
'args': (
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout
)}
else:
args = {'target': timeouting_sender_worker,
'args': (
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout,
timeout
)}
for i in range(num_workers):
if strategy == 'thr':
sender_workers.append(
threading.Thread(**args)
)
elif strategy == 'proc':
sender_workers.append(
multiprocessing.Process(**args)
)
print("Baszatás!")
# Starting all workers
finished_task_remover.start()
for worker in sender_workers:
worker.start()
# Waiting for workers to finish
try:
for worker in sender_workers:
worker.join()
except KeyboardInterrupt:
# Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom
print_progress_meme('|')
if strategy == 'thr':
# You should not press ctrl-c in thr
for worker in sender_workers:
worker.join()
elif strategy == 'proc':
# Interrupt forwarded for workers in mul mode, so we just wait for them to quit
for worker in sender_workers:
if worker.is_alive():
exitcode = worker.join(timeout=2)
if not exitcode: # = it did not exit
worker.terminate() # Not the nicest solution
# Stop the input generator
if input_generator_thread:
input_generator_thread.active = False
# read out all send requests to an array for handling later:
finished_task_remover.stop()
results = finished_task_remover.get_results()
total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200])
# Wait for inflight messages for a little
try:
for _ in range(int(inflight_timeout)):
time.sleep(1)
if returned_queue.qsize() >= total_successful_uploads:
break
except KeyboardInterrupt:
print("jovan akkor nem várom meg...")
mqtt_client.disconnect()
mqtt_client.loop_stop() # This stops further recieving MQTT messages
print() # Everything that might print from other threads are now stopped, so we put a newline
if int(inflight_timeout) != 0: # print only if there was a timeout
if returned_queue.qsize() < total_successful_uploads:
print("Waiting for inflight MQTT messages timed out!")
# Fill out the missing fields in the reuslts
total_answered = 0
while True:
try:
returned_job = returned_queue.get_nowait()
except queue.Empty:
break
try:
results[returned_job.id].alert_arrived = returned_job.arrived
except KeyError:
print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id)
total_answered += 1
# mini statistics
print(
f"{len(results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered")
total_runtime = results[max(results.keys())].upload_finished - results[min(results.keys())].upload_started
print(f"Test total runtime was {total_runtime} seconds")
print("HTTP Request/sec:", len(results) / total_runtime)
return results
def main():
help_epilog = textwrap.dedent('''\
Progress chars meaning:
HTTP related
. -> Upload succesfully completed (return code was 200)
! -> Upload failed (return code was not 200)
? -> Upload timed out (see --request-timeout)
MQTT related
, -> Alert arrived (Trough MQTT)
Process related
| -> User interrupted (CTRL+C pressed)
# -> Timeout expired (see --timeout)
Examples:
One worker send exactly 100 requests (wait 10 secs for all alerts to arrive)
benchmark2.py --workers 1 --requests 100 --file /home/testerboi/birbstuff/testdata/oIbBz.wav --inflight-timeout 10
Ten worker run for exactly one minute (wait 10 secs for all alerts to arrive)
benchmark2.py --workers 10 --requests 0 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav --inflight-timeout 10
Send either 100 requests or run for one minute (whichever comes first)
benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav
Note:
Threading strategy does not support Ctrl+C
''')
parser = argparse.ArgumentParser(
description='Birbnetes Benchmarker 2',
epilog=help_epilog,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('--strategy', type=str, required=False, default='thr', help='Forking strategy (thr or proc)')
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)')
parser.add_argument('--requests', type=int, required=True, help='Number of requests (0 for infinity)')
parser.add_argument('--timeout', type=int, required=False, default=0,
help='Maximum time for request sending (0 for unlimited)')
parser.add_argument('--inflight-timeout', type=int, required=False, default=30,
help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)')
parser.add_argument('--request-timeout', type=float, required=False, default=5,
help='Timeout for HTTP requests')
parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload')
parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample",
help='The target endpoint')
parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes",
help="Username for the MQTT server")
parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server")
parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com",
help="Hostname for the MQTT server")
parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server")
args = parser.parse_args()
print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})")
client = paho.mqtt.client.Client(client_id="10")
client.on_connect = mqtt_on_connect
client.on_message = mqtt_on_command
client.on_subscribe = mqtt_on_subscribe
if args.mqtt_password:
client.username_pw_set(args.mqtt_username, args.mqtt_password)
client.connect(args.mqtt_host, args.mqtt_port, 60) # this is a blocking call, will wait until connecion is complete
results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout,
args.inflight_timeout, args.request_timeout, args.strategy, client)
print("Eposzi kiértékelés idő...")
print("id Upload Dur Return Dur Total Dur")
# read out all recieved requests to an array for easier handling:
for id_, req in results.items():
if not req.alert_arrived:
if req.upload_status_code == 200:
print(id_, req.upload_finished - req.upload_started, '???', '???')
else:
print(id_, req.upload_finished - req.upload_started, '!', '???', '???')
else:
print(id_, req.upload_finished - req.upload_started, req.alert_arrived - req.upload_finished,
req.alert_arrived - req.upload_started)
if __name__ == '__main__':
main()