This repository has been archived on 2020-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
consumer-scheduler/communicators.py

61 lines
2.1 KiB
Python
Raw Normal View History

2020-05-08 21:48:32 +02:00
#!/usr/bin/env python3
import os
import logging
import requests
2020-05-08 22:05:49 +02:00
import requests.exceptions
2020-05-08 21:48:32 +02:00
from redis_super_storage import RedisSuperStorage
class ProducerCommunicator:
def __init__(self, redis_store: RedisSuperStorage):
self._redis_store = redis_store
self._session = requests.Session()
def push_ip_update(self, newip: str):
for key, ip in self._redis_store.get_producer_list().items():
try:
2020-05-08 22:05:49 +02:00
response = self._session.post(
2020-05-08 21:48:32 +02:00
f"http://{ip}/ip",
json={'uuid': os.environ['LOCAL_UUID'], 'ip': newip},
timeout=5
)
logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}")
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.warning(f"Could not push update to {key}: {str(e)}")
class ConsumerCommunicator:
def __init__(self, redis_store: RedisSuperStorage):
self._redis_store = redis_store
self._session = requests.Session()
def targeted_snyc(self, ip: str):
try:
# request synchronization
2020-05-08 22:05:49 +02:00
response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
2020-05-08 21:48:32 +02:00
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.error(f"Error while syncing to {ip}: {str(e)}")
return
if response.status_code == 200:
self._redis_store.update_consumer(response.json()['uuid'], ip)
def sync_all(self):
for uuid, info in self._redis_store.get_consumer_list().items():
ip = info['ip']
try:
# request synchronization
response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.error(f"Error while syncing to {ip}: {str(e)}")
continue
if response.status_code == 200:
self._redis_store.update_consumer(response.json()['uuid'], ip)