diff --git a/app.py b/app.py index 9a6848b..0a51413 100644 --- a/app.py +++ b/app.py @@ -21,43 +21,43 @@ __version__text__ = "1" sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10") -def main(): - # set logging preferences - logging.basicConfig(filename='', level=logging.DEBUG) +class RedisSuperStorage: + def __init__(self, r): + self.r = r - # connect to redis - r = redis.from_url(os.environ.get('REDIS_URL', "redis://localhost:6379/0")) + def get_consumer_dictionary(self): + consumer_list_redis = json.loads((self.r.get('consumer_list') or b'{}').decode('utf-8')) - # set initial consumer addresses - ip_list = os.environ['INITIAL_SERVERS'].split(',') - logging.debug('Get consumer list from environ at first: Done') - # get the dictionary of the currently available consumers - consumer_list_redis = json.loads((r.get('consumer_list') or b'{}').decode('utf-8')) - logging.debug('Get consumer list from redis at first: Done') - temp_dict = {} + return consumer_list_redis - host_name = socket.gethostname() + def get_ip_list(self): + # set initial consumer addresses + ip_list = os.environ['INITIAL_SERVERS'].split(',') + logging.debug('Get consumer list from environ at first: Done') + return ip_list - for ip in ip_list: - try: - # request synchronization - response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) - except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: - logging.error(f"Error while syncing to {ip}: {str(e)}") - continue - if response.status_code == 200: - temp_dict[response.json()['uuid']] = {'ip': ip} +class Scheduler: + def __init__(self, r): + self.r = r - consumer_list_redis.update(temp_dict) - r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) - logging.debug('Update redis consumers ip list from first answers: Done') - while True: - logging.debug('Infinite Cycle start : Done') - # get the dictionary of the currently available consumers - consumer_list_redis = json.loads((r.get('consumer_list') or b'{}').decode('utf-8')) - logging.debug('Get consumer list from redis: Done') + def request_first_sync(self, ip_list, consumer_list_redis): + temp_dict = {} + for ip in ip_list: + try: + # request synchronization + response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: + logging.error(f"Error while syncing to {ip}: {str(e)}") + continue + if response.status_code == 200: + temp_dict[response.json()['uuid']] = {'ip': ip} + consumer_list_redis.update(temp_dict) + self.r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) + logging.debug('Update redis consumers ip list from first answers: Done') + + def request_sync(self, consumer_list_redis): temp_dict = {} for uuid, info in consumer_list_redis.items(): ip = info['ip'] @@ -73,34 +73,33 @@ def main(): # update the dictionary of the currently available consumers consumer_list_redis.update(temp_dict) - r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) + self.r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) logging.debug('Update redis consumer ip list from answers: Done') - # Test ip change stuff + def test_ip_change(self, host_name): - old_ip = r.get('current_ip') + old_ip = self.r.get('current_ip') if old_ip: old_ip = old_ip.decode('utf-8') current_ip = socket.gethostbyname(host_name) - if not old_ip: # Not set yet. I this case no update required - r.set('current_ip', current_ip.encode('utf-8')) + self.r.set('current_ip', current_ip.encode('utf-8')) logging.debug(f"Previous info about the ip address could not be found! Current: {current_ip}") elif old_ip != current_ip: logging.info(f'IP changed: {old_ip} -> {current_ip} Pushing updates...') - r.set('current_ip', current_ip.encode('utf-8')) + self.r.set('current_ip', current_ip.encode('utf-8')) # pushing updates... - keys = r.keys('producer_*') + keys = self.r.keys('producer_*') logging.debug(f'Pushing update to the following producers: ' + ', '.join(k.decode('utf-8') for k in keys)) for key in keys: - ip = r.get(key) + ip = self.r.get(key) if ip: ip = ip.decode('utf-8') else: @@ -123,8 +122,43 @@ def main(): time.sleep(os.environ.get("RUN_INTERVAL", 30)) +def main(): + # set logging preferences + logging.basicConfig(filename='', level=logging.DEBUG) + + # Connect redis + r = redis.from_url(os.environ.get('REDIS_URL', "redis://localhost:6379/0")) + + redis_storage = RedisSuperStorage(r) + scheduler = Scheduler(r) + + # get the dictionary of the currently available consumers + + consumer_list_redis = redis_storage.get_consumer_dictionary() + logging.debug('Get consumer list from redis at first: Done') + + host_name = socket.gethostname() + ip_list = redis.get_ip_list() + + scheduler.request_list(ip_list) + scheduler.request_first_sync(ip_list, consumer_list_redis) + + while True: + logging.debug('Infinite Cycle start : Done') + # get the dictionary of the currently available consumers + consumer_list_redis = redis.get_consumer_dictionary() + logging.debug('Get consumer list from redis: Done') + + temp_dict = {} + + scheduler.request_sync(consumer_list_redis) + # Test ip change stuff + scheduler.test_ip_change(host_name) + + if __name__ == "__main__": try: + main() except KeyboardInterrupt: pass diff --git a/javitas.py b/javitas.py deleted file mode 100644 index 54179e4..0000000 --- a/javitas.py +++ /dev/null @@ -1,176 +0,0 @@ -#!/usr/bin/env python -import sentry_sdk -import time -import requests -import requests.exceptions -import os -import redis -import json -import logging -import socket - -""" -Scheduler -""" - -__author__ = "@kocsisr" -__copyright__ = "Copyright 2020, GoldenPogácsa Team" -__module_name__ = "app" -__version__text__ = "1" - -sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10") - - - -class redis_super_storage: - def __init__(self,r): - self.r = r - - def get_consumer_dictionary(self,r): - consumer_list_redis = json.loads((self.r.get('consumer_list') or b'{}').decode('utf-8')) - - return consumer_list_redis - - def get_ip_list(self): - # set initial consumer addresses - ip_list = os.environ['INITIAL_SERVERS'].split(',') - logging.debug('Get consumer list from environ at first: Done') - return ip_list - - - -class scheduler: - def __init__(self,r): - self.r = r - - def request_first_sync(self,ip_list,consumer_list_redis) : - temp_dict = {} - for ip in ip_list: - try: - # request synchronization - response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) - except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: - logging.error(f"Error while syncing to {ip}: {str(e)}") - continue - - if response.status_code == 200: - temp_dict[response.json()['uuid']] = {'ip': ip} - consumer_list_redis.update(temp_dict) - r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) - logging.debug('Update redis consumers ip list from first answers: Done') - - - def request_sync(self,consumer_list_redis): - temp_dict = {} - for uuid, info in consumer_list_redis.items(): - ip = info['ip'] - try: - # request synchronization - response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) - except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: - logging.error(f"Error while syncing to {ip}: {str(e)}") - continue - - if response.status_code == 200: - temp_dict[response.json()['uuid']] = {'ip': ip} - - # update the dictionary of the currently available consumers - consumer_list_redis.update(temp_dict) - r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) - logging.debug('Update redis consumer ip list from answers: Done') - - - def test_ip_change(self,host_name): - - old_ip = self.r.get('current_ip') - - if old_ip: - old_ip = old_ip.decode('utf-8') - - current_ip = socket.gethostbyname(host_name) - - - if not old_ip: # Not set yet. I this case no update required - self.r.set('current_ip', current_ip.encode('utf-8')) - logging.debug(f"Previous info about the ip address could not be found! Current: {current_ip}") - - elif old_ip != current_ip: - logging.info(f'IP changed: {old_ip} -> {current_ip} Pushing updates...') - self.r.set('current_ip', current_ip.encode('utf-8')) - - # pushing updates... - keys = self.r.keys('producer_*') - - logging.debug(f'Pushing update to the following producers: ' + ', '.join(k.decode('utf-8') for k in keys)) - - for key in keys: - ip = self.r.get(key) - if ip: - ip = ip.decode('utf-8') - else: - continue - - try: - response = requests.post( - f"http://{ip}/ip", - json={'uuid': os.environ['LOCAL_UUID'], 'ip': current_ip}, - timeout=5 - ) - logging.debug(f"Pushed update to {key.decode('utf-8')} at {ip}. Response: {response.status_code}") - except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: - logging.warning(f"Could not push update to {key.decode('utf-8')}: {str(e)}") - continue - else: - logging.debug(f'IP unchanged: {current_ip}') - - logging.debug('Waiting for next turn') - time.sleep(os.environ.get("RUN_INTERVAL", 30)) - - - - -def main(): - # set logging preferences - logging.basicConfig(filename='', level=logging.DEBUG) - - #Connect redis - r = redis.from_url(os.environ.get('REDIS_URL', "redis://localhost:6379/0")) - - redis_storage=redis_super_storage(r) - scheduler=scheduler(r) - - # get the dictionary of the currently available consumers - - consumer_list_redis = redis_storage.get_consumer_dictionary() - logging.debug('Get consumer list from redis at first: Done') - - - - - host_name = socket.gethostname() - ip_list=redis.get_ip_list() - - scheduler.request_list(ip_list) - scheduler.request_first_sync(ip_list,consumer_list_redis) - - while True: - logging.debug('Infinite Cycle start : Done') - # get the dictionary of the currently available consumers - consumer_list_redis=redis.get_consumer_dictionary() - logging.debug('Get consumer list from redis: Done') - - temp_dict = {} - - scheduler.request_sync(consumer_list_redis) - # Test ip change stuff - scheduler.test_ip_change(host_name) - - - - -if __name__ == "__main__": - try: - - main() - except KeyboardInterrupt: - pass \ No newline at end of file