Added communicators
This commit is contained in:
59
communicators.py
Normal file
59
communicators.py
Normal file
@@ -0,0 +1,59 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import logging
|
||||
import requests
|
||||
from redis_super_storage import RedisSuperStorage
|
||||
|
||||
|
||||
class ProducerCommunicator:
|
||||
|
||||
def __init__(self, redis_store: RedisSuperStorage):
|
||||
self._redis_store = redis_store
|
||||
self._session = requests.Session()
|
||||
|
||||
def push_ip_update(self, newip: str):
|
||||
|
||||
for key, ip in self._redis_store.get_producer_list().items():
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
f"http://{ip}/ip",
|
||||
json={'uuid': os.environ['LOCAL_UUID'], 'ip': newip},
|
||||
timeout=5
|
||||
)
|
||||
logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}")
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
logging.warning(f"Could not push update to {key}: {str(e)}")
|
||||
|
||||
|
||||
class ConsumerCommunicator:
|
||||
|
||||
def __init__(self, redis_store: RedisSuperStorage):
|
||||
self._redis_store = redis_store
|
||||
self._session = requests.Session()
|
||||
|
||||
def targeted_snyc(self, ip: str):
|
||||
try:
|
||||
# request synchronization
|
||||
response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
logging.error(f"Error while syncing to {ip}: {str(e)}")
|
||||
return
|
||||
|
||||
if response.status_code == 200:
|
||||
self._redis_store.update_consumer(response.json()['uuid'], ip)
|
||||
|
||||
def sync_all(self):
|
||||
for uuid, info in self._redis_store.get_consumer_list().items():
|
||||
ip = info['ip']
|
||||
try:
|
||||
# request synchronization
|
||||
response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
logging.error(f"Error while syncing to {ip}: {str(e)}")
|
||||
continue
|
||||
|
||||
if response.status_code == 200:
|
||||
self._redis_store.update_consumer(response.json()['uuid'], ip)
|
||||
Reference in New Issue
Block a user