« ^ »

今日やった事 - 20240903

所要時間: 約 3分

青汁王子が移り住んだ家よりも安い所でずっと暮しているから、本人としては転落したように思えるのかもしれないけど、僕から見ると全くどん底ではないと思う。移り住めたのだから家はあるし、金がないのはそうかもしれないけれど、その日に使える手持ちのお金が0円という事もないだろう。女がないのはデフォルトだから転落でも何でもない。屋根と壁があって、シャワー、洗濯機、冷蔵庫、エアコンがあり、今日食べるものに困るわけでも、今日着る服がないわけでもない。その時点で僕からすると若干良い生活のように思える。

numpy.ndarrayの確認

>>> np.frombuffer(b"\x01\x00\x00\x00", dtype="int32").tobytes()
b'\x01\x00\x00\x00'

フレーム

一定の間隔で収集された音声信号の断片をフレームと呼ぶ。各フレームの数値は、そのタイミングで収集できた音波の振幅を表す。

音声認識を確認する

引き続き、音声認識に取り組む。昨日、発話部分を切り取り、分割したそれぞれの音声を文字起こし用のモデルに食わせた。しかし、思ったほどの精度がない。切り取り部分はタイミング的には正しいように思える。雑音のせいなのか、又は別の理由なのかよく分からない。

そこで、どのような音声が、どのような文字起こしされているのか調べる事にした。分割した音声をQueueに入れており、それを1つずつ取り出して文字起こしモデルに処理させている。そこで音声と文字起こしの対応が作れる。それをファイルに出力させられれば、実際に耳で聞いて確認する事ができるだろう。

import time
import wave
import multiprocessing as mp
import multiprocessing.connection as mp_conn
import numpy as np
import sounddevice as sd
import torch
import queue

import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn

import numpy as np
import sounddevice as sd

import pydub
import pydub.effects

from reazonspeech.nemo.asr.audio import audio_from_tensor, audio_from_numpy
from nemo.collections.asr.models import EncDecRNNTBPEModel
from reazonspeech.nemo.asr.interface import AudioData
from reazonspeech.nemo.asr.transcribe import transcribe



SAMPLING_RATE = 16000
WINDOW_SIZE_SAMPLES = 512


def run_read_sound(
    ready_ev: mp.Event, shutdown_ev: mp.Event, out_conn: mp_conn.Connection
):
    def _fn(indata: np.ndarray[np.ndarray[np.int16]], frames, time, status):
        out_conn.send_bytes(indata.tobytes())

    ready_ev.set()
    with sd.InputStream(
        samplerate=SAMPLING_RATE,
        dtype="int16",
        blocksize=WINDOW_SIZE_SAMPLES,
        channels=1,
        callback=_fn,
    ):
        print("録音開始")
        while not shutdown_ev.is_set():
            sd.sleep(500)
        print("録音終了")
    shutdown_ev.set()


