diff --git a/javitas.py b/javitas.py new file mode 100644 index 0000000..54179e4 --- /dev/null +++ b/javitas.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +import sentry_sdk +import time +import requests +import requests.exceptions +import os +import redis +import json +import logging +import socket + +""" +Scheduler +""" + +__author__ = "@kocsisr" +__copyright__ = "Copyright 2020, GoldenPogácsa Team" +__module_name__ = "app" +__version__text__ = "1" + +sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10") + + + +class redis_super_storage: + def __init__(self,r): + self.r = r + + def get_consumer_dictionary(self,r): + consumer_list_redis = json.loads((self.r.get('consumer_list') or b'{}').decode('utf-8')) + + return consumer_list_redis + + def get_ip_list(self): + # set initial consumer addresses + ip_list = os.environ['INITIAL_SERVERS'].split(',') + logging.debug('Get consumer list from environ at first: Done') + return ip_list + + + +class scheduler: + def __init__(self,r): + self.r = r + + def request_first_sync(self,ip_list,consumer_list_redis) : + temp_dict = {} + for ip in ip_list: + try: + # request synchronization + response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: + logging.error(f"Error while syncing to {ip}: {str(e)}") + continue + + if response.status_code == 200: + temp_dict[response.json()['uuid']] = {'ip': ip} + consumer_list_redis.update(temp_dict) + r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) + logging.debug('Update redis consumers ip list from first answers: Done') + + + def request_sync(self,consumer_list_redis): + temp_dict = {} + for uuid, info in consumer_list_redis.items(): + ip = info['ip'] + try: + # request synchronization + response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: + logging.error(f"Error while syncing to {ip}: {str(e)}") + continue + + if response.status_code == 200: + temp_dict[response.json()['uuid']] = {'ip': ip} + + # update the dictionary of the currently available consumers + consumer_list_redis.update(temp_dict) + r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8')) + logging.debug('Update redis consumer ip list from answers: Done') + + + def test_ip_change(self,host_name): + + old_ip = self.r.get('current_ip') + + if old_ip: + old_ip = old_ip.decode('utf-8') + + current_ip = socket.gethostbyname(host_name) + + + if not old_ip: # Not set yet. I this case no update required + self.r.set('current_ip', current_ip.encode('utf-8')) + logging.debug(f"Previous info about the ip address could not be found! Current: {current_ip}") + + elif old_ip != current_ip: + logging.info(f'IP changed: {old_ip} -> {current_ip} Pushing updates...') + self.r.set('current_ip', current_ip.encode('utf-8')) + + # pushing updates... + keys = self.r.keys('producer_*') + + logging.debug(f'Pushing update to the following producers: ' + ', '.join(k.decode('utf-8') for k in keys)) + + for key in keys: + ip = self.r.get(key) + if ip: + ip = ip.decode('utf-8') + else: + continue + + try: + response = requests.post( + f"http://{ip}/ip", + json={'uuid': os.environ['LOCAL_UUID'], 'ip': current_ip}, + timeout=5 + ) + logging.debug(f"Pushed update to {key.decode('utf-8')} at {ip}. Response: {response.status_code}") + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: + logging.warning(f"Could not push update to {key.decode('utf-8')}: {str(e)}") + continue + else: + logging.debug(f'IP unchanged: {current_ip}') + + logging.debug('Waiting for next turn') + time.sleep(os.environ.get("RUN_INTERVAL", 30)) + + + + +def main(): + # set logging preferences + logging.basicConfig(filename='', level=logging.DEBUG) + + #Connect redis + r = redis.from_url(os.environ.get('REDIS_URL', "redis://localhost:6379/0")) + + redis_storage=redis_super_storage(r) + scheduler=scheduler(r) + + # get the dictionary of the currently available consumers + + consumer_list_redis = redis_storage.get_consumer_dictionary() + logging.debug('Get consumer list from redis at first: Done') + + + + + host_name = socket.gethostname() + ip_list=redis.get_ip_list() + + scheduler.request_list(ip_list) + scheduler.request_first_sync(ip_list,consumer_list_redis) + + while True: + logging.debug('Infinite Cycle start : Done') + # get the dictionary of the currently available consumers + consumer_list_redis=redis.get_consumer_dictionary() + logging.debug('Get consumer list from redis: Done') + + temp_dict = {} + + scheduler.request_sync(consumer_list_redis) + # Test ip change stuff + scheduler.test_ip_change(host_name) + + + + +if __name__ == "__main__": + try: + + main() + except KeyboardInterrupt: + pass \ No newline at end of file