import logging from typing import Optional, Tuple import alsaaudio import tempfile from queue import Queue, Empty from threading import Thread import wave import os SAMPLE_RATE = 44100 MICROPHONE = 'default' DEST_FOLDER = "/dev/shm/" class SlicedRecorder(Thread): BYTES_PER_SAMPLE = 2 # 16bit def __init__(self, slice_size: float, sample_rate=SAMPLE_RATE, microphone=MICROPHONE, dest_folder=DEST_FOLDER): super().__init__() self._output_queue = Queue() self._samples_per_slice = int(slice_size * sample_rate) self._dest_folder = dest_folder self._sample_rate = sample_rate self._inp = alsaaudio.PCM( alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL, microphone, channels=1, rate=sample_rate, format=alsaaudio.PCM_FORMAT_S16_LE, # = 2bytes periodsize=512 ) self._active = True def _request_new_file(self) -> Tuple[str, wave.Wave_write]: file_handle, record_path = tempfile.mkstemp(prefix="rec", suffix=".wav", dir=self._dest_folder) wavefile = wave.open(open(file_handle, 'wb'), 'wb') wavefile.setnchannels(1) # mono wavefile.setframerate(self._sample_rate) wavefile.setsampwidth(self.BYTES_PER_SAMPLE) # 16bit return record_path, wavefile def run(self): current_working_file_name, current_working_file = self._request_new_file() current_samples_saved = 0 read_failures = 0 while self._active: try: length, data = self._inp.read() except alsaaudio.ALSAAudioError as e: logging.exception(e) read_failures += 1 if read_failures >= 30: logging.error("Too many ALSA Read errors. Bailing out...") self._active = False return continue if length > 0: # will be always larger than zero (except on error) read_failures = 0 file_size_after_append = current_samples_saved + length if file_size_after_append > self._samples_per_slice: # Appending this would cause a too big slice samples_append_to_this_slice = self._samples_per_slice - current_samples_saved current_working_file.writeframes(data[:(samples_append_to_this_slice * self.BYTES_PER_SAMPLE)]) current_working_file.close() self._output_queue.put(current_working_file_name) current_working_file_name, current_working_file = self._request_new_file() current_working_file.writeframes(data[(samples_append_to_this_slice * self.BYTES_PER_SAMPLE):]) current_samples_saved = length - samples_append_to_this_slice else: # it is safe to append, the resulting file will be smaller than the slice current_working_file.writeframes(data) current_samples_saved = file_size_after_append def get_recording(self, blocking: bool) -> Optional[str]: try: return self._output_queue.get(block=blocking) except Empty: return None def get_queue_length(self) -> int: return self._output_queue.qsize() def stop(self): self._active = False class BirbnetesIoTPlatformRecordDriver: sliced_recorder = None @classmethod def init(cls, sample_length_sec: float, sample_rate=SAMPLE_RATE, microphone=MICROPHONE, dest_folder=DEST_FOLDER): cls.sliced_recorder = SlicedRecorder(sample_length_sec, sample_rate, microphone, dest_folder) cls.sliced_recorder.start() @classmethod def get_recording(cls, blocking: bool = False) -> Optional[str]: return cls.sliced_recorder.get_recording(blocking) @classmethod def get_queue_length(cls) -> int: return cls.sliced_recorder.get_queue_length() @classmethod def cleanup(cls): cls.sliced_recorder.stop() while True: fname = cls.sliced_recorder.get_recording(False) if fname: os.unlink(fname) else: break