青汁王子が移り住んだ家よりも安い所でずっと暮しているから、本人としては転落したように思えるのかもしれないけど、僕から見ると全くどん底ではないと思う。移り住めたのだから家はあるし、金がないのはそうかもしれないけれど、その日に使える手持ちのお金が0円という事もないだろう。女がないのはデフォルトだから転落でも何でもない。屋根と壁があって、シャワー、洗濯機、冷蔵庫、エアコンがあり、今日食べるものに困るわけでも、今日着る服がないわけでもない。その時点で僕からすると若干良い生活のように思える。
numpy.ndarrayの確認
>>> np.frombuffer(b"\x01\x00\x00\x00", dtype="int32").tobytes() b'\x01\x00\x00\x00'
フレーム
一定の間隔で収集された音声信号の断片をフレームと呼ぶ。各フレームの数値は、そのタイミングで収集できた音波の振幅を表す。
音声認識を確認する
引き続き、音声認識に取り組む。昨日、発話部分を切り取り、分割したそれぞれの音声を文字起こし用のモデルに食わせた。しかし、思ったほどの精度がない。切り取り部分はタイミング的には正しいように思える。雑音のせいなのか、又は別の理由なのかよく分からない。
そこで、どのような音声が、どのような文字起こしされているのか調べる事にした。分割した音声をQueueに入れており、それを1つずつ取り出して文字起こしモデルに処理させている。そこで音声と文字起こしの対応が作れる。それをファイルに出力させられれば、実際に耳で聞いて確認する事ができるだろう。
import time
import wave
import multiprocessing as mp
import multiprocessing.connection as mp_conn
import numpy as np
import sounddevice as sd
import torch
import queue
import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn
import numpy as np
import sounddevice as sd
import pydub
import pydub.effects
from reazonspeech.nemo.asr.audio import audio_from_tensor, audio_from_numpy
from nemo.collections.asr.models import EncDecRNNTBPEModel
from reazonspeech.nemo.asr.interface import AudioData
from reazonspeech.nemo.asr.transcribe import transcribe
SAMPLING_RATE = 16000
WINDOW_SIZE_SAMPLES = 512
def run_read_sound(
ready_ev: mp.Event, shutdown_ev: mp.Event, out_conn: mp_conn.Connection
):
def _fn(indata: np.ndarray[np.ndarray[np.int16]], frames, time, status):
out_conn.send_bytes(indata.tobytes())
ready_ev.set()
with sd.InputStream(
samplerate=SAMPLING_RATE,
dtype="int16",
blocksize=WINDOW_SIZE_SAMPLES,
channels=1,
callback=_fn,
):
print("録音開始")
while not shutdown_ev.is_set():
sd.sleep(500)
print("録音終了")
shutdown_ev.set()
def run_voice_activity_detection(
ready_ev: mp.Event,
shutdown_ev: mp.Event,
in_conn: mp_conn.Connection,
out_q: mp.Queue,
):
vad_model, utils = torch.hub.load("snakers4/silero-vad", "silero_vad")
(get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils
sampling_rate = SAMPLING_RATE
window_size = WINDOW_SIZE_SAMPLES
chunk_list = []
no_speech_count = 0
already_warm_up = False
ready_ev.set()
with torch.no_grad():
# 認識開始
while not shutdown_ev.is_set():
buf = in_conn.recv_bytes()
wav_data = np.frombuffer(buf, dtype=np.int16)
writable_wav_data = np.copy(wav_data) # コピーして書き込み可能にする
sound_tensor = torch.Tensor(writable_wav_data)
for ii in range(0, len(sound_tensor), window_size):
chunk = sound_tensor[ii : ii + window_size]
print("----------------------------")
print(chunk.shape)
ret = vad_model(chunk, sampling_rate)
speech_prob = ret.item() # 閾値よりも大きな数字であれば発話と見做す
if speech_prob > 0.3: # 発話検出
no_speech_count = 0
else: # 発話なし
no_speech_count += 1
if no_speech_count < 50:
print("発話中")
chunk_list.append(chunk)
elif chunk_list: # 発話の区切り
array = torch.cat(chunk_list).cpu().numpy()
print(f"発話終了: {len(array)} {array.dtype} {array.shape}")
if already_warm_up:
out_q.put(array)
else: # 最初の要素は暖気として扱い無視する
already_warm_up = True
chunk_list = []
def run_speech_to_text(
ready_ev: mp.Event,
shutdown_ev: mp.Event,
in_q: mp.Queue,
out_q: mp.Queue,
):
rez_model = EncDecRNNTBPEModel.from_pretrained(
'reazon-research/reazonspeech-nemo-v2',
map_location="cpu")
ready_ev.set()
# 文字起こし開始
with torch.no_grad():
while not shutdown_ev.is_set():
try:
arr: np.ndarray = in_q.get(timeout=10)
except queue.Empty:
continue
audio_data: AudioData = audio_from_numpy(arr, SAMPLING_RATE)
ret = transcribe(rez_model, audio_data)
text = "".join(sg.text for sg in ret.segments)
print("_____________________________________")
print(text, end = "")
print("_____________________________________")
out_q.put(text)
filename = f"voice-{int(time.time())}"
filename_wav = f"{filename}.wav"
filename_txt = f"{filename}.txt"
with open(filename_txt, "w+") as fp:
fp.write(text)
# 保存用のデータは変換する
ka = np.clip(arr, -1.0, 1.0) # 振幅を-1.0から1.0に制限
int16_waveform = (ka * 32767.0).astype(np.int16) # int16の範囲に変換
with wave.open(filename_wav, "wb") as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(SAMPLING_RATE)
print(type(audio_data.waveform))
print(audio_data.waveform.shape)
print(type(audio_data.waveform[0]))
wav_file.writeframes(int16_waveform.tobytes())
def main():
ready_ev = mp.Event()
shutdown_ev = mp.Event()
sound_conn_w, sound_conn_r = mp.Pipe()
vad_ready_ev = mp.Event()
vad_q = mp.Queue()
p = mp.Process(target=run_read_sound, args=(ready_ev, shutdown_ev, sound_conn_w))
p_vad = mp.Process(
target=run_voice_activity_detection,
args=(vad_ready_ev, shutdown_ev, sound_conn_r, vad_q),
)
stt_ready_ev = mp.Event()
stt_q = mp.Queue()
stt_proc = mp.Process(
target=run_speech_to_text,
args=(stt_ready_ev, shutdown_ev, vad_q, stt_q),
)
stt_proc.start()
p_vad.start()
stt_ready_ev.wait(timeout=10)
vad_ready_ev.wait(timeout=10)
p.start()
try:
while not shutdown_ev.is_set():
try:
val = stt_q.get(timeout=10)
print(f"認識: {val}")
if "終了" in val:
shutdown_ev.set()
except queue.Empty:
continue
except KeyboardInterrupt:
shutdown_ev.set()
p.terminate()
p_vad.terminate()
stt_proc.terminate()
finally:
p.terminate()
p_vad.terminate()
stt_proc.terminate()
try:
p.join(timeout=10)
p_vad.join(timeout=10)
stt_proc.join(timeout=10)
except:
p.kill()
p_vad.kill()
stt_proc.kill()
if __name__ == "__main__":
main()