-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGermanDataset.py
More file actions
121 lines (98 loc) · 6.28 KB
/
GermanDataset.py
File metadata and controls
121 lines (98 loc) · 6.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# Basics
import os
import copy
import numpy as np
# Multiprocessing
from multiprocessing import Manager
from multiprocessing.shared_memory import ShareableList
# Pytorch
from torch import Tensor
from torch.utils.data import Dataset
# Huggingface und co.
from tokenizers.implementations import ByteLevelBPETokenizer
from transformers import RobertaTokenizer, RobertaTokenizerFast, DataCollatorForLanguageModeling
# Typing
from typing import List, Dict, Tuple, Union, Optional
class GermanDataset(Dataset):
dataset_file: str = "german.txt"
def __init__(self, isLocalMain:bool, dataset_path: str, vocab_size: int, max_seq_length: int,
variable_seq_length:bool = False, limit: Optional[int] = None):
super(GermanDataset, self).__init__()
self.isLocalMain = isLocalMain
self.max_seq_length = max_seq_length
self.variable_seq_length = variable_seq_length
self.wandb_config = {
"Variable Seq Length": variable_seq_length,
"limit": limit,
}
if not os.path.isfile(f"{dataset_path}/{self.dataset_file}"):
print(f"No '{self.dataset_file}' found, generating dataset...")
# get all folder names in the dataset_path
folders: List[str] = [f for f in os.listdir(dataset_path) if os.path.isdir(f"{dataset_path}/{f}")]
word_count: int = 0
words_distinct: set = set()
with open(f"{dataset_path}/{self.dataset_file}", "w", encoding="utf-8") as file:
for folder in folders:
print(f"Processing dataset '{folder}'...")
word_file_path = f"{dataset_path}/{folder}/{folder}-words.txt"
sentences_file_path = f"{dataset_path}/{folder}/{folder}-sentences.txt"
# read the metadata word frequency file
with open(word_file_path, "r", encoding="utf-8") as word_file:
for line in word_file.readlines():
id, w, freq = line.split("\t")
words_distinct.add(w)
if (int(id) > 100): # because word ids 1-100 are reserved for special tokens which are not words
word_count += int(freq)
# read the sentences file
with open(sentences_file_path, "r", encoding="utf-8") as sentences_file:
for line in sentences_file.readlines():
line = line[line.find('\t') + 1:]
file.write(line)
print(f"Distinct words: {len(words_distinct)}")
print(f"Total words: {word_count}\n")
# Tokenizer training
print("Training tokenizer...")
tokenizer = ByteLevelBPETokenizer()
tokenizer.train(files=[f"{dataset_path}/{self.dataset_file}"],
show_progress=True, vocab_size=vocab_size, min_frequency=2, special_tokens=[
"<s>", # id = 0
"<pad>", # id = 1
"</s>", # id = 2
"<unk>", # id = 3
"<mask>", # id = 4
])
tokenizer.save_model(f"{dataset_path}/", "german")
# load pretrained tokenizer
self.tokenizer = RobertaTokenizer(f"{dataset_path}/german-vocab.json", f"{dataset_path}/german-merges.txt")
self.dataCollator = DataCollatorForLanguageModeling(tokenizer=self.tokenizer, mlm=True, mlm_probability=0.15)
if isLocalMain:
print("Loading dataset on main process only...")
manager = Manager() # initialize before the data loading saves memory, but why!?
with open(f"{dataset_path}/{self.dataset_file}", "r", encoding="utf-8") as file:
sentences = file.read().splitlines()
if limit is not None:
sentences = sentences[:limit]
np.random.shuffle(sentences)
self.data = manager.list(sentences) # use manager.list() to share the list between Dataloader num_workers processes and prevent memory leak, also works when using multiple GPUs
self.shared_data = ShareableList([len(self.data)], name='dataset_shared') # save the length of the dataset in a shared variable for the other GPU processes to emulate
else:
self.data = []
self.shared_data = ShareableList(name='dataset_shared') # get the length of the dataset from the shared variable
def __len__(self):
if self.isLocalMain:
return len(self.data)
else:
return self.shared_data[0]
def __getitem__(self, idx):
# Do all pre-processing and augmentation here, because of multiprocessing with num_workers > 0.
return copy.deepcopy(self.data[idx]) # copy.deepcopy() good practice to not modify the original data accidentally when using augmentations or co.
def collate_fn(self, batch: List[str]) -> Tuple[Tensor, Tensor, Tensor]:
# Do all batch preparation here, not in the models forward() function.
batch_tokenized = [self.tokenizer(sentence,
padding=False if self.variable_seq_length else 'max_length', # set padding = 'max_length' to test max GPU memory usage for for the given batch size
truncation=True,
max_length=self.max_seq_length,
return_special_tokens_mask=True) # the parameter work only here, use special token mask for faster dataCollator
for sentence in batch]
batch_masked = self.dataCollator(batch_tokenized, return_tensors="pt")
return (batch_masked["input_ids"], batch_masked["attention_mask"], batch_masked["labels"])