upload NK edge code

This commit is contained in:
Torma Kristóf 2020-07-13 19:42:32 +02:00
parent 52db1f209e
commit a2a6263eef
4 changed files with 275 additions and 0 deletions

View File

@ -0,0 +1,98 @@
import os
import alsaaudio
import audioop
import numpy
import shutil
import asyncio
import time
import datetime
import collections
import scipy.io.wavfile as wavfile
from pyAudioAnalysis import MidTermFeatures as mtF
from pyAudioAnalysis import audioTrainTest as aT
fs = 44100
start_time = 0
daily_counter = 0
std = None
mean = None
classifier = None
sec_60_que = None
class_names = None
microphone = 'hw:CARD=LX3000,DEV=0'
record_path = "Last60Sample/"
permanent_path = "DailySamples/"
conv = datetime.datetime.fromtimestamp
def background( f ):
def wrapped(*args,**kwargs):
return asyncio.get_event_loop().run_in_executor(None,f,*args,*kwargs)
return wrapped
def record_analyze_audio( mid_term_buffer_size_in_sec ):
print("Press Ctr+C to stop recording")
inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NONBLOCK,device=microphone)
inp.setchannels(1)
inp.setrate(fs)
inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
inp.setperiodsize(512)
mid_term_buffer_size = int(mid_term_buffer_size_in_sec * fs)
cur_window = []
append = cur_window.append
mid_term_buffer = []
d = os.path.dirname(record_path)
if os.path.exists(d) and record_path!=".":
shutil.rmtree(record_path)
os.makedirs(record_path)
d = os.path.dirname(permanent_path)
if os.path.exists(d) and permanent_path!=".":
shutil.rmtree(permanent_path)
os.makedirs(permanent_path)
while 1:
l,data = inp.read()
if l :
[append(audioop.getsample(data, 2, i)) for i in range(l)]
if (len(cur_window)+len(mid_term_buffer)>mid_term_buffer_size):
samples_to_copy_to_mid_buffer = mid_term_buffer_size - len(mid_term_buffer)
else:
samples_to_copy_to_mid_buffer = len(cur_window)
mid_term_buffer += cur_window[0:samples_to_copy_to_mid_buffer]
del(cur_window[0:samples_to_copy_to_mid_buffer])
if len(mid_term_buffer) == mid_term_buffer_size:
predict(mid_term_buffer)
mid_term_buffer = []
@background
def predict( mid_term_buffer ):
global sec_60_que
global daily_counter
[mt_features, st_features, _] = mtF.mid_feature_extraction(mid_term_buffer, fs, 2.0*fs, 2.0*fs, 0.020*fs, 0.020*fs)
curFV = (mt_features[:,0] - mean) / std
[result,P] = aT.classifier_wrapper(classifier, 'svm', curFV)
cur_wav_file_name = "%s%s_%s.wav"%(record_path,conv(time.time()).strftime('%S,%f'),str(class_names[int(result)]))
permanent_name = "%s/%s_%s.wav"%(permanent_path,conv(time.time()).strftime('%H:%M_%S,%f'),str(class_names[int(result)]))
mid_term_buffer_array = numpy.int16(mid_term_buffer)
wav_to_remove=sec_60_que.popleft()
if os.path.exists(str(wav_to_remove)):
os.remove(wav_to_remove)
if daily_counter != conv(time.time()).strftime('%d'):
shutil.rmtree(permanent_path)
daily_counter = conv(time.time()).strftime('%d')
wavfile.write(cur_wav_file_name, fs, mid_term_buffer_array)
if not os.path.exists(permanent_path) :
os.makedirs(permanent_path)
wavfile.write(permanent_name, fs, mid_term_buffer_array)
sec_60_que.append(cur_wav_file_name)
def start( duration_of_record,name_of_model ):
global classifier
global mean
global std
global class_names
global sec_60_que
duration = int(duration_of_record)
model_name = name_of_model
sec_60_que = collections.deque(int(60/duration)*[0],int(60/duration))
[classifier, mean,std,class_names,mt_win,mt_step,st_win,st_step, compute_beat] = aT.load_model(model_name)
record_analyze_audio( mid_term_buffer_size_in_sec = duration )

25
EDGE_CODE/buzzer.py Normal file
View File

@ -0,0 +1,25 @@
import asyncio
import RPi.GPIO as GPIO
from time import sleep
class Buzzer:
def __init__(self):
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
self.buzzer=23
GPIO.setup(self.buzzer,GPIO.OUT)
def background(f):
def wrapped(*args,**kwargs):
return asyncio.get_event_loop().run_in_executor(None,f,*args,*kwargs)
return wrapped
@background
def beep(self):
i=0
while i < 3 :
GPIO.output(self.buzzer,GPIO.HIGH)
sleep(0.2)
GPIO.output(self.buzzer,GPIO.LOW)
sleep(0.8)
i+=1

9
EDGE_CODE/dht_22.py Normal file
View File

@ -0,0 +1,9 @@
import Adafruit_DHT
class dht22:
def __init__(self):
self.dht_sensor = Adafruit_DHT.DHT22
self.dht_pin = 4
def get_humidity_temperature(self):
return Adafruit_DHT.read_retry(self.dht_sensor, self.dht_pin)

View File

@ -0,0 +1,143 @@
import time
import threading
import glob
import asyncio
import websockets
import buzzer
import dht_22
import audio_stream_classifier as a_s_c
from multiprocessing import Process
from os import listdir
from os.path import isfile,join,getctime
from scipy.io import wavfile
from azure.iot.device import IoTHubDeviceClient, Message, MethodResponse
CONNECTION_STRING = "HostName=AgriHub.azure-devices.net;DeviceId=Guard_Node_00001;SharedAccessKey=8zv3Rh3zmTu7Eqw75TgTBJ/8Dtepc3JvIUum9yLzmvs="
MSG_TXT = '{{"d":{d},"h":{h},"t":{t},"b":{b},"st":{st},"s":{sw},"g":{g},"w":{w},"f":{f},"p":{sp},"c":{c},"tem":{tem:0.1f},"hum":{hum:0.1f}}}'
PATH='Last60Sample'
INTERVAL = 15
EXTRA_INTERVAL = 0
STURNUS_ALERT = False
WS_SERVER_URI= "ws://192.168.1.71:8765"
def iothub_client_init():
client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
return client
async def send_wav(file_name):
print('Start Sending wav')
try:
async with websockets.connect(WS_SERVER_URI) as websocket:
sr, x = wavfile.read(file_name)
byte_message = x.tobytes()
await websocket.send(byte_message)
print("Wav sent")
reply = await websocket.recv()
print("Reply arrived:" + reply)
except:
print('Server unavailable!')
def start_recording():
print ("Start Recording")
a_s_c.start(1,"svm5ClassesAGRIREDUCED")
print ("Recording Started")
def downlink_message_listener(device_client):
while True:
method_request = device_client.receive_method_request()
print (
"\nDownlink message arrived with with:\nmethodName = {method_name}".format(
method_name=method_request.name
)
)
if method_request.name == "RequestAudioSample":
try:
list_of_files = glob.glob("Last60Sample/*sturnus.wav")
last_sturnus = max(list_of_files,key=getctime)
asyncio.run(send_wav(file_name=last_sturnus))
except ValueError:
response_payload = {"Response": "Invalid parameter"}
response_status = 400
else:
response_payload = {"Response": "Executed direct method {}".format(method_request.name)}
response_status = 200
elif method_request.name == "RequestAlert":
try:
global STURNUS_ALERT
STURNUS_ALERT = True
except ValueError:
response_payload = {"Response": "Invalid parameter"}
response_status = 400
else:
response_payload = {"Response": "Executed direct method {}".format(method_request.name)}
response_status = 200
else:
response_payload = {"Response": "Direct method {} not defined".format(method_request.name)}
response_status = 404
method_response = MethodResponse(method_request.request_id, response_status, payload=response_payload)
device_client.send_method_response(method_response)
def iothub_client_telemetry():
global STURNUS_ALERT
global EXTRA_INTERVAL
try:
BUZZER = buzzer.Buzzer()
DHT22 = dht_22.dht22()
client = iothub_client_init()
# Start a thread to listen downlink messages
device_method_thread = threading.Thread(target=downlink_message_listener, args=(client,))
device_method_thread.daemon = True
device_method_thread.start()
print("Waiting 60 seconds to fill the buffer")
time.sleep(61)
print ( "IoT Hub device sending periodic messages, press Ctrl-C to exit" )
while True:
only_files = [f[10:-4] for f in listdir(PATH) if isfile(join(PATH,f))]
DOG = only_files.count('dog')
HOOPOE = only_files.count('hoopoe')
TRACTOR = only_files.count('tractor')
BLACKBIRD = only_files.count('blackbird')
STURNUS = only_files.count('sturnus')
SWALLOW = only_files.count('swallow')
GOOSE = only_files.count('goose')
WIND = only_files.count('wind')
FIELD = only_files.count('field')
SPARROW = only_files.count('sparrow')
CRUSHAT = only_files.count('crushat')
HUMIDITY, TEMPERATURE = DHT22.get_humidity_temperature()
msg_txt_formatted = MSG_TXT.format(d=DOG,h=HOOPOE,t=TRACTOR,b=BLACKBIRD,st=STURNUS,sw=SWALLOW,g=GOOSE,w=WIND,f=FIELD,sp=SPARROW,c=CRUSHAT,tem=TEMPERATURE,hum=HUMIDITY)
message = Message(msg_txt_formatted)
# Add a custom application property to the message.
if STURNUS_ALERT == True:
message.custom_properties["sturnus_alert"] = True
BUZZER.beep()
STURNUS_ALERT = False
else:
message.custom_properties["sturnus_alert"] = False
# Send the message.
print( "Sending message: {}".format(message) )
client.send_message(message)
print( "Message sent" )
print( "Sleeping: ", (INTERVAL + EXTRA_INTERVAL))
time.sleep(INTERVAL + EXTRA_INTERVAL)
EXTRA_INTERVAL = 0
if(STURNUS_ALERT):
EXTRA_INTERVAL = 15
except KeyboardInterrupt:
print ( "IoTHubClient stopped" )
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
if __name__ == '__main__':
print ( "Press Ctrl-C to exit" )
runInParallel(start_recording, iothub_client_telemetry)