Overview
This page provides complete, production-ready examples for integrating Smallest Self-Host into your applications.Python Examples
Basic Transcription
Copy
import requests
import os
LICENSE_KEY = os.getenv("LICENSE_KEY")
API_URL = "http://localhost:7100"
def transcribe_audio(audio_url):
response = requests.post(
f"{API_URL}/v1/listen",
headers={
"Authorization": f"Token {LICENSE_KEY}",
"Content-Type": "application/json"
},
json={"url": audio_url}
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Transcription failed: {response.text}")
result = transcribe_audio("https://example.com/audio.wav")
print(f"Transcription: {result['text']}")
print(f"Confidence: {result['confidence']}")
With Retry Logic
Copy
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
class SmallestClient:
def __init__(self, api_url, license_key):
self.api_url = api_url
self.license_key = license_key
self.session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def transcribe(self, audio_url, **kwargs):
headers = {
"Authorization": f"Token {self.license_key}",
"Content-Type": "application/json"
}
payload = {"url": audio_url, **kwargs}
response = self.session.post(
f"{self.api_url}/v1/listen",
headers=headers,
json=payload,
timeout=300
)
response.raise_for_status()
return response.json()
client = SmallestClient(
api_url="http://localhost:7100",
license_key=os.getenv("LICENSE_KEY")
)
result = client.transcribe(
"https://example.com/audio.wav",
punctuate=True,
timestamps=True
)
print(result['text'])
Async Processing with Webhook
Copy
from flask import Flask, request, jsonify
import requests
import os
app = Flask(__name__)
LICENSE_KEY = os.getenv("LICENSE_KEY")
API_URL = "http://localhost:7100"
@app.route('/webhook/transcription', methods=['POST'])
def transcription_webhook():
data = request.json
job_id = data['job_id']
status = data['status']
if status == 'completed':
result = data['result']
print(f"Job {job_id} completed: {result['text']}")
elif status == 'failed':
print(f"Job {job_id} failed: {data['error']}")
return jsonify({"received": True})
def submit_async_transcription(audio_url):
response = requests.post(
f"{API_URL}/v1/listen",
headers={
"Authorization": f"Token {LICENSE_KEY}",
"Content-Type": "application/json"
},
json={
"url": audio_url,
"callback_url": "https://myapp.com/webhook/transcription"
}
)
return response.json()
if __name__ == '__main__':
job = submit_async_transcription("https://example.com/long-audio.mp3")
print(f"Job submitted: {job['job_id']}")
app.run(port=5000)
Batch Processing
Copy
import concurrent.futures
import requests
import os
LICENSE_KEY = os.getenv("LICENSE_KEY")
API_URL = "http://localhost:7100"
def transcribe_single(audio_url):
try:
response = requests.post(
f"{API_URL}/v1/listen",
headers={
"Authorization": f"Token {LICENSE_KEY}",
"Content-Type": "application/json"
},
json={"url": audio_url},
timeout=300
)
response.raise_for_status()
return {
"url": audio_url,
"success": True,
"result": response.json()
}
except Exception as e:
return {
"url": audio_url,
"success": False,
"error": str(e)
}
audio_urls = [
"https://example.com/audio1.wav",
"https://example.com/audio2.wav",
"https://example.com/audio3.wav",
]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(transcribe_single, audio_urls))
for result in results:
if result['success']:
print(f"{result['url']}: {result['result']['text']}")
else:
print(f"{result['url']}: ERROR - {result['error']}")
JavaScript/Node.js Examples
Basic Transcription
Copy
const axios = require('axios');
const LICENSE_KEY = process.env.LICENSE_KEY;
const API_URL = 'http://localhost:7100';
async function transcribeAudio(audioUrl) {
try {
const response = await axios.post(
`${API_URL}/v1/listen`,
{ url: audioUrl },
{
headers: {
'Authorization': `Token ${LICENSE_KEY}`,
'Content-Type': 'application/json'
}
}
);
return response.data;
} catch (error) {
console.error('Transcription failed:', error.response?.data || error.message);
throw error;
}
}
transcribeAudio('https://example.com/audio.wav')
.then(result => {
console.log('Transcription:', result.text);
console.log('Confidence:', result.confidence);
});
TypeScript Client Class
Copy
import axios, { AxiosInstance } from 'axios';
interface TranscriptionOptions {
url?: string;
file?: File;
language?: string;
punctuate?: boolean;
diarize?: boolean;
timestamps?: boolean;
callback_url?: string;
}
interface TranscriptionResult {
request_id: string;
text: string;
confidence: number;
duration: number;
language: string;
words?: Array<{
word: string;
start: number;
end: number;
confidence: number;
}>;
}
class SmallestClient {
private client: AxiosInstance;
constructor(apiUrl: string, licenseKey: string) {
this.client = axios.create({
baseURL: apiUrl,
headers: {
'Authorization': `Token ${licenseKey}`,
'Content-Type': 'application/json'
},
timeout: 300000
});
}
async transcribe(options: TranscriptionOptions): Promise<TranscriptionResult> {
const response = await this.client.post('/v1/listen', options);
return response.data;
}
async health(): Promise<{ status: string }> {
const response = await this.client.get('/health');
return response.data;
}
}
const client = new SmallestClient(
process.env.API_URL || 'http://localhost:7100',
process.env.LICENSE_KEY!
);
async function main() {
const result = await client.transcribe({
url: 'https://example.com/audio.wav',
punctuate: true,
timestamps: true
});
console.log(result.text);
}
main();
Express.js API Integration
Copy
const express = require('express');
const axios = require('axios');
const multer = require('multer');
const FormData = require('form-data');
const app = express();
const upload = multer({ storage: multer.memoryStorage() });
const LICENSE_KEY = process.env.LICENSE_KEY;
const API_URL = 'http://localhost:7100';
app.post('/transcribe', upload.single('audio'), async (req, res) => {
try {
let result;
if (req.file) {
const formData = new FormData();
formData.append('audio', req.file.buffer, req.file.originalname);
const response = await axios.post(
`${API_URL}/v1/listen`,
formData,
{
headers: {
'Authorization': `Token ${LICENSE_KEY}`,
...formData.getHeaders()
}
}
);
result = response.data;
} else if (req.body.url) {
const response = await axios.post(
`${API_URL}/v1/listen`,
{ url: req.body.url },
{
headers: {
'Authorization': `Token ${LICENSE_KEY}`,
'Content-Type': 'application/json'
}
}
);
result = response.data;
} else {
return res.status(400).json({ error: 'No audio file or URL provided' });
}
res.json(result);
} catch (error) {
console.error('Transcription error:', error.response?.data || error.message);
res.status(500).json({ error: 'Transcription failed' });
}
});
app.listen(3000, () => {
console.log('Server running on port 3000');
});
Go Examples
Basic Client
Copy
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"time"
)
type TranscriptionRequest struct {
URL string `json:"url"`
Punctuate bool `json:"punctuate,omitempty"`
Language string `json:"language,omitempty"`
}
type TranscriptionResult struct {
RequestID string `json:"request_id"`
Text string `json:"text"`
Confidence float64 `json:"confidence"`
Duration float64 `json:"duration"`
}
type SmallestClient struct {
APIUrl string
LicenseKey string
HTTPClient *http.Client
}
func NewClient(apiURL, licenseKey string) *SmallestClient {
return &SmallestClient{
APIUrl: apiURL,
LicenseKey: licenseKey,
HTTPClient: &http.Client{Timeout: 5 * time.Minute},
}
}
func (c *SmallestClient) Transcribe(req TranscriptionRequest) (*TranscriptionResult, error) {
jsonData, err := json.Marshal(req)
if err != nil {
return nil, err
}
httpReq, err := http.NewRequest("POST", c.APIUrl+"/v1/listen", bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}
httpReq.Header.Set("Authorization", "Token "+c.LicenseKey)
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.HTTPClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("API error: %s", string(body))
}
var result TranscriptionResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return &result, nil
}
func main() {
client := NewClient(
"http://localhost:7100",
os.Getenv("LICENSE_KEY"),
)
result, err := client.Transcribe(TranscriptionRequest{
URL: "https://example.com/audio.wav",
Punctuate: true,
Language: "en",
})
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Transcription: %s\n", result.Text)
fmt.Printf("Confidence: %.2f\n", result.Confidence)
}
Best Practices
Use Environment Variables
Use Environment Variables
Never hardcode credentials:
Copy
export LICENSE_KEY="your-license-key"
export API_URL="https://api.example.com"
Implement Error Handling
Implement Error Handling
Always handle errors gracefully:
Copy
try:
result = client.transcribe(audio_url)
except requests.exceptions.Timeout:
print("Request timed out")
except requests.exceptions.HTTPError as e:
print(f"HTTP error: {e.response.status_code}")
except Exception as e:
print(f"Unexpected error: {e}")
Add Retry Logic
Add Retry Logic
Implement exponential backoff:
Copy
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def transcribe_with_retry(audio_url):
return client.transcribe(audio_url)
Use Connection Pooling
Use Connection Pooling
Reuse connections for better performance:
Copy
session = requests.Session()
session.mount('http://', HTTPAdapter(pool_connections=10, pool_maxsize=10))
Monitor and Log
Monitor and Log
Track API usage and errors:
Copy
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Transcribing: {audio_url}")
result = client.transcribe(audio_url)
logger.info(f"Success: {result['request_id']}")

