legacy/EDGE_CODE/audio_stream_classifier.py

98 lines
3.5 KiB
Python

import os
import alsaaudio
import audioop
import numpy
import shutil
import asyncio
import time
import datetime
import collections
import scipy.io.wavfile as wavfile
from pyAudioAnalysis import MidTermFeatures as mtF
from pyAudioAnalysis import audioTrainTest as aT
fs = 44100
start_time = 0
daily_counter = 0
std = None
mean = None
classifier = None
sec_60_que = None
class_names = None
microphone = 'hw:CARD=LX3000,DEV=0'
record_path = "Last60Sample/"
permanent_path = "DailySamples/"
conv = datetime.datetime.fromtimestamp
def background( f ):
def wrapped(*args,**kwargs):
return asyncio.get_event_loop().run_in_executor(None,f,*args,*kwargs)
return wrapped
def record_analyze_audio( mid_term_buffer_size_in_sec ):
print("Press Ctr+C to stop recording")
inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NONBLOCK,device=microphone)
inp.setchannels(1)
inp.setrate(fs)
inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
inp.setperiodsize(512)
mid_term_buffer_size = int(mid_term_buffer_size_in_sec * fs)
cur_window = []
append = cur_window.append
mid_term_buffer = []
d = os.path.dirname(record_path)
if os.path.exists(d) and record_path!=".":
shutil.rmtree(record_path)
os.makedirs(record_path)
d = os.path.dirname(permanent_path)
if os.path.exists(d) and permanent_path!=".":
shutil.rmtree(permanent_path)
os.makedirs(permanent_path)
while 1:
l,data = inp.read()
if l :
[append(audioop.getsample(data, 2, i)) for i in range(l)]
if (len(cur_window)+len(mid_term_buffer)>mid_term_buffer_size):
samples_to_copy_to_mid_buffer = mid_term_buffer_size - len(mid_term_buffer)
else:
samples_to_copy_to_mid_buffer = len(cur_window)
mid_term_buffer += cur_window[0:samples_to_copy_to_mid_buffer]
del(cur_window[0:samples_to_copy_to_mid_buffer])
if len(mid_term_buffer) == mid_term_buffer_size:
predict(mid_term_buffer)
mid_term_buffer = []
@background
def predict( mid_term_buffer ):
global sec_60_que
global daily_counter
[mt_features, st_features, _] = mtF.mid_feature_extraction(mid_term_buffer, fs, 2.0*fs, 2.0*fs, 0.020*fs, 0.020*fs)
curFV = (mt_features[:,0] - mean) / std
[result,P] = aT.classifier_wrapper(classifier, 'svm', curFV)
cur_wav_file_name = "%s%s_%s.wav"%(record_path,conv(time.time()).strftime('%S,%f'),str(class_names[int(result)]))
permanent_name = "%s/%s_%s.wav"%(permanent_path,conv(time.time()).strftime('%H:%M_%S,%f'),str(class_names[int(result)]))
mid_term_buffer_array = numpy.int16(mid_term_buffer)
wav_to_remove=sec_60_que.popleft()
if os.path.exists(str(wav_to_remove)):
os.remove(wav_to_remove)
if daily_counter != conv(time.time()).strftime('%d'):
shutil.rmtree(permanent_path)
daily_counter = conv(time.time()).strftime('%d')
wavfile.write(cur_wav_file_name, fs, mid_term_buffer_array)
if not os.path.exists(permanent_path) :
os.makedirs(permanent_path)
wavfile.write(permanent_name, fs, mid_term_buffer_array)
sec_60_que.append(cur_wav_file_name)
def start( duration_of_record,name_of_model ):
global classifier
global mean
global std
global class_names
global sec_60_que
duration = int(duration_of_record)
model_name = name_of_model
sec_60_que = collections.deque(int(60/duration)*[0],int(60/duration))
[classifier, mean,std,class_names,mt_win,mt_step,st_win,st_step, compute_beat] = aT.load_model(model_name)
record_analyze_audio( mid_term_buffer_size_in_sec = duration )