#!/usr/bin/env python3 import subprocess import threading from typing import Optional from dataclasses import dataclass import enum import sys import os import io import time class DeviceState(enum.Enum): CREATED = 0 BOOTING = 1 RUNNING = 2 CRASHED = 3 @dataclass class DeviceInfo: id: int executable: str state: DeviceState = DeviceState.CREATED log_fd: Optional[io.TextIOWrapper] = None proc: Optional[subprocess.Popen] = None class RealSlimShadyThread(threading.Thread): def __init__(self, device_list: list): super().__init__() self._device_list = device_list self._device_list_lock = threading.Lock() def run(self): while True: time.sleep(0.5) with self._device_list_lock: # Check for crashed devices for device in filter(lambda dev: dev.state in [DeviceState.BOOTING, DeviceState.RUNNING], self._device_list): if device.proc.pid: ret = device.proc.poll() if (ret is None): # process is running device.state = DeviceState.RUNNING else: device.state = DeviceState.CRASHED # Start created, crashed processes for device in filter(lambda dev: dev.state == [DeviceState.CREATED, DeviceState.CRASHED], self._device_list)[:2]: if device.log_fd: try: device.log_fd.close() except: pass device.log_fd = open(f"ned_logs/{device.id}.log", "at") device.proc = subprocess.Popen( [device.executable, str(device.id)], stdout=device.log_fd, preexec_fn=lambda: os.setpgrp() ) device.state = DeviceState.BOOTING def get_device_list(self) -> list: with self._device_list_lock: return self._device_list.copy() # swallow copy... eh def main(): executable = sys.argv[1] n_proc = int(sys.argv[2]) os.makedirs("ned_logs", exist_ok=True) device_list = [DeviceInfo(id=i, executable=executable) for i in range(n_proc)] while True: print("\033[2J\033[0;0f", end="", flush=False) for state in DeviceState: print(state.name, flush=False) print(", ".join(str(dev.id) for dev in device_list if dev.state == state), flush=False) print(flush=False) sys.stdout.flush() time.sleep(1) if __name__ == '__main__': main()