def run_voice_activity_detection(
    ready_ev: mp.Event,
    shutdown_ev: mp.Event,
    in_conn: mp_conn.Connection,
    out_q: mp.Queue,
):
    vad_model, utils = torch.hub.load("snakers4/silero-vad", "silero_vad")

    (get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils

    sampling_rate = SAMPLING_RATE
    window_size = WINDOW_SIZE_SAMPLES

    chunk_list = []
    no_speech_count = 0

    already_warm_up = False

    ready_ev.set()
    with torch.no_grad():
        # 認識開始
        while not shutdown_ev.is_set():
            buf = in_conn.recv_bytes()
            wav_data = np.frombuffer(buf, dtype=np.int16)
            writable_wav_data = np.copy(wav_data)  # コピーして書き込み可能にする
            sound_tensor = torch.Tensor(writable_wav_data)
            for ii in range(0, len(sound_tensor), window_size):
                chunk = sound_tensor[ii : ii + window_size]
                print("----------------------------")
                print(chunk.shape)
                ret = vad_model(chunk, sampling_rate)
                speech_prob = ret.item()  # 閾値よりも大きな数字であれば発話と見做す
                if speech_prob > 0.3:  # 発話検出
                    no_speech_count = 0
                else:  # 発話なし
                    no_speech_count += 1

                if no_speech_count < 50:
                    print("発話中")
                    chunk_list.append(chunk)
                elif chunk_list:  # 発話の区切り
                    array = torch.cat(chunk_list).cpu().numpy()
                    print(f"発話終了: {len(array)} {array.dtype} {array.shape}")

                    if already_warm_up:
                        out_q.put(array)
                    else:  # 最初の要素は暖気として扱い無視する
                        already_warm_up = True
                    chunk_list = []

def run_speech_to_text(
    ready_ev: mp.Event,
    shutdown_ev: mp.Event,
    in_q: mp.Queue,
    out_q: mp.Queue,
):
    rez_model = EncDecRNNTBPEModel.from_pretrained(
        'reazon-research/reazonspeech-nemo-v2',
        map_location="cpu")

    ready_ev.set()

        # 文字起こし開始
    with torch.no_grad():
        while not shutdown_ev.is_set():
            try:
                arr: np.ndarray = in_q.get(timeout=10)
            except queue.Empty:
                continue

            audio_data: AudioData = audio_from_numpy(arr, SAMPLING_RATE)
            ret = transcribe(rez_model, audio_data)
            text = "".join(sg.text for sg in ret.segments)
            print("_____________________________________")
            print(text, end = "")
            print("_____________________________________")
            out_q.put(text)

            filename = f"voice-{int(time.time())}"
            filename_wav = f"{filename}.wav"
            filename_txt = f"{filename}.txt"
            
            with open(filename_txt, "w+") as fp:
                fp.write(text)

            # 保存用のデータは変換する
            ka = np.clip(arr, -1.0, 1.0)  # 振幅を-1.0から1.0に制限
            int16_waveform = (ka * 32767.0).astype(np.int16)  # int16の範囲に変換
            
            with wave.open(filename_wav, "wb") as wav_file:
                wav_file.setnchannels(1)
                wav_file.setsampwidth(2)
                wav_file.setframerate(SAMPLING_RATE)
                print(type(audio_data.waveform))
                print(audio_data.waveform.shape)
                print(type(audio_data.waveform[0]))
                wav_file.writeframes(int16_waveform.tobytes())

def main():
    ready_ev = mp.Event()
    shutdown_ev = mp.Event()
    sound_conn_w, sound_conn_r = mp.Pipe()

    vad_ready_ev = mp.Event()
    vad_q = mp.Queue()

    p = mp.Process(target=run_read_sound, args=(ready_ev, shutdown_ev, sound_conn_w))
    p_vad = mp.Process(
        target=run_voice_activity_detection,
        args=(vad_ready_ev, shutdown_ev, sound_conn_r, vad_q),
    )


    stt_ready_ev = mp.Event()
    stt_q = mp.Queue()
    stt_proc = mp.Process(
        target=run_speech_to_text,
        args=(stt_ready_ev, shutdown_ev, vad_q, stt_q),
    )

    stt_proc.start()
    p_vad.start()
    stt_ready_ev.wait(timeout=10)
    vad_ready_ev.wait(timeout=10)
    p.start()
    try:
        while not shutdown_ev.is_set():
            try:
                val = stt_q.get(timeout=10)
                print(f"認識: {val}")
                if "終了" in val:
                    shutdown_ev.set()
            except queue.Empty:
                continue
    except KeyboardInterrupt:
        shutdown_ev.set()
        p.terminate()
        p_vad.terminate()
        stt_proc.terminate()
    finally:
        p.terminate()
        p_vad.terminate()
        stt_proc.terminate()

    try:
        p.join(timeout=10)
        p_vad.join(timeout=10)
        stt_proc.join(timeout=10)
    except:
        p.kill()
        p_vad.kill()
        stt_proc.kill()


if __name__ == "__main__":
    main()