これまで、ある目的のために一生懸命やってきた。そのために、いろんな事を我慢したり、後回しにしたり、保留したりしてきた。結局その目的は、ぶち壊されれてしまった。我慢したり、後回しにしたり、保留したりしてきた事が本当に無意味だった。この事で自分の中での物事の優先順位は大きく変わった。だから、気になった事を気になったままやろうと思う。そして、自分以外の誰かや何かを優先する事のないようにしようと思う。これまで出来なそうな事も何とか喰らい付いて、出来ないなりに何とかやってきた。その必要ももうない。これからは出来ない事は出来ないと言うし、しない。
Pythonのmultiprocessing.Pipeでデータを受け渡す
スレッドを使おうかとも考えたがGILを回避したかったため、プロセスを分ける事にした。そこでmultiprocessingの使い方についておさらいする。
import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn
def read_sound(conn: mp_conn.Connection):
while True:
conn.send_bytes(b"a")
time.sleep(1)
def process_sound(conn: mp_conn.Connection):
while True:
b = conn.recv_bytes()
print(b)
def main():
conn1, conn2 = mp.Pipe()
p1 = mp.Process(target=read_sound, args=(conn1,))
p2 = mp.Process(target=process_sound, args=(conn2,))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == "__main__":
main()
音声データを受け渡す
import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn
import numpy as np
import sounddevice as sd
SAMPLING_RATE = 16000
WINDOW_SIZE_SAMPLES = 512
def read_sound(conn: mp_conn.Connection):
def _(indata: np.ndarray[np.ndarray[np.int16]], frames, time, status):
conn.send_bytes(indata.tobytes())
with sd.InputStream(
samplerate=SAMPLING_RATE,
dtype="int16",
blocksize=WINDOW_SIZE_SAMPLES,
channels=1,
callback=_,
):
print("録音開始")
sd.sleep(10 * 1000)
print("録音終了")
def process_sound(conn: mp_conn.Connection):
while True:
b = conn.recv_bytes()
print(len(b))
def main():
conn1, conn2 = mp.Pipe()
p1 = mp.Process(target=read_sound, args=(conn1,))
p2 = mp.Process(target=process_sound, args=(conn2,))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == "__main__":
main()
リアルタイムの単純な文字起こし
import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn
import numpy as np
import sounddevice as sd
import pydub
import pydub.effects
from reazonspeech.nemo.asr.audio import audio_from_tensor, audio_from_numpy
from nemo.collections.asr.models import EncDecRNNTBPEModel
from reazonspeech.nemo.asr.interface import AudioData
from reazonspeech.nemo.asr.transcribe import transcribe
SAMPLING_RATE = 16000
WINDOW_SIZE_SAMPLES = 512
def read_sound(conn: mp_conn.Connection, shutdown_event: mp.Event):
def _audio_callback(indata: np.ndarray, frames, time, status):
if conn and not shutdown_event.is_set():
conn.send_bytes(indata.tobytes())
with sd.InputStream(samplerate=SAMPLING_RATE, dtype="int16",
blocksize=WINDOW_SIZE_SAMPLES,
channels=1, callback=_audio_callback):
try:
print("録音開始")
while not shutdown_event.is_set():
sd.sleep(1000)
finally:
print("録音終了")
shutdown_event.set()
def process_sound(conn: mp_conn.Connection, shutdown_event: mp.Event):
audio_buffer = b""
buffer_size = 10 * SAMPLING_RATE # For example, 10 seconds buffer
rez_model = EncDecRNNTBPEModel.from_pretrained(
'reazon-research/reazonspeech-nemo-v2',
map_location="cpu")
while not shutdown_event.is_set():
if not conn.poll(timeout=1):
print("no data")
continue
b: bytes = conn.recv_bytes()
audio_buffer += b
if len(audio_buffer) < buffer_size * 2:
continue
data = np.frombuffer(audio_buffer[:buffer_size * 2], dtype=np.int16)
audio_buffer = audio_buffer[buffer_size * 2:]
if np.abs(data).max() == 0:
continue
data = data.astype('float32') / 32768.0
audio_data: AudioData = audio_from_numpy(data, SAMPLING_RATE)
ret = transcribe(rez_model, audio_data)
text = "".join(sg.text for sg in ret.segments)
print("_____________________________________")
print(text, end = "")
print("_____________________________________")
def main():
conn1, conn2 = mp.Pipe()
shutdown_event = mp.Event()
p1 = mp.Process(target=read_sound, args=(conn1, shutdown_event))
p2 = mp.Process(target=process_sound, args=(conn2, shutdown_event))
p2.start()
p1.start()
try:
p1.join()
p2.join()
except KeyboardInterrupt:
shutdown_event.set()
p1.terminate()
p2.terminate()
finally:
p1.join()
p2.join()
if __name__ == "__main__":
main()
リアルタイムで音声の質問に文字で答える
import time
import queue
import multiprocessing as mp
import multiprocessing.connection as mp_conn
import numpy as np
import sounddevice as sd
import pydub
import pydub.effects
from reazonspeech.nemo.asr.audio import audio_from_tensor, audio_from_numpy
from nemo.collections.asr.models import EncDecRNNTBPEModel
from reazonspeech.nemo.asr.interface import AudioData
from reazonspeech.nemo.asr.transcribe import transcribe
import mlx_lm
from mlx_lm import load, generate
SAMPLING_RATE = 16000
WINDOW_SIZE_SAMPLES = 512
def read_sound(shutdown_event: mp.Event, conn: mp_conn.Connection):
def _audio_callback(indata: np.ndarray, frames, time, status):
if conn and not shutdown_event.is_set():
conn.send_bytes(indata.tobytes())
with sd.InputStream(samplerate=SAMPLING_RATE, dtype="int16",
blocksize=WINDOW_SIZE_SAMPLES,
channels=1, callback=_audio_callback):
try:
print("録音開始")
while not shutdown_event.is_set():
sd.sleep(1000)
finally:
print("録音終了")
shutdown_event.set()
def process_sound(shutdown_event: mp.Event, conn: mp_conn.Connection,
question_q: mp.Queue):
audio_buffer = b""
buffer_size = 10 * SAMPLING_RATE # For example, 10 seconds buffer
rez_model = EncDecRNNTBPEModel.from_pretrained(
'reazon-research/reazonspeech-nemo-v2',
map_location="cpu")
while not shutdown_event.is_set():
if not conn.poll(timeout=1):
print("no data")
continue
b: bytes = conn.recv_bytes()
audio_buffer += b
if len(audio_buffer) < buffer_size * 2:
continue
data = np.frombuffer(audio_buffer[:buffer_size * 2], dtype=np.int16)
audio_buffer = audio_buffer[buffer_size * 2:]
if np.abs(data).max() == 0:
continue
data = data.astype('float32') / 32768.0
audio_data: AudioData = audio_from_numpy(data, SAMPLING_RATE)
ret = transcribe(rez_model, audio_data)
text = "".join(sg.text for sg in ret.segments)
print("_____________________________________")
print(text, end = "")
print("_____________________________________")
question_q.put(text)
def process_sentence(shutdown_event, question_q):
model, tokenizer = load("mlx-community/Llama-3-Swallow-8B-Instruct-v0.1-8bit")
while not shutdown_event.is_set():
try:
question = question_q.get(timeout=3)
except queue.Empty:
continue
print("*************************************")
print(f"QUESTION: {question}")
print("*************************************")
prompt = [
{"role": "system", "content": "あなたは日本語に堪能な魔法使いです。英語を使ってはいけません。全て日本語で回答します。"},
{"role": "user", "content": question},
{"role": "system", "content": "この文章に対しての回答を、日本語で1つだけ示します。"},
]
applyed_prompt = tokenizer.apply_chat_template(
prompt, tokenize=False, add_generate_prompt=True)
ai_response_stream = mlx_lm.stream_generate(
model=model, tokenizer=tokenizer,
prompt=applyed_prompt, max_tokens=128)
response = ""
for resp in ai_response_stream:
print(resp, end="")
response += resp
print()
def main():
conn1, conn2 = mp.Pipe()
question_q = mp.Queue()
answer_q = mp.Queue()
shutdown_event = mp.Event()
p1 = mp.Process(target=read_sound,
args=(shutdown_event, conn1))
p2 = mp.Process(target=process_sound,
args=(shutdown_event, conn2, question_q))
p3 = mp.Process(target=process_sentence,
args=(shutdown_event, question_q))
p3.start()
p2.start()
p1.start()
try:
p1.join()
p2.join()
p3.join()
except KeyboardInterrupt:
shutdown_event.set()
p1.terminate()
p2.terminate()
p3.terminate()
finally:
p1.join()
p2.join()
p3.join()
if __name__ == "__main__":
main()
文章を読み上げる
import librosa
import melo.api
import numpy as np
import sounddevice as sd
sentence = "ぼくの名前はしむどんです。"
model = melo.api.TTS(language="JP", device="mps")
voice_data = model.tts_to_file(
sentence, model.hps.data.spk2id["JP"], quiet=True)
voice_data2 = librosa.resample(voice_data, orig_sr=44100, target_sr=16000)
voice_data3 = (voice_data2 * 32768).astype(np.int16)
index = 0
def callback(outdata, frames, time, status):
global index
if status:
print(status, flush=True)
chunk = voice_data3[index:index + frames]
if len(chunk) < frames:
outdata[:len(chunk), 0] = chunk
outdata[len(chunk):, 0] = 0
raise sd.CallbackStop
else:
outdata[:, 0] = chunk
index += frames
with sd.OutputStream(
samplerate=16000,
channels=1,
dtype='int16',
callback=callback
):
sd.sleep(5000)
文字で質問を入力し音声で答える
import re
from dataclasses import dataclass
import multiprocessing as mp
import queue
import librosa
import melo.api
import numpy as np
import sounddevice as sd
import mlx_lm
from mlx_lm import load, generate
@dataclass
class ProcessState:
"""fm"""
shutdown_ev: mp.Event
ready_ev: mp.Event
in_q: mp.Queue
out_q: mp.Queue
def run_question_to_answer(state: ProcessState):
model, tokenizer = load("mlx-community/Llama-3-Swallow-8B-Instruct-v0.1-8bit")
state.ready_ev.set()
while not state.shutdown_ev.is_set():
try:
question: str = state.in_q.get(timeout=1)
except queue.Empty:
continue
print(f"START run_question_to_answer: {question}")
prompt = [
{"role": "system", "content": "あなたは日本語に堪能な魔法使いです。英語を使ってはいけません。全て日本語で回答します。"},
{"role": "user", "content": question},
{"role": "system", "content": "この文章に対しての回答を、日本語で1つだけ示します。"},
]
applyed_prompt = tokenizer.apply_chat_template(
prompt, tokenize=False, add_generate_prompt=True)
ai_response_stream = mlx_lm.stream_generate(
model=model, tokenizer=tokenizer,
prompt=applyed_prompt, max_tokens=100)
long_answer = ""
for resp in ai_response_stream:
long_answer += resp
m = re.search(r"\<\|eot_id\|\>user\<\|eot_id\|\>.*?\<\|eot_id\|\>",
long_answer)
if not m:
continue
answer = m.string
state.out_q.put(answer)
print(f"END run_question_to_answer: {answer}")
def run_answer_to_sound(state: ProcessState):
model = melo.api.TTS(language="JP", device="mps")
state.ready_ev.set()
while not state.shutdown_ev.is_set():
try:
answer: str = state.in_q.get(timeout=1)
except queue.Empty:
continue
print(f"START run_answer_to_sound: {answer}")
voice_data = model.tts_to_file(answer,
model.hps.data.spk2id["JP"],
quiet=True)
voice_data2 = librosa.resample(voice_data,
orig_sr=44100,
target_sr=16000)
voice_data3: np.ndarray = (voice_data2 * 32768).astype(np.int16)
state.out_q.put(voice_data3)
print(f"END run_answer_to_sound: {voice_data3.shape}")
def run_speech(state: ProcessState):
with sd.OutputStream(samplerate=16000, channels=1, dtype='int16') as stream:
state.ready_ev.set()
while not state.shutdown_ev.is_set():
try:
sound = state.in_q.get(timeout=1)
except queue.Empty:
continue
print(f"START run_speech: {sound.shape}")
stream.write(sound)
print(f"END run_speech: {sound.shape}")
sd.sleep(500)
def main():
shutdown_ev = mp.Event()
question_to_answer_state = ProcessState(
shutdown_ev=shutdown_ev,
ready_ev=mp.Event(),
in_q=mp.Queue(),
out_q=mp.Queue(),
)
answer_to_sound_state = ProcessState(
shutdown_ev=shutdown_ev,
ready_ev=mp.Event(),
in_q=question_to_answer_state.out_q,
out_q=mp.Queue(),
)
speech_state = ProcessState(
shutdown_ev=shutdown_ev,
ready_ev=mp.Event(),
in_q=answer_to_sound_state.out_q,
out_q=None
)
state_list = [
question_to_answer_state,
answer_to_sound_state,
speech_state,
]
children = [
mp.Process(target=run_question_to_answer, args=(question_to_answer_state,)),
mp.Process(target=run_answer_to_sound, args=(answer_to_sound_state,)),
mp.Process(target=run_speech, args=(speech_state,)),
]
try:
for child in children:
child.start()
for state in state_list:
state.ready_ev.wait(timeout=60)
while True:
t = input("??? ")
question_to_answer_state.in_q.put(t)
except:
shutdown_ev.set()
for child in children:
child.terminate()
finally:
shutdown_ev.set()
for child in children:
child.join(timeout=60)
if __name__ == "__main__":
main()
話し中かどうか、もう一度
音声を処理して文字起こしモデルに食わせているが精度がイマイチだ。発話検出との繋りが上手くできていないから省略していたが、もう一度実装して動きを確認しようと思う。
ただ、ここで力尽きた。