#!/usr/bin/env python3 """ This module contains the classes used for communicating with other consumers as well as the producers themselves. """ import os import logging import requests import requests.exceptions from redis_super_storage import RedisSuperStorage class ProducerCommunicator: def __init__(self, redis_store: RedisSuperStorage): self._redis_store = redis_store self._session = requests.Session() def push_ip_update(self, newip: str): for key, ip in self._redis_store.get_producer_list().items(): try: response = self._session.post( f"http://{ip}/ip", json={'uuid': os.environ['LOCAL_UUID'], 'ip': newip}, timeout=5 ) logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}") except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Could not push update to {key}: {str(e)}") class ConsumerCommunicator: def __init__(self, redis_store: RedisSuperStorage): self._redis_store = redis_store self._session = requests.Session() def targeted_snyc(self, ip: str): try: # request synchronization response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Error while syncing to {ip}: {str(e)}") return if response.status_code == 200: self._redis_store.update_consumer(response.json()['uuid'], ip) def sync_all(self): for uuid, info in self._redis_store.get_consumer_list().items(): ip = info['ip'] try: # request synchronization response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Error while syncing to {ip}: {str(e)}") continue if response.status_code == 200: self._redis_store.update_consumer(response.json()['uuid'], ip)