#!/usr/bin/env python3 import subprocess import threading from typing import Optional from dataclasses import dataclass import enum import sys import os import io import time import signal import random class DeviceState(enum.Enum): CREATED = 0 BOOTING = 1 RUNNING = 2 CRASHED = 3 STOPPED = 4 @dataclass class DeviceInfo: id: int executable: str state: DeviceState = DeviceState.CREATED log_fd: Optional[io.TextIOWrapper] = None proc: Optional[subprocess.Popen] = None class RealSlimShadyThread(threading.Thread): def __init__(self, device_list: list): super().__init__() self._device_list = device_list self._device_list_lock = threading.Lock() self._active = True def run(self): while self._active: time.sleep(0.5) with self._device_list_lock: # Check for crashed devices for device in filter(lambda dev: dev.state in [DeviceState.BOOTING, DeviceState.RUNNING], self._device_list): if device.proc.pid: ret = device.proc.poll() if (ret is None): # process is running device.state = DeviceState.RUNNING else: device.state = DeviceState.CRASHED # Start created, crashed processes created_or_crashed = list( filter( lambda dev: dev.state in [DeviceState.CREATED, DeviceState.CRASHED], self._device_list ) ) random.shuffle(created_or_crashed) for device in created_or_crashed[:2]: # start only two at a time to save on resources if device.log_fd: try: device.log_fd.close() except: pass device.log_fd = open(f"ned_logs/{device.id}.log", "at") device.proc = subprocess.Popen( device.executable.split(' ') + [str(device.id)], stdout=device.log_fd, stderr=device.log_fd, preexec_fn=lambda: os.setpgrp() ) device.state = DeviceState.BOOTING # Stopping with self._device_list_lock: stop_start_time = time.time() for dev in filter(lambda dev: dev.state in [DeviceState.BOOTING, DeviceState.RUNNING], self._device_list): dev.proc.terminate() still_running = True while still_running: time.sleep(0.2) with self._device_list_lock: for dev in self._device_list: if dev.state in [DeviceState.RUNNING, DeviceState.BOOTING]: ret = dev.proc.poll() if ret is not None: if ret == 0: dev.state = DeviceState.STOPPED else: dev.state = DeviceState.CRASHED still_running = any( dev.state in [DeviceState.RUNNING, DeviceState.BOOTING] for dev in self._device_list ) if still_running and ((time.time() - stop_start_time) > 15): for dev in filter(lambda dev: dev.state in [DeviceState.BOOTING, DeviceState.RUNNING], self._device_list): dev.proc.kill() dev.state = DeviceState.CRASHED def get_device_list(self) -> list: with self._device_list_lock: return self._device_list.copy() # swallow copy... eh def stop(self): self._active = False @property def stopping(self) -> bool: return not self._active def main(): executable = sys.argv[1] n_proc = int(sys.argv[2]) os.makedirs("ned_logs", exist_ok=True) slim_shady = RealSlimShadyThread([DeviceInfo(id=i, executable=executable) for i in range(n_proc)]) def shudown_signal(signal, frame): slim_shady.stop() signal.signal(signal.SIGINT, shudown_signal) signal.signal(signal.SIGTERM, shudown_signal) slim_shady.start() while slim_shady.is_alive(): print("\033[2J\033[0;0f", end="", flush=False) for state in DeviceState: print(state.name, flush=False) print(", ".join(str(dev.id) for dev in slim_shady.get_device_list() if dev.state == state), flush=False) print(flush=False) if slim_shady.stopping: print("STOPPING...", flush=False) sys.stdout.flush() time.sleep(1) if __name__ == '__main__': main()