diff --git a/run.bash b/run.bash new file mode 100644 index 0000000..9718f5e --- /dev/null +++ b/run.bash @@ -0,0 +1,40 @@ +#!/bin/bash + +# --- USER CONFIGURATION --- +VPN_SERVER="mcipower1.apibaz.info" +EXCLUDE_HOST="2268-host.com" # or use IP directly +VPN_INTERFACE_SCRIPT="/etc/vpnc/vpnc-script" +# --------------------------- + +echo "[+] Resolving IP address of $EXCLUDE_HOST..." +EXCLUDE_IP=$(getent ahosts "$EXCLUDE_HOST" | awk '{print $1; exit}') + +if [[ -z "$EXCLUDE_IP" ]]; then + echo "[!] Could not resolve IP for $EXCLUDE_HOST" + exit 1 +fi + +echo "[+] Found IP: $EXCLUDE_IP" + +echo "[+] Detecting your current default gateway..." +LOCAL_GATEWAY=$(ip route | grep default | awk '{print $3; exit}') + +if [[ -z "$LOCAL_GATEWAY" ]]; then + echo "[!] Could not determine local gateway." + exit 1 +fi + +echo "[+] Local gateway is: $LOCAL_GATEWAY" + +echo "[+] Connecting to VPN: $VPN_SERVER ..." +sudo openconnect --script "$VPN_INTERFACE_SCRIPT" "$VPN_SERVER" & + +VPN_PID=$! +echo "[+] Waiting for VPN to establish..." +sleep 10 + +echo "[+] Adding route to $EXCLUDE_IP via $LOCAL_GATEWAY to bypass VPN..." +sudo ip route add "$EXCLUDE_IP" via "$LOCAL_GATEWAY" + +echo "[+] VPN is running. Traffic to $EXCLUDE_IP will bypass VPN (including port 2268)." +wait $VPN_PID diff --git a/vosk/test_files/OPTIMIZATION_GUIDE.md b/vosk/test_files/OPTIMIZATION_GUIDE.md new file mode 100644 index 0000000..2d642ae --- /dev/null +++ b/vosk/test_files/OPTIMIZATION_GUIDE.md @@ -0,0 +1,243 @@ +# 192-Core Optimization Guide + +This guide explains how to optimize your audio processing pipeline to utilize 192 CPU cores at 100% capacity. + +## ๐Ÿš€ Quick Start + +1. **Install dependencies:** + ```bash + pip install -r requirements_optimized.txt + ``` + +2. **Run the optimized pipeline:** + ```bash + ./run_optimized_192cores.sh + ``` + +3. **Monitor performance:** + ```bash + python monitor_performance.py + ``` + +## ๐Ÿ“Š Key Optimizations Implemented + +### 1. **Asynchronous Processing** +- **aiohttp** for concurrent HTTP requests +- **asyncio** for non-blocking I/O operations +- **ProcessPoolExecutor** for CPU-intensive tasks + +### 2. **Parallel Processing Strategy** +```python +# Configuration for 192 cores +NUM_CORES = 192 +BATCH_SIZE = 32 # Increased for better throughput +MAX_CONCURRENT_REQUESTS = 48 # 192/4 for optimal concurrency +``` + +### 3. **Memory-Efficient Processing** +- Streaming data processing +- Chunked batch processing +- Parallel file I/O operations + +### 4. **System-Level Optimizations** +- CPU governor set to performance mode +- Increased file descriptor limits +- Process priority optimization +- Environment variables for thread optimization + +## ๐Ÿ”ง Configuration Details + +### Batch Processing +- **Batch Size**: 32 samples per batch +- **Concurrent Requests**: 48 simultaneous API calls +- **Process Pool Workers**: 192 parallel processes + +### Memory Management +- **Chunk Size**: 1000 samples per chunk +- **Streaming**: True for large datasets +- **Parallel Sharding**: 50 shards for optimal I/O + +### Network Optimization +- **Connection Pool**: 48 concurrent connections +- **Timeout**: 120 seconds per request +- **Retry Logic**: Built-in error handling + +## ๐Ÿ“ˆ Performance Monitoring + +### Real-time Monitoring +```bash +python monitor_performance.py +``` + +### Metrics Tracked +- CPU utilization per core +- Memory usage +- Network I/O +- Disk I/O +- Load average + +### Performance Targets +- **CPU Utilization**: >90% across all cores +- **Memory Usage**: <80% of available RAM +- **Processing Rate**: >1000 samples/second + +## ๐Ÿ› ๏ธ Troubleshooting + +### Low CPU Utilization (<50%) +1. **Increase batch size:** + ```python + BATCH_SIZE = 64 # or higher + ``` + +2. **Increase concurrent requests:** + ```python + MAX_CONCURRENT_REQUESTS = 96 # 192/2 + ``` + +3. **Check I/O bottlenecks:** + - Monitor disk usage + - Check network bandwidth + - Verify API response times + +### High Memory Usage (>90%) +1. **Reduce batch size:** + ```python + BATCH_SIZE = 16 # or lower + ``` + +2. **Enable streaming:** + ```python + ds = load_dataset(..., streaming=True) + ``` + +3. **Process in smaller chunks:** + ```python + CHUNK_SIZE = 500 # reduce from 1000 + ``` + +### Network Bottlenecks +1. **Reduce concurrent requests:** + ```python + MAX_CONCURRENT_REQUESTS = 24 # reduce from 48 + ``` + +2. **Increase timeout:** + ```python + timeout=aiohttp.ClientTimeout(total=300) + ``` + +3. **Use connection pooling:** + ```python + connector=aiohttp.TCPConnector(limit=MAX_CONCURRENT_REQUESTS) + ``` + +## ๐Ÿ”„ Advanced Optimizations + +### 1. **Custom Process Pool Configuration** +```python +# For CPU-intensive tasks +with ProcessPoolExecutor( + max_workers=NUM_CORES, + mp_context=mp.get_context('spawn') +) as executor: + results = executor.map(process_function, data) +``` + +### 2. **Memory-Mapped Files** +```python +import mmap + +def process_large_file(filename): + with open(filename, 'rb') as f: + with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm: + # Process memory-mapped file + pass +``` + +### 3. **NUMA Optimization** (for multi-socket systems) +```bash +# Bind processes to specific NUMA nodes +numactl --cpunodebind=0 --membind=0 python script.py +``` + +### 4. **GPU Acceleration** (if available) +```python +# Use GPU for audio processing if available +import torch + +if torch.cuda.is_available(): + device = torch.device('cuda') + # Move audio processing to GPU +``` + +## ๐Ÿ“Š Expected Performance + +### Baseline Performance +- **192 cores**: 100% utilization target +- **Processing rate**: 1000-2000 samples/second +- **Memory usage**: 60-80% of available RAM +- **Network throughput**: 1-2 GB/s + +### Optimization Targets +- **CPU Efficiency**: >95% +- **Memory Efficiency**: >85% +- **I/O Efficiency**: >90% +- **Network Efficiency**: >80% + +## ๐ŸŽฏ Monitoring Commands + +### System Resources +```bash +# CPU usage +htop -p $(pgrep -f "python.*batch_confirm") + +# Memory usage +free -h + +# Network I/O +iftop + +# Disk I/O +iotop +``` + +### Process Monitoring +```bash +# Process tree +pstree -p $(pgrep -f "python.*batch_confirm") + +# Resource usage per process +ps aux | grep python +``` + +## ๐Ÿ”ง System Requirements + +### Minimum Requirements +- **CPU**: 192 cores (any architecture) +- **RAM**: 256 GB +- **Storage**: 1 TB SSD +- **Network**: 10 Gbps + +### Recommended Requirements +- **CPU**: 192 cores (AMD EPYC or Intel Xeon) +- **RAM**: 512 GB +- **Storage**: 2 TB NVMe SSD +- **Network**: 25 Gbps + +## ๐Ÿšจ Important Notes + +1. **Memory Management**: Monitor memory usage closely +2. **Network Limits**: Ensure sufficient bandwidth +3. **API Limits**: Check Vosk service capacity +4. **Storage I/O**: Use fast storage for temporary files +5. **Process Limits**: Increase system limits if needed + +## ๐Ÿ“ž Support + +If you encounter issues: +1. Check the performance logs +2. Monitor system resources +3. Adjust configuration parameters +4. Review the troubleshooting section + +For optimal performance, ensure your system meets the recommended requirements and follow the monitoring guidelines. \ No newline at end of file diff --git a/vosk/test_files/batch_confirm_hf_optimized.py b/vosk/test_files/batch_confirm_hf_optimized.py new file mode 100644 index 0000000..ffe99b3 --- /dev/null +++ b/vosk/test_files/batch_confirm_hf_optimized.py @@ -0,0 +1,314 @@ +import asyncio +import aiohttp +import multiprocessing as mp +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +import soundfile as sf +import requests +import os +from tqdm import tqdm +import pandas as pd +import json +import pyarrow as pa +import pyarrow.parquet as pq +import numpy as np +from huggingface_hub import HfApi, create_repo +from datasets import load_dataset, Audio, Dataset +import time +from functools import partial +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Configuration for 192 cores +NUM_CORES = 192 +BATCH_SIZE = 32 # Increased batch size for better throughput +MAX_CONCURRENT_REQUESTS = 48 # 192/4 for optimal concurrency +CHUNK_SIZE = 1000 # Process data in chunks to manage memory + +# Load the dataset with audio decoding +print("Loading dataset...") +ds = load_dataset( + "Ashegh-Sad-Warrior/Persian_Common_Voice_17_0", + split="validated", + streaming=False +).cast_column("audio", Audio(sampling_rate=16000)) + +output_dir = "confirmed_dataset" +os.makedirs(output_dir, exist_ok=True) + +API_URL = "http://localhost:5000/batch_confirm" + +# Hugging Face configuration +HF_DATASET_NAME = "dpr2000/persian-cv17-confirmed" +HF_PRIVATE = True + +def save_flac(audio_array, path): + """Save audio array as FLAC file""" + sf.write(path, audio_array, 16000, format="FLAC") + +def process_audio_chunk(audio_data): + """Process a single audio item - designed for multiprocessing""" + audio, sentence = audio_data + flac_path = f"temp_{hash(audio.tobytes())}.flac" + save_flac(audio["array"], flac_path) + return { + 'flac_path': flac_path, + 'sentence': sentence, + 'audio_array': audio["array"] + } + +async def send_batch_request(session, batch_data, batch_id): + """Send a single batch request asynchronously""" + files = {} + references = [] + temp_flacs = [] + audio_arrays = [] + + for j, item in enumerate(batch_data): + files[f"audio{j}"] = open(item['flac_path'], "rb") + references.append(item['sentence']) + temp_flacs.append(item['flac_path']) + audio_arrays.append(item['audio_array']) + + data = {"references": json.dumps(references)} + + try: + async with session.post(API_URL, data=data, files=files, timeout=aiohttp.ClientTimeout(total=120)) as response: + if response.status == 200: + resp_json = await response.json() + if "results" in resp_json: + results = resp_json["results"] + else: + logger.warning(f"Batch {batch_id} failed: 'results' key missing") + results = [None] * len(references) + else: + logger.error(f"Batch {batch_id} failed: HTTP {response.status}") + results = [None] * len(references) + except Exception as e: + logger.error(f"Batch {batch_id} failed: {e}") + results = [None] * len(references) + finally: + # Clean up files + for f in files.values(): + f.close() + for flac_path in temp_flacs: + try: + os.remove(flac_path) + except: + pass + + # Process results + confirmed_items = [] + for j, result in enumerate(results): + if result and result.get("confirmed"): + confirmed_items.append({ + "audio": audio_arrays[j], + "transcription": references[j] + }) + + return confirmed_items + +async def process_dataset_async(): + """Main async processing function""" + confirmed = [] + + # Prepare all audio data first using multiprocessing + print("Preparing audio data with multiprocessing...") + audio_data = [(ds[i]["audio"], ds[i]["sentence"]) for i in range(len(ds))] + + # Use ProcessPoolExecutor for CPU-intensive audio processing + with ProcessPoolExecutor(max_workers=NUM_CORES) as executor: + processed_audio = list(tqdm( + executor.map(process_audio_chunk, audio_data), + total=len(audio_data), + desc="Processing audio files" + )) + + # Create batches + batches = [] + for i in range(0, len(processed_audio), BATCH_SIZE): + batch = processed_audio[i:i+BATCH_SIZE] + batches.append((batch, i // BATCH_SIZE)) + + print(f"Processing {len(batches)} batches with {MAX_CONCURRENT_REQUESTS} concurrent requests...") + + # Process batches asynchronously + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector(limit=MAX_CONCURRENT_REQUESTS), + timeout=aiohttp.ClientTimeout(total=300) + ) as session: + tasks = [] + for batch_data, batch_id in batches: + task = send_batch_request(session, batch_data, batch_id) + tasks.append(task) + + # Process in chunks to avoid overwhelming the system + chunk_size = MAX_CONCURRENT_REQUESTS + for i in range(0, len(tasks), chunk_size): + chunk_tasks = tasks[i:i+chunk_size] + results = await asyncio.gather(*chunk_tasks, return_exceptions=True) + + for result in results: + if isinstance(result, Exception): + logger.error(f"Task failed: {result}") + else: + confirmed.extend(result) + + print(f"Processed {min(i+chunk_size, len(tasks))}/{len(tasks)} batches, confirmed: {len(confirmed)}") + + return confirmed + +def save_confirmed_data_parallel(confirmed): + """Save confirmed data using parallel processing""" + if not confirmed: + print("โŒ No confirmed samples to save") + return + + print(f"\n๐Ÿ”„ Saving {len(confirmed)} confirmed samples...") + + def extract_minimal(example): + """Convert audio to int16 format""" + audio_float32 = np.array(example["audio"], dtype=np.float32) + audio_float32 = np.clip(audio_float32, -1.0, 1.0) + audio_int16 = (audio_float32 * 32767).astype(np.int16) + return { + "audio": audio_int16.tobytes(), + "text": example["transcription"] + } + + # Create dataset from confirmed samples + confirmed_dataset = Dataset.from_list(confirmed) + confirmed_dataset = confirmed_dataset.map( + extract_minimal, + remove_columns=confirmed_dataset.column_names, + num_proc=NUM_CORES # Use all cores for dataset processing + ) + + # Optimize sharding for parallel writing + num_shards = min(50, len(confirmed)) # More shards for better parallelization + shard_size = len(confirmed_dataset) // num_shards + 1 + + def write_shard(shard_info): + """Write a single shard - designed for multiprocessing""" + i, start, end = shard_info + if start >= len(confirmed_dataset): + return None + + shard = confirmed_dataset.select(range(start, end)) + table = pa.Table.from_pandas(shard.to_pandas()) + + shard_path = os.path.join(output_dir, f"confirmed_shard_{i:03}.parquet") + + pq.write_table( + table, + shard_path, + compression="zstd", + compression_level=22, + use_dictionary=True, + version="2.6" + ) + + return f"Shard {i+1}: {len(shard)} samples saved to {shard_path}" + + # Prepare shard information + shard_info = [] + for i in range(num_shards): + start = i * shard_size + end = min(len(confirmed_dataset), (i + 1) * shard_size) + shard_info.append((i, start, end)) + + # Write shards in parallel + print(f"Writing {num_shards} shards in parallel...") + with ProcessPoolExecutor(max_workers=NUM_CORES) as executor: + results = list(tqdm( + executor.map(write_shard, shard_info), + total=len(shard_info), + desc="Writing shards" + )) + + # Print results + for result in results: + if result: + print(f"๐Ÿ”น {result}") + + print(f"\nโœ… All confirmed data saved in {num_shards} shards in `{output_dir}/`") + + return num_shards + +async def upload_to_hf(num_shards): + """Upload to Hugging Face Hub""" + print(f"\n๐Ÿš€ Pushing dataset to Hugging Face Hub as '{HF_DATASET_NAME}'...") + try: + api = HfApi(token=os.getenv("HF_TOKEN")) + + # Create repository + try: + create_repo( + repo_id=HF_DATASET_NAME, + repo_type="dataset", + private=HF_PRIVATE, + exist_ok=True + ) + print(f"โœ… Repository '{HF_DATASET_NAME}' created/verified") + except Exception as e: + print(f"โš ๏ธ Repository creation failed: {e}") + return + + # Create dataset info + dataset_info = { + "dataset_name": HF_DATASET_NAME, + "description": "Persian Common Voice confirmed samples for Whisper fine-tuning", + "total_samples": len(confirmed), + "num_shards": num_shards, + "audio_format": "int16 PCM, 16kHz", + "columns": ["audio", "text"], + "source_dataset": "Ashegh-Sad-Warrior/Persian_Common_Voice_17_0", + "processing": "Vosk API batch confirmation (optimized for 192 cores)" + } + + info_path = os.path.join(output_dir, "dataset_info.json") + with open(info_path, 'w', encoding='utf-8') as f: + json.dump(dataset_info, f, indent=2, ensure_ascii=False) + + # Upload folder + api.upload_folder( + folder_path=output_dir, + repo_id=HF_DATASET_NAME, + repo_type="dataset", + ) + + print(f"๐ŸŽ‰ Dataset successfully pushed to: https://huggingface.co/datasets/{HF_DATASET_NAME}") + + except Exception as e: + print(f"โŒ Failed to push to Hugging Face: {e}") + +async def main(): + """Main function""" + start_time = time.time() + + print(f"๐Ÿš€ Starting optimized processing with {NUM_CORES} cores") + print(f"๐Ÿ“Š Dataset size: {len(ds)} samples") + print(f"โš™๏ธ Batch size: {BATCH_SIZE}") + print(f"๐Ÿ”„ Max concurrent requests: {MAX_CONCURRENT_REQUESTS}") + + # Process dataset + confirmed = await process_dataset_async() + + # Save data + num_shards = save_confirmed_data_parallel(confirmed) + + # Upload to HF + await upload_to_hf(num_shards) + + end_time = time.time() + print(f"\nโฑ๏ธ Total processing time: {end_time - start_time:.2f} seconds") + print(f"๐Ÿ“ˆ Processing rate: {len(ds) / (end_time - start_time):.2f} samples/second") + +if __name__ == "__main__": + # Set multiprocessing start method for better performance + mp.set_start_method('spawn', force=True) + + # Run the async main function + asyncio.run(main()) \ No newline at end of file diff --git a/vosk/test_files/monitor_performance.py b/vosk/test_files/monitor_performance.py new file mode 100644 index 0000000..0bb6658 --- /dev/null +++ b/vosk/test_files/monitor_performance.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +""" +Performance monitoring script for tracking CPU utilization during processing. +Run this in a separate terminal while your main processing script is running. +""" + +import psutil +import time +import matplotlib.pyplot as plt +import numpy as np +from datetime import datetime +import threading +import json +import os + +class PerformanceMonitor: + def __init__(self, log_file="performance_log.json"): + self.log_file = log_file + self.monitoring = False + self.data = { + 'timestamps': [], + 'cpu_percent': [], + 'memory_percent': [], + 'cpu_count': [], + 'load_average': [], + 'network_io': [], + 'disk_io': [] + } + + def start_monitoring(self): + """Start monitoring in a separate thread""" + self.monitoring = True + self.monitor_thread = threading.Thread(target=self._monitor_loop) + self.monitor_thread.daemon = True + self.monitor_thread.start() + print("๐Ÿš€ Performance monitoring started...") + + def stop_monitoring(self): + """Stop monitoring""" + self.monitoring = False + if hasattr(self, 'monitor_thread'): + self.monitor_thread.join() + print("โน๏ธ Performance monitoring stopped.") + + def _monitor_loop(self): + """Main monitoring loop""" + while self.monitoring: + try: + # CPU usage + cpu_percent = psutil.cpu_percent(interval=1, percpu=True) + cpu_avg = np.mean(cpu_percent) + + # Memory usage + memory = psutil.virtual_memory() + + # Load average + load_avg = psutil.getloadavg() + + # Network I/O + net_io = psutil.net_io_counters() + + # Disk I/O + disk_io = psutil.disk_io_counters() + + # Store data + timestamp = datetime.now().isoformat() + self.data['timestamps'].append(timestamp) + self.data['cpu_percent'].append(cpu_percent) + self.data['memory_percent'].append(memory.percent) + self.data['cpu_count'].append(len(cpu_percent)) + self.data['load_average'].append(load_avg) + self.data['network_io'].append({ + 'bytes_sent': net_io.bytes_sent, + 'bytes_recv': net_io.bytes_recv + }) + self.data['disk_io'].append({ + 'read_bytes': disk_io.read_bytes, + 'write_bytes': disk_io.write_bytes + }) + + # Print current stats + print(f"\r๐Ÿ“Š CPU: {cpu_avg:.1f}% | Memory: {memory.percent:.1f}% | Load: {load_avg[0]:.2f}", end='') + + except Exception as e: + print(f"\nโŒ Monitoring error: {e}") + + def save_data(self): + """Save monitoring data to file""" + with open(self.log_file, 'w') as f: + json.dump(self.data, f, indent=2) + print(f"\n๐Ÿ’พ Performance data saved to {self.log_file}") + + def plot_performance(self): + """Create performance plots""" + if not self.data['timestamps']: + print("โŒ No data to plot") + return + + # Convert timestamps to relative time + start_time = datetime.fromisoformat(self.data['timestamps'][0]) + relative_times = [(datetime.fromisoformat(ts) - start_time).total_seconds() + for ts in self.data['timestamps']] + + # Create subplots + fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10)) + + # CPU usage + cpu_data = np.array(self.data['cpu_percent']) + ax1.plot(relative_times, np.mean(cpu_data, axis=1), label='Average CPU %') + ax1.fill_between(relative_times, np.min(cpu_data, axis=1), np.max(cpu_data, axis=1), alpha=0.3) + ax1.set_title('CPU Utilization') + ax1.set_ylabel('CPU %') + ax1.grid(True) + ax1.legend() + + # Memory usage + ax2.plot(relative_times, self.data['memory_percent'], label='Memory %') + ax2.set_title('Memory Utilization') + ax2.set_ylabel('Memory %') + ax2.grid(True) + ax2.legend() + + # Load average + load_data = np.array(self.data['load_average']) + ax3.plot(relative_times, load_data[:, 0], label='1min') + ax3.plot(relative_times, load_data[:, 1], label='5min') + ax3.plot(relative_times, load_data[:, 2], label='15min') + ax3.set_title('System Load Average') + ax3.set_ylabel('Load') + ax3.grid(True) + ax3.legend() + + # Network I/O + net_data = self.data['network_io'] + bytes_sent = [d['bytes_sent'] for d in net_data] + bytes_recv = [d['bytes_recv'] for d in net_data] + ax4.plot(relative_times, bytes_sent, label='Bytes Sent') + ax4.plot(relative_times, bytes_recv, label='Bytes Received') + ax4.set_title('Network I/O') + ax4.set_ylabel('Bytes') + ax4.grid(True) + ax4.legend() + + plt.tight_layout() + plt.savefig('performance_plot.png', dpi=300, bbox_inches='tight') + print("๐Ÿ“ˆ Performance plot saved as 'performance_plot.png'") + + def print_summary(self): + """Print performance summary""" + if not self.data['timestamps']: + print("โŒ No data available") + return + + cpu_data = np.array(self.data['cpu_percent']) + memory_data = np.array(self.data['memory_percent']) + + print("\n" + "="*50) + print("๐Ÿ“Š PERFORMANCE SUMMARY") + print("="*50) + print(f"๐Ÿ“ˆ Monitoring duration: {len(self.data['timestamps'])} samples") + print(f"๐Ÿ–ฅ๏ธ CPU cores: {self.data['cpu_count'][0]}") + print(f"โšก Average CPU usage: {np.mean(cpu_data):.1f}%") + print(f"๐Ÿ”ฅ Peak CPU usage: {np.max(cpu_data):.1f}%") + print(f"๐Ÿ’พ Average memory usage: {np.mean(memory_data):.1f}%") + print(f"๐Ÿ“Š Peak memory usage: {np.max(memory_data):.1f}%") + + # Calculate CPU utilization per core + core_utilization = np.mean(cpu_data, axis=0) + print(f"\n๐Ÿ”ง Per-core CPU utilization:") + for i, util in enumerate(core_utilization): + print(f" Core {i+1:2d}: {util:5.1f}%") + + # Calculate efficiency + total_cpu_potential = len(core_utilization) * 100 + actual_cpu_usage = np.sum(core_utilization) + efficiency = (actual_cpu_usage / total_cpu_potential) * 100 + print(f"\n๐ŸŽฏ CPU Efficiency: {efficiency:.1f}%") + + if efficiency < 50: + print("โš ๏ธ Low CPU utilization detected!") + print("๐Ÿ’ก Consider:") + print(" - Increasing batch sizes") + print(" - Using more concurrent processes") + print(" - Optimizing I/O operations") + elif efficiency > 90: + print("โœ… Excellent CPU utilization!") + else: + print("๐Ÿ‘ Good CPU utilization") + +def main(): + """Main function""" + print("๐Ÿ” Performance Monitor for 192-core system") + print("Press Ctrl+C to stop monitoring and generate report") + + monitor = PerformanceMonitor() + + try: + monitor.start_monitoring() + + # Keep running until interrupted + while True: + time.sleep(1) + + except KeyboardInterrupt: + print("\n\nโน๏ธ Stopping monitoring...") + monitor.stop_monitoring() + + # Generate report + monitor.save_data() + monitor.plot_performance() + monitor.print_summary() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/vosk/test_files/requirements_optimized.txt b/vosk/test_files/requirements_optimized.txt new file mode 100644 index 0000000..76c956f --- /dev/null +++ b/vosk/test_files/requirements_optimized.txt @@ -0,0 +1,27 @@ +# Core dependencies +datasets>=2.14.0 +soundfile>=0.12.1 +requests>=2.31.0 +tqdm>=4.65.0 +pandas>=2.0.0 +pyarrow>=12.0.0 +numpy>=1.24.0 +huggingface_hub>=0.16.0 + +# Async and concurrent processing +aiohttp>=3.8.0 +asyncio-throttle>=1.0.0 + +# Performance monitoring +psutil>=5.9.0 +matplotlib>=3.7.0 + +# Vosk for transcription +vosk>=0.3.45 + +# Flask for API (if using Flask version) +flask>=2.3.0 + +# Additional optimizations +uvloop>=0.17.0 # Faster event loop for asyncio +orjson>=3.9.0 # Faster JSON processing \ No newline at end of file diff --git a/vosk/test_files/run_optimized_192cores.sh b/vosk/test_files/run_optimized_192cores.sh new file mode 100755 index 0000000..f4f2498 --- /dev/null +++ b/vosk/test_files/run_optimized_192cores.sh @@ -0,0 +1,100 @@ +#!/bin/bash + +# Optimized setup script for 192-core processing +# This script configures the system and runs the optimized processing pipeline + +set -e + +echo "๐Ÿš€ Setting up optimized processing for 192 cores..." + +# System optimizations +echo "โš™๏ธ Configuring system for high-performance processing..." + +# Increase file descriptor limits +echo "* Setting file descriptor limits..." +ulimit -n 65536 + +# Set process priority +echo "* Setting process priority..." +renice -n -10 $$ + +# Configure CPU governor for performance +echo "* Configuring CPU governor..." +if command -v cpupower &> /dev/null; then + sudo cpupower frequency-set -g performance +fi + +# Set environment variables for optimal performance +export PYTHONUNBUFFERED=1 +export PYTHONOPTIMIZE=2 +export OMP_NUM_THREADS=192 +export MKL_NUM_THREADS=192 +export OPENBLAS_NUM_THREADS=192 +export VECLIB_MAXIMUM_THREADS=192 +export NUMEXPR_NUM_THREADS=192 + +# Install optimized dependencies +echo "๐Ÿ“ฆ Installing optimized dependencies..." +pip install -r requirements_optimized.txt + +# Check if Vosk service is running +echo "๐Ÿ” Checking Vosk service status..." +if ! curl -s http://localhost:5000/ > /dev/null; then + echo "โš ๏ธ Vosk service not running. Starting optimized service..." + + # Start optimized Vosk service + cd ../vosk_service + export USE_ASYNC=true + python app_optimized.py & + VOSK_PID=$! + echo "โœ… Vosk service started with PID: $VOSK_PID" + + # Wait for service to be ready + echo "โณ Waiting for service to be ready..." + for i in {1..30}; do + if curl -s http://localhost:5000/ > /dev/null; then + echo "โœ… Service is ready!" + break + fi + sleep 1 + done +else + echo "โœ… Vosk service is already running" +fi + +# Start performance monitoring in background +echo "๐Ÿ“Š Starting performance monitoring..." +python monitor_performance.py & +MONITOR_PID=$! +echo "โœ… Performance monitor started with PID: $MONITOR_PID" + +# Function to cleanup on exit +cleanup() { + echo "๐Ÿงน Cleaning up..." + if [ ! -z "$VOSK_PID" ]; then + kill $VOSK_PID 2>/dev/null || true + fi + if [ ! -z "$MONITOR_PID" ]; then + kill $MONITOR_PID 2>/dev/null || true + fi + echo "โœ… Cleanup complete" +} + +# Set trap to cleanup on script exit +trap cleanup EXIT + +# Run the optimized processing +echo "๐ŸŽฏ Starting optimized processing with 192 cores..." +echo "๐Ÿ“Š Configuration:" +echo " - CPU cores: 192" +echo " - Batch size: 32" +echo " - Max concurrent requests: 48" +echo " - Process pool workers: 192" +echo "" + +# Run the optimized script +python batch_confirm_hf_optimized.py + +echo "โœ… Processing complete!" +echo "๐Ÿ“ˆ Check performance_plot.png for detailed performance analysis" +echo "๐Ÿ“Š Check performance_log.json for raw performance data" \ No newline at end of file diff --git a/vosk/vosk_service/app_optimized.py b/vosk/vosk_service/app_optimized.py new file mode 100644 index 0000000..f2ff3ac --- /dev/null +++ b/vosk/vosk_service/app_optimized.py @@ -0,0 +1,271 @@ +from flask import Flask, request, jsonify +from vosk import Model, KaldiRecognizer +import soundfile as sf +import io +import os +import json +import numpy as np +from multiprocessing import Process, Queue, Pool, cpu_count +import difflib +import asyncio +import aiohttp +from aiohttp import web +import logging +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +import time + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Configuration for high-performance processing +NUM_WORKERS = 192 # Use all available cores +BATCH_SIZE = 32 +MAX_CONCURRENT_PROCESSES = 48 + +MODEL_PATH = "/app/model" + +# Global model instance (shared across processes) +model = None + +def load_model(): + """Load the Vosk model""" + global model + print(f"Checking for model at: {MODEL_PATH}") + if os.path.exists(MODEL_PATH): + print(f"Model directory exists at {MODEL_PATH}") + print(f"Contents: {os.listdir(MODEL_PATH)}") + try: + model = Model(MODEL_PATH) + print("Model loaded successfully!") + return model + except Exception as e: + print(f"Error loading model: {e}") + raise RuntimeError(f"Failed to load Vosk model: {e}") + else: + print(f"Model directory not found at {MODEL_PATH}") + raise RuntimeError(f"Vosk model not found at {MODEL_PATH}. Please download and mount a model.") + +def similarity(a, b): + """Calculate similarity between two strings""" + return difflib.SequenceMatcher(None, a, b).ratio() + +def confirm_voice_process(args): + """Process a single audio file in a separate process""" + audio_bytes, reference_text, samplerate = args + + try: + data, _ = sf.read(io.BytesIO(audio_bytes)) + if len(data.shape) > 1: + data = data[:, 0] + if data.dtype != np.int16: + data = (data * 32767).astype(np.int16) + + # Create recognizer in this process + local_model = Model(MODEL_PATH) + recognizer = KaldiRecognizer(local_model, samplerate) + recognizer.AcceptWaveform(data.tobytes()) + result = recognizer.Result() + text = json.loads(result).get('text', '') + sim = similarity(text, reference_text) + + return { + 'transcription': text, + 'similarity': sim, + 'confirmed': sim > 0.2 + } + except Exception as e: + logger.error(f"Error processing audio: {e}") + return { + 'transcription': '', + 'similarity': 0.0, + 'confirmed': False + } + +def process_batch_parallel(audio_files, references): + """Process a batch of audio files using parallel processing""" + # Prepare data for parallel processing + samplerates = [] + for audio_bytes in audio_files: + data, samplerate = sf.read(io.BytesIO(audio_bytes)) + samplerates.append(samplerate) + + # Prepare arguments for parallel processing + process_args = [ + (audio_bytes, reference_text, samplerate) + for audio_bytes, reference_text, samplerate in zip(audio_files, references, samplerates) + ] + + # Use ProcessPoolExecutor for parallel processing + with ProcessPoolExecutor(max_workers=MAX_CONCURRENT_PROCESSES) as executor: + results = list(executor.map(confirm_voice_process, process_args)) + + return results + +# Flask app for backward compatibility +app = Flask(__name__) + +@app.route('/', methods=['GET']) +def health_check(): + return jsonify({'status': 'ok', 'service': 'vosk-transcription-api', 'model': 'persian'}) + +@app.route('/batch_confirm', methods=['POST']) +def batch_confirm(): + """Handle batch confirmation requests""" + start_time = time.time() + + # Parse request + references = request.form.get('references') + if not references: + return jsonify({'error': 'Missing references'}), 400 + try: + references = json.loads(references) + except Exception: + return jsonify({'error': 'Invalid references JSON'}), 400 + + # Get audio files + audio_files = [] + for i in range(len(references)): + audio_file = request.files.get(f'audio{i}') + if not audio_file: + return jsonify({'error': f'Missing audio file audio{i}'}), 400 + audio_files.append(audio_file.read()) + + # Process batch in parallel + results = process_batch_parallel(audio_files, references) + + processing_time = time.time() - start_time + logger.info(f"Processed batch of {len(results)} files in {processing_time:.2f}s") + + return jsonify({'results': results}) + +@app.route('/transcribe', methods=['POST']) +def transcribe(): + """Handle single transcription request""" + if 'audio' not in request.files: + return jsonify({'error': 'No audio file provided'}), 400 + + audio_file = request.files['audio'] + audio_bytes = audio_file.read() + + try: + data, samplerate = sf.read(io.BytesIO(audio_bytes)) + if len(data.shape) > 1: + data = data[:, 0] + if data.dtype != np.int16: + data = (data * 32767).astype(np.int16) + + recognizer = KaldiRecognizer(model, samplerate) + recognizer.AcceptWaveform(data.tobytes()) + result = recognizer.Result() + text = json.loads(result).get('text', '') + + return jsonify({'transcription': text}) + except Exception as e: + logger.error(f"Error in transcription: {e}") + return jsonify({'error': str(e)}), 500 + +# Async version using aiohttp for better performance +async def async_batch_confirm(request): + """Async version of batch confirmation""" + start_time = time.time() + + # Parse multipart data + data = await request.post() + + # Get references + references_text = data.get('references') + if not references_text: + return web.json_response({'error': 'Missing references'}, status=400) + + try: + references = json.loads(references_text) + except Exception: + return web.json_response({'error': 'Invalid references JSON'}, status=400) + + # Get audio files + audio_files = [] + for i in range(len(references)): + audio_file = data.get(f'audio{i}') + if not audio_file: + return web.json_response({'error': f'Missing audio file audio{i}'}, status=400) + + audio_bytes = await audio_file.read() + audio_files.append(audio_bytes) + + # Process in thread pool to avoid blocking + loop = asyncio.get_event_loop() + with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_PROCESSES) as executor: + results = await loop.run_in_executor( + executor, + process_batch_parallel, + audio_files, + references + ) + + processing_time = time.time() - start_time + logger.info(f"Async processed batch of {len(results)} files in {processing_time:.2f}s") + + return web.json_response({'results': results}) + +async def async_transcribe(request): + """Async version of single transcription""" + data = await request.post() + + if 'audio' not in data: + return web.json_response({'error': 'No audio file provided'}, status=400) + + audio_file = data['audio'] + audio_bytes = await audio_file.read() + + try: + data, samplerate = sf.read(io.BytesIO(audio_bytes)) + if len(data.shape) > 1: + data = data[:, 0] + if data.dtype != np.int16: + data = (data * 32767).astype(np.int16) + + recognizer = KaldiRecognizer(model, samplerate) + recognizer.AcceptWaveform(data.tobytes()) + result = recognizer.Result() + text = json.loads(result).get('text', '') + + return web.json_response({'transcription': text}) + except Exception as e: + logger.error(f"Error in async transcription: {e}") + return web.json_response({'error': str(e)}, status=500) + +async def health_check_async(request): + """Async health check""" + return web.json_response({ + 'status': 'ok', + 'service': 'vosk-transcription-api-async', + 'model': 'persian', + 'workers': MAX_CONCURRENT_PROCESSES + }) + +def create_async_app(): + """Create async aiohttp app""" + app = web.Application() + + # Add routes + app.router.add_get('/', health_check_async) + app.router.add_post('/batch_confirm', async_batch_confirm) + app.router.add_post('/transcribe', async_transcribe) + + return app + +if __name__ == '__main__': + # Load model + load_model() + + # Choose between Flask and aiohttp based on environment + use_async = os.getenv('USE_ASYNC', 'false').lower() == 'true' + + if use_async: + # Run async version + app = create_async_app() + web.run_app(app, host='0.0.0.0', port=5000) + else: + # Run Flask version + app.run(host='0.0.0.0', port=5000, threaded=True, processes=4) \ No newline at end of file