legacy/EDGE_CODE/guard_node_device.py

143 lines
5.3 KiB
Python

import time
import threading
import glob
import asyncio
import websockets
import buzzer
import dht_22
import audio_stream_classifier as a_s_c
from multiprocessing import Process
from os import listdir
from os.path import isfile,join,getctime
from scipy.io import wavfile
from azure.iot.device import IoTHubDeviceClient, Message, MethodResponse
CONNECTION_STRING = "HostName=AgriHub.azure-devices.net;DeviceId=Guard_Node_00001;SharedAccessKey=8zv3Rh3zmTu7Eqw75TgTBJ/8Dtepc3JvIUum9yLzmvs="
MSG_TXT = '{{"d":{d},"h":{h},"t":{t},"b":{b},"st":{st},"s":{sw},"g":{g},"w":{w},"f":{f},"p":{sp},"c":{c},"tem":{tem:0.1f},"hum":{hum:0.1f}}}'
PATH='Last60Sample'
INTERVAL = 15
EXTRA_INTERVAL = 0
STURNUS_ALERT = False
WS_SERVER_URI= "ws://192.168.1.71:8765"
def iothub_client_init():
client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
return client
async def send_wav(file_name):
print('Start Sending wav')
try:
async with websockets.connect(WS_SERVER_URI) as websocket:
sr, x = wavfile.read(file_name)
byte_message = x.tobytes()
await websocket.send(byte_message)
print("Wav sent")
reply = await websocket.recv()
print("Reply arrived:" + reply)
except:
print('Server unavailable!')
def start_recording():
print ("Start Recording")
a_s_c.start(1,"svm5ClassesAGRIREDUCED")
print ("Recording Started")
def downlink_message_listener(device_client):
while True:
method_request = device_client.receive_method_request()
print (
"\nDownlink message arrived with with:\nmethodName = {method_name}".format(
method_name=method_request.name
)
)
if method_request.name == "RequestAudioSample":
try:
list_of_files = glob.glob("Last60Sample/*sturnus.wav")
last_sturnus = max(list_of_files,key=getctime)
asyncio.run(send_wav(file_name=last_sturnus))
except ValueError:
response_payload = {"Response": "Invalid parameter"}
response_status = 400
else:
response_payload = {"Response": "Executed direct method {}".format(method_request.name)}
response_status = 200
elif method_request.name == "RequestAlert":
try:
global STURNUS_ALERT
STURNUS_ALERT = True
except ValueError:
response_payload = {"Response": "Invalid parameter"}
response_status = 400
else:
response_payload = {"Response": "Executed direct method {}".format(method_request.name)}
response_status = 200
else:
response_payload = {"Response": "Direct method {} not defined".format(method_request.name)}
response_status = 404
method_response = MethodResponse(method_request.request_id, response_status, payload=response_payload)
device_client.send_method_response(method_response)
def iothub_client_telemetry():
global STURNUS_ALERT
global EXTRA_INTERVAL
try:
BUZZER = buzzer.Buzzer()
DHT22 = dht_22.dht22()
client = iothub_client_init()
# Start a thread to listen downlink messages
device_method_thread = threading.Thread(target=downlink_message_listener, args=(client,))
device_method_thread.daemon = True
device_method_thread.start()
print("Waiting 60 seconds to fill the buffer")
time.sleep(61)
print ( "IoT Hub device sending periodic messages, press Ctrl-C to exit" )
while True:
only_files = [f[10:-4] for f in listdir(PATH) if isfile(join(PATH,f))]
DOG = only_files.count('dog')
HOOPOE = only_files.count('hoopoe')
TRACTOR = only_files.count('tractor')
BLACKBIRD = only_files.count('blackbird')
STURNUS = only_files.count('sturnus')
SWALLOW = only_files.count('swallow')
GOOSE = only_files.count('goose')
WIND = only_files.count('wind')
FIELD = only_files.count('field')
SPARROW = only_files.count('sparrow')
CRUSHAT = only_files.count('crushat')
HUMIDITY, TEMPERATURE = DHT22.get_humidity_temperature()
msg_txt_formatted = MSG_TXT.format(d=DOG,h=HOOPOE,t=TRACTOR,b=BLACKBIRD,st=STURNUS,sw=SWALLOW,g=GOOSE,w=WIND,f=FIELD,sp=SPARROW,c=CRUSHAT,tem=TEMPERATURE,hum=HUMIDITY)
message = Message(msg_txt_formatted)
# Add a custom application property to the message.
if STURNUS_ALERT == True:
message.custom_properties["sturnus_alert"] = True
BUZZER.beep()
STURNUS_ALERT = False
else:
message.custom_properties["sturnus_alert"] = False
# Send the message.
print( "Sending message: {}".format(message) )
client.send_message(message)
print( "Message sent" )
print( "Sleeping: ", (INTERVAL + EXTRA_INTERVAL))
time.sleep(INTERVAL + EXTRA_INTERVAL)
EXTRA_INTERVAL = 0
if(STURNUS_ALERT):
EXTRA_INTERVAL = 15
except KeyboardInterrupt:
print ( "IoTHubClient stopped" )
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
if __name__ == '__main__':
print ( "Press Ctrl-C to exit" )
runInParallel(start_recording, iothub_client_telemetry)