-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathutils.py
More file actions
240 lines (182 loc) · 6.88 KB
/
utils.py
File metadata and controls
240 lines (182 loc) · 6.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# coding=utf-8
# Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc. team.
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Load QuAC dataset. """
from __future__ import absolute_import, division, print_function
import json
import logging
import math
import collections
import linecache
import numpy as np
from io import open
from tqdm import tqdm
import torch
# Required by XLNet evaluation method to compute optimal threshold (see write_predictions_extended() method)
# from utils_squad_evaluate import find_all_best_thresh_v2, make_qid_to_has_ans, get_raw_scores
logger = logging.getLogger(__name__)
from typing import Dict, List
try:
import marisa_trie
except ModuleNotFoundError:
pass
class Trie(object):
def __init__(self, sequences: List[List[int]] = []):
self.trie_dict = {}
self.len = 0
if sequences:
for sequence in sequences:
Trie._add_to_trie(sequence, self.trie_dict)
self.len += 1
self.append_trie = None
self.bos_token_id = None
def append(self, trie, bos_token_id):
self.append_trie = trie
self.bos_token_id = bos_token_id
def add(self, sequence: List[int]):
Trie._add_to_trie(sequence, self.trie_dict)
self.len += 1
def get(self, prefix_sequence: List[int]):
return Trie._get_from_trie(
prefix_sequence, self.trie_dict, self.append_trie, self.bos_token_id
)
@staticmethod
def load_from_dict(trie_dict):
trie = Trie()
trie.trie_dict = trie_dict
trie.len = sum(1 for _ in trie)
return trie
@staticmethod
def _add_to_trie(sequence: List[int], trie_dict: Dict):
if sequence:
if sequence[0] not in trie_dict:
trie_dict[sequence[0]] = {}
Trie._add_to_trie(sequence[1:], trie_dict[sequence[0]])
@staticmethod
def _get_from_trie(
prefix_sequence: List[int],
trie_dict: Dict,
append_trie=None,
bos_token_id: int = None,
):
if len(prefix_sequence) == 0:
output = list(trie_dict.keys())
if append_trie and bos_token_id in output:
output.remove(bos_token_id)
output += list(append_trie.trie_dict.keys())
return output
elif prefix_sequence[0] in trie_dict:
return Trie._get_from_trie(
prefix_sequence[1:],
trie_dict[prefix_sequence[0]],
append_trie,
bos_token_id,
)
else:
if append_trie:
return append_trie.get(prefix_sequence)
else:
return []
def __iter__(self):
def _traverse(prefix_sequence, trie_dict):
if trie_dict:
for next_token in trie_dict:
yield from _traverse(
prefix_sequence + [next_token], trie_dict[next_token]
)
else:
yield prefix_sequence
return _traverse([], self.trie_dict)
def __len__(self):
return self.len
def __getitem__(self, value):
return self.get(value)
class MarisaTrie(object):
def __init__(
self,
sequences: List[List[int]] = [],
cache_fist_branch=True,
max_token_id=256001,
):
self.int2char = [chr(i) for i in range(min(max_token_id, 55000))] + (
[chr(i) for i in range(65000, max_token_id + 10000)]
if max_token_id >= 55000
else []
)
self.char2int = {self.int2char[i]: i for i in range(max_token_id)}
self.cache_fist_branch = cache_fist_branch
if self.cache_fist_branch:
self.zero_iter = list({sequence[0] for sequence in sequences})
assert len(self.zero_iter) == 1
self.first_iter = list({sequence[1] for sequence in sequences})
self.trie = marisa_trie.Trie(
"".join([self.int2char[i] for i in sequence]) for sequence in sequences
)
def get(self, prefix_sequence: List[int]):
if self.cache_fist_branch and len(prefix_sequence) == 0:
return self.zero_iter
elif (
self.cache_fist_branch
and len(prefix_sequence) == 1
and self.zero_iter == prefix_sequence
):
return self.first_iter
else:
key = "".join([self.int2char[i] for i in prefix_sequence])
return list(
{
self.char2int[e[len(key)]]
for e in self.trie.keys(key)
if len(e) > len(key)
}
)
def __iter__(self):
for sequence in self.trie.iterkeys():
yield [self.char2int[e] for e in sequence]
def __len__(self):
return len(self.trie)
def __getitem__(self, value):
return self.get(value)
class DummyTrieMention(object):
def __init__(self, return_values):
self._return_values = return_values
def get(self, indices=None):
return self._return_values
class DummyTrieEntity(object):
def __init__(self, return_values, codes):
self._return_values = list(
set(return_values).difference(
set(
codes[e]
for e in (
"start_mention_token",
"end_mention_token",
"start_entity_token",
)
)
)
)
self._codes = codes
def get(self, indices, depth=0):
if len(indices) == 0 and depth == 0:
return self._codes["end_mention_token"]
elif len(indices) == 0 and depth == 1:
return self._codes["start_entity_token"]
elif len(indices) == 0:
return self._return_values
elif len(indices) == 1 and indices[0] == self._codes["end_entity_token"]:
return self._codes["EOS"]
else:
return self.get(indices[1:], depth=depth + 1)