« ^ »

今日やった事 - 20240901

所要時間: 約 4分

これまで、ある目的のために一生懸命やってきた。そのために、いろんな事を我慢したり、後回しにしたり、保留したりしてきた。結局その目的は、ぶち壊されれてしまった。我慢したり、後回しにしたり、保留したりしてきた事が本当に無意味だった。この事で自分の中での物事の優先順位は大きく変わった。だから、気になった事を気になったままやろうと思う。そして、自分以外の誰かや何かを優先する事のないようにしようと思う。これまで出来なそうな事も何とか喰らい付いて、出来ないなりに何とかやってきた。その必要ももうない。これからは出来ない事は出来ないと言うし、しない。

Pythonのmultiprocessing.Pipeでデータを受け渡す

スレッドを使おうかとも考えたがGILを回避したかったため、プロセスを分ける事にした。そこでmultiprocessingの使い方についておさらいする。

import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn

def read_sound(conn: mp_conn.Connection):
    while True:
        conn.send_bytes(b"a")
        time.sleep(1)

def process_sound(conn: mp_conn.Connection):
    while True:
        b = conn.recv_bytes()
        print(b)

def main():
    conn1, conn2 = mp.Pipe()

    p1 = mp.Process(target=read_sound, args=(conn1,))
    p2 = mp.Process(target=process_sound, args=(conn2,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

if __name__ == "__main__":
    main()

音声データを受け渡す

import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn

import numpy as np
import sounddevice as sd

SAMPLING_RATE = 16000
WINDOW_SIZE_SAMPLES = 512


def read_sound(conn: mp_conn.Connection):
    def _(indata: np.ndarray[np.ndarray[np.int16]], frames, time, status):
        conn.send_bytes(indata.tobytes())

    with sd.InputStream(
        samplerate=SAMPLING_RATE,
        dtype="int16",
        blocksize=WINDOW_SIZE_SAMPLES,
        channels=1,
        callback=_,
    ):
        print("録音開始")
        sd.sleep(10 * 1000)
        print("録音終了")


def process_sound(conn: mp_conn.Connection):
    while True:
        b = conn.recv_bytes()
        print(len(b))


def main():
    conn1, conn2 = mp.Pipe()

    p1 = mp.Process(target=read_sound, args=(conn1,))
    p2 = mp.Process(target=process_sound, args=(conn2,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()


if __name__ == "__main__":
    main()

リアルタイムの単純な文字起こし

import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn

import numpy as np
import sounddevice as sd

import pydub
import pydub.effects

from reazonspeech.nemo.asr.audio import audio_from_tensor, audio_from_numpy
from nemo.collections.asr.models import EncDecRNNTBPEModel
from reazonspeech.nemo.asr.interface import AudioData
from reazonspeech.nemo.asr.transcribe import transcribe


SAMPLING_RATE = 16000
WINDOW_SIZE_SAMPLES = 512


def read_sound(conn: mp_conn.Connection, shutdown_event: mp.Event):
    def _audio_callback(indata: np.ndarray, frames, time, status):
        if conn and not shutdown_event.is_set():
            conn.send_bytes(indata.tobytes())

    with sd.InputStream(samplerate=SAMPLING_RATE, dtype="int16",
                        blocksize=WINDOW_SIZE_SAMPLES,
                        channels=1, callback=_audio_callback):
        try:
            print("録音開始")
            while not shutdown_event.is_set():
                sd.sleep(1000)
        finally:
            print("録音終了")
            shutdown_event.set()

def process_sound(conn: mp_conn.Connection, shutdown_event: mp.Event):
    audio_buffer = b""
    buffer_size = 10 * SAMPLING_RATE  # For example, 10 seconds buffer

    rez_model = EncDecRNNTBPEModel.from_pretrained(
        'reazon-research/reazonspeech-nemo-v2',
        map_location="cpu")

    while not shutdown_event.is_set():
        if not conn.poll(timeout=1):
            print("no data")
            continue

        b: bytes = conn.recv_bytes()
        audio_buffer += b

        if len(audio_buffer) < buffer_size * 2:
            continue

        data = np.frombuffer(audio_buffer[:buffer_size * 2], dtype=np.int16)
        audio_buffer = audio_buffer[buffer_size * 2:]

        if np.abs(data).max() == 0:
            continue

        data = data.astype('float32') / 32768.0
        audio_data: AudioData = audio_from_numpy(data, SAMPLING_RATE)
        ret = transcribe(rez_model, audio_data)
        text = "".join(sg.text for sg in ret.segments)
        print("_____________________________________")
        print(text, end = "")
        print("_____________________________________")

def main():
    conn1, conn2 = mp.Pipe()
    shutdown_event = mp.Event()

    p1 = mp.Process(target=read_sound, args=(conn1, shutdown_event))
    p2 = mp.Process(target=process_sound, args=(conn2, shutdown_event))

    p2.start()
    p1.start()

    try:
        p1.join()
        p2.join()
    except KeyboardInterrupt:
        shutdown_event.set()
        p1.terminate()
        p2.terminate()
    finally:
        p1.join()
        p2.join()

if __name__ == "__main__":
    main()

リアルタイムで音声の質問に文字で答える

import time
import queue
import multiprocessing as mp
import multiprocessing.connection as mp_conn

import numpy as np
import sounddevice as sd

import pydub
import pydub.effects

from reazonspeech.nemo.asr.audio import audio_from_tensor, audio_from_numpy
from nemo.collections.asr.models import EncDecRNNTBPEModel
from reazonspeech.nemo.asr.interface import AudioData
from reazonspeech.nemo.asr.transcribe import transcribe

import mlx_lm
from mlx_lm import load, generate



SAMPLING_RATE = 16000
WINDOW_SIZE_SAMPLES = 512


def read_sound(shutdown_event: mp.Event, conn: mp_conn.Connection):
    def _audio_callback(indata: np.ndarray, frames, time, status):
        if conn and not shutdown_event.is_set():
            conn.send_bytes(indata.tobytes())

    with sd.InputStream(samplerate=SAMPLING_RATE, dtype="int16",
                        blocksize=WINDOW_SIZE_SAMPLES,
                        channels=1, callback=_audio_callback):
        try:
            print("録音開始")
            while not shutdown_event.is_set():
                sd.sleep(1000)
        finally:
            print("録音終了")
            shutdown_event.set()

def process_sound(shutdown_event: mp.Event, conn: mp_conn.Connection,
                  question_q: mp.Queue):
    audio_buffer = b""
    buffer_size = 10 * SAMPLING_RATE  # For example, 10 seconds buffer

    rez_model = EncDecRNNTBPEModel.from_pretrained(
        'reazon-research/reazonspeech-nemo-v2',
        map_location="cpu")

    while not shutdown_event.is_set():
        if not conn.poll(timeout=1):
            print("no data")
            continue

        b: bytes = conn.recv_bytes()
        audio_buffer += b

        if len(audio_buffer) < buffer_size * 2:
            continue

        data = np.frombuffer(audio_buffer[:buffer_size * 2], dtype=np.int16)
        audio_buffer = audio_buffer[buffer_size * 2:]

        if np.abs(data).max() == 0:
            continue

        data = data.astype('float32') / 32768.0
        audio_data: AudioData = audio_from_numpy(data, SAMPLING_RATE)
        ret = transcribe(rez_model, audio_data)
        text = "".join(sg.text for sg in ret.segments)
        print("_____________________________________")
        print(text, end = "")
        print("_____________________________________")
        question_q.put(text)


def process_sentence(shutdown_event, question_q):
    model, tokenizer = load("mlx-community/Llama-3-Swallow-8B-Instruct-v0.1-8bit")

    while not shutdown_event.is_set():
        try:
            question = question_q.get(timeout=3)
        except queue.Empty:
            continue

        print("*************************************")
        print(f"QUESTION: {question}")
        print("*************************************")
        prompt = [
          {"role": "system", "content": "あなたは日本語に堪能な魔法使いです。英語を使ってはいけません。全て日本語で回答します。"},
          {"role": "user", "content": question},
          {"role": "system", "content": "この文章に対しての回答を、日本語で1つだけ示します。"},
        ]
        applyed_prompt = tokenizer.apply_chat_template(
            prompt, tokenize=False, add_generate_prompt=True)

        ai_response_stream = mlx_lm.stream_generate(
            model=model, tokenizer=tokenizer,
            prompt=applyed_prompt, max_tokens=128)

        response = ""
        for resp in ai_response_stream:
            print(resp, end="")
            response += resp
        print()


def main():
    conn1, conn2 = mp.Pipe()
    question_q = mp.Queue()
    answer_q = mp.Queue()

    shutdown_event = mp.Event()
    p1 = mp.Process(target=read_sound,
                    args=(shutdown_event, conn1))
    p2 = mp.Process(target=process_sound,
                    args=(shutdown_event, conn2, question_q))
    p3 = mp.Process(target=process_sentence,
                    args=(shutdown_event, question_q))

    p3.start()
    p2.start()
    p1.start()

    try:
        p1.join()
        p2.join()
        p3.join()
    except KeyboardInterrupt:
        shutdown_event.set()
        p1.terminate()
        p2.terminate()
        p3.terminate()
    finally:
        p1.join()
        p2.join()
        p3.join()

if __name__ == "__main__":
    main()

文章を読み上げる

import librosa
import melo.api
import numpy as np
import sounddevice as sd

sentence = "ぼくの名前はしむどんです。"

model = melo.api.TTS(language="JP", device="mps")

voice_data = model.tts_to_file(
    sentence, model.hps.data.spk2id["JP"], quiet=True)
voice_data2 = librosa.resample(voice_data, orig_sr=44100, target_sr=16000)
voice_data3 = (voice_data2 * 32768).astype(np.int16)

index = 0

def callback(outdata, frames, time, status):
    global index

    if status:
        print(status, flush=True)


    chunk = voice_data3[index:index + frames]
    if len(chunk) < frames:
        outdata[:len(chunk), 0] = chunk
        outdata[len(chunk):, 0] = 0
        raise sd.CallbackStop
    else:
        outdata[:, 0] = chunk
    index += frames

with sd.OutputStream(
        samplerate=16000,
        channels=1,
        dtype='int16',
        callback=callback
    ):
    sd.sleep(5000)

文字で質問を入力し音声で答える

import re
from dataclasses import dataclass
import multiprocessing as mp
import queue

import librosa
import melo.api
import numpy as np
import sounddevice as sd

import mlx_lm
from mlx_lm import load, generate


@dataclass
class ProcessState:
    """fm"""
    shutdown_ev: mp.Event
    ready_ev: mp.Event
    in_q: mp.Queue
    out_q: mp.Queue


def run_question_to_answer(state: ProcessState):
    model, tokenizer = load("mlx-community/Llama-3-Swallow-8B-Instruct-v0.1-8bit")
    state.ready_ev.set()

    while not state.shutdown_ev.is_set():
        try:
            question: str = state.in_q.get(timeout=1)
        except queue.Empty:
            continue

        print(f"START run_question_to_answer: {question}")
        prompt = [
          {"role": "system", "content": "あなたは日本語に堪能な魔法使いです。英語を使ってはいけません。全て日本語で回答します。"},
          {"role": "user", "content": question},
          {"role": "system", "content": "この文章に対しての回答を、日本語で1つだけ示します。"},
        ]
        applyed_prompt = tokenizer.apply_chat_template(
            prompt, tokenize=False, add_generate_prompt=True)

        ai_response_stream = mlx_lm.stream_generate(
            model=model, tokenizer=tokenizer,
            prompt=applyed_prompt, max_tokens=100)

        long_answer = ""
        for resp in ai_response_stream:
            long_answer += resp
        m = re.search(r"\<\|eot_id\|\>user\<\|eot_id\|\>.*?\<\|eot_id\|\>",
                      long_answer)
        if not m:
            continue

        answer = m.string
        state.out_q.put(answer)
        print(f"END run_question_to_answer: {answer}")

def run_answer_to_sound(state: ProcessState):
    model = melo.api.TTS(language="JP", device="mps")

    state.ready_ev.set()

    while not state.shutdown_ev.is_set():
        try:
            answer: str = state.in_q.get(timeout=1)
        except queue.Empty:
            continue
        print(f"START run_answer_to_sound: {answer}")
        voice_data = model.tts_to_file(answer,
                                       model.hps.data.spk2id["JP"],
                                       quiet=True)
        voice_data2 = librosa.resample(voice_data,
                                       orig_sr=44100,
                                       target_sr=16000)
        voice_data3: np.ndarray = (voice_data2 * 32768).astype(np.int16)
        state.out_q.put(voice_data3)
        print(f"END run_answer_to_sound: {voice_data3.shape}")


def run_speech(state: ProcessState):
    with sd.OutputStream(samplerate=16000, channels=1, dtype='int16') as stream:
        state.ready_ev.set()
        while not state.shutdown_ev.is_set():
            try:
                sound = state.in_q.get(timeout=1)
            except queue.Empty:
                continue
            print(f"START run_speech: {sound.shape}")
            stream.write(sound)
            print(f"END run_speech: {sound.shape}")
            sd.sleep(500)

def main():
    shutdown_ev = mp.Event()
    question_to_answer_state = ProcessState(
        shutdown_ev=shutdown_ev,
        ready_ev=mp.Event(),
        in_q=mp.Queue(),
        out_q=mp.Queue(),
    )
    answer_to_sound_state = ProcessState(
        shutdown_ev=shutdown_ev,
        ready_ev=mp.Event(),
        in_q=question_to_answer_state.out_q,
        out_q=mp.Queue(),
    )
    speech_state = ProcessState(
        shutdown_ev=shutdown_ev,
        ready_ev=mp.Event(),
        in_q=answer_to_sound_state.out_q,
        out_q=None
    )
    state_list = [
        question_to_answer_state,
        answer_to_sound_state,
        speech_state,
    ]
    children = [
        mp.Process(target=run_question_to_answer, args=(question_to_answer_state,)),
        mp.Process(target=run_answer_to_sound, args=(answer_to_sound_state,)),
        mp.Process(target=run_speech, args=(speech_state,)),
    ]
    try:
        for child in children:
            child.start()

        for state in state_list:
            state.ready_ev.wait(timeout=60)

        while True:
            t = input("??? ")
            question_to_answer_state.in_q.put(t)
    except:
        shutdown_ev.set()
        for child in children:
            child.terminate()
    finally:
        shutdown_ev.set()
        for child in children:
            child.join(timeout=60)
if __name__ == "__main__":
    main()

話し中かどうか、もう一度

音声を処理して文字起こしモデルに食わせているが精度がイマイチだ。発話検出との繋りが上手くできていないから省略していたが、もう一度実装して動きを確認しようと思う。

ただ、ここで力尽きた。