benchmarks/benchmark3.py

313 lines
11 KiB
Python
Raw Normal View History

2020-10-23 16:10:18 +02:00
from typing import Optional
2020-10-23 02:55:13 +02:00
import time
import pycurl
import itertools
2020-10-23 03:44:42 +02:00
import argparse
2020-10-23 02:55:13 +02:00
import multiprocessing
2020-10-23 03:44:42 +02:00
import paho.mqtt.client
2020-10-23 02:55:13 +02:00
from datetime import datetime
from blist import blist
from dataclasses import dataclass
2020-10-23 03:44:42 +02:00
import threading
import queue
2020-10-23 04:08:54 +02:00
import sys
import csv
2020-10-23 05:14:34 +02:00
import random
2020-10-23 16:17:24 +02:00
import textwrap
2020-10-23 02:55:13 +02:00
2020-10-23 16:10:18 +02:00
2020-10-23 02:55:13 +02:00
@dataclass(frozen=False, init=True)
class UploadRequest:
id: int
upload_started: float = None
upload_finished: float = None
upload_status_code: bool = False
alert_arrived: float = None
@dataclass(frozen=False, init=True)
class ReturnedRequest:
id: int
arrived: float
2020-10-23 17:31:23 +02:00
def independent_worker(result_queue, filename: str, timeout: float, target_url: str):
2020-10-23 02:55:13 +02:00
prepared_curl = pycurl.Curl()
2020-10-23 17:31:23 +02:00
prepared_curl.setopt(pycurl.URL, target_url)
2020-10-23 02:55:13 +02:00
prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0)
prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0)
2020-10-23 18:21:39 +02:00
prepared_curl.setopt(pycurl.SSL_VERIFYSTATUS, 0)
2020-10-23 02:55:13 +02:00
# prepared_curl.setopt(pycurl.FORBID_REUSE, 0)
prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0)
prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
2020-10-23 18:21:39 +02:00
prepared_curl.setopt(pycurl.NOSIGNAL, 1)
prepared_curl.setopt(pycurl.TCP_FASTOPEN, 1)
prepared_curl.setopt(pycurl.MAXCONNECTS, 500) # this does not change much, as we use only one connection per worker
2020-10-23 02:55:13 +02:00
completed_jobs_list = blist() # O(log n) insert instead of python's O(n)
worker_stop_time = 0
worker_completed_job_count = 0
# Start working!!
worker_start_time = time.time()
while True:
2020-10-23 05:14:34 +02:00
jobid = random.randint(0, 2147483647)
2020-10-23 02:55:13 +02:00
prepared_curl.setopt(pycurl.HTTPPOST, [
('file', (
2020-10-23 18:21:39 +02:00
pycurl.FORM_CONTENTTYPE, 'audio/wave',
2020-10-23 03:44:42 +02:00
pycurl.FORM_FILE, filename # Copying buffers from Python memory would be even slower... trust me
2020-10-23 02:55:13 +02:00
)),
('description', (
pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}',
pycurl.FORM_CONTENTTYPE, 'application/json',
))
])
job_start_time = time.time()
prepared_curl.perform()
job_stop_time = time.time()
status_code = prepared_curl.getinfo(pycurl.HTTP_CODE)
worker_completed_job_count += 1
completed_jobs_list.append(UploadRequest(
id=jobid,
upload_started=job_start_time,
upload_finished=job_stop_time,
upload_status_code=status_code
))
if (job_stop_time - worker_start_time) >= timeout:
worker_stop_time = job_stop_time
break
# end of loop
runtime = worker_stop_time - worker_start_time
prepared_curl.close()
result_queue.put(
(runtime, worker_completed_job_count, completed_jobs_list)
)
2020-10-23 03:44:42 +02:00
def mqtt_on_connect(client, userdata, flags, rc):
client.subscribe(f"command/#")
def mqtt_on_command(client, userdata, message):
# msg = json.loads(message.payload.decode())
# if msg['command'] == 'doAlert':
_id = int(message.topic[8:]) # len('command/') = 8
userdata.put(ReturnedRequest(id=_id, arrived=time.time()))
2020-10-23 02:55:13 +02:00
2020-10-23 03:44:42 +02:00
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
userdata.set()
2020-10-23 17:31:23 +02:00
def run_benchmark(num_workers: int, timeout: float, filename: str, target_url: str):
2020-10-23 02:55:13 +02:00
result_queue = multiprocessing.Queue()
workers = []
for _ in range(num_workers):
workers.append(multiprocessing.Process(
target=independent_worker,
2020-10-23 17:31:23 +02:00
args=(result_queue, filename, timeout, target_url)
2020-10-23 03:44:42 +02:00
))
2020-10-23 02:55:13 +02:00
for w in workers:
w.start()
completed_workers = 0
2020-10-23 03:44:42 +02:00
all_requests_completed = {}
2020-10-23 05:14:34 +02:00
count_requests_completed = 0
2020-10-23 02:55:13 +02:00
while completed_workers < num_workers:
2020-10-23 03:44:42 +02:00
results = result_queue.get()
2020-10-23 05:14:34 +02:00
count_requests_completed += results[1]
2020-10-23 03:44:42 +02:00
for result in results[2]:
all_requests_completed[result.id] = result
2020-10-23 02:55:13 +02:00
completed_workers += 1
for w in workers:
w.join()
2020-10-23 05:14:34 +02:00
assert count_requests_completed == len(all_requests_completed)
2020-10-23 16:24:28 +02:00
final_sorted_results = list(all_requests_completed.values())
final_sorted_results.sort(key=lambda a: a.upload_started) # sort by upload start time
return final_sorted_results
2020-10-23 03:44:42 +02:00
2020-10-23 16:10:18 +02:00
def csv_time_format(timestamp: Optional[float]) -> Optional[str]:
if timestamp:
return datetime.fromtimestamp(timestamp).isoformat()
else:
return None
2020-10-23 04:08:54 +02:00
def write_results(results, file_handle):
writer = csv.writer(file_handle)
2020-10-23 16:10:18 +02:00
# Latency and rtt are in ms
writer.writerow(
2020-10-23 17:31:23 +02:00
['id', 'http_start_time', 'http_complete_time', 'http_time', 'status_code', 'mqtt_arrive_time', 'latency',
'rtt']
2020-10-23 16:10:18 +02:00
)
2020-10-23 04:08:54 +02:00
2020-10-23 16:24:28 +02:00
for result in results:
2020-10-23 16:51:21 +02:00
http_time = (result.upload_finished - result.upload_started) * 1000 if result.upload_finished else None
2020-10-23 16:10:18 +02:00
latency = (result.alert_arrived - result.upload_finished) * 1000 if result.alert_arrived else None
rtt = (result.alert_arrived - result.upload_started) * 1000 if result.alert_arrived else None
row = [
result.id,
csv_time_format(result.upload_started),
csv_time_format(result.upload_finished),
2020-10-23 16:51:21 +02:00
http_time,
2020-10-23 16:10:18 +02:00
result.upload_status_code,
csv_time_format(result.alert_arrived),
latency,
rtt
]
2020-10-23 04:08:54 +02:00
writer.writerow(row)
2020-10-23 03:44:42 +02:00
def main():
2020-10-23 16:17:24 +02:00
help_epilog = textwrap.dedent('''\
CSV Columns meaning:
id - Unique ID of a request (not sequential but at least unique)
http_start_time - Timestamp of when the HTTP request is started
http_complete_time - Timestamp of when the HTTP request is completed
2020-10-23 16:51:21 +02:00
http_time - Time it took to perform the HTTP request (i.e. Uploading) in milliseconds
2020-10-23 16:17:24 +02:00
status_code - Status code of the HTTP request
mqtt_arrive_time - Timestamp of when the corresponding MQTT message arrived
latency - Latency of the system (http_complete_time - mqtt_arrive_time) in milliseconds
rtt - Total round trip time (http_start_time - mqtt_arrive_time) in milliseconds
''')
2020-10-23 03:44:42 +02:00
parser = argparse.ArgumentParser(
2020-10-23 16:17:24 +02:00
description='Birbnetes Benchmarker 3',
epilog=help_epilog,
formatter_class=argparse.RawTextHelpFormatter
2020-10-23 03:44:42 +02:00
)
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)')
parser.add_argument('--timeout', type=int, required=False, default=10,
help='Maximum time for request sending (0 for unlimited)')
parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload')
2020-10-23 17:31:23 +02:00
parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/benchmark",
2020-10-23 03:44:42 +02:00
help='The target endpoint')
parser.add_argument('--inflight-timeout', type=int, required=False, default=30,
help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)')
2020-10-23 04:08:54 +02:00
parser.add_argument('--output', type=str, required=True, help='name of the file to write results to (- for stdout)')
2020-10-23 03:44:42 +02:00
parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes",
help="Username for the MQTT server")
parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server")
parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com",
help="Hostname for the MQTT server")
parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server")
args = parser.parse_args()
print("Workers:", args.workers)
print("Timeout:", args.timeout, "sec")
alerts_arrived_queue = queue.Queue()
print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})")
subscription_waiter = threading.Event()
# Preparing connection
mqtt_client = paho.mqtt.client.Client(client_id="10")
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_subscribe = mqtt_on_subscribe
mqtt_client.user_data_set(subscription_waiter)
if args.mqtt_password:
mqtt_client.username_pw_set(args.mqtt_username, args.mqtt_password)
# this is a blocking call, will wait until connecion is complete
mqtt_client.connect(args.mqtt_host, args.mqtt_port, 60)
# Start loop in background
mqtt_client.loop_start()
# Wait for subscription to be made
subscription_waiter.wait()
# Set final command reciever
mqtt_client.user_data_set(alerts_arrived_queue)
mqtt_client.on_message = mqtt_on_command
print("MQTT Complete!")
print("Running benchmark...")
2020-10-23 17:31:23 +02:00
benchmark_results = run_benchmark(args.workers, args.timeout, args.file, args.target_url)
2020-10-23 03:44:42 +02:00
# Wait for inflight messages for a little
2020-10-23 16:24:28 +02:00
total_successful_uploads = len([req for req in benchmark_results if req.upload_status_code == 200])
all_arrived = alerts_arrived_queue.qsize() >= total_successful_uploads
if (not all_arrived) and (args.inflight_timeout > 0):
print("Waiting for inflight MQTT messages to arrive...")
waiting_started = time.time()
for _ in range(int(args.inflight_timeout * 1000)):
if alerts_arrived_queue.qsize() >= total_successful_uploads:
all_arrived = True
break
2020-10-23 03:44:42 +02:00
time.sleep(0.001)
waited_for_inflight = time.time() - waiting_started
print(f"Waited a total {waited_for_inflight} seconds")
2020-10-23 03:52:09 +02:00
2020-10-23 03:44:42 +02:00
if not all_arrived:
print("WARNING: Not all MQTT Messages arrived!")
# Disconnectiong MQTT
mqtt_client.disconnect()
mqtt_client.loop_stop() # This stops further recieving MQTT messages
# Aggregate results
total_answered = 0
while True:
try:
returned_job = alerts_arrived_queue.get_nowait()
except queue.Empty:
break
2020-10-23 16:48:27 +02:00
# super ghetto pairing
paired = False
for benchmark_result in benchmark_results:
if benchmark_result.id == returned_job.id:
benchmark_result.alert_arrived = returned_job.arrived
paired = True
break
if not paired:
2020-10-23 03:44:42 +02:00
print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id)
total_answered += 1
# print some mini statistics
2020-10-23 16:24:28 +02:00
total_runtime = max(benchmark_results, key=lambda a: a.upload_finished).upload_finished - \
min(benchmark_results, key=lambda a: a.upload_started).upload_started
2020-10-23 05:14:34 +02:00
2020-10-23 03:44:42 +02:00
print(
f"{len(benchmark_results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered"
)
print(f"Test total runtime was {total_runtime} seconds")
print("HTTP Request/sec:", len(benchmark_results) / total_runtime)
2020-10-23 02:55:13 +02:00
2020-10-23 04:08:54 +02:00
if args.output == '-':
print("Writing results to STDOUT")
print("\n---------- CUT HERE ----------\n")
write_results(benchmark_results, sys.stdout)
else:
print("Writing results to", args.output)
with open(args.output, 'w') as f:
write_results(benchmark_results, f)
2020-10-23 02:55:13 +02:00
if __name__ == '__main__':
main()