IP is now saved to redis
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				continuous-integration/drone/push Build is passing
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	continuous-integration/drone/push Build is passing
				
			This commit is contained in:
		
							
								
								
									
										37
									
								
								app.py
									
									
									
									
									
								
							
							
						
						
									
										37
									
								
								app.py
									
									
									
									
									
								
							@@ -26,7 +26,7 @@ def main():
 | 
				
			|||||||
    logging.basicConfig(filename='', level=logging.DEBUG)
 | 
					    logging.basicConfig(filename='', level=logging.DEBUG)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # connect to redis
 | 
					    # connect to redis
 | 
				
			||||||
    r = redis.from_url(os.get('REDIS_URL', "redis://localhost:6379/0"))
 | 
					    r = redis.from_url(os.environ.get('REDIS_URL', "redis://localhost:6379/0"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # set initial consumer addresses
 | 
					    # set initial consumer addresses
 | 
				
			||||||
    ip_list = os.environ['INITIAL_SERVERS'].split(',')
 | 
					    ip_list = os.environ['INITIAL_SERVERS'].split(',')
 | 
				
			||||||
@@ -37,13 +37,13 @@ def main():
 | 
				
			|||||||
    temp_dict = {}
 | 
					    temp_dict = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    host_name = socket.gethostname()
 | 
					    host_name = socket.gethostname()
 | 
				
			||||||
    current_ip = socket.gethostbyname(host_name)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for ip in ip_list:
 | 
					    for ip in ip_list:
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            # request synchronization
 | 
					            # request synchronization
 | 
				
			||||||
            response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']})
 | 
					            response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
 | 
				
			||||||
        except requests.exceptions.ConnectionError:
 | 
					        except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
 | 
				
			||||||
 | 
					            logging.error(f"Error while syncing to {ip}: {str(e)}")
 | 
				
			||||||
            continue
 | 
					            continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if response.status_code == 200:
 | 
					        if response.status_code == 200:
 | 
				
			||||||
@@ -61,8 +61,9 @@ def main():
 | 
				
			|||||||
            ip = info['ip']
 | 
					            ip = info['ip']
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                # request synchronization
 | 
					                # request synchronization
 | 
				
			||||||
                response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']})
 | 
					                response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
 | 
				
			||||||
            except requests.exceptions.ConnectionError:
 | 
					            except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
 | 
				
			||||||
 | 
					                logging.error(f"Error while syncing to {ip}: {str(e)}")
 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if response.status_code == 200:
 | 
					            if response.status_code == 200:
 | 
				
			||||||
@@ -73,12 +74,18 @@ def main():
 | 
				
			|||||||
        r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8'))
 | 
					        r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8'))
 | 
				
			||||||
        logging.debug('Update redis consumer ip list from answers: Done')
 | 
					        logging.debug('Update redis consumer ip list from answers: Done')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        logging.debug('Waiting for next turn')
 | 
					        # Test ip change stuff
 | 
				
			||||||
        # wait for the next update time
 | 
					
 | 
				
			||||||
 | 
					        current_ip = r.get('current_ip')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not current_ip:  # Not set yet. I this case no update required
 | 
				
			||||||
 | 
					            current_ip = socket.gethostbyname(host_name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if current_ip != socket.gethostbyname(host_name):
 | 
					        if current_ip != socket.gethostbyname(host_name):
 | 
				
			||||||
            logging.debug('IP changed. Pushing updates...')
 | 
					            logging.info('IP changed. Pushing updates...')
 | 
				
			||||||
            current_ip = socket.gethostbyname(host_name)
 | 
					            current_ip = socket.gethostbyname(host_name)
 | 
				
			||||||
 | 
					            r.set('current_ip', current_ip.encode('utf-8'))
 | 
				
			||||||
 | 
					            # pushing update...
 | 
				
			||||||
            keys = r.keys('producer_*')
 | 
					            keys = r.keys('producer_*')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            logging.debug(f'Pushing update to the following producers: ' + ', '.join(k.decode('utf-8') for k in keys))
 | 
					            logging.debug(f'Pushing update to the following producers: ' + ', '.join(k.decode('utf-8') for k in keys))
 | 
				
			||||||
@@ -91,16 +98,20 @@ def main():
 | 
				
			|||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
                    response = requests.post(f"http://{ip}/ip",
 | 
					                    response = requests.post(
 | 
				
			||||||
                                             json={'uuid': os.environ['LOCAL_UUID'], 'ip': current_ip})
 | 
					                        f"http://{ip}/ip",
 | 
				
			||||||
 | 
					                        json={'uuid': os.environ['LOCAL_UUID'], 'ip': current_ip},
 | 
				
			||||||
 | 
					                        timeout=5
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
                    logging.debug(f"Pushed update to {key.decode('utf-8')} at {ip}. Response: {response.status_code}")
 | 
					                    logging.debug(f"Pushed update to {key.decode('utf-8')} at {ip}. Response: {response.status_code}")
 | 
				
			||||||
                except requests.exceptions.ConnectionError as e:
 | 
					                except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
 | 
				
			||||||
                    logging.warning(f"Could not push update to {key.decode('utf-8')}: {str(e)}")
 | 
					                    logging.warning(f"Could not push update to {key.decode('utf-8')}: {str(e)}")
 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            logging.debug('IP unchanged.')
 | 
					            logging.debug('IP unchanged.')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        time.sleep(30)
 | 
					        logging.debug('Waiting for next turn')
 | 
				
			||||||
 | 
					        time.sleep(os.environ.get("RUN_INTERVAL", 30))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == "__main__":
 | 
					if __name__ == "__main__":
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user