#!/usr/bin/env python3 import os import logging import requests import requests.exceptions from redis_super_storage import RedisSuperStorage class ProducerCommunicator: def __init__(self, redis_store: RedisSuperStorage): self._redis_store = redis_store self._session = requests.Session() def push_ip_update(self, newip: str): for key, ip in self._redis_store.get_producer_list().items(): try: response = self._session.post( f"http://{ip}/ip", json={'uuid': os.environ['LOCAL_UUID'], 'ip': newip}, timeout=5 ) logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}") except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Could not push update to {key}: {str(e)}") class ConsumerCommunicator: def __init__(self, redis_store: RedisSuperStorage): self._redis_store = redis_store self._session = requests.Session() def targeted_snyc(self, ip: str): try: # request synchronization response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Error while syncing to {ip}: {str(e)}") return if response.status_code == 200: self._redis_store.update_consumer(response.json()['uuid'], ip) def sync_all(self): for uuid, info in self._redis_store.get_consumer_list().items(): ip = info['ip'] try: # request synchronization response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Error while syncing to {ip}: {str(e)}") continue if response.status_code == 200: self._redis_store.update_consumer(response.json()['uuid'], ip)