Merge branch 'dev'
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Pünkösd Marcell 2020-05-14 20:48:57 +02:00
commit 3cefcabad2
12 changed files with 338 additions and 14 deletions

View File

@ -2,8 +2,79 @@
P2P Consumer Scheduler P2P Consumer Scheduler
====================== ======================
Repository for the consumer's scheduler The scheduler part of a consumer system for the P2P storage system.
Functionality: updates the database of the available consumers periodically
Produced by GoldenPogácsa Inc. Produced by GoldenPogácsa Inc.
Basics
------
This is a component of a fully working consumer system.
In order to setup a consumer you will need the consumer API as well and a Redis database.
This component faciliates the automatic synchronization between consumers.
As well as detecting IP changes of the host which is running on.
Part of a system
----------------
This component works as a complimentary part for the consumer API.
The information exchange between the API is solved by using a Common Redis database.
This database must be shared between the API and the Scheduler instance.
Also the UUID is shared between the two components. This is obvious since they are essentially both parts of the same system.
Communications
--------------
The communication between other consumers and producers are solved by their REST API endpoints.
The follwing events will cause communication by this object:
* A sychronization task
* An IP address change detected
Since the communication between consumers happens regularly the IP address change event does not cause an immediate synchronization
However communication originated by scheduler toward producers only happens when the IP address change is detected.
Synchronization
~~~~~~~~~~~~~~~
This call originated by the scheduler and happens regularly (See. configuration).
A synchronization task causes the followings:
* Each consumer known by this consumer is checked for availability
* Each consumer this consumer communicated with updated the availability of this consumer.
* Each consumer which had no information of this consumer, now have.
Consumers learn about each other, and their IP addresses (and changes) during a synchronization.
Called URL::
http://<consumer ip>/sync
Body::
{
"uuid" : "str: LOCAL_UUID"
}
IP update
~~~~~~~~~
This communication is originated by the scheduler when an IP address change is detected.
This call is used to let producers immanently know about an IP change of their consumer, so that they can converge faster.
Called URL::
http://<producer ip>/ip
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip": "str: provided by param"
}

