def get_usb_audio():
def piper_tts(text):
"""
-Generate speech using Piper TTS with proper pipeline handling
+Generate speech using Piper TTS with proper pipeline handling and pitch variation
"""
+import random
logger = logging.getLogger(__name__)
text_regex = re.compile(r'[^A
-Za
-z0
-9s.,'"
-?!]
+')
text = text_regex.sub('', text)
def piper_tts(text):
try:
plughwstr = f'plughw:{AUDIO_DEVICE_INDEX},0'
-# Create pipeline
+# Generate pitch variation for cat
-like expressiveness
+# Base pitch shift:
+2 semitones (higher voice)
+# Random variation: ±1 semitone for expressiveness
+base_pitch_shift = 2.0# Higher base pitch
+random_variation = random.uniform(
-1.0, 1.0)# Random variation
+total_pitch_shift = base_pitch_shift
+ random_variation
+
+# Slight tempo variation for more natural speech (0.9
-1.1x speed)
+tempo_variation = random.uniform(0.95, 1.05)
+
+logger.info(f"TTS Pitch:
+{total_pitch_shift:.1f} semitones, Tempo: {tempo_variation:.2f}x")
+
+# Create pipeline: echo
-> piper
-> sox
-> aplay
p1 = subprocess.Popen(
['echo', sentence],
stdout=subprocess.PIPE,
def piper_tts(text):
# Allow p1 to receive a SIGPIPE if p2 exits
p1.stdout.close()
-p3 = subprocess.Popen(
+# Add SoX for pitch and tempo processing
+p3 = subprocess.Popen([
+'sox',
+'
-t', 'raw',# Input type: raw
+'
-r', '22050',# Sample rate: 22050 Hz
+'
-e', 'signed', # Encoding: signed
+'
-b', '16', # Bits: 16
-bit
+'
-c', '1',# Channels: mono
+'
-',# Input from stdin
+'
-t', 'raw',# Output type: raw
+'
-r', '22050',# Keep same sample rate
+'
-e', 'signed', # Keep same encoding
+'
-b', '16', # Keep same bits
+'
-c', '1',# Keep mono
+'
-',# Output to stdout
+'pitch', str(int(total_pitch_shift * 100)),# Pitch shift in cents (100 cents = 1 semitone)
+'tempo', str(tempo_variation), # Tempo adjustment
+], stdin=p2.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+# Allow p2 to receive SIGPIPE if p3 exits
+p2.stdout.close()
+
+p4 = subprocess.Popen(
[
'/usr/bin/aplay',
'
-r', '22050',
-'
-f', 'S16_LE',
+'
-f', 'S16_LE',
'
-t', 'raw',
'
-c', '1',
'
-v',
'
-device', plughwstr
],
-stdin=p2.stdout,
+stdin=p3.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
-# Allow p2 to receive SIGPIPE if p3 exits
-p2.stdout.close()
+# Allow p3 to receive SIGPIPE if p4 exits
+p3.stdout.close()
-# Capture stderr from p2 before it's closed
+# Capture stderr from p2 and p3 before they're closed
p2_stderr_data = p2.stderr.read()
p2.stderr.close()
# Wait for completion and check errors
-_, stderr3 = p3.communicate()
-p2.wait()
+_, stderr4 = p4.communicate()
+p3.wait()
+p2.wait()
p1.wait()
if p1.returncode != 0:
def piper_tts(text):
logger.error("Piper stderr: %s", p2_stderr_data.decode() if p2_stderr_data else "")
raise subprocess.CalledProcessError(p2.returncode, 'piper')
if p3.returncode != 0:
-raise subprocess.CalledProcessError(p3.returncode, 'aplay')
+logger.error("SoX failed with return code: %d", p3.returncode)
+raise subprocess.CalledProcessError(p3.returncode, 'sox')
+if p4.returncode != 0:
+logger.error("aplay failed with return code: %d", p4.returncode)
+raise subprocess.CalledProcessError(p4.returncode, 'aplay')
time.sleep(sentence_sleep)
except Exception as e:
logger.error("TTS Pipeline Error: %s", str(e))
-for p in [p1, p2, p3]:
+for p in [p1, p2, p3, p4]:
try:
p.kill()
except Exception as ke: