170 lines
5.7 KiB
Python
170 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
This module contains the classes used for communicating with other consumers as well as the producers themselves.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import requests
|
|
import requests.exceptions
|
|
from redis_super_storage import RedisSuperStorage
|
|
|
|
|
|
class ProducerCommunicator:
|
|
"""
|
|
This class is used to communicate with producers.
|
|
The addresses of producers are fetched from `RedisSuperStorage`.
|
|
"""
|
|
|
|
def __init__(self, redis_store: RedisSuperStorage):
|
|
"""
|
|
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
|
|
|
|
:param redis_store: A `RedisSuperStorage` instance.
|
|
"""
|
|
self._redis_store = redis_store
|
|
self._session = requests.Session()
|
|
|
|
def push_ip_update(self, newip: str):
|
|
"""
|
|
This method is used to push an ip update to all known consumers.
|
|
The list of consumers are read from the `RedisSuperStorage` instance.
|
|
(The list of producers are maintained by the api endpoint.)
|
|
|
|
The uuid of this consumer is acquired directly from the `LOCAL_UUID` envvar.
|
|
|
|
A timeout of 5 seconds is hardcoded for each producer individually. Timeout is logged as warning.
|
|
|
|
Called URL::
|
|
|
|
http://<producer ip>/ip
|
|
|
|
Body::
|
|
|
|
{
|
|
"uuid" : "str: LOCAL_UUID",
|
|
"ip": "str: provided by param"
|
|
}
|
|
|
|
|
|
:param newip: New ipaddress to be annouced.
|
|
"""
|
|
|
|
for key, ip in self._redis_store.get_producer_list().items():
|
|
|
|
try:
|
|
response = self._session.post(
|
|
f"http://{ip}/ip",
|
|
json={
|
|
'uuid': os.environ['LOCAL_UUID'],
|
|
'ip': newip
|
|
},
|
|
timeout=5
|
|
)
|
|
logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}")
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
logging.warning(f"Could not push update to {key}: {str(e)}")
|
|
|
|
|
|
class ConsumerCommunicator:
|
|
"""
|
|
This class is used to communicate with consumers.
|
|
The addresses of consumers are fetched from the `RedisSuperStorage`.
|
|
"""
|
|
|
|
def __init__(self, redis_store: RedisSuperStorage, force_ip_override: bool = False):
|
|
"""
|
|
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
|
|
|
|
:param redis_store: A `RedisSuperStorage` instance.
|
|
:param force_ip_override: Include the ip address stored in redis to the sync message (Disable the reciever ip discovery in the other consumer)
|
|
"""
|
|
self._redis_store = redis_store
|
|
self._session = requests.Session()
|
|
self._force_ip_override = force_ip_override
|
|
|
|
def targeted_snyc(self, ip: str):
|
|
"""
|
|
This method works similarly to `sync_all` however the target is not fetched from the `RedisSuperStorage` instance.
|
|
The results are processed the same way (saved to redis).
|
|
|
|
A timeout of 5 seconds is hardcoded for this function. Timeout is logged as warning.
|
|
|
|
This method is preferred when the remote is unknown (uuid is unknown). Mostly when the application just started up,
|
|
and an initial syncrhronization to all consumers is required.
|
|
|
|
See `sync_all` for more information.
|
|
|
|
Called URL::
|
|
|
|
http://<consumer ip>/sync
|
|
|
|
Body::
|
|
|
|
{
|
|
"uuid" : "str: LOCAL_UUID",
|
|
"ip" : "str: optional: IP override"
|
|
}
|
|
|
|
:param ip: The ip address of the consumer to be synced to.
|
|
"""
|
|
|
|
message = {'uuid': os.environ['LOCAL_UUID']}
|
|
|
|
if self._force_ip_override:
|
|
message['ip'] = self._redis_store.current_ip
|
|
|
|
try:
|
|
# request synchronization
|
|
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
|
|
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
|
return
|
|
|
|
if response.status_code == 200:
|
|
self._redis_store.update_consumer(response.json()['uuid'], ip)
|
|
|
|
def sync_all(self):
|
|
"""
|
|
This method is used to syncronize with each known consumer.
|
|
The list of consumers are acquired from the `RedisSuperStorage` instance.
|
|
|
|
This syncrhonization run causes the followings:
|
|
* Each consumer known by this consumer is checked for availability
|
|
* Each consumer this consumer communicated with updated the availability of this consumer.
|
|
* Each consumer which had no information of this consumer, now have.
|
|
|
|
A timeout of 5 seconds is hardcoded for each consumer individually. Timeout is logged as warning.
|
|
|
|
Called URL::
|
|
|
|
http://<consumer ip>/sync
|
|
|
|
Body::
|
|
|
|
{
|
|
"uuid" : "str: LOCAL_UUID",
|
|
"ip" : "str: optional: IP override"
|
|
}
|
|
|
|
"""
|
|
|
|
message = {'uuid': os.environ['LOCAL_UUID']}
|
|
|
|
if self._force_ip_override:
|
|
message['ip'] = self._redis_store.current_ip
|
|
|
|
for uuid, info in self._redis_store.get_consumer_list().items():
|
|
ip = info['ip']
|
|
try:
|
|
# request synchronization
|
|
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
|
|
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
|
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
|
continue
|
|
|
|
if response.status_code == 200:
|
|
self._redis_store.update_consumer(response.json()['uuid'], ip)
|