import io import sys import time import pycurl import itertools import multiprocessing import threading import queue from datetime import datetime from blist import blist from dataclasses import dataclass @dataclass(frozen=False, init=True) class UploadRequest: id: int upload_started: float = None upload_finished: float = None upload_status_code: bool = False alert_arrived: float = None @dataclass(frozen=False, init=True) class ReturnedRequest: id: int arrived: float def independent_worker(number_generator, result_queue, filename: str, timeout: float): prepared_curl = pycurl.Curl() prepared_curl.setopt(pycurl.URL, "https://birb.k8s.kmlabz.com/benchmark") prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0) prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0) # prepared_curl.setopt(pycurl.FORBID_REUSE, 0) prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0) prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda _: None) completed_jobs_list = blist() # O(log n) insert instead of python's O(n) worker_stop_time = 0 worker_completed_job_count = 0 # Start working!! worker_start_time = time.time() while True: jobid = next(number_generator) prepared_curl.setopt(pycurl.HTTPPOST, [ ('file', ( pycurl.FORM_FILE, filename )), ('description', ( pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}', pycurl.FORM_CONTENTTYPE, 'application/json', )) ]) job_start_time = time.time() prepared_curl.perform() job_stop_time = time.time() status_code = prepared_curl.getinfo(pycurl.HTTP_CODE) worker_completed_job_count += 1 completed_jobs_list.append(UploadRequest( id=jobid, upload_started=job_start_time, upload_finished=job_stop_time, upload_status_code=status_code )) if (job_stop_time - worker_start_time) >= timeout: worker_stop_time = job_stop_time break # end of loop runtime = worker_stop_time - worker_start_time prepared_curl.close() result_queue.put( (runtime, worker_completed_job_count, completed_jobs_list) ) def main(): num_workers = 50 timeout = 10.0 filename = '/home/marcsello/XauddzikzfKhogEcCYmufFMUrjYTwhwf.wav' result_queue = multiprocessing.Queue() workers = [] number_gen = itertools.count() for _ in range(num_workers): workers.append(multiprocessing.Process( target=independent_worker, args=(number_gen, result_queue, filename, timeout)) ) for w in workers: w.start() completed_workers = 0 total_numer_of_completed_requests = 0 while completed_workers < num_workers: result = result_queue.get() total_numer_of_completed_requests += result[1] completed_workers += 1 for w in workers: w.join() print(total_numer_of_completed_requests / timeout) if __name__ == '__main__': main()