-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcesres_graphcodebert_model.py
More file actions
1440 lines (1185 loc) · 67.3 KB
/
cesres_graphcodebert_model.py
File metadata and controls
1440 lines (1185 loc) · 67.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# ==================================================================
#
# CESReS classification model program - Main program
#
# Author: Guillaume Steveny
# Year: 2023 -- 2024
#
# The development of this code was made by following the
# official AllenNLP (the library we used) tutorial.
# Every method or function that is inspired from it, before
# we adapted these for our task, is referenced as "inspired by".
# We added all the parameters handling and the adaptation of the
# model (using pre-trained model, the classification head, the
# command line predictions, the GUI connection and the
# documentation).
#
# Tutorial URL:
# https://guide.allennlp.org/training-and-prediction#4
#
# ==================================================================
from __future__ import annotations
import json
import warnings
import yaml
import os
import tempfile
import asyncio
from asyncio import StreamWriter, StreamReader
from typing import Iterable, Dict, Tuple, List, Any, Sequence
import numpy as np
import torch
import transformers
from transformers import AutoModel
from allennlp.common import JsonDict
from allennlp.data import DatasetReader, Instance, Vocabulary, DataLoader
from allennlp.data.data_loaders import SimpleDataLoader
from allennlp.data.fields import LabelField, ArrayField, MultiLabelField, TensorField
from allennlp.data.tokenizers import PretrainedTransformerTokenizer
from allennlp.data.token_indexers import PretrainedTransformerIndexer
from allennlp.interpret.saliency_interpreters import IntegratedGradient
from allennlp.models import Model
from allennlp.modules.seq2vec_encoders.bert_pooler import BertPooler
from allennlp.predictors import Predictor
from allennlp.training import Trainer, GradientDescentTrainer
from allennlp.training.metrics import CategoricalAccuracy, F1MultiLabelMeasure
from allennlp.training.optimizers import HuggingfaceAdamWOptimizer
from allennlp.training.util import evaluate
from interpret_captum import CaptumInterpreter
from transform_code_to_df import convert_code_to_features, show_features, input_from_features, get_code_tokens
from collections import OrderedDict
# NOTE: you can add codecarbon with
# from codecarbon import track_emissions, OfflineEmissionsTracker
# ======================================================================================================================
# Types of encoders the user can specify in the configuration file
encoders = {
"cls_label": lambda arg, kwargs: select_cls_embedding,
"bert_pooler": lambda arg, kwargs: BertPooler(*arg, **kwargs)
}
# Types of accuracy the user can specify in the configuration file
accuracies = {
"categorical_accuracy": lambda arg, kwargs: CategoricalAccuracy()
}
# Type of possible activation functions
activations = {
"gelu": lambda arg, kwargs: torch.nn.GELU(),
"leaky_relu": lambda arg, kwargs: torch.nn.LeakyReLU(),
"relu": lambda arg, kwargs: torch.nn.ReLU()
}
def construct_sequential_head(number: int, hidden_sizes: int | list[int], num_labels: int, activation=None, norm=False):
"""
Construct a sequential classification head with multiple dense layers
Args:
number: the number of dense layers to use before the classification layer
hidden_sizes: an integer or list of integer. If this value is a single integer, it is supposed to be the
number of hidden units for each of the layers. If this is a list of integers, this should be
composed of ('number' + 1) values to specify a number of hidden units for each layer.
num_labels: the number of labels for the last dense (classification) layer
activation: parameter to indicate which type of activation function should be use between the layers
norm: whether to use the Batch Normalization after the activation function
Returns:
a Sequential block composed of `number` denser layer + 1 classification layer
"""
order = []
if activation is None:
activation = {"name": "gelu"}
act_name = activation['name']
if type(hidden_sizes) == list:
assert len(hidden_sizes) + 1 == number, "You specified different hidden sizes but the number of layers is " \
f"not coherent. Number of layers: {number}, number of values: " \
f"{len(hidden_sizes)}. So you should have given {number + 1} values " \
f"to the configuration."
for i in range(number):
order.append((f"classification_{i+1}", torch.nn.Linear(hidden_sizes[i], hidden_sizes[i+1])))
order.append((f"{act_name}_{i+1}", create_parameter("activation", **activation)))
if norm:
order.append((f"norm_{i+1}", torch.nn.BatchNorm1d(hidden_sizes[i+1])))
order += [("classification_output", torch.nn.Linear(hidden_sizes[-1], num_labels))]
else:
hidden_size = hidden_sizes
for i in range(number):
order.append((f"classification_{i+1}", torch.nn.Linear(hidden_size, hidden_size)))
order.append((f"{act_name}_{i+1}", create_parameter("activation", **activation)))
if norm:
order.append((f"norm_{i+1}", torch.nn.BatchNorm1d(hidden_size)))
order += [("classification_output", torch.nn.Linear(hidden_size, num_labels))]
return torch.nn.Sequential(OrderedDict(order))
# Types of classifiers the user can specify in the configuration file
classifiers = {
"simple": lambda arg, kwargs: torch.nn.Linear(*arg, **kwargs),
"mult_dense": lambda arg, kwargs: construct_sequential_head(*arg, **kwargs)
}
# Type of losses that are usable in the configuration
losses = {
"cross_entropy": lambda arg, kwargs: torch.nn.functional.cross_entropy,
"multilabel_soft_margin_loss": lambda arg, kwargs: torch.nn.functional.multilabel_soft_margin_loss
}
# Types of parameters to be dynamically chosen by the used with its configuration file
param_types = {
"encoder": encoders,
"classification_head": classifiers,
"accuracy": accuracies,
"activation": activations,
"loss": losses
}
class WrongParameter(Exception):
"""
Class to specify a problem in the configuration value the user specified
"""
def __init__(self, value, listing):
super().__init__(f"Wrong parameter {value}.\nShould be one in the {listing} list.")
def create_parameter(param_type, name, arg=[], kwargs={}):
"""
Function to create a specific parameter from the configuration parameters
Args:
param_type: a str corresponding to the parameter type (encoder, classification_hed or accuracy)
name: a str corresponding to the name of the chosen value of the parameter
arg: a list of arguments to be given to this parameter (ordered)
kwargs: a dict of keywords arguments to be given to this parameter
Returns:
the constructed parameter for the model
"""
# If the parameter is not a recognized type, raise exception
if param_type not in param_types:
raise WrongParameter(param_type, list(param_types.keys()))
# If the selected value does not exist for this parameter
if name not in param_types[param_type]:
raise WrongParameter(param_type, list(param_types[param_type].keys()))
# Return the parameter we could construct
return param_types[param_type][name](arg, kwargs)
# ======================================================================================================================
# This class was written by using the AllenNLP official tutorial as inspiration.
# The tutorial can be found at: https://guide.allennlp.org/training-and-prediction#4
# The source code of this tutorial is: https://github.com/allenai/allennlp-guide/blob/master/quick_start/predict.py
# We modified the instance retrieving and tensor generation.
class CodeReader(DatasetReader):
"""
Class representing a Dataset Reader able to recover the code snippets inside a txt file and the label associated
with each of these.
Attributes:
snippet_splitter: a string representing the "code" used to split the different examples in the input file
label_splitter: a string representing the "code" used to split a snippet and the label associated with it
multi_labels: a string representing the splitter used to distinguish the different labels associated
with a code when performing a multi_label classification (None implies single label)
part_graph: Sequence[int] containing information about the number of tokens to have in the code part and in the
graph part of the input
tokenizer: PreTrainedTransformerTokenizer representing the tokenizer used to transform a code snippet
into a Token sequence to put inside the embedder
indexer: PreTrainedTransformerIndexer representing the indexer associated with the tokenizer specified
debug: bool representing the fact to show the features that are created by this object
Args:
huggingface_model: str representing the name of the pretrained model from huggingface
you want to use to tokenize the input data (should be a feature-extraction model and
the same that the embedder you want to use)
(default = GraphCodeBERT-py model from Enoch)
snippet_splitter: str representing the "code" used to split the different examples in the input file
(default = "\n$$$\n")
label_splitter: str representing the "code" used to split a snipper and the label associated with it
(default = " $x$ ")
multi_labels: str representing the separator used when multiple labels are associated with each code.
If this parameter is unspecified, the classification is supposed single class
(default = None)
part_graph: Sequence[int] composed of maximum two numbers and corresponding to the number of tokens to
keep for each part of the input. The first number is the code token count while the second
is the number of tokens in the graph (dfg) part of the input. This number can be zero if
the model is CodeBERT.
(default = (256, 256))
compiled_language: str to indicate the path to the compiled library containing the parsing information for
tree-sitter when creating the tokenized instances. For windows, the file should have a
dll extension (without putting the extension in the path). On linux, this file should have a
so extension (here specified in the parameter).
(default = './my_language.so')
kwargs_tokenizer: Dict containing the additional args you want to put inside the tokenizer
(default = {max_length: 512})
kwargs_indexer: Dict containing the additional args you want to put inside the indexer
(default = {})
debug: bool representing the fact to show the features that are created by this object
"""
__slots__ = ["tokenizer", "indexer", "snippet_splitter", "label_splitter", "multi_labels", "part_graph",
"compiled_language", "debug"]
def __init__(self,
huggingface_model: str = "Enoch/graphcodebert-py",
snippet_splitter: str = "\n$$$\n",
label_splitter: str = " $x$ ",
multi_labels: str | None = None,
part_graph: Sequence[int] = (256, 256),
compiled_language: str = "./my-language.so",
kwargs_tokenizer: Dict[str, Any] = None,
kwargs_indexer: Dict[str, Any] = None,
debug: bool = False):
super().__init__()
# Set a default value for the additional parameters of the tokenizer and the indexer
if kwargs_tokenizer is None:
kwargs_tokenizer = {"max_length": 512}
if kwargs_indexer is None:
kwargs_indexer = {}
# Adding information to parse the entries in the input file
self.snippet_splitter = snippet_splitter
self.label_splitter = label_splitter
self.multi_labels = multi_labels
# Adding information about the part used after tokenization
self.part_graph = part_graph[:2]
# Add the information about the compiled language
self.compiled_language = compiled_language
# Creating the attributes of the object
self.tokenizer = PretrainedTransformerTokenizer(huggingface_model, **kwargs_tokenizer)
self.indexer = {'tokens': PretrainedTransformerIndexer(huggingface_model, **kwargs_indexer)}
# Save the debug variable
self.debug = debug
# -------------------------------
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def text_to_instance(self, text: str, label: str = None) -> Instance:
"""
Method to transform a string entry (text) into an AllenNLP Instance possibly associated with a label.
Args:
text: str representing the text to be tokenized and on which the Instance should be created.
label: str representing a possible label to be associated with the text entry.
Returns:
an Instance object containing the text tokenized and the label.
"""
# Tokenizing the entry
features = convert_code_to_features(text, self.tokenizer.tokenizer, *self.part_graph,
language_library=self.compiled_language)
if self.debug:
show_features(features)
# Get the input for the model
ids, position, mask = input_from_features(features, *self.part_graph)
# Transform it in Field for AllenNLP library
ids_field = ArrayField(ids)
mask_field = ArrayField(mask)
position_field = ArrayField(position)
# Create the fields for the instance (the text tokenized and indexed)
fields: dict[str, TensorField | LabelField | MultiLabelField]
fields = {'input_ids': ids_field, "mask": mask_field, "positions_ids": position_field}
# If a label is associated with this code, add a LabelField to the Instance
if label:
if self.multi_labels:
fields['label'] = MultiLabelField(label.split(self.multi_labels))
else:
fields['label'] = LabelField(label)
return Instance(fields)
# -------------------------------
def get_features(self, text: str) -> dict[str, list[any]]:
"""
Method to transform a text entry into the features generated for the GraphCodeBERT model.
Args:
text: str representing the text to be tokenized.
Returns:
a dict[str, List[Any]] containing for each feature name, the value of this feature.
"""
return convert_code_to_features(text, self.tokenizer.tokenizer, *self.part_graph,
language_library=self.compiled_language)
# -------------------------------
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def _read(self, file_path) -> Iterable[Instance]:
# Inherited method from the base DatasetReader class
# Read the provided file
with open(file_path, "r") as file:
text = file.read()
# We split the entry according to the specified code
examples = text.split(self.snippet_splitter)[:-1]
# For each create example
for example in examples:
# Split the text - label examples
try:
text, label = example.strip().split(self.label_splitter)
# Avoid problems with empty codes
except ValueError:
continue
# Yield a created Instance for this example
yield self.text_to_instance(text, label)
# ======================================================================================================================
def select_cls_embedding(embeddings: torch.Tensor) -> torch.Tensor:
"""
Function allowing to recover the CLS token embedding from a tensor of dimension [batch_size, tokens, embedding_size].
The CLS token should be the first one at position 0.
Args:
embeddings: a Tensor of dim [batch_size, tokens, embedding_size] where each component corresponds to the
embedding of a particular token in the input (and this for each component of the batch).
Returns:
a Tensor of dim [batch_size, 1, embedding_size] where the kept component corresponds to the embedding of the
CLS token.
"""
return embeddings[:, 0, :]
# ======================================================================================================================
# This class was written by using the AllenNLP official tutorial as inspiration.
# The tutorial can be found at: https://guide.allennlp.org/training-and-prediction#4
# The source code of this tutorial is: https://github.com/allenai/allennlp-guide/blob/master/quick_start/predict.py
# We modified the architecture and its initialization.
class ClassificationEmbedderModel(Model):
"""
Class representing a model to classify input codes according to a particular pretrained embedder.
Attributes:
embedder: PreTrainedTransformerEmbedder constructed from a model of the huggingface library which allows
to transform a input tokenized code into a Tensor of dimension [batch_size, tokens, embedding_size]
(default = GraphCodeBERT-py model from Enoch)
encoder: Callable[[Tensor], Tensor] allowing to transform the output of the embedder into a single embedding
for each component of the current batch
(default = bert_pooler)
classifier: Linear layer for classifying according to the embedding
accuracy: Metric to compute the score of the classification. This argument could be omitted (set to None)
and nothing will be computed for this
(default = None)
loss: Callable[[Tensor, Tensor] Tensor] allowing to compute the loss of the classification
(default = torch.nn.functional.cross_entropy)
prob_activation: Callable[[Tensor], Tensor] representing the last activation used to compute the probabilities
as output of the classification. The attribute is determined by the multi_label parameter.
(default = torch.nn.Softmax(dim=1))
multi_label: bool specifying if the classification is a multi label one, implying to replace the softmax
by a sigmoid at the last layer
(default = False)
f1: F1Measure computing the precision, recall and f1 score during training
(default : the labels are transformed in a multi_label classification but where only one label is selected)
i: int representing the number of times the forward method is called (can be used for showing intermediate
results)
debug: bool to indicate if part of the forward call should be show to the user
Args:
voc: AllenNLP Vocabulary object constructed from the tokenizer selected inside the huggingface platform
labels: Tuple[str, ...] containing the different labels that could be associated with each of the code snippet
(default = ("success", "failed"))
huggingface_model: str representing the name of the pretrained model from huggingface you want to use to embed
the input tokenized data (should be a feature-extraction model and the same that the
tokenizer and indexer you used to read the input data)
(default = Enoch/graphcodebert-py)
kwargs_embedder: Dict containing the additional args you want to put inside the embedder
(default = {})
embedding_size: int representing the dimension of the embedding created by the model
(default = 768)
encoder: Dict corresponding to the keyword arguments used to create the encoder thanks to the create_parameter
function
(default = create a BertPooler encoder)
classification_head: Dict corresponding to the keyword arguments used to create the accuracy thanks to the
create_parameter function
(default = create a simple dense layer (embedding size to num_labels))
accuracy: Dict corresponding to the keyword arguments used to create the accuracy thanks to the create_parameter
function
(default = do not use this type of metric (e.g. multi-label architecture))
loss: Dict containing the information about the loss that should be used to train the model
(default = CrossEntropyLoss)
multi_label: bool specifying if the classification is a multi label one, implying to replace the softmax
by a sigmoid at the last layer
(default = False)
debug: bool to indicate if part of the forward call should be show to the user
"""
__slots__ = ["embedder", "encoder", "classifier", "accuracy", "loss", "prob_activation", "multi_label", "i", "f1",
"debug"]
def __init__(self,
voc: Vocabulary,
labels: Tuple[str, ...] = ("success", "failed"),
huggingface_model: str = "Enoch/graphcodebert-py",
kwargs_embedder: Dict[str, Any] = None,
embedding_size: int = 768,
encoder: Dict[str, Any] = None,
classification_head: Dict[str, Any] = None,
accuracy: Dict[str, Any] = None,
loss: Dict[str, Any] = None,
multi_label: bool = False,
debug: bool = False
):
# Labels token supposed in the corresponding namespace for the classification process
# But we check if it is indeed the case
labels_voc = voc.get_token_to_index_vocabulary("labels")
for label in labels:
if label not in labels_voc:
voc.add_token_to_namespace(label, "labels")
# Print the vocabulary for the user
print(voc)
# Init the model with the current vocabulary
super().__init__(voc)
# ~~~~~~~~~~~~~~~~~~~~
# Ensure having the default additional parameters to the embedder
if kwargs_embedder is None:
kwargs_embedder = {}
# Creates the embedder for this model
self.embedder = AutoModel.from_pretrained(huggingface_model,
**(kwargs_embedder.get("huggingface_parameters", {})))
if not (kwargs_embedder.get("trainable", False)):
for param in self.embedder.base_model.parameters():
param.requires_grad = False
# ~~~~~~~~~~~~~~~~~~~~
# If the encoder is not specified, create a BertPooler by default
if encoder is None:
encoder = {"name": "bert_pooler", "arg": [huggingface_model], "kwargs": {}}
# Creates the attribute to encode the code as a single embedding
self.encoder = create_parameter("encoder", **encoder)
# ~~~~~~~~~~~~~~~~~~~~
# Count the labels and creates the linear layer for classifying
num_labels = len(voc.get_token_to_index_vocabulary("labels"))
# If the classification head is not specified, create a simple dense layer by default
if classification_head is None:
classification_head = {"name": "simple", "arg": [embedding_size, num_labels], "kwargs": {}}
# Creates the attribute to compute a component for each class
self.classifier = create_parameter("classification_head", **classification_head)
# ~~~~~~~~~~~~~~~~~~~~
# If the accuracy is not specified, we ignore this parameter
if accuracy is not None:
# Creates the accuracy metric for the training and evaluation
self.accuracy = create_parameter("accuracy", **accuracy)
else:
self.accuracy = None
# Creates the metric to follow the precision, recall and f1 score during training (and at the evaluation too)
self.f1 = F1MultiLabelMeasure(average="micro")
# ~~~~~~~~~~~~~~~~~~~~
# Check if the loss is specified, otherwise use the default cross_entropy
if loss is None:
loss = {"name": "cross_entropy"}
# Create the loss function
self.loss = create_parameter("loss", **loss)
# ~~~~~~~~~~~~~~~~~~~~
# Select the type of activation to ue according to the type of classification
self.prob_activation = torch.nn.Softmax(dim=1) if not multi_label else torch.nn.Sigmoid()
# Save the multi_label parameter (for computing the scores)
self.multi_label = multi_label
# ~~~~~~~~~~~~~~~~~~~~
# Initialize the count of forward calls
self.i = 0
# Variable to account for a debugging process
self.debug = debug
# -------------------------------
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def forward(self,
input_ids: torch.Tensor,
mask: torch.Tensor,
positions_ids: torch.Tensor,
label: torch.Tensor = None) -> Dict[str, torch.Tensor]:
"""
Method constructing the network and flowing the data inside it.
Args:
input_ids: a Tensor associated with the ids of the tokenized version of the input code.
mask: a Tensor containing the attention mask to be used inside the embedder.
positions_ids: a Tensor containing the positional embedding of the input sequence.
label: a Tensor associated with the input code (None if only predicting without labelled data).
Returns:
a Dict of tensors containing the prediction and possibly the loss and accuracy measures
(if a label is provided).
"""
# Account for this call
self.i += 1
# Gets the generated mask from the tokenizer and the indexer
mask = mask.bool().to(config["CONFIG"]["device"])
# Gets the tokens indexed by the reader
toks = input_ids.long().to(config["CONFIG"]["device"])
# Gets the positional embeddings to be used
pos = positions_ids.long().to(config["CONFIG"]["device"])
# Generates the embedding for the code
emb = self.embedder(toks, mask, position_ids=pos).last_hidden_state
# Get the embedding of the total input
embedded_text = self.encoder(emb)
# Generates the logits for each batch example
logits = self.classifier(embedded_text)
# Probabilities after score output
probs = self.prob_activation(logits)
# Debug functionality where the probs and input are shown every 10 calls
if self.debug and self.i-1 % 10 == 0:
print(input_ids)
print(probs)
# Transform the label input to be usable by the F1 metric (if multi label classification)
if not self.multi_label and label is not None:
new_label = torch.zeros((len(label), len(config["MODEL"]["labels"])+2))
for l in range(len(label)):
new_label[l][label[l]] = 1
new_label = new_label.to(config["CONFIG"]["device"])
else:
new_label = label
# Puts the probabilities inside the output
output = {'probs': probs}
# If a label was provided (training or testing)
if label is not None:
# Transform label to correct device
label = label.to(config["CONFIG"]["device"])
# Computes the accuracy
if self.accuracy is not None:
self.accuracy(logits, label)
# Computes the f1 score
self.f1(probs, new_label)
# Computes the loss associated with the examples (for backpropagation)
output['loss'] = self.loss(logits, label)
# Return the generated output for this training step
return output
# -------------------------------
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def get_metrics(self, reset: bool = False) -> Dict[str, float]:
# Method inherited from Model
# Compute the f1 score and adds it to the metric dictionary
f1_val = self.f1.get_metric(reset)
# Compute the accuracy if it is possible to do so
accuracy_metric = {} if self.accuracy is None else {"accuracy": self.accuracy.get_metric(reset)}
return {**accuracy_metric, **f1_val}
# ======================================================================================================================
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def build_code_reader(kwargs_reader: dict[str, Any] = None) -> DatasetReader:
"""
Function to create the default dataset reader (additional parameters could be added if desired).
Args:
kwargs_reader: dict of each parameter we could add to the CodeReader. By default, it creates the
default configuration for this object, but overloading the parameters by specifying them inside
this dict will prefer them to the original default configuration.
Returns:
a CodeReader with the desired parameters.
"""
if kwargs_reader is None:
kwargs_reader = {}
return CodeReader(**kwargs_reader)
# ======================================================================================================================
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def read_data(reader: DatasetReader, train_path: str, validation_path: str) -> tuple[list[Instance], list[Instance]]:
"""
Function to recover the examples for the training and validation process of the model.
Args:
reader: a DatasetReader able to process the specified files containing the code snippets and the labels.
train_path: the file path containing all the code examples on which we want to train the model.
validation_path: the file path on which the model will be evaluated during training.
Returns:
the training data and validation data examples as lists of Instance.
"""
print("-"*50)
print("Reading data")
training_data = list(reader.read(train_path))
validation_data = list(reader.read(validation_path))
print(f"Stats:\n"
f"\tNumber of training examples: {len(training_data)}\n"
f"\tNumber of validation examples: {len(validation_data)}")
print("-"*50+"\n")
return training_data, validation_data
# ======================================================================================================================
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def build_data_loaders(
train_data: list[Instance],
validation_data: list[Instance],
train_batch_size: int = 8,
validation_batch_size: int = 8
) -> tuple[DataLoader, DataLoader]:
"""
Function used to create the DataLoader used during the training and validation process.
Args:
train_data: a List of Instance corresponding to the entries found in the training file.
validation_data: a List of Instance corresponding to the entries found in the validation file.
train_batch_size: an integer for the size of the training batches (default = 8).
validation_batch_size: an integer for the size of the validation batches (default = 8).
Returns:
the constructed DataLoaders.
"""
# NOTE: Here we use SimpleDataLoader, however, it could be more efficient to use multiprocessing equivalent
train_loader = SimpleDataLoader(train_data, train_batch_size, shuffle=True)
validation_loader = SimpleDataLoader(validation_data, validation_batch_size, shuffle=False)
return train_loader, validation_loader
# ======================================================================================================================
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def build_trainer(
model: Model,
serialization_dir: str,
train_loader: DataLoader,
validation_loader: DataLoader,
num_epochs: int = 1,
validation_metric: str = "-loss",
patience: None | int = None,
learning_rate: float = 1e-5,
kwargs_optimizer: dict[str, Any] = None
) -> Trainer:
"""
Function used to build the trainer used to improve the specified model with the training steps.
By default, the trainer is a GradientDescentTrainer with optimizer set to HuggingfaceAdamWOptimizer for 1 epoch.
Args:
model: the AllenNLP Model you want to train.
serialization_dir: the path of the directory used to store the weights and results.
train_loader: a DatasetLoader containing the training examples.
validation_loader: a DatasetLoader containing the validation examples.
num_epochs: an int corresponding to the number of epochs you want to perform.
(default=1)
validation_metric: a str representing the criterion used to save the best model.
(default="+fscore")
patience: an integer to specify if early stopping should be activated (not specifying it disables this
functionality), this means that, if the model does not improve its performance on the validation
dataset for this number of epoch, then the model stops the training and save its best epoch.
(default : not activated)
learning_rate: the learning rate used to learn the weights in the model. This should be a floating point number
greater than 0. This is given to HuggingfaceAdamWOptimizer has parameter lr.
(default=1e-5)
kwargs_optimizer: dict containing additional args to be given to the init method of HuggingfaceAdamWOptimizer.
(default : no additional parameters)
Returns:
the constructed GradientDescentTrainer.
"""
# Logs for the user
print(50*"-")
print("Building the trainer")
parameters = [(n, p) for n, p in model.named_parameters() if p.requires_grad]
print(f"Parameters: {len(parameters)}")
print("\t"+"\n\t".join(f"{i}: {x[0]}" for i, x in enumerate(parameters)))
# Sets the kwargs to default
if kwargs_optimizer is None:
kwargs_optimizer = {}
# Creates the optimizer
# NOTE: We could add the optimizer inside the configuration parameters.
optimizer = HuggingfaceAdamWOptimizer(
parameters,
lr=learning_rate,
**kwargs_optimizer
)
# Creates the trainer
trainer = GradientDescentTrainer(
model=model,
serialization_dir=serialization_dir,
data_loader=train_loader,
validation_metric=validation_metric,
validation_data_loader=validation_loader,
num_epochs=num_epochs,
optimizer=optimizer,
patience=patience
)
print((50 * "-")+"\n")
return trainer
# ======================================================================================================================
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def build_model(
voc: Vocabulary,
kwargs_model: dict[str, Any] = None
) -> Model:
"""
Function to create the default model with the parameters set to all the default values.
Args:
voc: AllenNLP Vocabulary object constructed from the tokenizer selected inside the huggingface platform.
kwargs_model: the Dict corresponding to the additional parameters you might want to add to the constructed
Model (default = {}).
Returns:
the constructed Model.
"""
print(50*"-")
print("Building the model")
if kwargs_model is None:
kwargs_model = {}
else:
print("Additional parameters")
print(kwargs_model)
print((50*"-")+"\n")
return ClassificationEmbedderModel(voc, **kwargs_model)
# ======================================================================================================================
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def train_model(
train_path: str,
validation_path: str,
voc: Vocabulary,
serialization_dir: str = None,
cuda: bool = True,
epochs: int = 1
) -> tuple[Model, DatasetReader]:
"""
Function allowing to start the training process of the default model.
Args:
train_path: a str containing the path referencing the training txt file.
validation_path: a str containing the path referencing the validation txt file.
voc: a AllenNLP Vocabulary object created upon the pretrained model from huggingface.
serialization_dir: a str representing the directory in which the results should be stored
(default = a tmp file is created to store these results).
cuda: a boolean indicating if the model should be considered to run on GPU or not.
epochs: an int representing the number of epochs to perform to train the model.
Returns:
the trained Model and the DatasetReader associated with the default parameters after training.
"""
# Get the dataset reader with the default parameters
dataset_reader = build_code_reader(config["READER"])
# Gets the train and validation datasets
train_data, validation_data = read_data(dataset_reader, train_path, validation_path)
# Check if the batch normalization was activated
if config["MODEL"].get("classification_head", {}).get("kwargs", {}).get("norm", False):
# Get the training batch size
batch_size = config["CONFIG"]["batch_size"]
# Get the number of training examples
train_size = len(train_data)
# Check if the training set should not be modified
if train_size % batch_size == 1:
# The code should be tolerant to misconfiguration
if train_size == 1:
raise ValueError("Your training dataset is only composed of a single instance which will lead "
"to error when training the model with normalization as you specified it in the "
"configuration. Ensure this is normal to only have one single training example with "
"such parameters.")
# Warn the user his/her training dataset has changed
warnings.warn("The training batch size you are using paired with BatchNormalization will raise an error "
f"during training. The training dataset size becomes: {train_size - 1}.")
train_data = train_data[:-1]
# Construct the default model
model = build_model(voc, config["MODEL"])
# Specifies to be on GPU or not
if cuda:
model.to('cuda')
else:
model.to('cpu')
# Converts the data into dataloaders + index with the vocabulary
train_loader, validation_loader = build_data_loaders(train_data, validation_data, config["CONFIG"]["batch_size"],
config["CONFIG"]["validation_batch_size"])
train_loader.index_with(voc)
validation_loader.index_with(voc)
# If we should create a temporary file to maintain the results
if serialization_dir is None:
# We keep it as in the AllenNLP tutorial to ensure a serialization dir to exist
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = build_trainer(model, tmp_dir, train_loader, validation_loader, epochs, **config["TRAINER"])
print("Starting training")
trainer.train()
print("Finished training\n")
else:
trainer = build_trainer(model, serialization_dir, train_loader, validation_loader, epochs, **config["TRAINER"])
print("Starting training")
trainer.train()
print("Finished training\n")
# Return the trained model and the dataset_reader
return model, dataset_reader
# ======================================================================================================================
# This class was written by using the AllenNLP official tutorial as inspiration.
# The tutorial can be found at: https://guide.allennlp.org/training-and-prediction#4
# The source code of this tutorial is: https://github.com/allenai/allennlp-guide/blob/master/quick_start/predict.py
# We modified the label handling
class CodeClassifierPredictor(Predictor):
"""
Predict the labels to be associated with the code snippets
"""
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def predict(self, sentence: str) -> JsonDict:
return self.predict_json({"sentence": sentence})
# Inspired by the official AllenNLP tutorial: https://guide.allennlp.org/training-and-prediction#4
def _json_to_instance(self, json_dict: JsonDict) -> Instance:
# Method inherited from the super class
sentence = json_dict["sentence"]
return self._dataset_reader.text_to_instance(sentence)
def predictions_to_labeled_instances(
self, instance: Instance, outputs: dict[str, np.ndarray]
) -> list[Instance]:
# Method inspired by text_classifier predictor in AllenNLP library
# It transforms an Instance into a new Instance where a label is associated with
# It allows to add the interpretability components to the system
new_instance = instance.duplicate()
if config["MODEL"].get("multi_label", False):
label = [i for i, v in enumerate(outputs["probs"]) if v > 0.5]
new_instance.add_field("label", MultiLabelField(label, skip_indexing=True, num_labels=len(outputs["probs"])))
else:
label = np.argmax(outputs["probs"])
new_instance.add_field("label", LabelField(int(label), skip_indexing=True))
return [new_instance]
def get_interpretable_text_field_embedder(self) -> torch.nn.Module:
# Method to get the part of the model which is responsible for the embedding generation
# Here it is the last layer of the embedder which is the BertPooler
# If we use another encoder, this would be another component (not implemented)
# This follows the behaviour of the function we override
for module in self._model.modules():
if isinstance(module, transformers.models.roberta.modeling_roberta.RobertaModel):
return module
raise ValueError("Did not found a RobertaModel, are you sure to have a model using this layer. "
"If this is not the case, please modify / override this method with your current "
"architecture.")
# ======================================================================================================================
class CodeClassifierInterpreter:
"""
Class allowing to construct a Saliency Interpreter which gives the tokens that should be the one the more
probable to be the reason of such label associated with the current code.
Attributes:
predictor: a Predictor object we can use for labelling new entries'
dataset_reader: a CodeReader object to transform new entries' to Instances
interpreter: the Saliency Map interpreter used
Args:
predictor: a Predictor object we can use for labelling new entries'
dataset_reader: a CodeReader object to transform new entries' to Instances
"""
__slots__ = ["predictor", "dataset_reader", "interpreter"]
def __init__(self, predictor: Predictor, dataset_reader: CodeReader):
self.predictor = predictor
self.dataset_reader = dataset_reader
self.interpreter = IntegratedGradient(predictor)
def interpret_json(self, inputs: dict[str, str]) -> dict[str, dict[str, Sequence[float]]]:
"""
Get an inputs formatted as a json and send it to the Saliency Map interpreter
Args:
inputs: an entry formatted as a json
Returns:
the score associated with each entry
"""
return self.interpreter.saliency_interpret_from_json(inputs)
def interpret(self, sentence: str, limit=5) -> list[tuple[int, tuple[str, float]]]:
features: dict[str, str] = dataset_reader.get_features(sentence)
inter = self.interpret_json({"sentence": sentence})
inter = inter["instance_1"]["grad_input_1"]
f = [x.replace('\u0120', '_') for x in features["input_tokens"]]
inter_ids = list(zip(f, inter))
inter_ids_pos = list(enumerate(inter_ids))
inter_sort = sorted(inter_ids_pos, key=lambda x: x[1][1], reverse=True)[:limit]
return inter_sort
# ======================================================================================================================
async def handle_msg(
msg: bytes,
reader: StreamReader,
writer: StreamWriter,
predictor: CodeClassifierPredictor,