213 lines
6.1 KiB
Python
213 lines
6.1 KiB
Python
from datasets import load_dataset, DatasetDict
|
|
|
|
common_voice = DatasetDict()
|
|
|
|
common_voice["train"] = load_dataset("Ashegh-Sad-Warrior/Persian_Common_Voice_17_0", split="validated[:20%]")
|
|
common_voice["test"] = load_dataset("Ashegh-Sad-Warrior/Persian_Common_Voice_17_0", split="validated[20%:23%]")
|
|
|
|
print(common_voice)
|
|
|
|
|
|
common_voice = common_voice.remove_columns(['client_id', 'path', 'up_votes', 'down_votes', 'age', 'gender', 'accent', 'locale', 'segment', 'variant'])
|
|
|
|
print(common_voice)
|
|
|
|
|
|
|
|
from transformers import WhisperFeatureExtractor
|
|
|
|
feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-small")
|
|
|
|
|
|
from transformers import WhisperTokenizer
|
|
|
|
tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small", language="fa", task="transcribe")
|
|
|
|
|
|
|
|
from transformers import WhisperProcessor
|
|
|
|
processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="fa", task="transcribe")
|
|
|
|
from parsivar import Normalizer,Tokenizer, SpellCheck
|
|
|
|
import numpy as np
|
|
import tqdm
|
|
normalizer = Normalizer()
|
|
spell_checker = SpellCheck()
|
|
vocab = np.array([])
|
|
for i in tqdm.tqdm(common_voice["train"]["sentence"]):
|
|
i = spell_checker.spell_corrector(normalizer.normalize(i))
|
|
vocab = np.append(vocab,Tokenizer().tokenize_words(i), axis=0)
|
|
for i in tqdm.tqdm(common_voice["test"]["sentence"]):
|
|
i = spell_checker.spell_corrector(normalizer.normalize(i))
|
|
vocab = np.append(vocab,Tokenizer().tokenize_words(i), axis=0)
|
|
vocab = np.unique(vocab)
|
|
print(vocab, vocab.shape)
|
|
|
|
|
|
import numpy as np
|
|
import tqdm
|
|
normalizer = Normalizer()
|
|
vocab = np.array([])
|
|
for i in tqdm.tqdm(common_voice["train"]["sentence"]):
|
|
i = normalizer.normalize(i)
|
|
vocab = np.append(vocab,Tokenizer().tokenize_words(i), axis=0)
|
|
for i in tqdm.tqdm(common_voice["test"]["sentence"]):
|
|
i = normalizer.normalize(i)
|
|
vocab = np.append(vocab,Tokenizer().tokenize_words(i), axis=0)
|
|
vocab = np.unique(vocab)
|
|
print(vocab, vocab.shape)
|
|
|
|
tokenizer.add_tokens(list(vocab))
|
|
|
|
processor.tokenizer = tokenizer
|
|
|
|
|
|
|
|
|
|
print(common_voice["train"][0])
|
|
|
|
|
|
from datasets import Audio
|
|
|
|
common_voice = common_voice.cast_column("audio", Audio(sampling_rate=16000))
|
|
|
|
|
|
|
|
print(common_voice["train"][0])
|
|
|
|
|
|
def prepare_dataset(batch):
|
|
# load and resample audio data from 48 to 16kHz
|
|
audio = batch["audio"]
|
|
|
|
# compute log-Mel input features from input audio array
|
|
batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]
|
|
|
|
# encode target text to label ids
|
|
batch["labels"] = tokenizer(batch["sentence"]).input_ids
|
|
return batch
|
|
|
|
|
|
common_voice = common_voice.map(prepare_dataset, remove_columns=common_voice.column_names["train"])
|
|
|
|
|
|
from transformers import WhisperForConditionalGeneration
|
|
|
|
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny")
|
|
|
|
model.generation_config.language = "fa"
|
|
model.generation_config.task = "transcribe"
|
|
|
|
model.generation_config.forced_decoder_ids = None
|
|
|
|
|
|
import torch
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Union
|
|
|
|
@dataclass
|
|
class DataCollatorSpeechSeq2SeqWithPadding:
|
|
processor: Any
|
|
decoder_start_token_id: int
|
|
|
|
def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
|
|
# split inputs and labels since they have to be of different lengths and need different padding methods
|
|
# first treat the audio inputs by simply returning torch tensors
|
|
input_features = [{"input_features": feature["input_features"]} for feature in features]
|
|
batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")
|
|
|
|
# get the tokenized label sequences
|
|
label_features = [{"input_ids": feature["labels"]} for feature in features]
|
|
# pad the labels to max length
|
|
labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")
|
|
|
|
# replace padding with -100 to ignore loss correctly
|
|
labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)
|
|
|
|
# if bos token is appended in previous tokenization step,
|
|
# cut bos token here as it's append later anyways
|
|
if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
|
|
labels = labels[:, 1:]
|
|
|
|
batch["labels"] = labels
|
|
|
|
return batch
|
|
|
|
|
|
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
|
|
processor=processor,
|
|
decoder_start_token_id=model.config.decoder_start_token_id,
|
|
)
|
|
|
|
|
|
import evaluate
|
|
|
|
metric = evaluate.load("wer")
|
|
|
|
|
|
def compute_metrics(pred):
|
|
pred_ids = pred.predictions
|
|
label_ids = pred.label_ids
|
|
|
|
# replace -100 with the pad_token_id
|
|
label_ids[label_ids == -100] = tokenizer.pad_token_id
|
|
|
|
# we do not want to group tokens when computing the metrics
|
|
pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
|
|
label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)
|
|
|
|
wer = 100 * metric.compute(predictions=pred_str, references=label_str)
|
|
|
|
return {"wer": wer}
|
|
|
|
|
|
|
|
from transformers import Seq2SeqTrainingArguments
|
|
|
|
training_args = Seq2SeqTrainingArguments(
|
|
output_dir="./whisper-tiny-fa", # change to a repo name of your choice
|
|
per_device_train_batch_size=4,
|
|
gradient_accumulation_steps=4, # increase by 2x for every 2x decrease in batch size
|
|
learning_rate=1e-6,
|
|
warmup_steps=500,
|
|
max_steps=4000,
|
|
gradient_checkpointing=True,
|
|
fp16=True,
|
|
eval_strategy="steps",
|
|
per_device_eval_batch_size=4,
|
|
predict_with_generate=True,
|
|
generation_max_length=448,
|
|
save_steps=1000,
|
|
eval_steps=1000,
|
|
logging_steps=25,
|
|
report_to=["tensorboard"],
|
|
load_best_model_at_end=True,
|
|
metric_for_best_model="wer",
|
|
greater_is_better=False,
|
|
push_to_hub=False,
|
|
)
|
|
|
|
|
|
from transformers import Seq2SeqTrainer
|
|
|
|
trainer = Seq2SeqTrainer(
|
|
args=training_args,
|
|
model=model,
|
|
train_dataset=common_voice["train"],
|
|
eval_dataset=common_voice["test"],
|
|
data_collator=data_collator,
|
|
compute_metrics=compute_metrics,
|
|
tokenizer=processor.feature_extractor,
|
|
)
|
|
|
|
|
|
|
|
|
|
model.resize_token_embeddings(len(processor.tokenizer))
|
|
|
|
trainer.train()
|
|
|