distributed-kebab/kebab_worker/main.py

107 lines
2.8 KiB
Python

import multiprocessing
import os
import requests
def worker(outqueue, id):
print(f"Worker {id} is preparing...")
import numpy as np
import networkx as nx
with open("/dkebab/GBA1000.txt") as f:
n = int(f.readline()) # pontok száma
m = int(f.readline()) # élek száma
A = [[0] * n for _ in range(n)] # táblázat, ahol az élek helyén 1 van, ammeg 0
for _ in range(m):
[u, v] = map(int, f.readline().split())
A[u][v] = 1
A[v][u] = 1
input_graph = nx.Graph(np.array(A))
def max_comp_size(G):
return max([len(c) for c in nx.connected_components(G)])
predelete_count = int(id / 3)
target = len(input_graph.nodes) / 2
if predelete_count:
print(f"Worker {id} always deletes the {predelete_count} highest degree nodes")
print(f"Worker {id} is working...")
while True:
del_list = []
G = input_graph.copy()
# A fresh new start
for _ in range(predelete_count):
highest_node = None
highest_node_deg = 0
for node in G.nodes:
if G.degree[node] > highest_node_deg:
highest_node = node
highest_node_deg = G.degree[node]
del_list.append(highest_node)
G.remove_node(highest_node)
while max_comp_size(G) > target:
u = np.random.choice(G.nodes)
if u not in del_list:
del_list.append(u)
G.remove_node(u)
outqueue.put(del_list)
def worker_watchdog(outqueue, id):
while True:
try:
worker(outqueue, id)
except KeyboardInterrupt:
break
except:
print(f"Worker {id} died! Restarting...")
print(f"Worker {id} exited!")
def denumpy_result(numpy_type) -> list:
denumyed = []
for i in numpy_type:
denumyed.append(int(i))
return denumyed
def main():
target_url = os.environ['TARGET_URL']
multiprocessing.set_start_method('spawn')
outqueue = multiprocessing.Queue()
processes = []
for id in range(os.cpu_count()):
p = multiprocessing.Process(target=worker_watchdog, args=(outqueue, id))
p.start()
processes.append(p)
local_best = 1000
while True:
result = outqueue.get()
result_score = len(result)
if result_score < local_best:
print(f"New local best found: {local_best} -> {result_score}")
local_best = result_score
try:
r = requests.post(target_url, json=denumpy_result(result))
r.raise_for_status()
except (ConnectionError, requests.HTTPError) as e:
print(f"Error while posting result: {e}")
if __name__ == '__main__':
main()