benchmarks/benchmark3.py

313 lines
11 KiB
Python

from typing import Optional
import time
import pycurl
import itertools
import argparse
import multiprocessing
import paho.mqtt.client
from datetime import datetime
from blist import blist
from dataclasses import dataclass
import threading
import queue
import sys
import csv
import random
import textwrap
@dataclass(frozen=False, init=True)
class UploadRequest:
id: int
upload_started: float = None
upload_finished: float = None
upload_status_code: bool = False
alert_arrived: float = None
@dataclass(frozen=False, init=True)
class ReturnedRequest:
id: int
arrived: float
def independent_worker(result_queue, filename: str, timeout: float, target_url: str):
prepared_curl = pycurl.Curl()
prepared_curl.setopt(pycurl.URL, target_url)
prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0)
prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0)
prepared_curl.setopt(pycurl.SSL_VERIFYSTATUS, 0)
# prepared_curl.setopt(pycurl.FORBID_REUSE, 0)
prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0)
prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
prepared_curl.setopt(pycurl.NOSIGNAL, 1)
prepared_curl.setopt(pycurl.TCP_FASTOPEN, 1)
prepared_curl.setopt(pycurl.MAXCONNECTS, 500) # this does not change much, as we use only one connection per worker
completed_jobs_list = blist() # O(log n) insert instead of python's O(n)
worker_stop_time = 0
worker_completed_job_count = 0
# Start working!!
worker_start_time = time.time()
while True:
jobid = random.randint(0, 2147483647)
prepared_curl.setopt(pycurl.HTTPPOST, [
('file', (
pycurl.FORM_CONTENTTYPE, 'audio/wave',
pycurl.FORM_FILE, filename # Copying buffers from Python memory would be even slower... trust me
)),
('description', (
pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}',
pycurl.FORM_CONTENTTYPE, 'application/json',
))
])
job_start_time = time.time()
prepared_curl.perform()
job_stop_time = time.time()
status_code = prepared_curl.getinfo(pycurl.HTTP_CODE)
worker_completed_job_count += 1
completed_jobs_list.append(UploadRequest(
id=jobid,
upload_started=job_start_time,
upload_finished=job_stop_time,
upload_status_code=status_code
))
if (job_stop_time - worker_start_time) >= timeout:
worker_stop_time = job_stop_time
break
# end of loop
runtime = worker_stop_time - worker_start_time
prepared_curl.close()
result_queue.put(
(runtime, worker_completed_job_count, completed_jobs_list)
)
def mqtt_on_connect(client, userdata, flags, rc):
client.subscribe(f"command/#")
def mqtt_on_command(client, userdata, message):
# msg = json.loads(message.payload.decode())
# if msg['command'] == 'doAlert':
_id = int(message.topic[8:]) # len('command/') = 8
userdata.put(ReturnedRequest(id=_id, arrived=time.time()))
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
userdata.set()
def run_benchmark(num_workers: int, timeout: float, filename: str, target_url: str):
result_queue = multiprocessing.Queue()
workers = []
for _ in range(num_workers):
workers.append(multiprocessing.Process(
target=independent_worker,
args=(result_queue, filename, timeout, target_url)
))
for w in workers:
w.start()
completed_workers = 0
all_requests_completed = {}
count_requests_completed = 0
while completed_workers < num_workers:
results = result_queue.get()
count_requests_completed += results[1]
for result in results[2]:
all_requests_completed[result.id] = result
completed_workers += 1
for w in workers:
w.join()
assert count_requests_completed == len(all_requests_completed)
final_sorted_results = list(all_requests_completed.values())
final_sorted_results.sort(key=lambda a: a.upload_started) # sort by upload start time
return final_sorted_results
def csv_time_format(timestamp: Optional[float]) -> Optional[str]:
if timestamp:
return datetime.fromtimestamp(timestamp).isoformat()
else:
return None
def write_results(results, file_handle):
writer = csv.writer(file_handle)
# Latency and rtt are in ms
writer.writerow(
['id', 'http_start_time', 'http_complete_time', 'http_time', 'status_code', 'mqtt_arrive_time', 'latency',
'rtt']
)
for result in results:
http_time = (result.upload_finished - result.upload_started) * 1000 if result.upload_finished else None
latency = (result.alert_arrived - result.upload_finished) * 1000 if result.alert_arrived else None
rtt = (result.alert_arrived - result.upload_started) * 1000 if result.alert_arrived else None
row = [
result.id,
csv_time_format(result.upload_started),
csv_time_format(result.upload_finished),
http_time,
result.upload_status_code,
csv_time_format(result.alert_arrived),
latency,
rtt
]
writer.writerow(row)
def main():
help_epilog = textwrap.dedent('''\
CSV Columns meaning:
id - Unique ID of a request (not sequential but at least unique)
http_start_time - Timestamp of when the HTTP request is started
http_complete_time - Timestamp of when the HTTP request is completed
http_time - Time it took to perform the HTTP request (i.e. Uploading) in milliseconds
status_code - Status code of the HTTP request
mqtt_arrive_time - Timestamp of when the corresponding MQTT message arrived
latency - Latency of the system (http_complete_time - mqtt_arrive_time) in milliseconds
rtt - Total round trip time (http_start_time - mqtt_arrive_time) in milliseconds
''')
parser = argparse.ArgumentParser(
description='Birbnetes Benchmarker 3',
epilog=help_epilog,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)')
parser.add_argument('--timeout', type=int, required=False, default=10,
help='Maximum time for request sending (0 for unlimited)')
parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload')
parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/benchmark",
help='The target endpoint')
parser.add_argument('--inflight-timeout', type=int, required=False, default=30,
help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)')
parser.add_argument('--output', type=str, required=True, help='name of the file to write results to (- for stdout)')
parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes",
help="Username for the MQTT server")
parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server")
parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com",
help="Hostname for the MQTT server")
parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server")
args = parser.parse_args()
print("Workers:", args.workers)
print("Timeout:", args.timeout, "sec")
alerts_arrived_queue = queue.Queue()
print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})")
subscription_waiter = threading.Event()
# Preparing connection
mqtt_client = paho.mqtt.client.Client(client_id="10")
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_subscribe = mqtt_on_subscribe
mqtt_client.user_data_set(subscription_waiter)
if args.mqtt_password:
mqtt_client.username_pw_set(args.mqtt_username, args.mqtt_password)
# this is a blocking call, will wait until connecion is complete
mqtt_client.connect(args.mqtt_host, args.mqtt_port, 60)
# Start loop in background
mqtt_client.loop_start()
# Wait for subscription to be made
subscription_waiter.wait()
# Set final command reciever
mqtt_client.user_data_set(alerts_arrived_queue)
mqtt_client.on_message = mqtt_on_command
print("MQTT Complete!")
print("Running benchmark...")
benchmark_results = run_benchmark(args.workers, args.timeout, args.file, args.target_url)
# Wait for inflight messages for a little
total_successful_uploads = len([req for req in benchmark_results if req.upload_status_code == 200])
all_arrived = alerts_arrived_queue.qsize() >= total_successful_uploads
if (not all_arrived) and (args.inflight_timeout > 0):
print("Waiting for inflight MQTT messages to arrive...")
waiting_started = time.time()
for _ in range(int(args.inflight_timeout * 1000)):
if alerts_arrived_queue.qsize() >= total_successful_uploads:
all_arrived = True
break
time.sleep(0.001)
waited_for_inflight = time.time() - waiting_started
print(f"Waited a total {waited_for_inflight} seconds")
if not all_arrived:
print("WARNING: Not all MQTT Messages arrived!")
# Disconnectiong MQTT
mqtt_client.disconnect()
mqtt_client.loop_stop() # This stops further recieving MQTT messages
# Aggregate results
total_answered = 0
while True:
try:
returned_job = alerts_arrived_queue.get_nowait()
except queue.Empty:
break
# super ghetto pairing
paired = False
for benchmark_result in benchmark_results:
if benchmark_result.id == returned_job.id:
benchmark_result.alert_arrived = returned_job.arrived
paired = True
break
if not paired:
print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id)
total_answered += 1
# print some mini statistics
total_runtime = max(benchmark_results, key=lambda a: a.upload_finished).upload_finished - \
min(benchmark_results, key=lambda a: a.upload_started).upload_started
print(
f"{len(benchmark_results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered"
)
print(f"Test total runtime was {total_runtime} seconds")
print("HTTP Request/sec:", len(benchmark_results) / total_runtime)
if args.output == '-':
print("Writing results to STDOUT")
print("\n---------- CUT HERE ----------\n")
write_results(benchmark_results, sys.stdout)
else:
print("Writing results to", args.output)
with open(args.output, 'w') as f:
write_results(benchmark_results, f)
if __name__ == '__main__':
main()