diff --git a/birb_scheduler/k8s_buzerator.py b/birb_scheduler/k8s_buzerator.py new file mode 100644 index 0000000..c588c7f --- /dev/null +++ b/birb_scheduler/k8s_buzerator.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +import kubernetes + + +def ensure_running_pod_on_site(site:str): + pass diff --git a/birb_scheduler/run_scheduler.py b/birb_scheduler/run_scheduler.py index 6a46cb3..978acdf 100644 --- a/birb_scheduler/run_scheduler.py +++ b/birb_scheduler/run_scheduler.py @@ -4,23 +4,163 @@ from redis import Redis import requests from config import Config +from k8s_buzerator import ensure_running_pod_on_site + from urllib.parse import urljoin +RESCHEDULE_TIRGGER_LEVEL = 5 # how many times an incraising queue must be observed for an action to be taken +RESCHEDULE_TRIGGER_COUNTER_TTL = 60 # The counter clears itself after some time +RECENT_TROUBLE_TTL = 120 # how long a mark should live on a site which had trouble recently + def run(redis_client: Redis, site_url_map: Dict[str, str]): + run_count = redis_client.incr("RUNCOUNT") + logging.debug(f"Beginning of round {run_count}!") # Get weather report # get {http://example.com/report/}{site} The first part supplied logging.debug("Getting performance reports...") - r = requests.get(urljoin(Config.COLLECTOR_URL, "site"), timeout=10) + r = requests.get(urljoin(Config.COLLECTOR_URL, "site"), timeout=30) r.raise_for_status() - # TODO: - # - set the default scheduling site - # - Check if scheduling is required: any derivatives are above 0 for an extended period - # - if yes, then decide where to schedule - # - any sites with 0 derivative? - # - Check the k8s api before scheduling to see if the pod is running - # - check if there are any pod running where 0 units scheduled for extended time - # - delete that pod - # - write some log, so we can draw nice graphs - # Optional: check if a higher priority pod have free capacity, and move lower ones up + weather_report = r.json() + + # Figuring out if any of the sites need attention + incraising_queue_at = [] + for site_name, site_stat in weather_report: + if site_stat['queue']['derivative'] > 0.001 and site_stat['queue']['mean'] > 2: + key = f"INCRAISINGQUEUE:{site_name}" + incraising_queue_detected_times = redis_client.incr(key) + + if incraising_queue_detected_times > RESCHEDULE_TIRGGER_LEVEL: + logging.debug(f"Tirgger level reached at {site_name}") + redis_client.delete(key) + incraising_queue_at.append(site_name) + else: + logging.debug( + f"Suspicious queue size change at {site_name} ({incraising_queue_detected_times}/{RESCHEDULE_TIRGGER_LEVEL})") + redis_client.expire(key, RESCHEDULE_TRIGGER_COUNTER_TTL) # Probably extend lifetime + + # decide on default for the first time + if run_count > RESCHEDULE_TIRGGER_LEVEL * 2: + default_site = redis_client.get("DEFAULT:SCHEDULED") + if not default_site: + logging.debug("Default site is not set. Selecting one...") + for site in Config.SITE_LIST: + if site.name not in incraising_queue_at: + redis_client.set("DEFAULT:SCHEDULED", site) + logging.info(f"Default site set to {site}") + break + else: + logging.debug("Not enough data points to decide on default site yet") + + # No sites need attention + if not incraising_queue_at: + logging.info("Everything seems to be in order. No incraising queues. Nothing to do") + return + + # compile a list of where each client is scheduled + current_scheduling_table = {} + current_scheduling_table_counters = {} + logging.debug("State of scheduling right now:") + for key in redis_client.keys("SCHEDULED:*"): + target = redis_client.get(key) + + if target: + target = target.decode('utf-8') + + if target: + name = key[10:] + current_scheduling_table[name] = target + logging.debug(f"{name} -> {target}") + + if target in current_scheduling_table_counters: + current_scheduling_table_counters[target] += 1 + else: + current_scheduling_table_counters[target] = 1 + + logging.debug("Summary of scheduled clients:") + for site, counter in current_scheduling_table_counters.items(): + logging.debug(f"{site}: {counter}") + + if len(incraising_queue_at) == len(Config.SITE_LIST): + logging.warning("Out of resources! All sites reporting trouble, nowhere to reschedule!") + return + + # If attention required, schedule a single workload to one tier lower + for site_seeking_attention in incraising_queue_at: + redis_client.set(f"RECENTTROUBLE:{site_seeking_attention}", b"\x01", ex=RECENT_TROUBLE_TTL) + if current_scheduling_table_counters[site_seeking_attention] == 0: + logging.warning("Wtf? Site reporting trouble, but there are no workload scheduled to it... nothing to do") + continue + + logging.debug(f"Attempting to reschedule a single workload from {site_seeking_attention}...") + + # Compile a list of sites with lower rank than the current one + lower_rank_sites = [] + next_lower = True + for site in Config.SITE_LIST: + if next_lower: + lower_rank_sites.append(site) + else: + if site.name == site_seeking_attention: + next_lower = True + + if not lower_rank_sites: + logging.warning("Out of lower rank sites! Nowhere to schedule! Giving up on this site.") + continue + + # Select a fitting candidate from the lower rank sites + logging.debug(f"Selecting candidate out of these sites: {','.join(lower_rank_sites)}") + candidate = None + for lower_rank_site in lower_rank_sites: + logging.debug(f"Evaluating {lower_rank_site}") + recently_troubled = redis_client.get(f"RECENTTROUBLE:{lower_rank_site}") + + if recently_troubled: + recently_troubled = recently_troubled.decode('utf-8') + + if recently_troubled: + logging.debug(f"Site {lower_rank_site} had troubles recently, better not scheduling here...") + continue + + if lower_rank_site in incraising_queue_at: + logging.debug(f"Site {lower_rank_site} is not a good candidate: It's currently reporting in trouble...") + continue + + logging.debug(f"{lower_rank_site} seems to be an ideal candidate to move a single workload to") + candidate = lower_rank_site + break + + if not candidate: + logging.warning("Could not find a good candidate to move a single workload to! Giving up!") + continue + + # Do the actual re-scheduling + logging.debug(f"Re-scheduling a single workload from {site_seeking_attention} to {candidate}...") + ensure_running_pod_on_site(candidate) # blocking + + # Choose a workload "randomly": + device_to_reschedule = None + for key in redis_client.keys("SCHEDULED:*"): + target = redis_client.get(key) + + if target: + target = target.decode('utf-8') + + if target == site_seeking_attention: + device_to_reschedule = key[10:] + break + + if not device_to_reschedule: + logging.warning(f"No workload scheduled to {site_seeking_attention}... wtf??") + break + + logging.info(f"Re-scheduling {device_to_reschedule} from {site_seeking_attention} to {candidate}") + + redis_client.set(f"SCHEDULED:{device_to_reschedule}", candidate) + + # Set the new site as default + current_default = redis_client.get("DEFAULT:SCHEDULED") + if not current_default or (current_default.decode('utf-8') == site_seeking_attention): + logging.info(f"Marking {candidate} as the new default site!") + redis_client.set("DEFAULT:SCHEDULED", candidate)