import time from typing import Optional, Tuple import tempfile from queue import Queue, Empty from threading import Thread import wave import os SAMPLE_RATE = 44100 MICROPHONE = 'default' DEST_FOLDER = "/dev/shm/" class SlicedPlayer(Thread): BYTES_PER_SAMPLE = 2 # 16bit def __init__(self, slice_size: float, sample_rate=SAMPLE_RATE, microphone=MICROPHONE, dest_folder=DEST_FOLDER): super().__init__() self._output_queue = Queue() self._active = True self._wavefile = wave.open(microphone, "rb") self._sample_rate = self._wavefile.getframerate() self._dest_folder = dest_folder self._samples_per_slice = int(self._sample_rate * slice_size) self._slice_size = slice_size # length in seconds def _request_new_file(self) -> Tuple[str, wave.Wave_write]: file_handle, record_path = tempfile.mkstemp(prefix="rec", suffix=".wav", dir=self._dest_folder) wavefile = wave.open(open(file_handle, 'wb'), 'wb') wavefile.setnchannels(self._wavefile.getnchannels()) wavefile.setframerate(self._sample_rate) wavefile.setsampwidth(self.BYTES_PER_SAMPLE) # 16bit return record_path, wavefile def run(self): while self._active: time_started = time.time() current_working_file_name, current_working_file = self._request_new_file() sound_data = bytes() while True: sound_data += self._wavefile.readframes(self._samples_per_slice - len(sound_data)) if len(sound_data) < self._samples_per_slice: self._wavefile.rewind() else: break current_working_file.writeframes(sound_data) current_working_file.close() self._output_queue.put(current_working_file_name) time.sleep(self._slice_size - (time.time() - time_started)) # Emulate recording time def get_recording(self, blocking: bool) -> Optional[str]: try: return self._output_queue.get(block=blocking) except Empty: return None def get_queue_length(self) -> int: return self._output_queue.qsize() def stop(self): self._active = False class BirbnetesIoTPlatformRecordDriver: sliced_recorder = None @classmethod def init(cls, sample_length_sec: float, sample_rate=SAMPLE_RATE, microphone=MICROPHONE, dest_folder=DEST_FOLDER): cls.sliced_recorder = SlicedPlayer(sample_length_sec, sample_rate, microphone, dest_folder) cls.sliced_recorder.start() @classmethod def get_recording(cls, blocking: bool = False) -> Optional[str]: return cls.sliced_recorder.get_recording(blocking) @classmethod def get_queue_length(cls) -> int: return cls.sliced_recorder.get_queue_length() @classmethod def cleanup(cls): cls.sliced_recorder.stop() while True: fname = cls.sliced_recorder.get_recording(False) if fname: os.unlink(fname) else: break