|
| 1 | +import torch |
| 2 | +import torch.nn as nn |
| 3 | +from attackerModels.ANN import simpleDenseModel |
| 4 | + |
| 5 | +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| 6 | +# os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:512" |
| 7 | + |
| 8 | + |
| 9 | +class LSTM_ANN_Model(nn.Module): |
| 10 | + def __init__( |
| 11 | + self, |
| 12 | + vocab_size, |
| 13 | + embedding_dim, |
| 14 | + pad_idx, |
| 15 | + lstm_hidden_size, |
| 16 | + lstm_num_layers, |
| 17 | + lstm_bidirectional, |
| 18 | + ann_output_size, |
| 19 | + num_ann_layers, |
| 20 | + ann_numFirst, |
| 21 | + ): |
| 22 | + super(LSTM_ANN_Model, self).__init__() |
| 23 | + |
| 24 | + # Embedding layer |
| 25 | + self.embed = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx) |
| 26 | + |
| 27 | + # LSTM layer with dropout |
| 28 | + self.lstm = nn.LSTM( |
| 29 | + input_size=embedding_dim, |
| 30 | + hidden_size=lstm_hidden_size, |
| 31 | + num_layers=lstm_num_layers, |
| 32 | + bidirectional=lstm_bidirectional, |
| 33 | + batch_first=True, |
| 34 | + dropout=0.3, |
| 35 | + ) |
| 36 | + |
| 37 | + self.ann = simpleDenseModel( |
| 38 | + input_dims=lstm_hidden_size * 2 if lstm_bidirectional else lstm_hidden_size, |
| 39 | + output_dims=ann_output_size, |
| 40 | + num_layers=num_ann_layers, |
| 41 | + numFirst=ann_numFirst, |
| 42 | + ) |
| 43 | + |
| 44 | + self.lastAct = nn.Sigmoid() |
| 45 | + if ann_output_size > 1: |
| 46 | + self.lastAct = nn.Softmax() |
| 47 | + |
| 48 | + def forward(self, x): |
| 49 | + |
| 50 | + x = x.to(device) |
| 51 | + |
| 52 | + # Embedding |
| 53 | + x = self.embed(x) |
| 54 | + assert ( |
| 55 | + len(x.shape) == 3 |
| 56 | + ), f"Expected input shape [batch_size, seq_len], but got {x.shape}" |
| 57 | + |
| 58 | + # LSTM |
| 59 | + lstm_out, _ = self.lstm(x) |
| 60 | + lstm_out = lstm_out[:, -1, :] # Take the last hidden state |
| 61 | + |
| 62 | + # ANN |
| 63 | + ann_out = self.ann(lstm_out) |
| 64 | + |
| 65 | + # Output layer and log-softmax |
| 66 | + ann_out = self.lastAct(ann_out) |
| 67 | + return ann_out |
| 68 | + |
| 69 | + |
| 70 | +class RNN_ANN_Model(nn.Module): |
| 71 | + def __init__( |
| 72 | + self, |
| 73 | + vocab_size, |
| 74 | + embedding_dim, |
| 75 | + pad_idx, |
| 76 | + rnn_hidden_size, |
| 77 | + rnn_num_layers, |
| 78 | + rnn_bidirectional, |
| 79 | + ann_output_size, |
| 80 | + num_ann_layers, |
| 81 | + ann_numFirst, |
| 82 | + ): |
| 83 | + super(RNN_ANN_Model, self).__init__() |
| 84 | + |
| 85 | + # Embedding layer |
| 86 | + self.embed = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx) |
| 87 | + |
| 88 | + # Simple RNN layer with dropout |
| 89 | + self.rnn = nn.RNN( |
| 90 | + input_size=embedding_dim, |
| 91 | + hidden_size=rnn_hidden_size, |
| 92 | + num_layers=rnn_num_layers, |
| 93 | + bidirectional=rnn_bidirectional, |
| 94 | + batch_first=True, |
| 95 | + dropout=0.3, |
| 96 | + ) |
| 97 | + |
| 98 | + # ANN layer after RNN |
| 99 | + self.ann = simpleDenseModel( |
| 100 | + input_dims=rnn_hidden_size * 2 if rnn_bidirectional else rnn_hidden_size, |
| 101 | + output_dims=ann_output_size, |
| 102 | + num_layers=num_ann_layers, |
| 103 | + numFirst=ann_numFirst, |
| 104 | + ) |
| 105 | + |
| 106 | + # Activation function |
| 107 | + self.lastAct = nn.Sigmoid() |
| 108 | + if ann_output_size > 1: |
| 109 | + self.lastAct = nn.Softmax() |
| 110 | + |
| 111 | + def forward(self, x): |
| 112 | + x = x.to(device) |
| 113 | + |
| 114 | + # Embedding |
| 115 | + x = self.embed(x) |
| 116 | + assert ( |
| 117 | + len(x.shape) == 3 |
| 118 | + ), f"Expected input shape [batch_size, seq_len], but got {x.shape}" |
| 119 | + |
| 120 | + # RNN |
| 121 | + rnn_out, _ = self.rnn(x) |
| 122 | + rnn_out = rnn_out[:, -1, :] # Take the last hidden state |
| 123 | + |
| 124 | + # ANN |
| 125 | + ann_out = self.ann(rnn_out) |
| 126 | + |
| 127 | + # Final activation |
| 128 | + ann_out = self.lastAct(ann_out) |
| 129 | + return ann_out |
| 130 | + |
| 131 | + |
| 132 | +class SimpleTransformer(nn.Module): |
| 133 | + def __init__(self, vocab_size, embedding_dim=64, nhead=2, num_layers=1, max_len=1000, num_classes=2, post_activation=None): |
| 134 | + super().__init__() |
| 135 | + self.embedding = nn.Embedding(vocab_size, embedding_dim) |
| 136 | + self.pos_encoding = nn.Parameter(torch.zeros(1, max_len, embedding_dim)) |
| 137 | + |
| 138 | + encoder_layer = nn.TransformerEncoderLayer( |
| 139 | + d_model=embedding_dim, |
| 140 | + nhead=nhead, |
| 141 | + dim_feedforward=128, |
| 142 | + activation="gelu", |
| 143 | + batch_first=True |
| 144 | + ) |
| 145 | + self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) |
| 146 | + self.fc = nn.Linear(embedding_dim, num_classes) |
| 147 | + self.post_activation = None |
| 148 | + if post_activation: |
| 149 | + if (post_activation == "sigmoid"): |
| 150 | + self.post_activation = torch.nn.Sigmoid() |
| 151 | + elif (post_activation == "softmax"): |
| 152 | + self.post_activation = torch.nn.SoftMax() |
| 153 | + |
| 154 | + def forward(self, x): |
| 155 | + seq_len = x.size(1) |
| 156 | + x = self.embedding(x) + self.pos_encoding[:, :seq_len, :] |
| 157 | + x = self.encoder(x) |
| 158 | + x = x[:, 0, :] # use first token ([CLS]-like) |
| 159 | + x = self.fc(x) |
| 160 | + if self.post_activation: |
| 161 | + x = self.post_activation(x) |
| 162 | + return x |
| 163 | + |
| 164 | + |
| 165 | +if __name__ == "__main__": |
| 166 | + |
| 167 | + import argparse |
| 168 | + |
| 169 | + def get_parser(): |
| 170 | + """ |
| 171 | + CLI parser for training parameters and file paths. |
| 172 | + """ |
| 173 | + parser = argparse.ArgumentParser(description="Train and Save LSTM + ANN Model") |
| 174 | + parser.add_argument( |
| 175 | + "--vocab_size", type=int, required=True, help="Size of the vocabulary" |
| 176 | + ) |
| 177 | + parser.add_argument( |
| 178 | + "--embedding_dim", type=int, default=100, help="Embedding dimension" |
| 179 | + ) |
| 180 | + parser.add_argument( |
| 181 | + "--pad_idx", type=int, default=0, help="Padding index for embeddings" |
| 182 | + ) |
| 183 | + parser.add_argument( |
| 184 | + "--lstm_hidden_size", type=int, default=128, help="Hidden size of the LSTM" |
| 185 | + ) |
| 186 | + parser.add_argument( |
| 187 | + "--lstm_num_layers", type=int, default=2, help="Number of LSTM layers" |
| 188 | + ) |
| 189 | + parser.add_argument( |
| 190 | + "--lstm_bidirectional", action="store_true", help="Use bidirectional LSTM" |
| 191 | + ) |
| 192 | + parser.add_argument( |
| 193 | + "--output_size", type=int, required=True, help="Output size of ANN" |
| 194 | + ) |
| 195 | + parser.add_argument( |
| 196 | + "--num_ann_layers", type=int, default=3, help="Number of layers in ANN" |
| 197 | + ) |
| 198 | + parser.add_argument( |
| 199 | + "--ann_numFirst", |
| 200 | + type=int, |
| 201 | + default=32, |
| 202 | + help="Number of units in the first ANN layer", |
| 203 | + ) |
| 204 | + parser.add_argument( |
| 205 | + "--save_model_path", required=True, help="Path to save the trained model" |
| 206 | + ) |
| 207 | + return parser |
| 208 | + |
| 209 | + |
| 210 | + def main(args): |
| 211 | + # Define LSTM + ANN model |
| 212 | + if args.model_type == "LSTM_ANN": |
| 213 | + model = LSTM_ANN_Model( |
| 214 | + vocab_size=args.vocab_size, |
| 215 | + embedding_dim=args.embedding_dim, |
| 216 | + pad_idx=args.pad_idx, |
| 217 | + lstm_hidden_size=args.lstm_hidden_size, |
| 218 | + lstm_num_layers=args.lstm_num_layers, |
| 219 | + lstm_bidirectional=args.lstm_bidirectional, |
| 220 | + ann_output_size=args.ann_output_size, |
| 221 | + num_ann_layers=args.num_ann_layers, |
| 222 | + ann_numFirst=args.ann_numFirst, |
| 223 | + ).to(device) |
| 224 | + |
| 225 | + else: |
| 226 | + model = RNN_ANN_Model( |
| 227 | + vocab_size=args.vocab_size, |
| 228 | + embedding_dim=args.embedding_dim, |
| 229 | + pad_idx=args.pad_idx, |
| 230 | + lstm_hidden_size=args.lstm_hidden_size, |
| 231 | + lstm_num_layers=args.lstm_num_layers, |
| 232 | + lstm_bidirectional=args.lstm_bidirectional, |
| 233 | + output_size=args.output_size, |
| 234 | + ).to(device) |
| 235 | + |
| 236 | + # Save model |
| 237 | + torch.save(model.state_dict(), args.save_model_path) |
| 238 | + print(f"Model saved to {args.save_model_path}") |
| 239 | + |
| 240 | + parser = get_parser() |
| 241 | + args = parser.parse_args() |
| 242 | + main(args) |
0 commit comments