29
app.py
View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
This is the top level module, containing the main application. Launching this file will launch the scheduler part of the consumer application.
"""
import sentry_sdk import sentry_sdk
import time import time
import os import os
@ -8,10 +11,6 @@ from redis_super_storage import RedisSuperStorage
from communicators import ConsumerCommunicator, ProducerCommunicator from communicators import ConsumerCommunicator, ProducerCommunicator
from ip_watchdog import IPWatchdog from ip_watchdog import IPWatchdog
"""
Scheduler
"""
__author__ = "@kocsisr" __author__ = "@kocsisr"
__copyright__ = "Copyright 2020, GoldenPogácsa Team" __copyright__ = "Copyright 2020, GoldenPogácsa Team"
__module_name__ = "app" __module_name__ = "app"
@ -20,13 +19,33 @@ __version__text__ = "1"
sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10") sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10")
def get_initial_ip_list(): def get_initial_ip_list() -> list:
"""
This method is used to parse the content of INITIAL_SERVERS environment variable.
The contents of this variable is a list of ip addresses separated by commas.
This mehod returns a Python native `list` object containing the addreseses provided.
:return: A list of ip addresses provided by the environmental variable
"""
ip_list = os.environ['INITIAL_SERVERS'].split(',') ip_list = os.environ['INITIAL_SERVERS'].split(',')
logging.debug('Initial ip list ' + ", ".join(ip_list)) logging.debug('Initial ip list ' + ", ".join(ip_list))
return ip_list return ip_list
def main(): def main():
"""
This is the main method of the scheduler application.
This method does basically the followings:
* Sets up logging
* Creates the RedisSuperStorage object (connecting to Redis database)
* Sets up communicators and IP watchdog
* Performs an initial synchronization to all other consumers.
* Starts the main loop which does roughly the following:
* Syncrhronizes with all other consumers
* Check if ip address changed. If yes, then push updates to all producers.
* Wait `RUN_INTERVAL` seconds (provided by envvar)
"""
# set logging preferences # set logging preferences
logging.basicConfig(filename='', level=logging.DEBUG) logging.basicConfig(filename='', level=logging.DEBUG)

View File

@ -1,4 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
This module contains the classes used for communicating with other consumers as well as the producers themselves.
"""
import os import os
import logging import logging
import requests import requests
@ -7,12 +11,44 @@ from redis_super_storage import RedisSuperStorage
class ProducerCommunicator: class ProducerCommunicator:
"""
This class is used to communicate with producers.
The addresses of producers are fetched from `RedisSuperStorage`.
"""
def __init__(self, redis_store: RedisSuperStorage): def __init__(self, redis_store: RedisSuperStorage):
"""
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
:param redis_store: A `RedisSuperStorage` instance.
"""
self._redis_store = redis_store self._redis_store = redis_store
self._session = requests.Session() self._session = requests.Session()
def push_ip_update(self, newip: str): def push_ip_update(self, newip: str):
"""
This method is used to push an ip update to all known consumers.
The list of consumers are read from the `RedisSuperStorage` instance.
(The list of producers are maintained by the api endpoint.)
The uuid of this consumer is acquired directly from the `LOCAL_UUID` envvar.
A timeout of 5 seconds is hardcoded for each producer individually. Timeout is logged as warning.
Called URL::
http://<producer ip>/ip
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip": "str: provided by param"
}
:param newip: New ipaddress to be annouced.
"""
for key, ip in self._redis_store.get_producer_list().items(): for key, ip in self._redis_store.get_producer_list().items():
@ -28,12 +64,44 @@ class ProducerCommunicator:
class ConsumerCommunicator: class ConsumerCommunicator:
"""
This class is used to communicate with consumers.
The addresses of consumers are fetched from the `RedisSuperStorage`.
"""
def __init__(self, redis_store: RedisSuperStorage): def __init__(self, redis_store: RedisSuperStorage):
"""
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
:param redis_store: A `RedisSuperStorage` instance.
"""
self._redis_store = redis_store self._redis_store = redis_store
self._session = requests.Session() self._session = requests.Session()
def targeted_snyc(self, ip: str): def targeted_snyc(self, ip: str):
"""
This method works similarly to `sync_all` however the target is not fetched from the `RedisSuperStorage` instance.
The results are processed the same way (saved to redis).
A timeout of 5 seconds is hardcoded for this function. Timeout is logged as warning.
This method is preferred when the remote is unknown (uuid is unknown). Mostly when the application just started up,
and an initial syncrhronization to all consumers is required.
See `sync_all` for more information.
Called URL::
http://<consumer ip>/sync
Body::
{
"uuid" : "str: LOCAL_UUID"
}
:param ip: The ip address of the consumer to be synced to.
"""
try: try:
# request synchronization # request synchronization
response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
@ -46,6 +114,28 @@ class ConsumerCommunicator:
self._redis_store.update_consumer(response.json()['uuid'], ip) self._redis_store.update_consumer(response.json()['uuid'], ip)
def sync_all(self): def sync_all(self):
"""
This method is used to syncronize with each known consumer.
The list of consumers are acquired from the `RedisSuperStorage` instance.
This syncrhonization run causes the followings:
* Each consumer known by this consumer is checked for availability
* Each consumer this consumer communicated with updated the availability of this consumer.
* Each consumer which had no information of this consumer, now have.
A timeout of 5 seconds is hardcoded for each consumer individually. Timeout is logged as warning.
Called URL::
http://<consumer ip>/sync
Body::
{
"uuid" : "str: LOCAL_UUID"
}
"""
for uuid, info in self._redis_store.get_consumer_list().items(): for uuid, info in self._redis_store.get_consumer_list().items():
ip = info['ip'] ip = info['ip']
try: try:

View File

