benchmarks/benchmark3.py
2020-10-23 04:08:54 +02:00

258 lines
8.8 KiB
Python

import time
import pycurl
import itertools
import argparse
import multiprocessing
import paho.mqtt.client
from datetime import datetime
from blist import blist
from dataclasses import dataclass
import threading
import queue
import sys
import csv
@dataclass(frozen=False, init=True)
class UploadRequest:
id: int
upload_started: float = None
upload_finished: float = None
upload_status_code: bool = False
alert_arrived: float = None
@dataclass(frozen=False, init=True)
class ReturnedRequest:
id: int
arrived: float
def independent_worker(number_generator, result_queue, filename: str, timeout: float):
prepared_curl = pycurl.Curl()
prepared_curl.setopt(pycurl.URL, "https://birb.k8s.kmlabz.com/benchmark")
prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0)
prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0)
# prepared_curl.setopt(pycurl.FORBID_REUSE, 0)
prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0)
prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
completed_jobs_list = blist() # O(log n) insert instead of python's O(n)
worker_stop_time = 0
worker_completed_job_count = 0
# Start working!!
worker_start_time = time.time()
while True:
jobid = next(number_generator)
prepared_curl.setopt(pycurl.HTTPPOST, [
('file', (
pycurl.FORM_FILE, filename # Copying buffers from Python memory would be even slower... trust me
)),
('description', (
pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}',
pycurl.FORM_CONTENTTYPE, 'application/json',
))
])
job_start_time = time.time()
prepared_curl.perform()
job_stop_time = time.time()
status_code = prepared_curl.getinfo(pycurl.HTTP_CODE)
worker_completed_job_count += 1
completed_jobs_list.append(UploadRequest(
id=jobid,
upload_started=job_start_time,
upload_finished=job_stop_time,
upload_status_code=status_code
))
if (job_stop_time - worker_start_time) >= timeout:
worker_stop_time = job_stop_time
break
# end of loop
runtime = worker_stop_time - worker_start_time
prepared_curl.close()
result_queue.put(
(runtime, worker_completed_job_count, completed_jobs_list)
)
def mqtt_on_connect(client, userdata, flags, rc):
client.subscribe(f"command/#")
def mqtt_on_command(client, userdata, message):
# msg = json.loads(message.payload.decode())
# if msg['command'] == 'doAlert':
_id = int(message.topic[8:]) # len('command/') = 8
userdata.put(ReturnedRequest(id=_id, arrived=time.time()))
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
userdata.set()
def run_benchmark(num_workers: int, timeout: float, filename: str):
result_queue = multiprocessing.Queue()
workers = []
number_gen = itertools.count()
for _ in range(num_workers):
workers.append(multiprocessing.Process(
target=independent_worker,
args=(number_gen, result_queue, filename, timeout)
))
for w in workers:
w.start()
completed_workers = 0
all_requests_completed = {}
while completed_workers < num_workers:
results = result_queue.get()
for result in results[2]:
all_requests_completed[result.id] = result
completed_workers += 1
for w in workers:
w.join()
return all_requests_completed
def write_results(results, file_handle):
writer = csv.writer(file_handle)
# fire_time = Upload complete
# latency is in ms
writer.writerow(['id', 'fire_time', 'response_arrive_time', 'latency'])
for result in results.values():
latency = int((result.alert_arrived - result.upload_started) * 1000)
fire_time = datetime.fromtimestamp(result.upload_finished).isoformat()
response_arrive_time = datetime.fromtimestamp(result.alert_arrived).isoformat()
row = [result.id, fire_time, response_arrive_time, latency]
writer.writerow(row)
def main():
parser = argparse.ArgumentParser(
description='Birbnetes Benchmarker 3'
)
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)')
parser.add_argument('--timeout', type=int, required=False, default=10,
help='Maximum time for request sending (0 for unlimited)')
parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload')
parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample",
help='The target endpoint')
parser.add_argument('--inflight-timeout', type=int, required=False, default=30,
help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)')
parser.add_argument('--output', type=str, required=True, help='name of the file to write results to (- for stdout)')
parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes",
help="Username for the MQTT server")
parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server")
parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com",
help="Hostname for the MQTT server")
parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server")
args = parser.parse_args()
print("Workers:", args.workers)
print("Timeout:", args.timeout, "sec")
alerts_arrived_queue = queue.Queue()
print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})")
subscription_waiter = threading.Event()
# Preparing connection
mqtt_client = paho.mqtt.client.Client(client_id="10")
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_subscribe = mqtt_on_subscribe
mqtt_client.user_data_set(subscription_waiter)
if args.mqtt_password:
mqtt_client.username_pw_set(args.mqtt_username, args.mqtt_password)
# this is a blocking call, will wait until connecion is complete
mqtt_client.connect(args.mqtt_host, args.mqtt_port, 60)
# Start loop in background
mqtt_client.loop_start()
# Wait for subscription to be made
subscription_waiter.wait()
# Set final command reciever
mqtt_client.user_data_set(alerts_arrived_queue)
mqtt_client.on_message = mqtt_on_command
print("MQTT Complete!")
print("Running benchmark...")
benchmark_results = run_benchmark(args.workers, args.timeout, args.file)
print("Waiting for inflight MQTT messages to arrive...")
# Wait for inflight messages for a little
total_successful_uploads = len([req for req in benchmark_results.values() if req.upload_status_code == 200])
all_arrived = False
waiting_started = time.time()
for _ in range(int(args.inflight_timeout * 1000)):
if alerts_arrived_queue.qsize() >= total_successful_uploads:
all_arrived = True
break
time.sleep(0.001)
waited_for_inflight = time.time() - waiting_started
print(f"Waited a total {waited_for_inflight} seconds")
if not all_arrived:
print("WARNING: Not all MQTT Messages arrived!")
# Disconnectiong MQTT
mqtt_client.disconnect()
mqtt_client.loop_stop() # This stops further recieving MQTT messages
# Aggregate results
total_answered = 0
while True:
try:
returned_job = alerts_arrived_queue.get_nowait()
except queue.Empty:
break
try:
benchmark_results[returned_job.id].alert_arrived = returned_job.arrived
except KeyError:
print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id)
total_answered += 1
# print some mini statistics
total_runtime = \
benchmark_results[max(benchmark_results.keys())].upload_finished - \
benchmark_results[min(benchmark_results.keys())].upload_started
print(
f"{len(benchmark_results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered"
)
print(f"Test total runtime was {total_runtime} seconds")
print("HTTP Request/sec:", len(benchmark_results) / total_runtime)
if args.output == '-':
print("Writing results to STDOUT")
print("\n---------- CUT HERE ----------\n")
write_results(benchmark_results, sys.stdout)
else:
print("Writing results to", args.output)
with open(args.output, 'w') as f:
write_results(benchmark_results, f)
if __name__ == '__main__':
main()