Basic algorythm implemented
This commit is contained in:
parent
2e8a4d25b4
commit
ee28ef4637
6
birb_scheduler/k8s_buzerator.py
Normal file
6
birb_scheduler/k8s_buzerator.py
Normal file
@ -0,0 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
import kubernetes
|
||||
|
||||
|
||||
def ensure_running_pod_on_site(site:str):
|
||||
pass
|
@ -4,23 +4,163 @@ from redis import Redis
|
||||
import requests
|
||||
from config import Config
|
||||
|
||||
from k8s_buzerator import ensure_running_pod_on_site
|
||||
|
||||
from urllib.parse import urljoin
|
||||
|
||||
RESCHEDULE_TIRGGER_LEVEL = 5 # how many times an incraising queue must be observed for an action to be taken
|
||||
RESCHEDULE_TRIGGER_COUNTER_TTL = 60 # The counter clears itself after some time
|
||||
RECENT_TROUBLE_TTL = 120 # how long a mark should live on a site which had trouble recently
|
||||
|
||||
|
||||
def run(redis_client: Redis, site_url_map: Dict[str, str]):
|
||||
run_count = redis_client.incr("RUNCOUNT")
|
||||
logging.debug(f"Beginning of round {run_count}!")
|
||||
# Get weather report
|
||||
# get {http://example.com/report/}{site} The first part supplied
|
||||
logging.debug("Getting performance reports...")
|
||||
r = requests.get(urljoin(Config.COLLECTOR_URL, "site"), timeout=10)
|
||||
r = requests.get(urljoin(Config.COLLECTOR_URL, "site"), timeout=30)
|
||||
r.raise_for_status()
|
||||
|
||||
# TODO:
|
||||
# - set the default scheduling site
|
||||
# - Check if scheduling is required: any derivatives are above 0 for an extended period
|
||||
# - if yes, then decide where to schedule
|
||||
# - any sites with 0 derivative?
|
||||
# - Check the k8s api before scheduling to see if the pod is running
|
||||
# - check if there are any pod running where 0 units scheduled for extended time
|
||||
# - delete that pod
|
||||
# - write some log, so we can draw nice graphs
|
||||
# Optional: check if a higher priority pod have free capacity, and move lower ones up
|
||||
weather_report = r.json()
|
||||
|
||||
# Figuring out if any of the sites need attention
|
||||
incraising_queue_at = []
|
||||
for site_name, site_stat in weather_report:
|
||||
if site_stat['queue']['derivative'] > 0.001 and site_stat['queue']['mean'] > 2:
|
||||
key = f"INCRAISINGQUEUE:{site_name}"
|
||||
incraising_queue_detected_times = redis_client.incr(key)
|
||||
|
||||
if incraising_queue_detected_times > RESCHEDULE_TIRGGER_LEVEL:
|
||||
logging.debug(f"Tirgger level reached at {site_name}")
|
||||
redis_client.delete(key)
|
||||
incraising_queue_at.append(site_name)
|
||||
else:
|
||||
logging.debug(
|
||||
f"Suspicious queue size change at {site_name} ({incraising_queue_detected_times}/{RESCHEDULE_TIRGGER_LEVEL})")
|
||||
redis_client.expire(key, RESCHEDULE_TRIGGER_COUNTER_TTL) # Probably extend lifetime
|
||||
|
||||
# decide on default for the first time
|
||||
if run_count > RESCHEDULE_TIRGGER_LEVEL * 2:
|
||||
default_site = redis_client.get("DEFAULT:SCHEDULED")
|
||||
if not default_site:
|
||||
logging.debug("Default site is not set. Selecting one...")
|
||||
for site in Config.SITE_LIST:
|
||||
if site.name not in incraising_queue_at:
|
||||
redis_client.set("DEFAULT:SCHEDULED", site)
|
||||
logging.info(f"Default site set to {site}")
|
||||
break
|
||||
else:
|
||||
logging.debug("Not enough data points to decide on default site yet")
|
||||
|
||||
# No sites need attention
|
||||
if not incraising_queue_at:
|
||||
logging.info("Everything seems to be in order. No incraising queues. Nothing to do")
|
||||
return
|
||||
|
||||
# compile a list of where each client is scheduled
|
||||
current_scheduling_table = {}
|
||||
current_scheduling_table_counters = {}
|
||||
logging.debug("State of scheduling right now:")
|
||||
for key in redis_client.keys("SCHEDULED:*"):
|
||||
target = redis_client.get(key)
|
||||
|
||||
if target:
|
||||
target = target.decode('utf-8')
|
||||
|
||||
if target:
|
||||
name = key[10:]
|
||||
current_scheduling_table[name] = target
|
||||
logging.debug(f"{name} -> {target}")
|
||||
|
||||
if target in current_scheduling_table_counters:
|
||||
current_scheduling_table_counters[target] += 1
|
||||
else:
|
||||
current_scheduling_table_counters[target] = 1
|
||||
|
||||
logging.debug("Summary of scheduled clients:")
|
||||
for site, counter in current_scheduling_table_counters.items():
|
||||
logging.debug(f"{site}: {counter}")
|
||||
|
||||
if len(incraising_queue_at) == len(Config.SITE_LIST):
|
||||
logging.warning("Out of resources! All sites reporting trouble, nowhere to reschedule!")
|
||||
return
|
||||
|
||||
# If attention required, schedule a single workload to one tier lower
|
||||
for site_seeking_attention in incraising_queue_at:
|
||||
redis_client.set(f"RECENTTROUBLE:{site_seeking_attention}", b"\x01", ex=RECENT_TROUBLE_TTL)
|
||||
if current_scheduling_table_counters[site_seeking_attention] == 0:
|
||||
logging.warning("Wtf? Site reporting trouble, but there are no workload scheduled to it... nothing to do")
|
||||
continue
|
||||
|
||||
logging.debug(f"Attempting to reschedule a single workload from {site_seeking_attention}...")
|
||||
|
||||
# Compile a list of sites with lower rank than the current one
|
||||
lower_rank_sites = []
|
||||
next_lower = True
|
||||
for site in Config.SITE_LIST:
|
||||
if next_lower:
|
||||
lower_rank_sites.append(site)
|
||||
else:
|
||||
if site.name == site_seeking_attention:
|
||||
next_lower = True
|
||||
|
||||
if not lower_rank_sites:
|
||||
logging.warning("Out of lower rank sites! Nowhere to schedule! Giving up on this site.")
|
||||
continue
|
||||
|
||||
# Select a fitting candidate from the lower rank sites
|
||||
logging.debug(f"Selecting candidate out of these sites: {','.join(lower_rank_sites)}")
|
||||
candidate = None
|
||||
for lower_rank_site in lower_rank_sites:
|
||||
logging.debug(f"Evaluating {lower_rank_site}")
|
||||
recently_troubled = redis_client.get(f"RECENTTROUBLE:{lower_rank_site}")
|
||||
|
||||
if recently_troubled:
|
||||
recently_troubled = recently_troubled.decode('utf-8')
|
||||
|
||||
if recently_troubled:
|
||||
logging.debug(f"Site {lower_rank_site} had troubles recently, better not scheduling here...")
|
||||
continue
|
||||
|
||||
if lower_rank_site in incraising_queue_at:
|
||||
logging.debug(f"Site {lower_rank_site} is not a good candidate: It's currently reporting in trouble...")
|
||||
continue
|
||||
|
||||
logging.debug(f"{lower_rank_site} seems to be an ideal candidate to move a single workload to")
|
||||
candidate = lower_rank_site
|
||||
break
|
||||
|
||||
if not candidate:
|
||||
logging.warning("Could not find a good candidate to move a single workload to! Giving up!")
|
||||
continue
|
||||
|
||||
# Do the actual re-scheduling
|
||||
logging.debug(f"Re-scheduling a single workload from {site_seeking_attention} to {candidate}...")
|
||||
ensure_running_pod_on_site(candidate) # blocking
|
||||
|
||||
# Choose a workload "randomly":
|
||||
device_to_reschedule = None
|
||||
for key in redis_client.keys("SCHEDULED:*"):
|
||||
target = redis_client.get(key)
|
||||
|
||||
if target:
|
||||
target = target.decode('utf-8')
|
||||
|
||||
if target == site_seeking_attention:
|
||||
device_to_reschedule = key[10:]
|
||||
break
|
||||
|
||||
if not device_to_reschedule:
|
||||
logging.warning(f"No workload scheduled to {site_seeking_attention}... wtf??")
|
||||
break
|
||||
|
||||
logging.info(f"Re-scheduling {device_to_reschedule} from {site_seeking_attention} to {candidate}")
|
||||
|
||||
redis_client.set(f"SCHEDULED:{device_to_reschedule}", candidate)
|
||||
|
||||
# Set the new site as default
|
||||
current_default = redis_client.get("DEFAULT:SCHEDULED")
|
||||
if not current_default or (current_default.decode('utf-8') == site_seeking_attention):
|
||||
logging.info(f"Marking {candidate} as the new default site!")
|
||||
redis_client.set("DEFAULT:SCHEDULED", candidate)
|
||||
|
Loading…
Reference in New Issue
Block a user