import asyncio
import websockets
import json
import csv
from whisper_normalizer.english import EnglishTextNormalizer
from whisper_normalizer.basic import BasicTextNormalizer
from jiwer import wer
english_normalizer = EnglishTextNormalizer()
other_language_normalizer = BasicTextNormalizer()
async def transcribe_audio(api_key, audio_file):
with open(audio_file, 'rb') as f:
audio_data = f.read()
params = {
"audioLanguage": "en", # Change to your language
"audioEncoding": "linear16", # 16-bit PCM
"audioSampleRate": "16000", # sample rate of the audio file
"audioChannels": "1",
"addPunctuation": "true",
"api_key": api_key
}
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
url = f"wss://waves-api.smallest.ai/api/v1/asr?{query_string}"
transcription = []
async with websockets.connect(url) as ws:
async def listen():
async for message in ws:
response = json.loads(message)
if "text" in response:
transcription.append(response["text"])
listen_task = asyncio.create_task(listen())
chunk_size = int(16000 * 2 * 0.3) # 16kHz ร 2 bytes ร 0.3s
while audio_data:
chunk, audio_data = audio_data[:chunk_size], audio_data[chunk_size:]
await ws.send(chunk)
await asyncio.sleep(0.3)
await ws.send(b'') # End of stream
await asyncio.sleep(2)
listen_task.cancel()
return " ".join(transcription)
def calculate_wer(reference, hypothesis, language="en"):
if language == "en":
ref_normalized = english_normalizer(reference)
hyp_normalized = english_normalizer(hypothesis)
else:
ref_normalized = other_language_normalizer(reference)
hyp_normalized = other_language_normalizer(hypothesis)
return wer(ref_normalized, hyp_normalized)
async def main():
api_key = "your_api_key_here"
input_csv = "fleurs_dataset.csv" # input CSV
output_csv = "transcription_results_streaming.csv"
results = []
with open(input_csv, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
audio_file = row['audio_path']
reference_text = row.get('text', '')
transcript = await transcribe_audio(api_key, audio_file)
row['transcript'] = transcript
row['wer'] = calculate_wer(reference_text, transcript)
results.append(row)
if results:
with open(output_csv, 'w', newline='') as f:
fieldnames = list(results[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
wer_scores = [row['wer'] for row in results]
if wer_scores:
avg_wer = sum(wer_scores) / len(wer_scores)
print(f"Average WER: {avg_wer:.3f}")
if __name__ == "__main__":
asyncio.run(main())