iot-platform-raspberry/birbnetes_iot_platform_rasp.../record_stuff.py

124 lines
4.2 KiB
Python

import logging
from typing import Optional, Tuple
import alsaaudio
import tempfile
from queue import Queue, Empty
from threading import Thread
import wave
import os
SAMPLE_RATE = 44100
MICROPHONE = 'default'
DEST_FOLDER = "/dev/shm/"
class SlicedRecorder(Thread):
BYTES_PER_SAMPLE = 2 # 16bit
def __init__(self, slice_size: float, sample_rate=SAMPLE_RATE, microphone=MICROPHONE, dest_folder=DEST_FOLDER):
super().__init__()
self._output_queue = Queue()
self._samples_per_slice = int(slice_size * sample_rate)
self._dest_folder = dest_folder
self._sample_rate = sample_rate
self._inp = alsaaudio.PCM(
alsaaudio.PCM_CAPTURE,
alsaaudio.PCM_NORMAL,
microphone,
channels=1,
rate=sample_rate,
format=alsaaudio.PCM_FORMAT_S16_LE, # = 2bytes
periodsize=512
)
self._active = True
def _request_new_file(self) -> Tuple[str, wave.Wave_write]:
file_handle, record_path = tempfile.mkstemp(prefix="rec", suffix=".wav", dir=self._dest_folder)
wavefile = wave.open(open(file_handle, 'wb'), 'wb')
wavefile.setnchannels(1) # mono
wavefile.setframerate(self._sample_rate)
wavefile.setsampwidth(self.BYTES_PER_SAMPLE) # 16bit
return record_path, wavefile
def run(self):
current_working_file_name, current_working_file = self._request_new_file()
current_samples_saved = 0
read_failures = 0
while self._active:
try:
length, data = self._inp.read()
except alsaaudio.ALSAAudioError as e:
logging.exception(e)
read_failures += 1
if read_failures >= 30:
logging.error("Too many ALSA Read errors. Bailing out...")
self._active = False
return
continue
if length > 0: # will be always larger than zero (except on error)
read_failures = 0
file_size_after_append = current_samples_saved + length
if file_size_after_append > self._samples_per_slice: # Appending this would cause a too big slice
samples_append_to_this_slice = self._samples_per_slice - current_samples_saved
current_working_file.writeframes(data[:(samples_append_to_this_slice * self.BYTES_PER_SAMPLE)])
current_working_file.close()
self._output_queue.put(current_working_file_name)
current_working_file_name, current_working_file = self._request_new_file()
current_working_file.writeframes(data[(samples_append_to_this_slice * self.BYTES_PER_SAMPLE):])
current_samples_saved = length - samples_append_to_this_slice
else: # it is safe to append, the resulting file will be smaller than the slice
current_working_file.writeframes(data)
current_samples_saved = file_size_after_append
def get_recording(self, blocking: bool) -> Optional[str]:
try:
return self._output_queue.get(block=blocking)
except Empty:
return None
def get_queue_length(self) -> int:
return self._output_queue.qsize()
def stop(self):
self._active = False
class BirbnetesIoTPlatformRecordDriver:
sliced_recorder = None
@classmethod
def init(cls, sample_length_sec: float, sample_rate=SAMPLE_RATE, microphone=MICROPHONE, dest_folder=DEST_FOLDER):
cls.sliced_recorder = SlicedRecorder(sample_length_sec, sample_rate, microphone, dest_folder)
cls.sliced_recorder.start()
@classmethod
def get_recording(cls, blocking: bool = False) -> Optional[str]:
return cls.sliced_recorder.get_recording(blocking)
@classmethod
def get_queue_length(cls) -> int:
return cls.sliced_recorder.get_queue_length()
@classmethod
def cleanup(cls):
cls.sliced_recorder.stop()
while True:
fname = cls.sliced_recorder.get_recording(False)
if fname:
os.unlink(fname)
else:
break