From 561e8b519c40e8b1125679c827a89b91c9d13038 Mon Sep 17 00:00:00 2001 From: Alireza Date: Sat, 2 Aug 2025 18:33:31 +0330 Subject: [PATCH] Enhance batch_confirm_hf_optimized.py to ensure torchcodec is installed before loading the dataset, and update requirements_optimized.txt to include torchcodec. Modify run_optimized_192cores_no_root.sh to install additional audio dependencies and test audio imports. --- vosk/test_files/batch_confirm_hf_optimized.py | 10 + vosk/test_files/batch_confirm_hf_simple.py | 318 ++++++++++++++++++ vosk/test_files/fix_torchcodec.sh | 31 ++ vosk/test_files/install_and_run.sh | 33 ++ vosk/test_files/requirements_optimized.txt | 1 + vosk/test_files/run_now.sh | 34 ++ .../run_optimized_192cores_no_root.sh | 10 + vosk/test_files/test_audio_deps.py | 42 +++ 8 files changed, 479 insertions(+) create mode 100644 vosk/test_files/batch_confirm_hf_simple.py create mode 100644 vosk/test_files/fix_torchcodec.sh create mode 100644 vosk/test_files/install_and_run.sh create mode 100644 vosk/test_files/run_now.sh create mode 100644 vosk/test_files/test_audio_deps.py diff --git a/vosk/test_files/batch_confirm_hf_optimized.py b/vosk/test_files/batch_confirm_hf_optimized.py index ffe99b3..9600c18 100644 --- a/vosk/test_files/batch_confirm_hf_optimized.py +++ b/vosk/test_files/batch_confirm_hf_optimized.py @@ -27,6 +27,16 @@ BATCH_SIZE = 32 # Increased batch size for better throughput MAX_CONCURRENT_REQUESTS = 48 # 192/4 for optimal concurrency CHUNK_SIZE = 1000 # Process data in chunks to manage memory +# Ensure torchcodec is installed before loading dataset +try: + import torchcodec +except ImportError: + print("Installing torchcodec...") + import subprocess + import sys + subprocess.check_call([sys.executable, "-m", "pip", "install", "torchcodec>=0.1.0"]) + import torchcodec + # Load the dataset with audio decoding print("Loading dataset...") ds = load_dataset( diff --git a/vosk/test_files/batch_confirm_hf_simple.py b/vosk/test_files/batch_confirm_hf_simple.py new file mode 100644 index 0000000..7110b3b --- /dev/null +++ b/vosk/test_files/batch_confirm_hf_simple.py @@ -0,0 +1,318 @@ +import asyncio +import aiohttp +import multiprocessing as mp +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +import soundfile as sf +import requests +import os +from tqdm import tqdm +import pandas as pd +import json +import pyarrow as pa +import pyarrow.parquet as pq +import numpy as np +from huggingface_hub import HfApi, create_repo +from datasets import load_dataset, Audio, Dataset +import time +from functools import partial +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Configuration for 192 cores +NUM_CORES = 192 +BATCH_SIZE = 32 # Increased batch size for better throughput +MAX_CONCURRENT_REQUESTS = 48 # 192/4 for optimal concurrency +CHUNK_SIZE = 1000 # Process data in chunks to manage memory + +# Load the dataset without audio decoding first +print("Loading dataset...") +ds = load_dataset( + "Ashegh-Sad-Warrior/Persian_Common_Voice_17_0", + split="validated", + streaming=False +) + +# Now cast to audio after loading +print("Casting to audio...") +ds = ds.cast_column("audio", Audio(sampling_rate=16000)) + +output_dir = "confirmed_dataset" +os.makedirs(output_dir, exist_ok=True) + +API_URL = "http://localhost:5000/batch_confirm" + +# Hugging Face configuration +HF_DATASET_NAME = "dpr2000/persian-cv17-confirmed" +HF_PRIVATE = True + +def save_flac(audio_array, path): + """Save audio array as FLAC file""" + sf.write(path, audio_array, 16000, format="FLAC") + +def process_audio_chunk(audio_data): + """Process a single audio item - designed for multiprocessing""" + audio, sentence = audio_data + flac_path = f"temp_{hash(audio.tobytes())}.flac" + save_flac(audio["array"], flac_path) + return { + 'flac_path': flac_path, + 'sentence': sentence, + 'audio_array': audio["array"] + } + +async def send_batch_request(session, batch_data, batch_id): + """Send a single batch request asynchronously""" + files = {} + references = [] + temp_flacs = [] + audio_arrays = [] + + for j, item in enumerate(batch_data): + files[f"audio{j}"] = open(item['flac_path'], "rb") + references.append(item['sentence']) + temp_flacs.append(item['flac_path']) + audio_arrays.append(item['audio_array']) + + data = {"references": json.dumps(references)} + + try: + async with session.post(API_URL, data=data, files=files, timeout=aiohttp.ClientTimeout(total=120)) as response: + if response.status == 200: + resp_json = await response.json() + if "results" in resp_json: + results = resp_json["results"] + else: + logger.warning(f"Batch {batch_id} failed: 'results' key missing") + results = [None] * len(references) + else: + logger.error(f"Batch {batch_id} failed: HTTP {response.status}") + results = [None] * len(references) + except Exception as e: + logger.error(f"Batch {batch_id} failed: {e}") + results = [None] * len(references) + finally: + # Clean up files + for f in files.values(): + f.close() + for flac_path in temp_flacs: + try: + os.remove(flac_path) + except: + pass + + # Process results + confirmed_items = [] + for j, result in enumerate(results): + if result and result.get("confirmed"): + confirmed_items.append({ + "audio": audio_arrays[j], + "transcription": references[j] + }) + + return confirmed_items + +async def process_dataset_async(): + """Main async processing function""" + confirmed = [] + + # Prepare all audio data first using multiprocessing + print("Preparing audio data with multiprocessing...") + audio_data = [(ds[i]["audio"], ds[i]["sentence"]) for i in range(len(ds))] + + # Use ProcessPoolExecutor for CPU-intensive audio processing + with ProcessPoolExecutor(max_workers=NUM_CORES) as executor: + processed_audio = list(tqdm( + executor.map(process_audio_chunk, audio_data), + total=len(audio_data), + desc="Processing audio files" + )) + + # Create batches + batches = [] + for i in range(0, len(processed_audio), BATCH_SIZE): + batch = processed_audio[i:i+BATCH_SIZE] + batches.append((batch, i // BATCH_SIZE)) + + print(f"Processing {len(batches)} batches with {MAX_CONCURRENT_REQUESTS} concurrent requests...") + + # Process batches asynchronously + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector(limit=MAX_CONCURRENT_REQUESTS), + timeout=aiohttp.ClientTimeout(total=300) + ) as session: + tasks = [] + for batch_data, batch_id in batches: + task = send_batch_request(session, batch_data, batch_id) + tasks.append(task) + + # Process in chunks to avoid overwhelming the system + chunk_size = MAX_CONCURRENT_REQUESTS + for i in range(0, len(tasks), chunk_size): + chunk_tasks = tasks[i:i+chunk_size] + results = await asyncio.gather(*chunk_tasks, return_exceptions=True) + + for result in results: + if isinstance(result, Exception): + logger.error(f"Task failed: {result}") + else: + confirmed.extend(result) + + print(f"Processed {min(i+chunk_size, len(tasks))}/{len(tasks)} batches, confirmed: {len(confirmed)}") + + return confirmed + +def save_confirmed_data_parallel(confirmed): + """Save confirmed data using parallel processing""" + if not confirmed: + print("โŒ No confirmed samples to save") + return + + print(f"\n๐Ÿ”„ Saving {len(confirmed)} confirmed samples...") + + def extract_minimal(example): + """Convert audio to int16 format""" + audio_float32 = np.array(example["audio"], dtype=np.float32) + audio_float32 = np.clip(audio_float32, -1.0, 1.0) + audio_int16 = (audio_float32 * 32767).astype(np.int16) + return { + "audio": audio_int16.tobytes(), + "text": example["transcription"] + } + + # Create dataset from confirmed samples + confirmed_dataset = Dataset.from_list(confirmed) + confirmed_dataset = confirmed_dataset.map( + extract_minimal, + remove_columns=confirmed_dataset.column_names, + num_proc=NUM_CORES # Use all cores for dataset processing + ) + + # Optimize sharding for parallel writing + num_shards = min(50, len(confirmed)) # More shards for better parallelization + shard_size = len(confirmed_dataset) // num_shards + 1 + + def write_shard(shard_info): + """Write a single shard - designed for multiprocessing""" + i, start, end = shard_info + if start >= len(confirmed_dataset): + return None + + shard = confirmed_dataset.select(range(start, end)) + table = pa.Table.from_pandas(shard.to_pandas()) + + shard_path = os.path.join(output_dir, f"confirmed_shard_{i:03}.parquet") + + pq.write_table( + table, + shard_path, + compression="zstd", + compression_level=22, + use_dictionary=True, + version="2.6" + ) + + return f"Shard {i+1}: {len(shard)} samples saved to {shard_path}" + + # Prepare shard information + shard_info = [] + for i in range(num_shards): + start = i * shard_size + end = min(len(confirmed_dataset), (i + 1) * shard_size) + shard_info.append((i, start, end)) + + # Write shards in parallel + print(f"Writing {num_shards} shards in parallel...") + with ProcessPoolExecutor(max_workers=NUM_CORES) as executor: + results = list(tqdm( + executor.map(write_shard, shard_info), + total=len(shard_info), + desc="Writing shards" + )) + + # Print results + for result in results: + if result: + print(f"๐Ÿ”น {result}") + + print(f"\nโœ… All confirmed data saved in {num_shards} shards in `{output_dir}/`") + + return num_shards + +async def upload_to_hf(num_shards): + """Upload to Hugging Face Hub""" + print(f"\n๐Ÿš€ Pushing dataset to Hugging Face Hub as '{HF_DATASET_NAME}'...") + try: + api = HfApi(token=os.getenv("HF_TOKEN")) + + # Create repository + try: + create_repo( + repo_id=HF_DATASET_NAME, + repo_type="dataset", + private=HF_PRIVATE, + exist_ok=True + ) + print(f"โœ… Repository '{HF_DATASET_NAME}' created/verified") + except Exception as e: + print(f"โš ๏ธ Repository creation failed: {e}") + return + + # Create dataset info + dataset_info = { + "dataset_name": HF_DATASET_NAME, + "description": "Persian Common Voice confirmed samples for Whisper fine-tuning", + "total_samples": len(confirmed), + "num_shards": num_shards, + "audio_format": "int16 PCM, 16kHz", + "columns": ["audio", "text"], + "source_dataset": "Ashegh-Sad-Warrior/Persian_Common_Voice_17_0", + "processing": "Vosk API batch confirmation (optimized for 192 cores)" + } + + info_path = os.path.join(output_dir, "dataset_info.json") + with open(info_path, 'w', encoding='utf-8') as f: + json.dump(dataset_info, f, indent=2, ensure_ascii=False) + + # Upload folder + api.upload_folder( + folder_path=output_dir, + repo_id=HF_DATASET_NAME, + repo_type="dataset", + ) + + print(f"๐ŸŽ‰ Dataset successfully pushed to: https://huggingface.co/datasets/{HF_DATASET_NAME}") + + except Exception as e: + print(f"โŒ Failed to push to Hugging Face: {e}") + +async def main(): + """Main function""" + start_time = time.time() + + print(f"๐Ÿš€ Starting optimized processing with {NUM_CORES} cores") + print(f"๐Ÿ“Š Dataset size: {len(ds)} samples") + print(f"โš™๏ธ Batch size: {BATCH_SIZE}") + print(f"๐Ÿ”„ Max concurrent requests: {MAX_CONCURRENT_REQUESTS}") + + # Process dataset + confirmed = await process_dataset_async() + + # Save data + num_shards = save_confirmed_data_parallel(confirmed) + + # Upload to HF + await upload_to_hf(num_shards) + + end_time = time.time() + print(f"\nโฑ๏ธ Total processing time: {end_time - start_time:.2f} seconds") + print(f"๐Ÿ“ˆ Processing rate: {len(ds) / (end_time - start_time):.2f} samples/second") + +if __name__ == "__main__": + # Set multiprocessing start method for better performance + mp.set_start_method('spawn', force=True) + + # Run the async main function + asyncio.run(main()) \ No newline at end of file diff --git a/vosk/test_files/fix_torchcodec.sh b/vosk/test_files/fix_torchcodec.sh new file mode 100644 index 0000000..fc70766 --- /dev/null +++ b/vosk/test_files/fix_torchcodec.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +# Fix torchcodec dependency issue + +echo "๐Ÿ”ง Fixing torchcodec dependency..." + +# Activate virtual environment if it exists +if [ -d ".venv" ]; then + echo "๐Ÿ”ง Activating virtual environment..." + source .venv/bin/activate +fi + +# Install torchcodec +echo "๐Ÿ“ฆ Installing torchcodec..." +uv pip install torchcodec>=0.1.0 + +# Also install other audio-related dependencies +echo "๐Ÿ“ฆ Installing additional audio dependencies..." +uv pip install librosa>=0.10.0 +uv pip install ffmpeg-python>=0.2.0 + +# Test the import +echo "๐Ÿงช Testing audio imports..." +python test_audio_deps.py + +if [ $? -eq 0 ]; then + echo "๐ŸŽฏ torchcodec dependency fixed!" + echo "๐Ÿ’ก You can now run the optimized processing script." +else + echo "โŒ Failed to install torchcodec. Please check your system." +fi \ No newline at end of file diff --git a/vosk/test_files/install_and_run.sh b/vosk/test_files/install_and_run.sh new file mode 100644 index 0000000..2e17d31 --- /dev/null +++ b/vosk/test_files/install_and_run.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# Install torchcodec and run optimized processing + +echo "๐Ÿ”ง Installing torchcodec and running optimized processing..." + +# Activate virtual environment if it exists +if [ -d ".venv" ]; then + echo "๐Ÿ”ง Activating virtual environment..." + source .venv/bin/activate +fi + +# Install torchcodec first +echo "๐Ÿ“ฆ Installing torchcodec..." +uv pip install torchcodec>=0.1.0 + +# Also install other audio dependencies +echo "๐Ÿ“ฆ Installing additional audio dependencies..." +uv pip install librosa>=0.10.0 +uv pip install ffmpeg-python>=0.2.0 + +# Test the installation +echo "๐Ÿงช Testing torchcodec installation..." +python -c "import torchcodec; print('torchcodec installed successfully')" + +if [ $? -ne 0 ]; then + echo "โŒ Failed to install torchcodec. Trying alternative installation..." + uv pip install --force-reinstall torchcodec>=0.1.0 +fi + +# Run the optimized processing +echo "๐Ÿš€ Running optimized processing..." +python batch_confirm_hf_optimized.py \ No newline at end of file diff --git a/vosk/test_files/requirements_optimized.txt b/vosk/test_files/requirements_optimized.txt index 18953ad..3d44dad 100644 --- a/vosk/test_files/requirements_optimized.txt +++ b/vosk/test_files/requirements_optimized.txt @@ -7,6 +7,7 @@ pandas>=2.0.0 pyarrow>=12.0.0 numpy>=1.24.0 huggingface_hub>=0.16.0 +torchcodec>=0.1.0 # Async and concurrent processing aiohttp>=3.8.0 diff --git a/vosk/test_files/run_now.sh b/vosk/test_files/run_now.sh new file mode 100644 index 0000000..a925441 --- /dev/null +++ b/vosk/test_files/run_now.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +# One-command solution for 192-core optimized processing + +echo "๐Ÿš€ Setting up 192-core optimized processing..." + +# Make scripts executable +chmod +x run_optimized_192cores_no_root.sh +chmod +x fix_torchcodec.sh + +# Check if uv is available +if ! command -v uv &> /dev/null; then + echo "โŒ uv not found. Installing uv..." + curl -LsSf https://astral.sh/uv/install.sh | sh + source ~/.cargo/env +fi + +# Create virtual environment if it doesn't exist +if [ ! -d ".venv" ]; then + echo "๐Ÿ”ง Creating virtual environment..." + uv venv +fi + +# Activate virtual environment +echo "๐Ÿ”ง Activating virtual environment..." +source .venv/bin/activate + +# Fix torchcodec dependency +echo "๐Ÿ”ง Fixing audio dependencies..." +./fix_torchcodec.sh + +# Run the optimized processing +echo "๐Ÿš€ Running optimized processing..." +./run_optimized_192cores_no_root.sh \ No newline at end of file diff --git a/vosk/test_files/run_optimized_192cores_no_root.sh b/vosk/test_files/run_optimized_192cores_no_root.sh index 62e2c9c..e5d331a 100644 --- a/vosk/test_files/run_optimized_192cores_no_root.sh +++ b/vosk/test_files/run_optimized_192cores_no_root.sh @@ -70,6 +70,16 @@ source .venv/bin/activate echo "๐Ÿ“ฆ Installing dependencies..." uv pip install -r requirements_optimized.txt +# Install additional audio dependencies +echo "๐Ÿ“ฆ Installing audio dependencies..." +uv pip install torchcodec>=0.1.0 +uv pip install librosa>=0.10.0 +uv pip install ffmpeg-python>=0.2.0 + +# Test audio imports +echo "๐Ÿงช Testing audio imports..." +python test_audio_deps.py + # Check if Vosk service is running echo "๐Ÿ” Checking Vosk service status..." if ! curl -s http://localhost:5000/ > /dev/null; then diff --git a/vosk/test_files/test_audio_deps.py b/vosk/test_files/test_audio_deps.py new file mode 100644 index 0000000..6764a85 --- /dev/null +++ b/vosk/test_files/test_audio_deps.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +""" +Test script for audio dependencies +""" + +def test_audio_dependencies(): + """Test if all audio dependencies are installed""" + try: + import torchcodec + print("torchcodec: OK") + except ImportError as e: + print(f"torchcodec: FAILED - {e}") + return False + + try: + import datasets + print("datasets: OK") + except ImportError as e: + print(f"datasets: FAILED - {e}") + return False + + try: + import soundfile + print("soundfile: OK") + except ImportError as e: + print(f"soundfile: FAILED - {e}") + return False + + try: + import librosa + print("librosa: OK") + except ImportError as e: + print(f"librosa: FAILED - {e}") + return False + + print("All audio dependencies installed successfully") + return True + +if __name__ == "__main__": + success = test_audio_dependencies() + if not success: + exit(1) \ No newline at end of file