benchmarks/benchmark2.py

430 lines
14 KiB
Python
Raw Normal View History

2020-10-21 02:20:24 +02:00
import os
import signal
2020-10-20 00:45:08 +02:00
import time
import sys
import json
import requests
2020-10-23 00:35:58 +02:00
import pycurl
2020-10-21 02:20:24 +02:00
import textwrap
2020-10-20 00:45:08 +02:00
from datetime import datetime
from dataclasses import dataclass
import argparse
import paho.mqtt.client
2020-10-23 00:09:07 +02:00
import queue
import multiprocessing
import threading
2020-10-20 00:45:08 +02:00
@dataclass(frozen=False, init=False)
class UploadRequest:
id: int
2020-10-21 01:18:35 +02:00
upload_started: float = None
upload_finished: float = None
upload_status_code: bool = False
alert_arrived: float = None
2020-10-20 00:45:08 +02:00
2020-10-21 01:18:35 +02:00
def __init__(self, _id: int):
self.id = _id
2020-10-20 00:45:08 +02:00
@dataclass(frozen=False, init=True)
class ReturnedRequest:
id: int
arrived: float
2020-10-23 00:09:07 +02:00
class KebabRemover(threading.Thread):
2020-10-21 09:22:45 +02:00
2020-10-23 00:09:07 +02:00
def __init__(self, output_queue: queue.Queue):
2020-10-21 09:22:45 +02:00
super().__init__()
self.results = {}
self._output_queue = output_queue
self._active = True
2020-10-23 00:09:07 +02:00
self._lock = threading.Lock()
2020-10-21 09:22:45 +02:00
def run(self):
with self._lock:
while self._active:
try:
job = self._output_queue.get(timeout=2)
self.results[job.id] = job
2020-10-23 00:09:07 +02:00
except queue.Empty:
2020-10-21 09:22:45 +02:00
pass
def get_results(self) -> dict:
with self._lock:
return self.results
def stop(self):
self._active = False
2020-10-21 01:18:35 +02:00
def print_progress_meme(char: str):
print(char, end='')
sys.stdout.flush()
2020-10-23 00:35:58 +02:00
def allout_sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float):
2020-10-20 00:45:08 +02:00
try:
2020-10-23 00:35:58 +02:00
prepared_curl = pycurl.Curl()
prepared_curl.setopt(pycurl.URL, target_url)
prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0)
prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0)
prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0)
prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda data: None)
2020-10-20 00:45:08 +02:00
while True:
try:
job = input_queue.get_nowait()
2020-10-23 00:09:07 +02:00
except queue.Empty:
2020-10-20 00:45:08 +02:00
return
job.upload_started = time.time()
2020-10-23 00:35:58 +02:00
prepared_curl.perform()
job.upload_finished = time.time()
status_code = prepared_curl.getinfo(pycurl.HTTP_CODE)
if status_code == 200:
print_progress_meme('.')
else:
print_progress_meme('!')
2020-10-21 01:18:35 +02:00
2020-10-23 00:35:58 +02:00
job.upload_status_code = status_code
2020-10-20 00:45:08 +02:00
2020-10-23 00:35:58 +02:00
sent_queue.put(job)
2020-10-20 00:45:08 +02:00
except KeyboardInterrupt:
return
2020-10-23 00:35:58 +02:00
def timeouting_sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float,
timeout: float):
try:
prepared_curl = pycurl.Curl()
prepared_curl.setopt(pycurl.URL, target_url)
prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0)
prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0)
prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0)
prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda data: None)
start_time = time.time()
count = 0
while True:
try:
job = input_queue.get_nowait()
except queue.Empty:
return
job.upload_started = time.time()
prepared_curl.perform()
job.upload_finished = time.time()
status_code = prepared_curl.getinfo(pycurl.HTTP_CODE)
if status_code == 200:
print_progress_meme('.')
else:
print_progress_meme('!')
job.upload_status_code = status_code
sent_queue.put(job)
# check for timeout
count += 1
if count % 50 == 0:
if (time.time() - start_time) >= timeout:
return
except KeyboardInterrupt:
return
2020-10-20 00:45:08 +02:00
def mqtt_on_connect(client, userdata, flags, rc):
client.subscribe(f"command/#")
def mqtt_on_command(client, userdata, message):
2020-10-21 01:18:35 +02:00
# msg = json.loads(message.payload.decode())
# if msg['command'] == 'doAlert':
_id = int(message.topic[8:]) # len('command/') = 8
userdata.put(ReturnedRequest(id=_id, arrived=time.time()))
print_progress_meme(',')
2020-10-20 00:45:08 +02:00
2020-10-21 09:22:45 +02:00
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
userdata.set()
2020-10-23 00:09:07 +02:00
def infinite_job_generator(input_queue, target_ready_jobs: int):
2020-10-21 01:18:35 +02:00
i = 0
target_ready_jobs_local = target_ready_jobs
first = True
while True:
jobs_ready = input_queue.qsize()
if jobs_ready < target_ready_jobs_local:
2020-10-20 00:45:08 +02:00
2020-10-21 01:18:35 +02:00
if not first: # In the first run there are exactly zero jobs ready, so
if jobs_ready < (target_ready_jobs_local / 2):
target_ready_jobs_local *= 2
else:
first = False
jobs_to_generate = target_ready_jobs_local - jobs_ready
for _ in range(jobs_to_generate):
i += 1
input_queue.put(UploadRequest(i))
time.sleep(0.02)
2020-10-23 00:09:07 +02:00
if not threading.current_thread().active:
2020-10-21 01:18:35 +02:00
return
def run_benchmark(
num_requests: int,
num_workers: int,
sound_file_path: str,
target_url: str,
timeout: float,
inflight_timeout: float,
request_timeout: float,
2020-10-23 00:09:07 +02:00
strategy: str,
2020-10-21 01:18:35 +02:00
mqtt_client: paho.mqtt.client.Client):
2020-10-23 00:09:07 +02:00
if strategy == 'thr':
input_queue = queue.Queue()
sent_queue = queue.Queue()
returned_queue = queue.Queue()
print("Using threading strategy")
elif strategy == 'proc':
input_queue = multiprocessing.Queue()
sent_queue = multiprocessing.Queue()
returned_queue = multiprocessing.Queue()
print("Using multiprocessing strategy")
else:
print("Wrong strategy")
return
2020-10-20 00:45:08 +02:00
sender_workers = []
2020-10-21 01:18:35 +02:00
input_generator_thread = None
2020-10-20 00:45:08 +02:00
2020-10-21 01:18:35 +02:00
if num_requests == 0:
print(f"Unlimited amount of requests will be generated...")
2020-10-23 00:09:07 +02:00
input_generator_thread = threading.Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100))
2020-10-21 01:18:35 +02:00
input_generator_thread.active = True
input_generator_thread.start()
else:
print(f"Preparing {num_requests} requests...")
for i in range(num_requests):
input_queue.put(UploadRequest(i + 1))
2020-10-20 00:45:08 +02:00
2020-10-21 09:22:45 +02:00
print("Waiting for MQTT subscription to complete")
2020-10-23 00:09:07 +02:00
subscribe_complete_event = threading.Event()
2020-10-21 09:22:45 +02:00
mqtt_client.user_data_set(subscribe_complete_event)
2020-10-21 01:18:35 +02:00
mqtt_client.loop_start() # Puts mqtt stuff to a background thread
2020-10-20 00:45:08 +02:00
2020-10-21 09:22:45 +02:00
subscribe_complete_event.wait()
mqtt_client.user_data_set(returned_queue)
2020-10-21 01:18:35 +02:00
print(f"Preparing {num_workers} workers...")
2020-10-20 00:45:08 +02:00
2020-10-21 01:18:35 +02:00
with open(sound_file_path, "rb") as f:
2020-10-20 00:45:08 +02:00
file_contents = f.read()
2020-10-21 09:22:45 +02:00
finished_task_remover = KebabRemover(sent_queue)
2020-10-23 00:35:58 +02:00
if timeout == 0:
args = {'target': allout_sender_worker,
'args': (
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout
)}
else:
args = {'target': timeouting_sender_worker,
'args': (
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout,
timeout
)}
2020-10-23 00:09:07 +02:00
2020-10-21 01:18:35 +02:00
for i in range(num_workers):
2020-10-23 00:09:07 +02:00
if strategy == 'thr':
sender_workers.append(
threading.Thread(**args)
)
elif strategy == 'proc':
sender_workers.append(
multiprocessing.Process(**args)
)
2020-10-20 00:45:08 +02:00
print("Baszatás!")
2020-10-21 01:18:35 +02:00
# Starting all workers
2020-10-21 09:22:45 +02:00
finished_task_remover.start()
2020-10-20 00:45:08 +02:00
for worker in sender_workers:
worker.start()
2020-10-21 01:18:35 +02:00
# Waiting for workers to finish
2020-10-20 00:45:08 +02:00
try:
for worker in sender_workers:
worker.join()
except KeyboardInterrupt:
2020-10-21 09:22:45 +02:00
# Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom
2020-10-21 01:18:35 +02:00
print_progress_meme('|')
2020-10-23 00:35:58 +02:00
if strategy == 'thr':
# You should not press ctrl-c in thr
for worker in sender_workers:
worker.join()
elif strategy == 'proc':
# Interrupt forwarded for workers in mul mode, so we just wait for them to quit
for worker in sender_workers:
if worker.is_alive():
exitcode = worker.join(timeout=2)
if not exitcode: # = it did not exit
worker.terminate() # Not the nicest solution
2020-10-20 00:45:08 +02:00
2020-10-21 01:18:35 +02:00
# Stop the input generator
if input_generator_thread:
input_generator_thread.active = False
2020-10-21 01:18:35 +02:00
# read out all send requests to an array for handling later:
2020-10-21 09:22:45 +02:00
finished_task_remover.stop()
results = finished_task_remover.get_results()
total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200])
2020-10-20 00:45:08 +02:00
# Wait for inflight messages for a little
2020-10-21 01:18:35 +02:00
for _ in range(int(inflight_timeout)):
2020-10-20 00:45:08 +02:00
time.sleep(1)
if returned_queue.qsize() >= total_successful_uploads:
break
2020-10-21 09:22:45 +02:00
mqtt_client.disconnect()
2020-10-21 01:18:35 +02:00
mqtt_client.loop_stop() # This stops further recieving MQTT messages
print() # Everything that might print from other threads are now stopped, so we put a newline
2020-10-20 00:45:08 +02:00
2020-10-21 02:20:24 +02:00
if int(inflight_timeout) != 0: # print only if there was a timeout
if returned_queue.qsize() < total_successful_uploads:
print("Waiting for inflight MQTT messages timed out!")
2020-10-21 01:18:35 +02:00
# Fill out the missing fields in the reuslts
total_answered = 0
2020-10-20 00:45:08 +02:00
while True:
try:
2020-10-21 01:18:35 +02:00
returned_job = returned_queue.get_nowait()
2020-10-23 00:09:07 +02:00
except queue.Empty:
2020-10-20 00:45:08 +02:00
break
2020-10-21 01:18:35 +02:00
try:
results[returned_job.id].alert_arrived = returned_job.arrived
except KeyError:
print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id)
total_answered += 1
# mini statistics
2020-10-21 02:20:24 +02:00
print(
f"{len(results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered")
2020-10-21 01:18:35 +02:00
2020-10-21 09:41:11 +02:00
total_runtime = results[max(results.keys())].upload_finished - results[min(results.keys())].upload_started
print(f"Test total runtime was {total_runtime} seconds")
print("HTTP Request/sec:", len(results) / total_runtime)
2020-10-21 01:18:35 +02:00
return results
def main():
2020-10-21 02:20:24 +02:00
help_epilog = textwrap.dedent('''\
Progress chars meaning:
HTTP related
. -> Upload succesfully completed (return code was 200)
! -> Upload failed (return code was not 200)
? -> Upload timed out (see --request-timeout)
MQTT related
, -> Alert arrived (Trough MQTT)
Process related
| -> User interrupted (CTRL+C pressed)
# -> Timeout expired (see --timeout)
Examples:
One worker send exactly 100 requests (wait 10 secs for all alerts to arrive)
benchmark2.py --workers 1 --requests 100 --file /home/testerboi/birbstuff/testdata/oIbBz.wav --inflight-timeout 10
Ten worker run for exactly one minute (wait 10 secs for all alerts to arrive)
benchmark2.py --workers 10 --requests 0 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav --inflight-timeout 10
Send either 100 requests or run for one minute (whichever comes first)
benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav
2020-10-23 00:35:58 +02:00
Note:
Threading strategy does not support Ctrl+C
2020-10-21 02:20:24 +02:00
''')
2020-10-23 00:09:07 +02:00
parser = argparse.ArgumentParser(
description='Birbnetes Benchmarker 2',
epilog=help_epilog,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('--strategy', type=str, required=False, default='thr', help='Forking strategy (thr or proc)')
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)')
2020-10-21 02:20:24 +02:00
parser.add_argument('--requests', type=int, required=True, help='Number of requests (0 for infinity)')
2020-10-21 01:18:35 +02:00
parser.add_argument('--timeout', type=int, required=False, default=0,
2020-10-21 02:20:24 +02:00
help='Maximum time for request sending (0 for unlimited)')
2020-10-21 01:18:35 +02:00
parser.add_argument('--inflight-timeout', type=int, required=False, default=30,
2020-10-21 02:20:24 +02:00
help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)')
2020-10-21 01:18:35 +02:00
parser.add_argument('--request-timeout', type=float, required=False, default=5,
help='Timeout for HTTP requests')
2020-10-21 02:20:24 +02:00
parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload')
2020-10-21 01:18:35 +02:00
parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample",
help='The target endpoint')
parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes",
help="Username for the MQTT server")
parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server")
parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com",
help="Hostname for the MQTT server")
parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server")
args = parser.parse_args()
print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})")
client = paho.mqtt.client.Client(client_id="10")
client.on_connect = mqtt_on_connect
client.on_message = mqtt_on_command
2020-10-21 09:22:45 +02:00
client.on_subscribe = mqtt_on_subscribe
2020-10-21 01:18:35 +02:00
if args.mqtt_password:
client.username_pw_set(args.mqtt_username, args.mqtt_password)
2020-10-21 09:22:45 +02:00
client.connect(args.mqtt_host, args.mqtt_port, 60) # this is a blocking call, will wait until connecion is complete
2020-10-21 01:18:35 +02:00
results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout,
2020-10-23 00:09:07 +02:00
args.inflight_timeout, args.request_timeout, args.strategy, client)
2020-10-21 01:18:35 +02:00
2020-10-20 00:45:08 +02:00
print("Eposzi kiértékelés idő...")
print("id Upload Dur Return Dur Total Dur")
2020-10-21 01:18:35 +02:00
# read out all recieved requests to an array for easier handling:
for id_, req in results.items():
2020-10-20 00:45:08 +02:00
2020-10-21 01:18:35 +02:00
if not req.alert_arrived:
if req.upload_status_code == 200:
2020-10-20 00:45:08 +02:00
print(id_, req.upload_finished - req.upload_started, '???', '???')
else:
print(id_, req.upload_finished - req.upload_started, '!', '???', '???')
else:
2020-10-21 01:18:35 +02:00
print(id_, req.upload_finished - req.upload_started, req.alert_arrived - req.upload_finished,
req.alert_arrived - req.upload_started)
2020-10-20 00:45:08 +02:00
if __name__ == '__main__':
main()