import asyncio import aiohttp import multiprocessing as mp from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import soundfile as sf import requests import os from tqdm import tqdm import pandas as pd import json import pyarrow as pa import pyarrow.parquet as pq import numpy as np from huggingface_hub import HfApi, create_repo from datasets import load_dataset, Audio, Dataset import time from functools import partial import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Configuration for 192 cores NUM_CORES = 192 BATCH_SIZE = 32 # Increased batch size for better throughput MAX_CONCURRENT_REQUESTS = 48 # 192/4 for optimal concurrency CHUNK_SIZE = 1000 # Process data in chunks to manage memory # Load the dataset without audio decoding first print("Loading dataset...") ds = load_dataset( "Ashegh-Sad-Warrior/Persian_Common_Voice_17_0", split="validated", streaming=False ) # Now cast to audio after loading print("Casting to audio...") ds = ds.cast_column("audio", Audio(sampling_rate=16000)) output_dir = "confirmed_dataset" os.makedirs(output_dir, exist_ok=True) API_URL = "http://localhost:5000/batch_confirm" # Hugging Face configuration HF_DATASET_NAME = "dpr2000/persian-cv17-confirmed" HF_PRIVATE = True def save_flac(audio_array, path): """Save audio array as FLAC file""" sf.write(path, audio_array, 16000, format="FLAC") def process_audio_chunk(audio_data): """Process a single audio item - designed for multiprocessing""" audio, sentence = audio_data flac_path = f"temp_{hash(audio.tobytes())}.flac" save_flac(audio["array"], flac_path) return { 'flac_path': flac_path, 'sentence': sentence, 'audio_array': audio["array"] } async def send_batch_request(session, batch_data, batch_id): """Send a single batch request asynchronously""" files = {} references = [] temp_flacs = [] audio_arrays = [] for j, item in enumerate(batch_data): files[f"audio{j}"] = open(item['flac_path'], "rb") references.append(item['sentence']) temp_flacs.append(item['flac_path']) audio_arrays.append(item['audio_array']) data = {"references": json.dumps(references)} try: async with session.post(API_URL, data=data, files=files, timeout=aiohttp.ClientTimeout(total=120)) as response: if response.status == 200: resp_json = await response.json() if "results" in resp_json: results = resp_json["results"] else: logger.warning(f"Batch {batch_id} failed: 'results' key missing") results = [None] * len(references) else: logger.error(f"Batch {batch_id} failed: HTTP {response.status}") results = [None] * len(references) except Exception as e: logger.error(f"Batch {batch_id} failed: {e}") results = [None] * len(references) finally: # Clean up files for f in files.values(): f.close() for flac_path in temp_flacs: try: os.remove(flac_path) except: pass # Process results confirmed_items = [] for j, result in enumerate(results): if result and result.get("confirmed"): confirmed_items.append({ "audio": audio_arrays[j], "transcription": references[j] }) return confirmed_items async def process_dataset_async(): """Main async processing function""" confirmed = [] # Prepare all audio data first using multiprocessing print("Preparing audio data with multiprocessing...") audio_data = [(ds[i]["audio"], ds[i]["sentence"]) for i in range(len(ds))] # Use ProcessPoolExecutor for CPU-intensive audio processing with ProcessPoolExecutor(max_workers=NUM_CORES) as executor: processed_audio = list(tqdm( executor.map(process_audio_chunk, audio_data), total=len(audio_data), desc="Processing audio files" )) # Create batches batches = [] for i in range(0, len(processed_audio), BATCH_SIZE): batch = processed_audio[i:i+BATCH_SIZE] batches.append((batch, i // BATCH_SIZE)) print(f"Processing {len(batches)} batches with {MAX_CONCURRENT_REQUESTS} concurrent requests...") # Process batches asynchronously async with aiohttp.ClientSession( connector=aiohttp.TCPConnector(limit=MAX_CONCURRENT_REQUESTS), timeout=aiohttp.ClientTimeout(total=300) ) as session: tasks = [] for batch_data, batch_id in batches: task = send_batch_request(session, batch_data, batch_id) tasks.append(task) # Process in chunks to avoid overwhelming the system chunk_size = MAX_CONCURRENT_REQUESTS for i in range(0, len(tasks), chunk_size): chunk_tasks = tasks[i:i+chunk_size] results = await asyncio.gather(*chunk_tasks, return_exceptions=True) for result in results: if isinstance(result, Exception): logger.error(f"Task failed: {result}") else: confirmed.extend(result) print(f"Processed {min(i+chunk_size, len(tasks))}/{len(tasks)} batches, confirmed: {len(confirmed)}") return confirmed def save_confirmed_data_parallel(confirmed): """Save confirmed data using parallel processing""" if not confirmed: print("āŒ No confirmed samples to save") return print(f"\nšŸ”„ Saving {len(confirmed)} confirmed samples...") def extract_minimal(example): """Convert audio to int16 format""" audio_float32 = np.array(example["audio"], dtype=np.float32) audio_float32 = np.clip(audio_float32, -1.0, 1.0) audio_int16 = (audio_float32 * 32767).astype(np.int16) return { "audio": audio_int16.tobytes(), "text": example["transcription"] } # Create dataset from confirmed samples confirmed_dataset = Dataset.from_list(confirmed) confirmed_dataset = confirmed_dataset.map( extract_minimal, remove_columns=confirmed_dataset.column_names, num_proc=NUM_CORES # Use all cores for dataset processing ) # Optimize sharding for parallel writing num_shards = min(50, len(confirmed)) # More shards for better parallelization shard_size = len(confirmed_dataset) // num_shards + 1 def write_shard(shard_info): """Write a single shard - designed for multiprocessing""" i, start, end = shard_info if start >= len(confirmed_dataset): return None shard = confirmed_dataset.select(range(start, end)) table = pa.Table.from_pandas(shard.to_pandas()) shard_path = os.path.join(output_dir, f"confirmed_shard_{i:03}.parquet") pq.write_table( table, shard_path, compression="zstd", compression_level=22, use_dictionary=True, version="2.6" ) return f"Shard {i+1}: {len(shard)} samples saved to {shard_path}" # Prepare shard information shard_info = [] for i in range(num_shards): start = i * shard_size end = min(len(confirmed_dataset), (i + 1) * shard_size) shard_info.append((i, start, end)) # Write shards in parallel print(f"Writing {num_shards} shards in parallel...") with ProcessPoolExecutor(max_workers=NUM_CORES) as executor: results = list(tqdm( executor.map(write_shard, shard_info), total=len(shard_info), desc="Writing shards" )) # Print results for result in results: if result: print(f"šŸ”¹ {result}") print(f"\nāœ… All confirmed data saved in {num_shards} shards in `{output_dir}/`") return num_shards async def upload_to_hf(num_shards): """Upload to Hugging Face Hub""" print(f"\nšŸš€ Pushing dataset to Hugging Face Hub as '{HF_DATASET_NAME}'...") try: api = HfApi(token=os.getenv("HF_TOKEN")) # Create repository try: create_repo( repo_id=HF_DATASET_NAME, repo_type="dataset", private=HF_PRIVATE, exist_ok=True ) print(f"āœ… Repository '{HF_DATASET_NAME}' created/verified") except Exception as e: print(f"āš ļø Repository creation failed: {e}") return # Create dataset info dataset_info = { "dataset_name": HF_DATASET_NAME, "description": "Persian Common Voice confirmed samples for Whisper fine-tuning", "total_samples": len(confirmed), "num_shards": num_shards, "audio_format": "int16 PCM, 16kHz", "columns": ["audio", "text"], "source_dataset": "Ashegh-Sad-Warrior/Persian_Common_Voice_17_0", "processing": "Vosk API batch confirmation (optimized for 192 cores)" } info_path = os.path.join(output_dir, "dataset_info.json") with open(info_path, 'w', encoding='utf-8') as f: json.dump(dataset_info, f, indent=2, ensure_ascii=False) # Upload folder api.upload_folder( folder_path=output_dir, repo_id=HF_DATASET_NAME, repo_type="dataset", ) print(f"šŸŽ‰ Dataset successfully pushed to: https://huggingface.co/datasets/{HF_DATASET_NAME}") except Exception as e: print(f"āŒ Failed to push to Hugging Face: {e}") async def main(): """Main function""" start_time = time.time() print(f"šŸš€ Starting optimized processing with {NUM_CORES} cores") print(f"šŸ“Š Dataset size: {len(ds)} samples") print(f"āš™ļø Batch size: {BATCH_SIZE}") print(f"šŸ”„ Max concurrent requests: {MAX_CONCURRENT_REQUESTS}") # Process dataset confirmed = await process_dataset_async() # Save data num_shards = save_confirmed_data_parallel(confirmed) # Upload to HF await upload_to_hf(num_shards) end_time = time.time() print(f"\nā±ļø Total processing time: {end_time - start_time:.2f} seconds") print(f"šŸ“ˆ Processing rate: {len(ds) / (end_time - start_time):.2f} samples/second") if __name__ == "__main__": # Set multiprocessing start method for better performance mp.set_start_method('spawn', force=True) # Run the async main function asyncio.run(main())