From 04e189f5415fe0638792a8068a2cdca987fe4848 Mon Sep 17 00:00:00 2001 From: marcsello Date: Thu, 14 May 2020 18:11:56 +0200 Subject: [PATCH 1/6] Fixed sphinx config --- docs/conf.py | 17 +++++++++++------ docs/source/communicators.rst | 7 +++++++ docs/source/ip_watchdog.rst | 7 +++++++ docs/source/modules.rst | 3 +++ docs/source/redis_super_storage.rst | 7 +++++++ 5 files changed, 35 insertions(+), 6 deletions(-) create mode 100644 docs/source/communicators.rst create mode 100644 docs/source/ip_watchdog.rst create mode 100644 docs/source/redis_super_storage.rst diff --git a/docs/conf.py b/docs/conf.py index 1e0445d..57e0d2a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -10,9 +10,11 @@ # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. # -# import os -# import sys -# sys.path.insert(0, os.path.abspath('.')) +import os +import sys +sys.path.insert(0, os.path.abspath('..')) + +#import app # -- Project information ----------------------------------------------------- @@ -30,8 +32,7 @@ release = '0.1.0' # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = [ -] +extensions = ['sphinx.ext.autodoc'] # Add any paths that contain templates here, relative to this directory. templates_path = [] @@ -52,4 +53,8 @@ html_theme = 'sphinx_rtd_theme' # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = [] \ No newline at end of file +html_static_path = [] + +# master document +master_doc = 'index' +autoclass_content = 'both' \ No newline at end of file diff --git a/docs/source/communicators.rst b/docs/source/communicators.rst new file mode 100644 index 0000000..38cd902 --- /dev/null +++ b/docs/source/communicators.rst @@ -0,0 +1,7 @@ +communicators module +==================== + +.. automodule:: communicators + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/ip_watchdog.rst b/docs/source/ip_watchdog.rst new file mode 100644 index 0000000..6daecdf --- /dev/null +++ b/docs/source/ip_watchdog.rst @@ -0,0 +1,7 @@ +ip\_watchdog module +=================== + +.. automodule:: ip_watchdog + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/modules.rst b/docs/source/modules.rst index 7abee4e..09f1c7d 100644 --- a/docs/source/modules.rst +++ b/docs/source/modules.rst @@ -5,3 +5,6 @@ consumer-scheduler :maxdepth: 4 app + communicators + ip_watchdog + redis_super_storage \ No newline at end of file diff --git a/docs/source/redis_super_storage.rst b/docs/source/redis_super_storage.rst new file mode 100644 index 0000000..2ef2418 --- /dev/null +++ b/docs/source/redis_super_storage.rst @@ -0,0 +1,7 @@ +redis\_super\_storage module +============================ + +.. automodule:: redis_super_storage + :members: + :undoc-members: + :show-inheritance: From 108db57271f8d7f958f47751c695004503b4c340 Mon Sep 17 00:00:00 2001 From: marcsello Date: Thu, 14 May 2020 18:26:30 +0200 Subject: [PATCH 2/6] Added docstrings to ipw --- ip_watchdog.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/ip_watchdog.py b/ip_watchdog.py index 9962f70..b2126a6 100644 --- a/ip_watchdog.py +++ b/ip_watchdog.py @@ -7,12 +7,35 @@ from redis_super_storage import RedisSuperStorage class IPWatchdog: + """ + This is very simple class, that is used to determine if the ip address of the host have changed. + + Internally this class relies on `RedisSuperStorage` to fetch the last used ip address. + + The ip address of the current host is acquired using python's builtin `socket` interface, by requesting an address resolve agains the localhost's host name. + In some scenarios this may result in wrongly identifying the loopback address instead of the assigned address. + In most cases, where the application is run inside a docker container this method will work just fine. + """ + def __init__(self, redis_store: RedisSuperStorage): + """ + During the construction of the object, the host name is of the current machine is cached, for quicker lookups. + + :param redis_store: a RedisSuperStorage instance. + """ self._redis_store = redis_store self._host_name = socket.gethostname() def ip_changed(self) -> Tuple[bool, str]: + """ + This method fetches the last ip address from the RedisSuperStorage instance, and compares it to the current local address. + If the ip address changes the new value is automatically stored in the RedisSuperStorage instance. + + Detection is performed upon calling this method, as well as storing the updated address. + + :return: (changed, ip_address) A tuple with two members, where the first member indicates if the ip address is changed, the second member is the current ip address. + """ old_ip = self._redis_store.current_ip current_ip = socket.gethostbyname(self._host_name) From ac90ca829f3502e88560d16d7b2565d7f8d56c2c Mon Sep 17 00:00:00 2001 From: marcsello Date: Thu, 14 May 2020 18:43:05 +0200 Subject: [PATCH 3/6] Added some docstrings --- app.py | 29 ++++++++++++++++++++++++----- communicators.py | 4 ++++ ip_watchdog.py | 3 +++ redis_super_storage.py | 3 +++ 4 files changed, 34 insertions(+), 5 deletions(-) diff --git a/app.py b/app.py index 6d23bc4..aef71e6 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,7 @@ #!/usr/bin/env python3 +""" +This is the top level module, containing the main application. Launching this file will launch the scheduler part of the consumer application. +""" import sentry_sdk import time import os @@ -8,10 +11,6 @@ from redis_super_storage import RedisSuperStorage from communicators import ConsumerCommunicator, ProducerCommunicator from ip_watchdog import IPWatchdog -""" -Scheduler -""" - __author__ = "@kocsisr" __copyright__ = "Copyright 2020, GoldenPogácsa Team" __module_name__ = "app" @@ -20,13 +19,33 @@ __version__text__ = "1" sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10") -def get_initial_ip_list(): +def get_initial_ip_list() -> list: + """ + This method is used to parse the content of INITIAL_SERVERS environment variable. + The contents of this variable is a list of ip addresses separated by commas. + This mehod returns a Python native `list` object containing the addreseses provided. + + :return: A list of ip addresses provided by the environmental variable + """ ip_list = os.environ['INITIAL_SERVERS'].split(',') logging.debug('Initial ip list ' + ", ".join(ip_list)) return ip_list def main(): + """ + This is the main method of the scheduler application. + + This method does basically the followings: + * Sets up logging + * Creates the RedisSuperStorage object (connecting to Redis database) + * Sets up communicators and IP watchdog + * Performs an initial synchronization to all other consumers. + * Starts the main loop which does roughly the following: + * Syncrhronizes with all other consumers + * Check if ip address changed. If yes, then push updates to all producers. + * Wait `RUN_INTERVAL` seconds (provided by envvar) + """ # set logging preferences logging.basicConfig(filename='', level=logging.DEBUG) diff --git a/communicators.py b/communicators.py index 0d8f910..edb84f0 100644 --- a/communicators.py +++ b/communicators.py @@ -1,4 +1,8 @@ #!/usr/bin/env python3 +""" +This module contains the classes used for communicating with other consumers as well as the producers themselves. +""" + import os import logging import requests diff --git a/ip_watchdog.py b/ip_watchdog.py index b2126a6..a05bfef 100644 --- a/ip_watchdog.py +++ b/ip_watchdog.py @@ -1,4 +1,7 @@ #!/usr/bin/env python3 +""" +This module contains the IPWatchdog class which is responsible for detecting ip address changes on the host machine. +""" from typing import Tuple import logging import socket diff --git a/redis_super_storage.py b/redis_super_storage.py index 46bc10c..1ea528a 100644 --- a/redis_super_storage.py +++ b/redis_super_storage.py @@ -1,4 +1,7 @@ #!/usr/bin/env python3 +""" +This module contains the RedisSuperStorage module which is responsible to store and load the data structure used by the constumer. +""" import redis import os import json From 1a486833bb81010d045577bc14eeec1172986c17 Mon Sep 17 00:00:00 2001 From: marcsello Date: Thu, 14 May 2020 19:24:20 +0200 Subject: [PATCH 4/6] Documented rst --- redis_super_storage.py | 65 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/redis_super_storage.py b/redis_super_storage.py index 1ea528a..d7736de 100644 --- a/redis_super_storage.py +++ b/redis_super_storage.py @@ -9,12 +9,50 @@ import time class RedisSuperStorage: + """ + This class is the brain of the scheduler app, and probably half the brain of the whole consumer system. + + This class provides access to the data structures used in the consumer system trough various getter and setter methods. + Kind of like an ORM would do. + + Sometimes it is necessary to access more than one time to the redis database while qcquiring or setting some data. + In this cases some basic error handling is implemented to avoid most of the problems that could be caused by non-atomic operations. + """ def __init__(self, redis_url: str, timeout: int): + """ + During the object creation the Redis connection is attempted to be established. + + :param redis_url: URL of the redis database in the following form: `redis://localhost:6379/0` + :param timeout: Timeout to be set on keys, where it matters. (See the documentation for each method) + """ self.r = redis.from_url(redis_url) self._timeout = timeout def get_consumer_list(self) -> dict: + """ + This function gets the list of consumers from the Redis database. + + Consumers are saved to the database either by the API endpoint, or calling `update_consumer` of this class. + The consumers are volatile, they have a timeout set when they updated. + If they are not updated within that timeout they considered invalid, and does not returned by this method. + + + Each consumer is represented by the following structure:: + + { + "uuid" : "str", + "ip" : "str", + "last_seen": int + } + + Meaning of the fields: + * `uuid` - The UUID of the remote consumer + * `ip` - The IP address of the remote consumer + * `last_seen` - The UNIX timestamp when the consumer was last seen. + + :return: Despite it's name, this function returns a dictionary in which the keys are the uuids of each consumer. + """ keys = self.r.keys('consumer_*') list_of_customers = {} @@ -28,6 +66,15 @@ class RedisSuperStorage: return list_of_customers def get_producer_list(self) -> dict: + """ + This method returns a list of producer ip addresses, nothing more. + The producer ip addresses are volatile, they have a timeout set when they updated. + If they are not updated within that timeout they considered invalid, and does not returned by this method. + + Producers are added to the redis database by the API endpoint with a timeout set on them. + + :return: Despite it's name this function returns a dict... Similar to `get_consumer_list`. The keys are the keys stored in redis (lol) + """ keys = self.r.keys('producer_*') list_of_producer_ip = {} @@ -41,6 +88,13 @@ class RedisSuperStorage: return list_of_producer_ip def update_consumer(self, uuid: str, ip: str): + """ + Updates (or creates) informations of a specific consumer in the redis database. + The default timeout is set on the keys, when stored in the database. + + :param uuid: The uuid of the consumer to be updated. + :param ip: The ip address of that consumer. + """ cust_key = f"consumer_{uuid}" @@ -54,6 +108,11 @@ class RedisSuperStorage: self.r.expire(cust_key, self._timeout) def get_current_ip(self) -> str: + """ + This is a basic getter, which reads a single value from the Redis database. + + :return: The ip address of the consumer stored in the redis database. + """ ip = self.r.get('current_ip') if ip: @@ -62,6 +121,12 @@ class RedisSuperStorage: return ip def set_current_ip(self, ip: str): + """ + This is the most basic setter in the whole object. This is stores a single value which is the ip address of the consumer. + + The current ip in the redis storage does not time out. + :param ip: IP address to be set. + """ self.r.set('current_ip', ip.encode('utf-8')) current_ip = property(get_current_ip, set_current_ip) From 5d2bed0f14f27418dababca42c58321c14575348 Mon Sep 17 00:00:00 2001 From: marcsello Date: Thu, 14 May 2020 20:01:04 +0200 Subject: [PATCH 5/6] Added docstrings for communicators module. --- communicators.py | 86 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/communicators.py b/communicators.py index edb84f0..8d84fd4 100644 --- a/communicators.py +++ b/communicators.py @@ -11,12 +11,44 @@ from redis_super_storage import RedisSuperStorage class ProducerCommunicator: + """ + This class is used to communicate with producers. + The addresses of producers are fetched from `RedisSuperStorage`. + """ def __init__(self, redis_store: RedisSuperStorage): + """ + Upon creating this object. A requests session is created on order to take advantage of keep-alive connections. + + :param redis_store: A `RedisSuperStorage` instance. + """ self._redis_store = redis_store self._session = requests.Session() def push_ip_update(self, newip: str): + """ + This method is used to push an ip update to all known consumers. + The list of consumers are read from the `RedisSuperStorage` instance. + (The list of producers are maintained by the api endpoint.) + + The uuid of this consumer is acquired directly from the `LOCAL_UUID` envvar. + + A timeout of 5 seconds is hardcoded for each producer individually. Timeout is logged as warning. + + Called URL:: + + http:///ip + + Body:: + + { + "uuid" : "str: LOCAL_UUID", + "ip": "str: provided by param" + } + + + :param newip: New ipaddress to be annouced. + """ for key, ip in self._redis_store.get_producer_list().items(): @@ -32,12 +64,44 @@ class ProducerCommunicator: class ConsumerCommunicator: + """ + This class is used to communicate with consumers. + The addresses of consumers are fetched from the `RedisSuperStorage`. + """ def __init__(self, redis_store: RedisSuperStorage): + """ + Upon creating this object. A requests session is created on order to take advantage of keep-alive connections. + + :param redis_store: A `RedisSuperStorage` instance. + """ self._redis_store = redis_store self._session = requests.Session() def targeted_snyc(self, ip: str): + """ + This method works similarly to `sync_all` however the target is not fetched from the `RedisSuperStorage` instance. + The results are processed the same way (saved to redis). + + A timeout of 5 seconds is hardcoded for this function. Timeout is logged as warning. + + This method is preferred when the remote is unknown (uuid is unknown). Mostly when the application just started up, + and an initial syncrhronization to all consumers is required. + + See `sync_all` for more information. + + Called URL:: + + http:///sync + + Body:: + + { + "uuid" : "str: LOCAL_UUID" + } + + :param ip: The ip address of the consumer to be synced to. + """ try: # request synchronization response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) @@ -50,6 +114,28 @@ class ConsumerCommunicator: self._redis_store.update_consumer(response.json()['uuid'], ip) def sync_all(self): + """ + This method is used to syncronize with each known consumer. + The list of consumers are acquired from the `RedisSuperStorage` instance. + + This syncrhonization run causes the followings: + * Each consumer known by this consumer is checked for availability + * Each consumer this consumer communicated with updated the availability of this consumer. + * Each consumer which had no information of this consumer, now have. + + A timeout of 5 seconds is hardcoded for each consumer individually. Timeout is logged as warning. + + Called URL:: + + http:///sync + + Body:: + + { + "uuid" : "str: LOCAL_UUID" + } + + """ for uuid, info in self._redis_store.get_consumer_list().items(): ip = info['ip'] try: From 13842daf1c1033fa5d0aa1801f57346a2c85710f Mon Sep 17 00:00:00 2001 From: marcsello Date: Thu, 14 May 2020 20:47:52 +0200 Subject: [PATCH 6/6] Updated docs --- README.rst | 77 +++++++++++++++++++++++++++++++++++++++++++++++-- docs/config.rst | 19 ++++++++++++ docs/index.rst | 2 ++ 3 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 docs/config.rst diff --git a/README.rst b/README.rst index 31ce0f0..55ce536 100644 --- a/README.rst +++ b/README.rst @@ -2,6 +2,79 @@ P2P Consumer Scheduler ====================== -Repository for the consumer's scheduler +The scheduler part of a consumer system for the P2P storage system. + +Produced by GoldenPogácsa Inc. + +Basics +------ +This is a component of a fully working consumer system. +In order to setup a consumer you will need the consumer API as well and a Redis database. + +This component faciliates the automatic synchronization between consumers. +As well as detecting IP changes of the host which is running on. + +Part of a system +---------------- + +This component works as a complimentary part for the consumer API. + +The information exchange between the API is solved by using a Common Redis database. +This database must be shared between the API and the Scheduler instance. + +Also the UUID is shared between the two components. This is obvious since they are essentially both parts of the same system. + + +Communications +-------------- + +The communication between other consumers and producers are solved by their REST API endpoints. + +The follwing events will cause communication by this object: + * A sychronization task + * An IP address change detected + +Since the communication between consumers happens regularly the IP address change event does not cause an immediate synchronization +However communication originated by scheduler toward producers only happens when the IP address change is detected. + +Synchronization +~~~~~~~~~~~~~~~ + +This call originated by the scheduler and happens regularly (See. configuration). + +A synchronization task causes the followings: + * Each consumer known by this consumer is checked for availability + * Each consumer this consumer communicated with updated the availability of this consumer. + * Each consumer which had no information of this consumer, now have. + +Consumers learn about each other, and their IP addresses (and changes) during a synchronization. + +Called URL:: + + http:///sync + +Body:: + + { + "uuid" : "str: LOCAL_UUID" + } + + +IP update +~~~~~~~~~ + +This communication is originated by the scheduler when an IP address change is detected. + +This call is used to let producers immanently know about an IP change of their consumer, so that they can converge faster. + +Called URL:: + + http:///ip + +Body:: + + { + "uuid" : "str: LOCAL_UUID", + "ip": "str: provided by param" + } -Functionality: updates the database of the available consumers periodically diff --git a/docs/config.rst b/docs/config.rst new file mode 100644 index 0000000..be44f99 --- /dev/null +++ b/docs/config.rst @@ -0,0 +1,19 @@ +============= +Configuration +============= + +This software component can be configured via environmental variables, which are very useful in a containerized environment. + ++--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Variable | Default | Description | ++====================+==========================+=====================================================================================================================================================================+ +| `LOCAL_UUID` | **N/A** | The UUID of the consumer system. Must be the same as the API's | ++--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| `RUN_INTERVAL` | 10 | Interval between synchronizations, and local ip checks. | ++--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| `CUSTOMER_TIMEOUT` | 30 | Default timeout to be set for a consumer. If the timeout expires, the consumer will be considered invalid, and no further attempts will be made to contact with it. | ++--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| `REDIS_URL` | redis://localhost:6379/0 | URL of the Redis database shared with the consumer api | ++--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| `INITIAL_SERVERS` | **N/A** | A comma separated list of the initially known consumers (Can be empty, but must be specified) | ++--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 10efaa5..56ecc01 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -11,9 +11,11 @@ Welcome to Consumer Scheduler's documentation! :caption: Contents: readme + config source/modules + Indices and tables ==================