Working code examples for implementing the ASR WebSocket API
// Browser-based ASR with microphone input
let ws;
let audioContext;
let processor;
let source;
let stream;
async function startASR() {
const apiKey = 'your-api-key'; // Replace with your API key
const baseUrl = 'wss://waves-api.smallest.ai/api/v1/asr';
// Configure parameters
const params = new URLSearchParams({
api_key: apiKey,
audioEncoding: 'linear16',
audioSampleRate: '16000',
audioChannels: '1',
addPunctuation: 'true',
speechEndpointing: '300'
});
const url = `${baseUrl}?${params}`;
ws = new WebSocket(url);
ws.onopen = async () => {
console.log('✅ Connected to ASR service');
await setupMicrophone();
};
ws.onmessage = (event) => {
try {
const response = JSON.parse(event.data);
handleTranscription(response);
} catch (err) {
console.error('❌ Parse error:', err);
}
};
ws.onerror = (error) => {
console.error('❌ WebSocket error:', error);
};
ws.onclose = (event) => {
console.log(`🔌 Connection closed: ${event.code} - ${event.reason}`);
stopASR();
};
}
async function setupMicrophone() {
try {
// Request microphone access
stream = await navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: 16000,
channelCount: 1,
echoCancellation: true,
noiseSuppression: true
}
});
// Create audio context
audioContext = new AudioContext({ sampleRate: 16000 });
source = audioContext.createMediaStreamSource(stream);
// Create audio processor
processor = audioContext.createScriptProcessor(4096, 1, 1);
processor.onaudioprocess = (e) => {
if (ws.readyState === WebSocket.OPEN) {
const inputData = e.inputBuffer.getChannelData(0);
// Convert to 16-bit PCM
const int16Data = new Int16Array(inputData.length);
for (let i = 0; i < inputData.length; i++) {
int16Data[i] = Math.max(-32768, Math.min(32767, inputData[i] * 32768));
}
// Send audio data
ws.send(int16Data.buffer);
}
};
// Connect audio nodes
source.connect(processor);
processor.connect(audioContext.destination);
console.log('🎤 Recording started. Speak now...');
} catch (err) {
console.error('❌ Microphone error:', err);
alert('Microphone access required for ASR functionality');
}
}
function handleTranscription(response) {
console.log('📝 Response:', response);
if (response.error) {
console.error('❌ API Error:', response);
return;
}
if (response.text) {
const endOfTurn = response.isEndOfTurn ? ' [END_OF_TURN]' : '';
console.log(`📝 ${response.text}${endOfTurn}`);
updateTranscriptionDisplay(response.text);
}
}
function updateTranscriptionDisplay(text) {
const container = document.getElementById('transcription');
if (!container) return;
const finalDiv = document.createElement('div');
finalDiv.className = 'final-transcription';
finalDiv.textContent = text;
container.appendChild(finalDiv);
}
function stopASR() {
if (processor) {
processor.disconnect();
processor = null;
}
if (source) {
source.disconnect();
source = null;
}
if (audioContext) {
audioContext.close();
audioContext = null;
}
if (stream) {
stream.getTracks().forEach(track => track.stop());
stream = null;
}
if (ws) {
ws.close();
ws = null;
}
console.log('⏹️ ASR stopped');
}
// Usage
document.addEventListener('DOMContentLoaded', () => {
const startBtn = document.getElementById('start-asr');
const stopBtn = document.getElementById('stop-asr');
startBtn?.addEventListener('click', startASR);
stopBtn?.addEventListener('click', stopASR);
});
#!/usr/bin/env python3
import asyncio
import websockets
import json
import sys
import os
import subprocess
import signal
import platform
from pathlib import Path
import argparse
class WavesASR:
def __init__(self, api_key):
self.api_key = api_key
self.ws = None
self.connected = False
self.transcription_complete = False
self.transcription_text = []
async def connect(self, params=None):
if params is None:
params = {}
base_url = "wss://waves-api.smallest.ai/api/v1/asr"
default_params = {
"audioLanguage": "en",
"audioEncoding": "linear16",
"audioSampleRate": "24000",
"audioChannels": "1",
"addPunctuation": "true",
}
all_params = {**default_params, **params, "api_key": self.api_key}
query_string = "&".join([f"{key}={value}" for key, value in all_params.items()])
url = f"{base_url}?{query_string}"
print("🔌 Connecting to ASR service...")
try:
self.ws = await websockets.connect(url)
print("✅ Connected to ASR service")
self.connected = True
# Start listening for messages
asyncio.create_task(self._listen_for_messages())
except Exception as error:
print(f"❌ WebSocket error: {error}")
self.connected = False
raise error
async def _listen_for_messages(self):
try:
async for message in self.ws:
try:
response = json.loads(message)
self._handle_response(response)
except json.JSONDecodeError as error:
print(f"❌ JSON parse error: {error}")
except websockets.exceptions.ConnectionClosed:
print("🔌 Connection closed")
self.connected = False
self.transcription_complete = True
def _handle_response(self, response):
if "error" in response:
print(f"❌ API Error: {response}")
return
if "text" in response:
end_marker = " [END]" if response.get("isEndOfTurn", False) else ""
print(f"📝 {response['text']}{end_marker}")
self.transcription_text.append(response["text"])
async def send_audio_file(self, file_path, chunk_size=32000):
if not self.connected:
raise Exception("Not connected to ASR service")
if not Path(file_path).exists():
raise Exception(f"Audio file not found: {file_path}")
print(f"📂 Processing audio file: {file_path}")
with open(file_path, "rb") as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
if self.connected:
await self.ws.send(chunk)
else:
raise Exception("Connection lost during transmission")
print("✅ Audio file transmission complete")
async def start_microphone_recording(self, duration=10):
if not self.connected:
raise Exception("Not connected to ASR service")
print(f"🎤 Starting microphone recording for {duration} seconds...")
print("Press Ctrl+C to stop recording early")
# Try Sox first, then FFmpeg as fallback
try:
await self._try_recording_with_sox(duration)
except Exception as sox_error:
print("📝 Sox not available, trying FFmpeg...")
try:
await self._try_recording_with_ffmpeg(duration)
except Exception as ffmpeg_error:
raise Exception(f"""Recording failed. Please install either Sox or FFmpeg:
macOS:
brew install sox
# or
brew install ffmpeg
Linux:
sudo apt-get install sox
# or
sudo apt-get install ffmpeg
Windows:
Download from: https://sox.sourceforge.io/
# or
Download from: https://ffmpeg.org/
Original errors:
- Sox: {sox_error}
- FFmpeg: {ffmpeg_error}""")
async def _try_recording_with_sox(self, duration):
cmd = [
"sox",
"-d", # default audio device (microphone)
"-t", "raw", # output format: raw
"-r", "24000", # sample rate: 24000 Hz
"-e", "signed-integer", # encoding: signed integer
"-b", "16", # bit depth: 16 bits
"-c", "1", # channels: mono
"-", # output to stdout
"trim", "0", str(duration) # record for specified duration
]
await self._run_recording_process(cmd, "Sox")
async def _try_recording_with_ffmpeg(self, duration):
audio_format = "avfoundation" if platform.system() == "Darwin" else "pulse"
audio_input = ":0" if platform.system() == "Darwin" else "default"
cmd = [
"ffmpeg",
"-f", audio_format, # audio input format
"-i", audio_input, # default microphone
"-ar", "24000", # sample rate
"-ac", "1", # mono channel
"-f", "s16le", # 16-bit little-endian format
"-t", str(duration), # duration
"-" # output to stdout
]
await self._run_recording_process(cmd, "FFmpeg")
async def _run_recording_process(self, cmd, tool_name):
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Handle Ctrl+C gracefully
def signal_handler(sig, frame):
print(f"\n🛑 Stopping recording...")
process.terminate()
signal.signal(signal.SIGINT, signal_handler)
try:
while True:
chunk = await process.stdout.read(32000)
if not chunk:
break
if self.connected:
await self.ws.send(chunk)
else:
raise Exception("Connection lost during recording")
await process.wait()
if process.returncode == 0:
print("✅ Microphone recording complete")
else:
stderr = await process.stderr.read()
raise Exception(f"{tool_name} process exited with code {process.returncode}: {stderr.decode()}")
except Exception as e:
process.terminate()
raise e
async def wait_for_transcription(self):
while not self.transcription_complete:
await asyncio.sleep(0.1)
return " ".join(self.transcription_text)
async def close(self):
if self.ws:
await self.ws.close()
self.connected = False
async def transcribe_file(file_path, api_key):
asr = WavesASR(api_key)
try:
await asr.connect({
"audioLanguage": "en",
"addPunctuation": "true",
})
await asr.send_audio_file(file_path)
full_transcription = await asr.wait_for_transcription()
print("\n🎯 Complete Transcription:")
print(full_transcription)
return full_transcription
except Exception as error:
print(f"❌ Transcription error: {error}")
raise error
finally:
await asr.close()
async def transcribe_from_microphone(duration=10, api_key=None):
asr = WavesASR(api_key)
try:
await asr.connect({
"audioLanguage": "en",
"addPunctuation": "true",
"audioSampleRate": "24000",
})
await asr.start_microphone_recording(duration)
full_transcription = await asr.wait_for_transcription()
print("\n🎯 Complete Transcription:")
print(full_transcription)
return full_transcription
except Exception as error:
print(f"❌ Transcription error: {error}")
raise error
finally:
await asr.close()
def main():
parser = argparse.ArgumentParser(
description="🎤 Audio Transcription Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python simple_transcribe.py recording.wav
python simple_transcribe.py --mic 5
python simple_transcribe.py --mic
"""
)
parser.add_argument("file", nargs="?", help="Audio file to transcribe")
parser.add_argument("--mic", "-m", type=int, nargs="?", const=10,
help="Record from microphone (default: 10 seconds)")
args = parser.parse_args()
# API Key
api_key = "" # Your API Key
if not api_key:
print("❌ API key is required")
sys.exit(1)
async def run():
try:
if args.mic is not None:
duration = args.mic
print(f"🚀 Starting microphone recording for {duration} seconds...")
await transcribe_from_microphone(duration, api_key)
elif args.file:
file_path = args.file
print(f"🚀 Starting transcription of: {file_path}")
await transcribe_file(file_path, api_key)
else:
# Default to recording.wav if no arguments
file_path = "recording (1).wav"
print(f"🚀 Starting transcription of: {file_path}")
await transcribe_file(file_path, api_key)
print("\n✅ Transcription completed successfully!")
except Exception as error:
print(f"❌ Failed to transcribe: {error}")
sys.exit(1)
asyncio.run(run())
if __name__ == "__main__":
main()