@ -10,9 +10,11 @@
# add these directories to sys.path here. If the directory is relative to the # add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here. # documentation root, use os.path.abspath to make it absolute, like shown here.
# #
# import os import os
# import sys import sys
# sys.path.insert(0, os.path.abspath('.')) sys.path.insert(0, os.path.abspath('..'))
#import app
# -- Project information ----------------------------------------------------- # -- Project information -----------------------------------------------------
@ -30,8 +32,7 @@ release = '0.1.0'
# Add any Sphinx extension module names here, as strings. They can be # Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones. # ones.
extensions = [ extensions = ['sphinx.ext.autodoc']
]
# Add any paths that contain templates here, relative to this directory. # Add any paths that contain templates here, relative to this directory.
templates_path = [] templates_path = []
@ -53,3 +54,7 @@ html_theme = 'sphinx_rtd_theme'
# relative to this directory. They are copied after the builtin static files, # relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css". # so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = [] html_static_path = []
# master document
master_doc = 'index'
autoclass_content = 'both'

19
docs/config.rst Normal file
View File

@ -0,0 +1,19 @@
=============
Configuration
=============
This software component can be configured via environmental variables, which are very useful in a containerized environment.
+--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Variable | Default | Description |
+====================+==========================+=====================================================================================================================================================================+
| `LOCAL_UUID` | **N/A** | The UUID of the consumer system. Must be the same as the API's |
+--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `RUN_INTERVAL` | 10 | Interval between synchronizations, and local ip checks. |
+--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `CUSTOMER_TIMEOUT` | 30 | Default timeout to be set for a consumer. If the timeout expires, the consumer will be considered invalid, and no further attempts will be made to contact with it. |
+--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `REDIS_URL` | redis://localhost:6379/0 | URL of the Redis database shared with the consumer api |
+--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `INITIAL_SERVERS` | **N/A** | A comma separated list of the initially known consumers (Can be empty, but must be specified) |
+--------------------+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -11,9 +11,11 @@ Welcome to Consumer Scheduler's documentation!
:caption: Contents: :caption: Contents:
readme readme
config
source/modules source/modules
Indices and tables Indices and tables
================== ==================

View File

@ -0,0 +1,7 @@
communicators module
====================
.. automodule:: communicators
:members:
:undoc-members:
:show-inheritance:

View File

@ -0,0 +1,7 @@
ip\_watchdog module
===================
.. automodule:: ip_watchdog
:members:
:undoc-members:
:show-inheritance:

View File

@ -5,3 +5,6 @@ consumer-scheduler
:maxdepth: 4 :maxdepth: 4
app app
communicators
ip_watchdog
redis_super_storage

View File

@ -0,0 +1,7 @@
redis\_super\_storage module
============================
.. automodule:: redis_super_storage
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
This module contains the IPWatchdog class which is responsible for detecting ip address changes on the host machine.
"""
from typing import Tuple from typing import Tuple
import logging import logging
import socket import socket
@ -7,12 +10,35 @@ from redis_super_storage import RedisSuperStorage
class IPWatchdog: class IPWatchdog:
"""
This is very simple class, that is used to determine if the ip address of the host have changed.
Internally this class relies on `RedisSuperStorage` to fetch the last used ip address.
The ip address of the current host is acquired using python's builtin `socket` interface, by requesting an address resolve agains the localhost's host name.
In some scenarios this may result in wrongly identifying the loopback address instead of the assigned address.
In most cases, where the application is run inside a docker container this method will work just fine.
"""
def __init__(self, redis_store: RedisSuperStorage): def __init__(self, redis_store: RedisSuperStorage):
"""
During the construction of the object, the host name is of the current machine is cached, for quicker lookups.
:param redis_store: a RedisSuperStorage instance.
"""
self._redis_store = redis_store self._redis_store = redis_store
self._host_name = socket.gethostname() self._host_name = socket.gethostname()
def ip_changed(self) -> Tuple[bool, str]: def ip_changed(self) -> Tuple[bool, str]:
"""
This method fetches the last ip address from the RedisSuperStorage instance, and compares it to the current local address.
If the ip address changes the new value is automatically stored in the RedisSuperStorage instance.
Detection is performed upon calling this method, as well as storing the updated address.
:return: (changed, ip_address) A tuple with two members, where the first member indicates if the ip address is changed, the second member is the current ip address.
"""
old_ip = self._redis_store.current_ip old_ip = self._redis_store.current_ip
current_ip = socket.gethostbyname(self._host_name) current_ip = socket.gethostbyname(self._host_name)

View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
This module contains the RedisSuperStorage module which is responsible to store and load the data structure used by the constumer.
"""
import redis import redis
import os import os
import json import json
@ -6,12 +9,50 @@ import time
class RedisSuperStorage: class RedisSuperStorage:
"""
This class is the brain of the scheduler app, and probably half the brain of the whole consumer system.
This class provides access to the data structures used in the consumer system trough various getter and setter methods.
Kind of like an ORM would do.
Sometimes it is necessary to access more than one time to the redis database while qcquiring or setting some data.
In this cases some basic error handling is implemented to avoid most of the problems that could be caused by non-atomic operations.
"""
def __init__(self, redis_url: str, timeout: int): def __init__(self, redis_url: str, timeout: int):
"""
During the object creation the Redis connection is attempted to be established.
:param redis_url: URL of the redis database in the following form: `redis://localhost:6379/0`
:param timeout: Timeout to be set on keys, where it matters. (See the documentation for each method)
"""
self.r = redis.from_url(redis_url) self.r = redis.from_url(redis_url)
self._timeout = timeout self._timeout = timeout
def get_consumer_list(self) -> dict: def get_consumer_list(self) -> dict:
"""
This function gets the list of consumers from the Redis database.
Consumers are saved to the database either by the API endpoint, or calling `update_consumer` of this class.
The consumers are volatile, they have a timeout set when they updated.
If they are not updated within that timeout they considered invalid, and does not returned by this method.
Each consumer is represented by the following structure::
{
"uuid" : "str",
"ip" : "str",
"last_seen": int
}
Meaning of the fields:
* `uuid` - The UUID of the remote consumer
* `ip` - The IP address of the remote consumer
* `last_seen` - The UNIX timestamp when the consumer was last seen.
:return: Despite it's name, this function returns a dictionary in which the keys are the uuids of each consumer.
"""
keys = self.r.keys('consumer_*') keys = self.r.keys('consumer_*')
list_of_customers = {} list_of_customers = {}
@ -25,6 +66,15 @@ class RedisSuperStorage:
return list_of_customers return list_of_customers
def get_producer_list(self) -> dict: def get_producer_list(self) -> dict:
"""
This method returns a list of producer ip addresses, nothing more.
The producer ip addresses are volatile, they have a timeout set when they updated.
If they are not updated within that timeout they considered invalid, and does not returned by this method.
Producers are added to the redis database by the API endpoint with a timeout set on them.
:return: Despite it's name this function returns a dict... Similar to `get_consumer_list`. The keys are the keys stored in redis (lol)
"""
keys = self.r.keys('producer_*') keys = self.r.keys('producer_*')
list_of_producer_ip = {} list_of_producer_ip = {}
@ -38,6 +88,13 @@ class RedisSuperStorage:
return list_of_producer_ip return list_of_producer_ip
def update_consumer(self, uuid: str, ip: str): def update_consumer(self, uuid: str, ip: str):
"""
Updates (or creates) informations of a specific consumer in the redis database.
The default timeout is set on the keys, when stored in the database.
:param uuid: The uuid of the consumer to be updated.
:param ip: The ip address of that consumer.
"""
cust_key = f"consumer_{uuid}" cust_key = f"consumer_{uuid}"
@ -51,6 +108,11 @@ class RedisSuperStorage:
self.r.expire(cust_key, self._timeout) self.r.expire(cust_key, self._timeout)
def get_current_ip(self) -> str: def get_current_ip(self) -> str:
"""
This is a basic getter, which reads a single value from the Redis database.
:return: The ip address of the consumer stored in the redis database.
"""
ip = self.r.get('current_ip') ip = self.r.get('current_ip')
if ip: if ip:
@ -59,6 +121,12 @@ class RedisSuperStorage:
return ip return ip
def set_current_ip(self, ip: str): def set_current_ip(self, ip: str):
"""
This is the most basic setter in the whole object. This is stores a single value which is the ip address of the consumer.
The current ip in the redis storage does not time out.
:param ip: IP address to be set.
"""
self.r.set('current_ip', ip.encode('utf-8')) self.r.set('current_ip', ip.encode('utf-8'))
current_ip = property(get_current_ip, set_current_ip) current_ip = property(get_current_ip, set_current_ip)