This repository has been archived on 2020-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
consumer-scheduler/app.py

81 lines
2.7 KiB
Python
Raw Permalink Normal View History

2020-05-08 21:29:08 +02:00
#!/usr/bin/env python3
2020-05-14 18:43:05 +02:00
"""
This is the top level module, containing the main application. Launching this file will launch the scheduler part of the consumer application.
"""
2020-03-29 16:48:34 +02:00
import sentry_sdk
2020-03-29 19:13:42 +02:00
import time
import os
2020-03-30 15:42:48 +02:00
import logging
2020-03-29 16:48:34 +02:00
2020-05-08 21:29:08 +02:00
from redis_super_storage import RedisSuperStorage
2020-05-08 22:05:49 +02:00
from communicators import ConsumerCommunicator, ProducerCommunicator
2020-05-08 22:21:15 +02:00
from ip_watchdog import IPWatchdog
2020-05-08 21:29:08 +02:00
2020-03-29 17:16:41 +02:00
__author__ = "@kocsisr"
2020-03-29 16:48:34 +02:00
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
__module_name__ = "app"
__version__text__ = "1"
2020-03-29 19:13:42 +02:00
sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10")
2020-03-29 16:48:34 +02:00
2020-03-29 19:30:59 +02:00
2020-05-14 18:43:05 +02:00
def get_initial_ip_list() -> list:
"""
This method is used to parse the content of INITIAL_SERVERS environment variable.
The contents of this variable is a list of ip addresses separated by commas.
This mehod returns a Python native `list` object containing the addreseses provided.
:return: A list of ip addresses provided by the environmental variable
"""
2020-05-08 22:05:49 +02:00
ip_list = os.environ['INITIAL_SERVERS'].split(',')
2020-05-08 22:32:18 +02:00
logging.debug('Initial ip list ' + ", ".join(ip_list))
2020-05-08 22:05:49 +02:00
return ip_list
2020-03-29 19:13:42 +02:00
2020-03-29 19:26:52 +02:00
2020-05-08 20:47:02 +02:00
def main():
2020-05-14 18:43:05 +02:00
"""
This is the main method of the scheduler application.
This method does basically the followings:
* Sets up logging
* Creates the RedisSuperStorage object (connecting to Redis database)
* Sets up communicators and IP watchdog
* Performs an initial synchronization to all other consumers.
* Starts the main loop which does roughly the following:
* Syncrhronizes with all other consumers
* Check if ip address changed. If yes, then push updates to all producers.
* Wait `RUN_INTERVAL` seconds (provided by envvar)
"""
2020-05-08 20:47:02 +02:00
# set logging preferences
logging.basicConfig(filename='', level=logging.DEBUG)
2020-05-08 23:00:37 +02:00
redis_storage = RedisSuperStorage(os.environ.get('REDIS_URL', "redis://localhost:6379/0"),
2020-05-14 22:26:14 +02:00
int(os.environ.get("CUSTOMER_TIMEOUT", 30)))
2020-05-08 23:00:37 +02:00
2020-05-14 22:26:14 +02:00
consumer_communicator = ConsumerCommunicator(redis_storage, bool(os.environ.get('FORCE_IP_OVERRIDE', False)))
2020-05-08 22:05:49 +02:00
producer_communicator = ProducerCommunicator(redis_storage)
2020-05-08 22:21:15 +02:00
ip_watchdog = IPWatchdog(redis_storage)
2020-05-08 20:47:02 +02:00
2020-05-08 22:21:15 +02:00
logging.info("Syncing to initial consumer list")
2020-05-08 22:05:49 +02:00
for ip in get_initial_ip_list():
2020-05-08 22:21:15 +02:00
logging.debug(f"Syncing to {ip}")
2020-05-08 22:05:49 +02:00
consumer_communicator.targeted_snyc(ip)
while True:
logging.debug("Doing a sync")
consumer_communicator.sync_all()
2020-05-08 22:21:15 +02:00
ip_changed, ipaddr = ip_watchdog.ip_changed()
if ip_changed:
producer_communicator.push_ip_update(ipaddr)
2020-05-08 22:32:18 +02:00
time.sleep(int(os.environ.get("RUN_INTERVAL", 10)))
2020-05-08 20:47:02 +02:00
2020-03-29 19:26:52 +02:00
if __name__ == "__main__":
2020-03-29 19:31:43 +02:00
try:
2020-05-08 20:47:02 +02:00
2020-03-29 19:31:43 +02:00
main()
except KeyboardInterrupt:
pass