diff --git a/app.py b/app.py index 3d6793c..7747a52 100644 --- a/app.py +++ b/app.py @@ -1,14 +1,11 @@ #!/usr/bin/env python3 import sentry_sdk import time -import requests -import requests.exceptions import os -import json import logging -import socket from redis_super_storage import RedisSuperStorage +from communicators import ConsumerCommunicator, ProducerCommunicator """ Scheduler @@ -22,74 +19,27 @@ __version__text__ = "1" sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10") - -class Scheduler: - - def __init__(self): - pass - - def request_first_sync(self, ip_list, consumer_list_redis): - temp_dict = {} - for ip in ip_list: - try: - # request synchronization - response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) - except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: - logging.error(f"Error while syncing to {ip}: {str(e)}") - continue - - if response.status_code == 200: - temp_dict[response.json()['uuid']] = {'ip': ip} - consumer_list_redis.update(temp_dict) - self.r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) - logging.debug('Update redis consumers ip list from first answers: Done') - - def request_sync(self, consumer_list_redis): - - - def test_ip_change(self, host_name): - - old_ip = self.r.get('current_ip') - - if old_ip: - old_ip = old_ip.decode('utf-8') - - current_ip = socket.gethostbyname(host_name) - - if not old_ip: # Not set yet. I this case no update required - self.r.set('current_ip', current_ip.encode('utf-8')) - logging.debug(f"Previous info about the ip address could not be found! Current: {current_ip}") - - elif old_ip != current_ip: - logging.info(f'IP changed: {old_ip} -> {current_ip} Pushing updates...') - - # pushing updates... - keys = self.r.keys('producer_*') - - logging.debug(f'Pushing update to the following producers: ' + ', '.join(k.decode('utf-8') for k in keys)) - - for key in keys: - ip = self.r.get(key) - if ip: - ip = ip.decode('utf-8') - else: - continue - - - else: - logging.debug(f'IP unchanged: {current_ip}') - - logging.debug('Waiting for next turn') - time.sleep(os.environ.get("RUN_INTERVAL", 30)) +def get_initial_ip_list(): + ip_list = os.environ['INITIAL_SERVERS'].split(',') + logging.debug('Initial ip list' + ", ".join(ip_list)) + return ip_list def main(): # set logging preferences logging.basicConfig(filename='', level=logging.DEBUG) - redis_storage = RedisSuperStorage(os.environ.get('REDIS_URL', "redis://localhost:6379/0")) - scheduler = Scheduler() + redis_storage = RedisSuperStorage(os.environ.get('REDIS_URL', "redis://localhost:6379/0"), 5) + consumer_communicator = ConsumerCommunicator(redis_storage) + producer_communicator = ProducerCommunicator(redis_storage) + for ip in get_initial_ip_list(): + consumer_communicator.targeted_snyc(ip) + + while True: + logging.debug("Doing a sync") + consumer_communicator.sync_all() + time.sleep(os.environ.get("RUN_INTERVAL", 30)) if __name__ == "__main__": diff --git a/communicators.py b/communicators.py index c3850ea..74d25ac 100644 --- a/communicators.py +++ b/communicators.py @@ -2,6 +2,7 @@ import os import logging import requests +import requests.exceptions from redis_super_storage import RedisSuperStorage @@ -16,7 +17,7 @@ class ProducerCommunicator: for key, ip in self._redis_store.get_producer_list().items(): try: - response = requests.post( + response = self._session.post( f"http://{ip}/ip", json={'uuid': os.environ['LOCAL_UUID'], 'ip': newip}, timeout=5 @@ -35,7 +36,7 @@ class ConsumerCommunicator: def targeted_snyc(self, ip: str): try: # request synchronization - response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) + response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.error(f"Error while syncing to {ip}: {str(e)}") diff --git a/redis_super_storage.py b/redis_super_storage.py index 68bd0c3..4f31a8f 100644 --- a/redis_super_storage.py +++ b/redis_super_storage.py @@ -4,6 +4,7 @@ import os import json import socket import time +import logging class RedisSuperStorage: @@ -47,6 +48,7 @@ class RedisSuperStorage: current_ip = socket.gethostbyname(host_name) if current_ip != old_ip: + logging.info(f'IP changed: {old_ip} -> {current_ip}') self.r.set('current_ip', current_ip.encode('utf-8')) return current_ip != old_ip