This repository has been archived on 2020-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
consumer-scheduler/communicators.py

170 lines
5.7 KiB
Python

#!/usr/bin/env python3
"""
This module contains the classes used for communicating with other consumers as well as the producers themselves.
"""
import os
import logging
import requests
import requests.exceptions
from redis_super_storage import RedisSuperStorage
class ProducerCommunicator:
"""
This class is used to communicate with producers.
The addresses of producers are fetched from `RedisSuperStorage`.
"""
def __init__(self, redis_store: RedisSuperStorage):
"""
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
:param redis_store: A `RedisSuperStorage` instance.
"""
self._redis_store = redis_store
self._session = requests.Session()
def push_ip_update(self, newip: str):
"""
This method is used to push an ip update to all known consumers.
The list of consumers are read from the `RedisSuperStorage` instance.
(The list of producers are maintained by the api endpoint.)
The uuid of this consumer is acquired directly from the `LOCAL_UUID` envvar.
A timeout of 5 seconds is hardcoded for each producer individually. Timeout is logged as warning.
Called URL::
http://<producer ip>/ip
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip": "str: provided by param"
}
:param newip: New ipaddress to be annouced.
"""
for key, ip in self._redis_store.get_producer_list().items():
try:
response = self._session.post(
f"http://{ip}/ip",
json={
'uuid': os.environ['LOCAL_UUID'],
'ip': newip
},
timeout=5
)
logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}")
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.warning(f"Could not push update to {key}: {str(e)}")
class ConsumerCommunicator:
"""
This class is used to communicate with consumers.
The addresses of consumers are fetched from the `RedisSuperStorage`.
"""
def __init__(self, redis_store: RedisSuperStorage, force_ip_override: bool = False):
"""
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
:param redis_store: A `RedisSuperStorage` instance.
:param force_ip_override: Include the ip address stored in redis to the sync message (Disable the reciever ip discovery in the other consumer)
"""
self._redis_store = redis_store
self._session = requests.Session()
self._force_ip_override = force_ip_override
def targeted_snyc(self, ip: str):
"""
This method works similarly to `sync_all` however the target is not fetched from the `RedisSuperStorage` instance.
The results are processed the same way (saved to redis).
A timeout of 5 seconds is hardcoded for this function. Timeout is logged as warning.
This method is preferred when the remote is unknown (uuid is unknown). Mostly when the application just started up,
and an initial syncrhronization to all consumers is required.
See `sync_all` for more information.
Called URL::
http://<consumer ip>/sync
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip" : "str: optional: IP override"
}
:param ip: The ip address of the consumer to be synced to.
"""
message = {'uuid': os.environ['LOCAL_UUID']}
if self._force_ip_override:
message['ip'] = self._redis_store.current_ip
try:
# request synchronization
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.warning(f"Error while syncing to {ip}: {str(e)}")
return
if response.status_code == 200:
self._redis_store.update_consumer(response.json()['uuid'], ip)
def sync_all(self):
"""
This method is used to syncronize with each known consumer.
The list of consumers are acquired from the `RedisSuperStorage` instance.
This syncrhonization run causes the followings:
* Each consumer known by this consumer is checked for availability
* Each consumer this consumer communicated with updated the availability of this consumer.
* Each consumer which had no information of this consumer, now have.
A timeout of 5 seconds is hardcoded for each consumer individually. Timeout is logged as warning.
Called URL::
http://<consumer ip>/sync
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip" : "str: optional: IP override"
}
"""
message = {'uuid': os.environ['LOCAL_UUID']}
if self._force_ip_override:
message['ip'] = self._redis_store.current_ip
for uuid, info in self._redis_store.get_consumer_list().items():
ip = info['ip']
try:
# request synchronization
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.warning(f"Error while syncing to {ip}: {str(e)}")
continue
if response.status_code == 200:
self._redis_store.update_consumer(response.json()['uuid'], ip)