2021-12-12 02:07:26 +01:00
|
|
|
from typing import Dict
|
2021-12-12 00:59:00 +01:00
|
|
|
import logging
|
2021-12-12 02:07:26 +01:00
|
|
|
from redis import Redis
|
|
|
|
import requests
|
|
|
|
from config import Config
|
2021-12-12 00:59:00 +01:00
|
|
|
|
2021-12-13 02:38:10 +01:00
|
|
|
from k8s_buzerator import ensure_running_pod_on_site
|
|
|
|
|
2021-12-12 02:07:26 +01:00
|
|
|
from urllib.parse import urljoin
|
2021-12-12 00:59:00 +01:00
|
|
|
|
2021-12-13 02:38:10 +01:00
|
|
|
RESCHEDULE_TIRGGER_LEVEL = 5 # how many times an incraising queue must be observed for an action to be taken
|
|
|
|
RESCHEDULE_TRIGGER_COUNTER_TTL = 60 # The counter clears itself after some time
|
|
|
|
RECENT_TROUBLE_TTL = 120 # how long a mark should live on a site which had trouble recently
|
|
|
|
|
2021-12-12 02:07:26 +01:00
|
|
|
|
|
|
|
def run(redis_client: Redis, site_url_map: Dict[str, str]):
|
2021-12-13 02:38:10 +01:00
|
|
|
run_count = redis_client.incr("RUNCOUNT")
|
|
|
|
logging.debug(f"Beginning of round {run_count}!")
|
2021-12-12 02:07:26 +01:00
|
|
|
# Get weather report
|
|
|
|
# get {http://example.com/report/}{site} The first part supplied
|
|
|
|
logging.debug("Getting performance reports...")
|
2021-12-13 02:38:10 +01:00
|
|
|
r = requests.get(urljoin(Config.COLLECTOR_URL, "site"), timeout=30)
|
2021-12-12 02:07:26 +01:00
|
|
|
r.raise_for_status()
|
|
|
|
|
2021-12-13 02:38:10 +01:00
|
|
|
weather_report = r.json()
|
|
|
|
|
|
|
|
# Figuring out if any of the sites need attention
|
|
|
|
incraising_queue_at = []
|
|
|
|
for site_name, site_stat in weather_report:
|
|
|
|
if site_stat['queue']['derivative'] > 0.001 and site_stat['queue']['mean'] > 2:
|
|
|
|
key = f"INCRAISINGQUEUE:{site_name}"
|
|
|
|
incraising_queue_detected_times = redis_client.incr(key)
|
|
|
|
|
|
|
|
if incraising_queue_detected_times > RESCHEDULE_TIRGGER_LEVEL:
|
|
|
|
logging.debug(f"Tirgger level reached at {site_name}")
|
|
|
|
redis_client.delete(key)
|
|
|
|
incraising_queue_at.append(site_name)
|
|
|
|
else:
|
|
|
|
logging.debug(
|
|
|
|
f"Suspicious queue size change at {site_name} ({incraising_queue_detected_times}/{RESCHEDULE_TIRGGER_LEVEL})")
|
|
|
|
redis_client.expire(key, RESCHEDULE_TRIGGER_COUNTER_TTL) # Probably extend lifetime
|
|
|
|
|
|
|
|
# decide on default for the first time
|
|
|
|
if run_count > RESCHEDULE_TIRGGER_LEVEL * 2:
|
|
|
|
default_site = redis_client.get("DEFAULT:SCHEDULED")
|
|
|
|
if not default_site:
|
|
|
|
logging.debug("Default site is not set. Selecting one...")
|
|
|
|
for site in Config.SITE_LIST:
|
|
|
|
if site.name not in incraising_queue_at:
|
2021-12-13 04:39:42 +01:00
|
|
|
ensure_running_pod_on_site(site.name) # blocking
|
|
|
|
redis_client.set("DEFAULT:SCHEDULED", site.name)
|
|
|
|
logging.info(f"Default site set to {site.name}")
|
2021-12-13 02:38:10 +01:00
|
|
|
break
|
|
|
|
else:
|
|
|
|
logging.debug("Not enough data points to decide on default site yet")
|
|
|
|
|
|
|
|
# No sites need attention
|
|
|
|
if not incraising_queue_at:
|
|
|
|
logging.info("Everything seems to be in order. No incraising queues. Nothing to do")
|
|
|
|
return
|
|
|
|
|
|
|
|
# compile a list of where each client is scheduled
|
|
|
|
current_scheduling_table = {}
|
|
|
|
current_scheduling_table_counters = {}
|
|
|
|
logging.debug("State of scheduling right now:")
|
|
|
|
for key in redis_client.keys("SCHEDULED:*"):
|
|
|
|
target = redis_client.get(key)
|
|
|
|
|
|
|
|
if target:
|
|
|
|
target = target.decode('utf-8')
|
|
|
|
|
|
|
|
if target:
|
|
|
|
name = key[10:]
|
|
|
|
current_scheduling_table[name] = target
|
|
|
|
logging.debug(f"{name} -> {target}")
|
|
|
|
|
|
|
|
if target in current_scheduling_table_counters:
|
|
|
|
current_scheduling_table_counters[target] += 1
|
|
|
|
else:
|
|
|
|
current_scheduling_table_counters[target] = 1
|
|
|
|
|
|
|
|
logging.debug("Summary of scheduled clients:")
|
|
|
|
for site, counter in current_scheduling_table_counters.items():
|
|
|
|
logging.debug(f"{site}: {counter}")
|
|
|
|
|
|
|
|
if len(incraising_queue_at) == len(Config.SITE_LIST):
|
|
|
|
logging.warning("Out of resources! All sites reporting trouble, nowhere to reschedule!")
|
|
|
|
return
|
|
|
|
|
|
|
|
# If attention required, schedule a single workload to one tier lower
|
|
|
|
for site_seeking_attention in incraising_queue_at:
|
|
|
|
redis_client.set(f"RECENTTROUBLE:{site_seeking_attention}", b"\x01", ex=RECENT_TROUBLE_TTL)
|
|
|
|
if current_scheduling_table_counters[site_seeking_attention] == 0:
|
|
|
|
logging.warning("Wtf? Site reporting trouble, but there are no workload scheduled to it... nothing to do")
|
|
|
|
continue
|
|
|
|
|
|
|
|
logging.debug(f"Attempting to reschedule a single workload from {site_seeking_attention}...")
|
|
|
|
|
|
|
|
# Compile a list of sites with lower rank than the current one
|
|
|
|
lower_rank_sites = []
|
|
|
|
next_lower = True
|
|
|
|
for site in Config.SITE_LIST:
|
|
|
|
if next_lower:
|
|
|
|
lower_rank_sites.append(site)
|
|
|
|
else:
|
|
|
|
if site.name == site_seeking_attention:
|
|
|
|
next_lower = True
|
|
|
|
|
|
|
|
if not lower_rank_sites:
|
|
|
|
logging.warning("Out of lower rank sites! Nowhere to schedule! Giving up on this site.")
|
|
|
|
continue
|
|
|
|
|
|
|
|
# Select a fitting candidate from the lower rank sites
|
|
|
|
logging.debug(f"Selecting candidate out of these sites: {','.join(lower_rank_sites)}")
|
|
|
|
candidate = None
|
|
|
|
for lower_rank_site in lower_rank_sites:
|
|
|
|
logging.debug(f"Evaluating {lower_rank_site}")
|
|
|
|
recently_troubled = redis_client.get(f"RECENTTROUBLE:{lower_rank_site}")
|
|
|
|
|
|
|
|
if recently_troubled:
|
|
|
|
recently_troubled = recently_troubled.decode('utf-8')
|
|
|
|
|
|
|
|
if recently_troubled:
|
|
|
|
logging.debug(f"Site {lower_rank_site} had troubles recently, better not scheduling here...")
|
|
|
|
continue
|
|
|
|
|
|
|
|
if lower_rank_site in incraising_queue_at:
|
|
|
|
logging.debug(f"Site {lower_rank_site} is not a good candidate: It's currently reporting in trouble...")
|
|
|
|
continue
|
|
|
|
|
|
|
|
logging.debug(f"{lower_rank_site} seems to be an ideal candidate to move a single workload to")
|
|
|
|
candidate = lower_rank_site
|
|
|
|
break
|
|
|
|
|
|
|
|
if not candidate:
|
|
|
|
logging.warning("Could not find a good candidate to move a single workload to! Giving up!")
|
|
|
|
continue
|
|
|
|
|
|
|
|
# Do the actual re-scheduling
|
|
|
|
logging.debug(f"Re-scheduling a single workload from {site_seeking_attention} to {candidate}...")
|
|
|
|
ensure_running_pod_on_site(candidate) # blocking
|
|
|
|
|
|
|
|
# Choose a workload "randomly":
|
|
|
|
device_to_reschedule = None
|
|
|
|
for key in redis_client.keys("SCHEDULED:*"):
|
|
|
|
target = redis_client.get(key)
|
|
|
|
|
|
|
|
if target:
|
|
|
|
target = target.decode('utf-8')
|
|
|
|
|
|
|
|
if target == site_seeking_attention:
|
|
|
|
device_to_reschedule = key[10:]
|
|
|
|
break
|
|
|
|
|
|
|
|
if not device_to_reschedule:
|
|
|
|
logging.warning(f"No workload scheduled to {site_seeking_attention}... wtf??")
|
|
|
|
break
|
|
|
|
|
|
|
|
logging.info(f"Re-scheduling {device_to_reschedule} from {site_seeking_attention} to {candidate}")
|
|
|
|
|
|
|
|
redis_client.set(f"SCHEDULED:{device_to_reschedule}", candidate)
|
|
|
|
|
|
|
|
# Set the new site as default
|
|
|
|
current_default = redis_client.get("DEFAULT:SCHEDULED")
|
|
|
|
if not current_default or (current_default.decode('utf-8') == site_seeking_attention):
|
|
|
|
logging.info(f"Marking {candidate} as the new default site!")
|
|
|
|
redis_client.set("DEFAULT:SCHEDULED", candidate)
|