from typing import Dict import logging from redis import Redis import requests from config import Config from k8s_buzerator import ensure_running_pod_on_site from urllib.parse import urljoin RESCHEDULE_TIRGGER_LEVEL = 5 # how many times an incraising queue must be observed for an action to be taken RESCHEDULE_TRIGGER_COUNTER_TTL = 60 # The counter clears itself after some time RECENT_TROUBLE_TTL = 120 # how long a mark should live on a site which had trouble recently def run(redis_client: Redis, site_url_map: Dict[str, str]): run_count = redis_client.incr("RUNCOUNT") logging.debug(f"Beginning of round {run_count}!") # Get weather report # get {http://example.com/report/}{site} The first part supplied logging.debug("Getting performance reports...") r = requests.get(urljoin(Config.COLLECTOR_URL, "site"), timeout=30) r.raise_for_status() weather_report = r.json() # Figuring out if any of the sites need attention incraising_queue_at = [] for site_name, site_stat in weather_report: if site_stat['queue']['derivative'] > 0.001 and site_stat['queue']['mean'] > 2: key = f"INCRAISINGQUEUE:{site_name}" incraising_queue_detected_times = redis_client.incr(key) if incraising_queue_detected_times > RESCHEDULE_TIRGGER_LEVEL: logging.debug(f"Tirgger level reached at {site_name}") redis_client.delete(key) incraising_queue_at.append(site_name) else: logging.debug( f"Suspicious queue size change at {site_name} ({incraising_queue_detected_times}/{RESCHEDULE_TIRGGER_LEVEL})") redis_client.expire(key, RESCHEDULE_TRIGGER_COUNTER_TTL) # Probably extend lifetime # decide on default for the first time if run_count > RESCHEDULE_TIRGGER_LEVEL * 2: default_site = redis_client.get("DEFAULT:SCHEDULED") if not default_site: logging.debug("Default site is not set. Selecting one...") for site in Config.SITE_LIST: if site.name not in incraising_queue_at: ensure_running_pod_on_site(site.name) # blocking redis_client.set("DEFAULT:SCHEDULED", site.name) logging.info(f"Default site set to {site.name}") break else: logging.debug("Not enough data points to decide on default site yet") # No sites need attention if not incraising_queue_at: logging.info("Everything seems to be in order. No incraising queues. Nothing to do") return # compile a list of where each client is scheduled current_scheduling_table = {} current_scheduling_table_counters = {} logging.debug("State of scheduling right now:") for key in redis_client.keys("SCHEDULED:*"): target = redis_client.get(key) if target: target = target.decode('utf-8') if target: name = key[10:] current_scheduling_table[name] = target logging.debug(f"{name} -> {target}") if target in current_scheduling_table_counters: current_scheduling_table_counters[target] += 1 else: current_scheduling_table_counters[target] = 1 logging.debug("Summary of scheduled clients:") for site, counter in current_scheduling_table_counters.items(): logging.debug(f"{site}: {counter}") if len(incraising_queue_at) == len(Config.SITE_LIST): logging.warning("Out of resources! All sites reporting trouble, nowhere to reschedule!") return # If attention required, schedule a single workload to one tier lower for site_seeking_attention in incraising_queue_at: redis_client.set(f"RECENTTROUBLE:{site_seeking_attention}", b"\x01", ex=RECENT_TROUBLE_TTL) if current_scheduling_table_counters[site_seeking_attention] == 0: logging.warning("Wtf? Site reporting trouble, but there are no workload scheduled to it... nothing to do") continue logging.debug(f"Attempting to reschedule a single workload from {site_seeking_attention}...") # Compile a list of sites with lower rank than the current one lower_rank_sites = [] next_lower = True for site in Config.SITE_LIST: if next_lower: lower_rank_sites.append(site) else: if site.name == site_seeking_attention: next_lower = True if not lower_rank_sites: logging.warning("Out of lower rank sites! Nowhere to schedule! Giving up on this site.") continue # Select a fitting candidate from the lower rank sites logging.debug(f"Selecting candidate out of these sites: {','.join(lower_rank_sites)}") candidate = None for lower_rank_site in lower_rank_sites: logging.debug(f"Evaluating {lower_rank_site}") recently_troubled = redis_client.get(f"RECENTTROUBLE:{lower_rank_site}") if recently_troubled: recently_troubled = recently_troubled.decode('utf-8') if recently_troubled: logging.debug(f"Site {lower_rank_site} had troubles recently, better not scheduling here...") continue if lower_rank_site in incraising_queue_at: logging.debug(f"Site {lower_rank_site} is not a good candidate: It's currently reporting in trouble...") continue logging.debug(f"{lower_rank_site} seems to be an ideal candidate to move a single workload to") candidate = lower_rank_site break if not candidate: logging.warning("Could not find a good candidate to move a single workload to! Giving up!") continue # Do the actual re-scheduling logging.debug(f"Re-scheduling a single workload from {site_seeking_attention} to {candidate}...") ensure_running_pod_on_site(candidate) # blocking # Choose a workload "randomly": device_to_reschedule = None for key in redis_client.keys("SCHEDULED:*"): target = redis_client.get(key) if target: target = target.decode('utf-8') if target == site_seeking_attention: device_to_reschedule = key[10:] break if not device_to_reschedule: logging.warning(f"No workload scheduled to {site_seeking_attention}... wtf??") break logging.info(f"Re-scheduling {device_to_reschedule} from {site_seeking_attention} to {candidate}") redis_client.set(f"SCHEDULED:{device_to_reschedule}", candidate) # Set the new site as default current_default = redis_client.get("DEFAULT:SCHEDULED") if not current_default or (current_default.decode('utf-8') == site_seeking_attention): logging.info(f"Marking {candidate} as the new default site!") redis_client.set("DEFAULT:SCHEDULED", candidate)