#!/usr/bin/env python3 """ This module contains the classes used for communicating with other consumers as well as the producers themselves. """ import os import logging import requests import requests.exceptions from redis_super_storage import RedisSuperStorage class ProducerCommunicator: """ This class is used to communicate with producers. The addresses of producers are fetched from `RedisSuperStorage`. """ def __init__(self, redis_store: RedisSuperStorage): """ Upon creating this object. A requests session is created on order to take advantage of keep-alive connections. :param redis_store: A `RedisSuperStorage` instance. """ self._redis_store = redis_store self._session = requests.Session() def push_ip_update(self, newip: str): """ This method is used to push an ip update to all known consumers. The list of consumers are read from the `RedisSuperStorage` instance. (The list of producers are maintained by the api endpoint.) The uuid of this consumer is acquired directly from the `LOCAL_UUID` envvar. A timeout of 5 seconds is hardcoded for each producer individually. Timeout is logged as warning. Called URL:: http:///ip Body:: { "uuid" : "str: LOCAL_UUID", "ip": "str: provided by param" } :param newip: New ipaddress to be annouced. """ for key, ip in self._redis_store.get_producer_list().items(): try: response = self._session.post( f"http://{ip}/ip", json={'uuid': os.environ['LOCAL_UUID'], 'ip': newip}, timeout=5 ) logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}") except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Could not push update to {key}: {str(e)}") class ConsumerCommunicator: """ This class is used to communicate with consumers. The addresses of consumers are fetched from the `RedisSuperStorage`. """ def __init__(self, redis_store: RedisSuperStorage): """ Upon creating this object. A requests session is created on order to take advantage of keep-alive connections. :param redis_store: A `RedisSuperStorage` instance. """ self._redis_store = redis_store self._session = requests.Session() def targeted_snyc(self, ip: str): """ This method works similarly to `sync_all` however the target is not fetched from the `RedisSuperStorage` instance. The results are processed the same way (saved to redis). A timeout of 5 seconds is hardcoded for this function. Timeout is logged as warning. This method is preferred when the remote is unknown (uuid is unknown). Mostly when the application just started up, and an initial syncrhronization to all consumers is required. See `sync_all` for more information. Called URL:: http:///sync Body:: { "uuid" : "str: LOCAL_UUID" } :param ip: The ip address of the consumer to be synced to. """ try: # request synchronization response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Error while syncing to {ip}: {str(e)}") return if response.status_code == 200: self._redis_store.update_consumer(response.json()['uuid'], ip) def sync_all(self): """ This method is used to syncronize with each known consumer. The list of consumers are acquired from the `RedisSuperStorage` instance. This syncrhonization run causes the followings: * Each consumer known by this consumer is checked for availability * Each consumer this consumer communicated with updated the availability of this consumer. * Each consumer which had no information of this consumer, now have. A timeout of 5 seconds is hardcoded for each consumer individually. Timeout is logged as warning. Called URL:: http:///sync Body:: { "uuid" : "str: LOCAL_UUID" } """ for uuid, info in self._redis_store.get_consumer_list().items(): ip = info['ip'] try: # request synchronization response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Error while syncing to {ip}: {str(e)}") continue if response.status_code == 200: self._redis_store.update_consumer(response.json()['uuid'], ip)