diff --git a/README.md b/README.md index bf9752c..b8c66f2 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ -# SuperCATs -For more information, check out the paper on [paper link](https://ieeexplore.ieee.org/document/9954872). Also check out project page here [Project Page link].
+# **SuperCATs** : Cost Aggregation with Transformers for Sparse Correspondence +For more information, check out the paper on [[paper link]](https://ieeexplore.ieee.org/document/9954872). Also check out project page here [[Project Page link]](https://ku-cvlab.github.io/SuperCATs/).
*This paper is accepted in ICCE-Asia'22* - + >**Cost Aggregation with Transformers for Sparse Correspondence**

>Abstract : In this work, we introduce a novel network, namely SuperCATs, which aims to find a correspondence field between visually similar images. SuperCATs stands on the shoulder of the recently proposed matching networks, SuperGlue and CATs, taking the merits of both for constructing an integrative framework. Specifically, given keypoints and corresponding descriptors, we first apply attentional aggregation consisting of self- and cross- graph neural network to obtain feature descriptors. Subsequently, we construct a cost volume using the descriptors, which then undergoes a tranformer aggregator for cost aggregation. With this approach, we manage to replace the handcrafted module based on solving an optimal transport problem initially included in SuperGlue with a transformer well known for its global receptive fields, making our approach more robust to severe deformations. We conduct experiments to demonstrate the effectiveness of the proposed method, and show that the proposed model is on par with SuperGlue for both indoor and outdoor scenes. @@ -15,7 +15,7 @@ Structure of Transformer Aggregator is illustrated below: ![aggregator](fig/aggregator.png) # Training -To train the SuperGlue with default parameters, run the following command: +To train the SuperCATs with default parameters, run the following command: ``` python train.py ``` diff --git a/sjlee_backup/IMC.py b/sjlee_backup/IMC.py deleted file mode 100644 index a7cbe25..0000000 --- a/sjlee_backup/IMC.py +++ /dev/null @@ -1,212 +0,0 @@ - -import os -import sys - -import torch -import torch.nn as nn -import torch.nn.functional as F - -import numpy as np -from functools import partial - -from pydoc import source_synopsis -from sjlee_backup.superglue2 import SuperGlue, normalize_keypoints, arange_like, log_optimal_transport -from sjlee_backup.losssuperglue import loss_superglue - -sys.path.append(os.path.join(os.path.dirname(__file__), 'cats')) -from sjlee.cats.cats import TransformerAggregator ########################################################### - -def dfs_freeze(model): - for name, child in model.named_children(): - for param in child.parameters(): - param.requires_grad = False - - dfs_freeze(child) - -def softmax_with_temperature(x, beta=2., d = 1): - r'''SFNet: Learning Object-aware Semantic Flow (Lee et al.)''' - M, _ = x.max(dim=d, keepdim=True) - x = x - M # subtract maximum value for stability - exp_x = torch.exp(x/beta) - exp_x_sum = exp_x.sum(dim=d, keepdim=True) - return exp_x / exp_x_sum - -# positional embedding 필요한가? -# M * N 크기가 다 다른 문제 -class SimpleSuperCATs(SuperGlue): - def __init__(self, - config, - feature_size=32, - feature_proj_dim=128, - depth=4, - num_heads=4, - mlp_ratio=4, - ): - super().__init__(config) - - # freeze superglue's layers - dfs_freeze(self.kenc) - dfs_freeze(self.gnn) - dfs_freeze(self.final_proj) - - self.feature_size = feature_size - self.feature_proj_dim = feature_proj_dim - self.decoder_embed_dim = self.feature_size ** 2 - - self.decoder = TransformerAggregator( - img_size=self.feature_size, embed_dim=self.decoder_embed_dim, depth=depth, num_heads=num_heads, - mlp_ratio=mlp_ratio, qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), - num_hyperpixel=1 - ) - - def forward(self, data): - """Run SuperGlue on a pair of keypoints and descriptors""" - with torch.no_grad(): - - desc0, desc1 = data['descriptors0'], data['descriptors1'] - kpts0, kpts1 = data['keypoints0'], data['keypoints1'] - - - - desc0 = desc0.transpose(0,1) - desc1 = desc1.transpose(0,1) - kpts0 = torch.reshape(kpts0, (1, -1, 2)) - kpts1 = torch.reshape(kpts1, (1, -1, 2)) - - if kpts0.shape[1] == 0 or kpts1.shape[1] == 0: # no keypoints - shape0, shape1 = kpts0.shape[:-1], kpts1.shape[:-1] - return [], { - 'matches0': kpts0.new_full(shape0, -1, dtype=torch.int)[0], - 'matches1': kpts1.new_full(shape1, -1, dtype=torch.int)[0], - 'matching_scores0': kpts0.new_zeros(shape0)[0], - 'matching_scores1': kpts1.new_zeros(shape1)[0], - 'skip_train': True - } - - # Keypoint normalization. - kpts0 = normalize_keypoints(kpts0, data['image0'].shape) - kpts1 = normalize_keypoints(kpts1, data['image1'].shape) - - # Keypoint MLP encoder. - desc0 = desc0 + self.kenc(kpts0, torch.transpose(data['scores0'], 0, 1)) - desc1 = desc1 + self.kenc(kpts1, torch.transpose(data['scores1'], 0, 1)) - - # Multi-layer Transformer network. - desc0, desc1 = self.gnn(desc0, desc1) - - # Final MLP projection. - mdesc0, mdesc1 = self.final_proj(desc0), self.final_proj(desc1) - - # Compute matching descriptor distance. - scores = torch.einsum('bdn,bdm->bnm', mdesc0, mdesc1) - scores = scores / self.config['descriptor_dim']**.5 - - #scores[scores>30.] = 30. - #scores[scores<-80.] = -80. - #print(scores.max(), scores.min()) - - b, m, n = scores.shape - max_keypoints = self.feature_size ** 2 - if m + n < max_keypoints *2: - p2d = (0, max_keypoints-n, 0, max_keypoints-m) - scores = F.pad(scores, p2d, 'constant', 0.).type(scores.dtype) - - #print(scores.max(), scores.min()) - scores = self.decoder(scores[:, None, :, :]) - - scores = (softmax_with_temperature(scores)) - - - - #scores = self.decoder(scores[:, None, :, :]) - #print(scores.max(), scores.min()) - scores = scores[:, :m, :n] - #print(scores.max(), scores.min()) - - # Run the optimal transport. - ''' - scores = log_optimal_transport( - scores, self.bin_score, - iters=self.config['sinkhorn_iterations']) - ''' - # Get the matches with score above "match_threshold". - max0, max1 = scores[:, :, :].max(2), scores[:, :, :].max(1) - indices0, indices1 = max0.indices, max1.indices - mutual0 = arange_like(indices0, 1)[None] == indices1.gather(1, indices0) - mutual1 = arange_like(indices1 , 1)[None] == indices0.gather(1, indices1) - zero = scores.new_tensor(0) - mscores0 = torch.where(mutual0, max0.values, zero) - mscores1 = torch.where(mutual1, mscores0.gather(1, indices1), zero) - valid0 = mutual0 & (mscores0 > self.config['match_threshold']) - valid1 = mutual1 & valid0.gather(1, indices1) - indices0 = torch.where(valid0, indices0, indices0.new_tensor(-1)) - indices1 = torch.where(valid1, indices1, indices1.new_tensor(-1)) - - #print(mscores0.min(), mscores0.max()) - #print(mscores0) - - return scores, { - 'matches0': indices0[0], # use -1 for invalid match - 'matches1': indices1[0], # use -1 for invalid match - 'matching_scores0': mscores0[0], - 'matching_scores1': mscores1[0], - 'skip_train': False - } - - -if __name__ == '__main__': - from superpoint import SuperPoint - - config = { - 'superpoint': { - 'nms_radius': 4, - 'keypoint_threshold': 0.005, - 'max_keypoints': 1024 - }, - 'superglue': { - 'weights': 'outdoor', - 'sinkhorn_iterations': 20, - 'match_threshold':0.2 - } - } - - """ - data = { - 'image0': torch.randn(1, 1, 512, 512), - 'image1': torch.randn(1, 1, 512, 512) - } - - superpoint = SuperPoint(config.get('superpoint', {})) - - output1 = superpoint({'image': data['image0']}) - output2 = superpoint({'image': data['image1']}) - - pred = {} - - pred = {**pred, **{k+'0': v for k, v in output1.items()}} - pred = {**pred, **{k+'1': v for k, v in output2.items()}} - - data = {**data, **pred} - - for k in data: - if isinstance(data[k], (list, tuple)): - data[k] = torch.stack(data[k]) - """ - - pred = { - 'keypoints0' : torch.randn(1, 1, 484, 2), - 'keypoints1' : torch.randn(1, 1, 484, 2), - 'descriptors0' : torch.randn(256, 1, 484), - 'descriptors1' : torch.randn(256, 1, 484), - 'scores0' : torch.randn(484, 1), - 'scores1' : torch.randn(484, 1), - 'image0' : torch.randn(1, 1, 512, 512), - 'image1' : torch.randn(1, 1, 512, 512), - # 'all_matches' : torch.randn(2, 1, 1248) - } - - superglue = SimpleSuperCATs(config.get('superglue', {})) - scores, output = superglue(pred) - - # loss = loss_superglue(scores, pred['all_matches'].permute(1, 2, 0)) - # print(loss) \ No newline at end of file diff --git a/sjlee_backup/IMCsuperglue.py b/sjlee_backup/IMCsuperglue.py deleted file mode 100644 index b7eb471..0000000 --- a/sjlee_backup/IMCsuperglue.py +++ /dev/null @@ -1,192 +0,0 @@ - -import os -import sys - -import torch -import torch.nn as nn -import torch.nn.functional as F - -import numpy as np -from functools import partial - -from pydoc import source_synopsis -from sjlee_backup.superglue2 import SuperGlue, normalize_keypoints, arange_like, log_optimal_transport -from sjlee_backup.losssuperglue import loss_superglue - -sys.path.append(os.path.join(os.path.dirname(__file__), 'cats')) -from cats import TransformerAggregator - -def dfs_freeze(model): - for name, child in model.named_children(): - for param in child.parameters(): - param.requires_grad = False - - dfs_freeze(child) - -def softmax_with_temperature(x, beta=2., d = 1): - r'''SFNet: Learning Object-aware Semantic Flow (Lee et al.)''' - M, _ = x.max(dim=d, keepdim=True) - x = x - M # subtract maximum value for stability - exp_x = torch.exp(x/beta) - exp_x_sum = exp_x.sum(dim=d, keepdim=True) - return exp_x / exp_x_sum - -# positional embedding 필요한가? -# M * N 크기가 다 다른 문제 -class SimpleSuperCATs(SuperGlue): - def __init__(self, - config, - feature_size=32, - feature_proj_dim=128, - depth=4, - num_heads=4, - mlp_ratio=4, - ): - super().__init__(config) - - # freeze superglue's layers - dfs_freeze(self.kenc) - dfs_freeze(self.gnn) - dfs_freeze(self.final_proj) - - self.feature_size = feature_size - self.feature_proj_dim = feature_proj_dim - self.decoder_embed_dim = self.feature_size ** 2 - - self.decoder = TransformerAggregator( - img_size=self.feature_size, embed_dim=self.decoder_embed_dim, depth=depth, num_heads=num_heads, - mlp_ratio=mlp_ratio, qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), - num_hyperpixel=1 - ) - - def forward(self, data): - """Run SuperGlue on a pair of keypoints and descriptors""" - with torch.no_grad(): - - desc0, desc1 = data['descriptors0'], data['descriptors1'] - kpts0, kpts1 = data['keypoints0'], data['keypoints1'] - - - - desc0 = desc0.transpose(0,1) - desc1 = desc1.transpose(0,1) - kpts0 = torch.reshape(kpts0, (1, -1, 2)) - kpts1 = torch.reshape(kpts1, (1, -1, 2)) - - if kpts0.shape[1] == 0 or kpts1.shape[1] == 0: # no keypoints - shape0, shape1 = kpts0.shape[:-1], kpts1.shape[:-1] - return [], { - 'matches0': kpts0.new_full(shape0, -1, dtype=torch.int)[0], - 'matches1': kpts1.new_full(shape1, -1, dtype=torch.int)[0], - 'matching_scores0': kpts0.new_zeros(shape0)[0], - 'matching_scores1': kpts1.new_zeros(shape1)[0], - 'skip_train': True - } - - # Keypoint normalization. - kpts0 = normalize_keypoints(kpts0, data['image0'].shape) - kpts1 = normalize_keypoints(kpts1, data['image1'].shape) - - # Keypoint MLP encoder. - desc0 = desc0 + self.kenc(kpts0, torch.transpose(data['scores0'], 0, 1)) - desc1 = desc1 + self.kenc(kpts1, torch.transpose(data['scores1'], 0, 1)) - - # Multi-layer Transformer network. - desc0, desc1 = self.gnn(desc0, desc1) - - # Final MLP projection. - mdesc0, mdesc1 = self.final_proj(desc0), self.final_proj(desc1) - - # Compute matching descriptor distance. - scores = torch.einsum('bdn,bdm->bnm', mdesc0, mdesc1) - scores = scores / self.config['descriptor_dim']**.5 - - #print(scores.max(), scores.min()) - - # Run the optimal transport. - - scores = log_optimal_transport( - scores, self.bin_score, - iters=self.config['sinkhorn_iterations']) - - # Get the matches with score above "match_threshold". - max0, max1 = scores[:, :-1, :-1].max(2), scores[:, :-1, :-1].max(1) - indices0, indices1 = max0.indices, max1.indices - mutual0 = arange_like(indices0, 1)[None] == indices1.gather(1, indices0) - mutual1 = arange_like(indices1 , 1)[None] == indices0.gather(1, indices1) - zero = scores.new_tensor(0) - mscores0 = torch.where(mutual0, max0.values.exp(), zero) - mscores1 = torch.where(mutual1, mscores0.gather(1, indices1), zero) - valid0 = mutual0 & (mscores0 > self.config['match_threshold']) - valid1 = mutual1 & valid0.gather(1, indices1) - indices0 = torch.where(valid0, indices0, indices0.new_tensor(-1)) - indices1 = torch.where(valid1, indices1, indices1.new_tensor(-1)) - - #print(mscores0.min(), mscores0.max()) - #print(mscores0) - - return scores, { - 'matches0': indices0[0], # use -1 for invalid match - 'matches1': indices1[0], # use -1 for invalid match - 'matching_scores0': mscores0[0], - 'matching_scores1': mscores1[0], - 'skip_train': False - } - - -if __name__ == '__main__': - from superpoint import SuperPoint - - config = { - 'superpoint': { - 'nms_radius': 4, - 'keypoint_threshold': 0.005, - 'max_keypoints': 1024 - }, - 'superglue': { - 'weights': 'outdoor', - 'sinkhorn_iterations': 20, - 'match_threshold':0.2 - } - } - - """ - data = { - 'image0': torch.randn(1, 1, 512, 512), - 'image1': torch.randn(1, 1, 512, 512) - } - - superpoint = SuperPoint(config.get('superpoint', {})) - - output1 = superpoint({'image': data['image0']}) - output2 = superpoint({'image': data['image1']}) - - pred = {} - - pred = {**pred, **{k+'0': v for k, v in output1.items()}} - pred = {**pred, **{k+'1': v for k, v in output2.items()}} - - data = {**data, **pred} - - for k in data: - if isinstance(data[k], (list, tuple)): - data[k] = torch.stack(data[k]) - """ - - pred = { - 'keypoints0' : torch.randn(1, 1, 484, 2), - 'keypoints1' : torch.randn(1, 1, 484, 2), - 'descriptors0' : torch.randn(256, 1, 484), - 'descriptors1' : torch.randn(256, 1, 484), - 'scores0' : torch.randn(484, 1), - 'scores1' : torch.randn(484, 1), - 'image0' : torch.randn(1, 1, 512, 512), - 'image1' : torch.randn(1, 1, 512, 512), - # 'all_matches' : torch.randn(2, 1, 1248) - } - - superglue = SimpleSuperCATs(config.get('superglue', {})) - scores, output = superglue(pred) - - # loss = loss_superglue(scores, pred['all_matches'].permute(1, 2, 0)) - # print(loss) \ No newline at end of file diff --git "a/sjlee_backup/IMC\353\202\230\354\244\221\354\227\220.py" "b/sjlee_backup/IMC\353\202\230\354\244\221\354\227\220.py" deleted file mode 100644 index 81129e1..0000000 --- "a/sjlee_backup/IMC\353\202\230\354\244\221\354\227\220.py" +++ /dev/null @@ -1,221 +0,0 @@ - -import os -import sys - -import torch -import torch.nn as nn -import torch.nn.functional as F - -import numpy as np -from functools import partial - -from pydoc import source_synopsis -from sjlee_backup.superglue2 import SuperGlue, normalize_keypoints, arange_like, log_optimal_transport -from sjlee_backup.loss import loss_superglue - -sys.path.append(os.path.join(os.path.dirname(__file__), 'cats')) -from cats import TransformerAggregator - -def dfs_freeze(model): - for name, child in model.named_children(): - for param in child.parameters(): - param.requires_grad = False - - dfs_freeze(child) - -def softmax_with_temperature(x, beta=2., d = 1): - r'''SFNet: Learning Object-aware Semantic Flow (Lee et al.)''' - M, _ = x.max(dim=d, keepdim=True) - x = x - M # subtract maximum value for stability - exp_x = torch.exp(x/beta) - exp_x_sum = exp_x.sum(dim=d, keepdim=True) - return exp_x / exp_x_sum - -# positional embedding 필요한가? -# M * N 크기가 다 다른 문제 -class SimpleSuperCATs(SuperGlue): - def __init__(self, - config, - feature_size=32, - feature_proj_dim=128, - depth=4, - num_heads=4, - mlp_ratio=4, - ): - super().__init__(config) - - # freeze superglue's layers - dfs_freeze(self.kenc) - dfs_freeze(self.gnn) - dfs_freeze(self.final_proj) - - self.feature_size = feature_size - self.feature_proj_dim = feature_proj_dim - self.decoder_embed_dim = self.feature_size ** 2 - - self.decoder = TransformerAggregator( - img_size=self.feature_size, embed_dim=self.decoder_embed_dim, depth=depth, num_heads=num_heads, - mlp_ratio=mlp_ratio, qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), - num_hyperpixel=1 - ) - - def forward(self, data): - """Run SuperGlue on a pair of keypoints and descriptors""" - with torch.no_grad(): - - desc0, desc1 = data['descriptors0'], data['descriptors1'] - kpts0, kpts1 = data['keypoints0'], data['keypoints1'] - - - - desc0 = desc0.transpose(0,1) - desc1 = desc1.transpose(0,1) - kpts0 = torch.reshape(kpts0, (1, -1, 2)) - kpts1 = torch.reshape(kpts1, (1, -1, 2)) - - if kpts0.shape[1] == 0 or kpts1.shape[1] == 0: # no keypoints - shape0, shape1 = kpts0.shape[:-1], kpts1.shape[:-1] - return [], { - 'matches0': kpts0.new_full(shape0, -1, dtype=torch.int)[0], - 'matches1': kpts1.new_full(shape1, -1, dtype=torch.int)[0], - 'matching_scores0': kpts0.new_zeros(shape0)[0], - 'matching_scores1': kpts1.new_zeros(shape1)[0], - 'skip_train': True - } - - - - # Keypoint normalization. - kpts0 = normalize_keypoints(kpts0, data['image0'].shape) - kpts1 = normalize_keypoints(kpts1, data['image1'].shape) - - # Keypoint MLP encoder. - desc0 = desc0 + self.kenc(kpts0, torch.transpose(data['scores0'], 0, 1)) - desc1 = desc1 + self.kenc(kpts1, torch.transpose(data['scores1'], 0, 1)) - - - # Multi-layer Transformer network. - desc0, desc1 = self.gnn(desc0, desc1) - - # Final MLP projection. - mdesc0, mdesc1 = self.final_proj(desc0), self.final_proj(desc1) - - # Compute matching descriptor distance. - scores = torch.einsum('bdn,bdm->bnm', mdesc0, mdesc1) - scores = scores / self.config['descriptor_dim']**.5 - - - - b, m, n = scores.shape - max_keypoints = self.feature_size ** 2 - if m + n < max_keypoints *2: - p2d = (0, max_keypoints-n, 0, max_keypoints-m) - scores = F.pad(scores, p2d, 'constant', 0.).type(scores.dtype) - - - scores = self.decoder(scores[:, None, :, :]) - scores = scores[:, :m, :n] - - #print(scores) - thr = 80. - scores[scores<-thr] = -thr - scores[scores>thr] = thr - #print(scores) - scores = (softmax_with_temperature(scores)) - - - # Run the optimal transport. - ''' - scores = log_optimal_transport( - scores, self.bin_score, - iters=self.config['sinkhorn_iterations']) - scores[scores<-100.] = -100. - scores = scores[:, :-1, :-1].exp() - ''' - - #print(scores) - - #print(scores.min(), scores.max()) - #print(scores.exp().min(), scores.exp().max()) - - # Get the matches with score above "match_threshold". - max0, max1 = scores[:, :, :].max(2), scores[:, :, :].max(1) - indices0, indices1 = max0.indices, max1.indices - mutual0 = arange_like(indices0, 1)[None] == indices1.gather(1, indices0) - mutual1 = arange_like(indices1 , 1)[None] == indices0.gather(1, indices1) - zero = scores.new_tensor(0) - mscores0 = torch.where(mutual0, max0.values, zero) - mscores1 = torch.where(mutual1, mscores0.gather(1, indices1), zero) - valid0 = mutual0 & (mscores0 > self.config['match_threshold']) - valid1 = mutual1 & valid0.gather(1, indices1) - indices0 = torch.where(valid0, indices0, indices0.new_tensor(-1)) - indices1 = torch.where(valid1, indices1, indices1.new_tensor(-1)) - - #print(mscores0.min(), mscores0.max()) - #print(mscores0) - - return scores, { - 'matches0': indices0[0], # use -1 for invalid match - 'matches1': indices1[0], # use -1 for invalid match - 'matching_scores0': mscores0[0], - 'matching_scores1': mscores1[0], - 'skip_train': False - } - - -if __name__ == '__main__': - from superpoint import SuperPoint - - config = { - 'superpoint': { - 'nms_radius': 4, - 'keypoint_threshold': 0.005, - 'max_keypoints': 1024 - }, - 'superglue': { - 'weights': 'outdoor', - 'sinkhorn_iterations': 20, - 'match_threshold':0.2 - } - } - - """ - data = { - 'image0': torch.randn(1, 1, 512, 512), - 'image1': torch.randn(1, 1, 512, 512) - } - - superpoint = SuperPoint(config.get('superpoint', {})) - - output1 = superpoint({'image': data['image0']}) - output2 = superpoint({'image': data['image1']}) - - pred = {} - - pred = {**pred, **{k+'0': v for k, v in output1.items()}} - pred = {**pred, **{k+'1': v for k, v in output2.items()}} - - data = {**data, **pred} - - for k in data: - if isinstance(data[k], (list, tuple)): - data[k] = torch.stack(data[k]) - """ - - pred = { - 'keypoints0' : torch.randn(1, 1, 484, 2), - 'keypoints1' : torch.randn(1, 1, 484, 2), - 'descriptors0' : torch.randn(256, 1, 484), - 'descriptors1' : torch.randn(256, 1, 484), - 'scores0' : torch.randn(484, 1), - 'scores1' : torch.randn(484, 1), - 'image0' : torch.randn(1, 1, 512, 512), - 'image1' : torch.randn(1, 1, 512, 512), - # 'all_matches' : torch.randn(2, 1, 1248) - } - - superglue = SimpleSuperCATs(config.get('superglue', {})) - scores, output = superglue(pred) - - # loss = loss_superglue(scores, pred['all_matches'].permute(1, 2, 0)) - # print(loss) \ No newline at end of file diff --git a/sjlee_backup/__pycache__/IMC.cpython-38.pyc b/sjlee_backup/__pycache__/IMC.cpython-38.pyc deleted file mode 100644 index a70c1a1..0000000 Binary files a/sjlee_backup/__pycache__/IMC.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/__pycache__/IMC_backup.cpython-38.pyc b/sjlee_backup/__pycache__/IMC_backup.cpython-38.pyc deleted file mode 100644 index 6d0a596..0000000 Binary files a/sjlee_backup/__pycache__/IMC_backup.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/__pycache__/IMCcopy.cpython-38.pyc b/sjlee_backup/__pycache__/IMCcopy.cpython-38.pyc deleted file mode 100644 index 4adcea9..0000000 Binary files a/sjlee_backup/__pycache__/IMCcopy.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/__pycache__/IMCsuperglue.cpython-38.pyc b/sjlee_backup/__pycache__/IMCsuperglue.cpython-38.pyc deleted file mode 100644 index 1cee602..0000000 Binary files a/sjlee_backup/__pycache__/IMCsuperglue.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/__pycache__/loss.cpython-38.pyc b/sjlee_backup/__pycache__/loss.cpython-38.pyc deleted file mode 100644 index 57d4a2b..0000000 Binary files a/sjlee_backup/__pycache__/loss.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/__pycache__/losssuperglue.cpython-38.pyc b/sjlee_backup/__pycache__/losssuperglue.cpython-38.pyc deleted file mode 100644 index 81b596e..0000000 Binary files a/sjlee_backup/__pycache__/losssuperglue.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/__pycache__/superglue.cpython-38.pyc b/sjlee_backup/__pycache__/superglue.cpython-38.pyc deleted file mode 100644 index 3acd4d8..0000000 Binary files a/sjlee_backup/__pycache__/superglue.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/__pycache__/superglue2.cpython-38.pyc b/sjlee_backup/__pycache__/superglue2.cpython-38.pyc deleted file mode 100644 index 8dccc4e..0000000 Binary files a/sjlee_backup/__pycache__/superglue2.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/__pycache__/superpoint.cpython-38.pyc b/sjlee_backup/__pycache__/superpoint.cpython-38.pyc deleted file mode 100644 index 262ba3e..0000000 Binary files a/sjlee_backup/__pycache__/superpoint.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/cats/__pycache__/cats.cpython-38.pyc b/sjlee_backup/cats/__pycache__/cats.cpython-38.pyc deleted file mode 100644 index 0ce884b..0000000 Binary files a/sjlee_backup/cats/__pycache__/cats.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/cats/__pycache__/cats.cpython-39.pyc b/sjlee_backup/cats/__pycache__/cats.cpython-39.pyc deleted file mode 100644 index 5711968..0000000 Binary files a/sjlee_backup/cats/__pycache__/cats.cpython-39.pyc and /dev/null differ diff --git a/sjlee_backup/cats/__pycache__/mod.cpython-38.pyc b/sjlee_backup/cats/__pycache__/mod.cpython-38.pyc deleted file mode 100644 index 23b810b..0000000 Binary files a/sjlee_backup/cats/__pycache__/mod.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/cats/cats.py b/sjlee_backup/cats/cats.py deleted file mode 100644 index e85d793..0000000 --- a/sjlee_backup/cats/cats.py +++ /dev/null @@ -1,404 +0,0 @@ -import os -import sys -from operator import add -from functools import reduce, partial - -import torch -import torch.nn as nn -import torch.nn.functional as F -import numpy as np - -import torchvision.models as models - -from feature_backbones import resnet -from mod import FeatureL2Norm, unnormalise_and_convert_mapping_to_flow - -''' -Modified timm library Vision Transformer implementation -https://github.com/rwightman/pytorch-image-models -''' - -# ================= timm functions START ================= # - -import math -import warnings - -def drop_path(x, drop_prob: float = 0., training: bool = False, scale_by_keep: bool = True): - """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). - This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, - the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... - See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for - changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use - 'survival rate' as the argument. - """ - if drop_prob == 0. or not training: - return x - keep_prob = 1 - drop_prob - shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets - random_tensor = x.new_empty(shape).bernoulli_(keep_prob) - if keep_prob > 0.0 and scale_by_keep: - random_tensor.div_(keep_prob) - return x * random_tensor - -def _no_grad_trunc_normal_(tensor, mean, std, a, b): - # Cut & paste from PyTorch official master until it's in a few official releases - RW - # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf - def norm_cdf(x): - # Computes standard normal cumulative distribution function - return (1. + math.erf(x / math.sqrt(2.))) / 2. - - if (mean < a - 2 * std) or (mean > b + 2 * std): - warnings.warn("mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " - "The distribution of values may be incorrect.", - stacklevel=2) - - with torch.no_grad(): - # Values are generated by using a truncated uniform distribution and - # then using the inverse CDF for the normal distribution. - # Get upper and lower cdf values - l = norm_cdf((a - mean) / std) - u = norm_cdf((b - mean) / std) - - # Uniformly fill tensor with values from [l, u], then translate to - # [2l-1, 2u-1]. - tensor.uniform_(2 * l - 1, 2 * u - 1) - - # Use inverse cdf transform for normal distribution to get truncated - # standard normal - tensor.erfinv_() - - # Transform to proper mean, std - tensor.mul_(std * math.sqrt(2.)) - tensor.add_(mean) - - # Clamp to ensure it's in the proper range - tensor.clamp_(min=a, max=b) - return tensor - - -def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.): - # type: (Tensor, float, float, float, float) -> Tensor - r"""Fills the input Tensor with values drawn from a truncated - normal distribution. The values are effectively drawn from the - normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)` - with values outside :math:`[a, b]` redrawn until they are within - the bounds. The method used for generating the random values works - best when :math:`a \leq \text{mean} \leq b`. - Args: - tensor: an n-dimensional `torch.Tensor` - mean: the mean of the normal distribution - std: the standard deviation of the normal distribution - a: the minimum cutoff value - b: the maximum cutoff value - Examples: - >>> w = torch.empty(3, 5) - >>> nn.init.trunc_normal_(w) - """ - return _no_grad_trunc_normal_(tensor, mean, std, a, b) - -# ================= timm functions END================= # - - - - -class Mlp(nn.Module): - def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.): - super().__init__() - out_features = out_features or in_features - hidden_features = hidden_features or in_features - self.fc1 = nn.Linear(in_features, hidden_features) - self.act = act_layer() - self.fc2 = nn.Linear(hidden_features, out_features) - self.drop = nn.Dropout(drop) - - def forward(self, x): - x = self.fc1(x) - x = self.act(x) - x = self.drop(x) - x = self.fc2(x) - x = self.drop(x) - return x - -class Attention(nn.Module): - def __init__(self, dim, num_heads=8, qkv_bias=False, qk_scale=None, attn_drop=0., proj_drop=0.): - super().__init__() - self.num_heads = num_heads - head_dim = dim // num_heads - # NOTE scale factor was wrong in my original version, can set manually to be compat with prev weights - self.scale = qk_scale or head_dim ** -0.5 - - self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) - self.attn_drop = nn.Dropout(attn_drop) - self.proj = nn.Linear(dim, dim) - self.proj_drop = nn.Dropout(proj_drop) - - def forward(self, x): - B, N, C = x.shape - qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4) - q, k, v = qkv[0], qkv[1], qkv[2] # make torchscript happy (cannot use tensor as tuple) - - attn = (q @ k.transpose(-2, -1)) * self.scale - attn = attn.softmax(dim=-1) - attn = self.attn_drop(attn) - - x = (attn @ v).transpose(1, 2).reshape(B, N, C) - x = self.proj(x) - x = self.proj_drop(x) - return x - - -class MultiscaleBlock(nn.Module): - - def __init__(self, dim, num_heads, mlp_ratio=4., qkv_bias=False, qk_scale=None, drop=0., attn_drop=0., - drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm): - super().__init__() - self.attn = Attention( - dim, num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop, proj_drop=drop) - self.attn_multiscale = Attention( - dim, num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop, proj_drop=drop) - # NOTE: drop path for stochastic depth, we shall see if this is better than dropout here - self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity() - self.norm1 = norm_layer(dim) - self.norm2 = norm_layer(dim) - self.norm3 = norm_layer(dim) - self.norm4 = norm_layer(dim) - mlp_hidden_dim = int(dim * mlp_ratio) - self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop) - self.mlp2 = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop) - - def forward(self, x): - ''' - Multi-level aggregation - ''' - B, N, H, W = x.shape - if N == 1: - x = x.flatten(0, 1) - a = self.norm1(x) - x = x + self.drop_path(self.attn(self.norm1(x))) - x = x + self.drop_path(self.mlp(self.norm2(x))) - return x.view(B, N, H, W) - x = x.flatten(0, 1) - x = x + self.drop_path(self.attn(self.norm1(x))) - x = x + self.drop_path(self.mlp2(self.norm4(x))) - x = x.view(B, N, H, W).transpose(1, 2).flatten(0, 1) - x = x + self.drop_path(self.attn_multiscale(self.norm3(x))) - x = x.view(B, H, N, W).transpose(1, 2).flatten(0, 1) - x = x + self.drop_path(self.mlp(self.norm2(x))) - x = x.view(B, N, H, W) - return x - - -class TransformerAggregator(nn.Module): - def __init__(self, num_hyperpixel, img_size=224, embed_dim=768, depth=12, num_heads=12, mlp_ratio=4., qkv_bias=True, qk_scale=None, - drop_rate=0., attn_drop_rate=0., drop_path_rate=0., norm_layer=None): - super().__init__() - self.img_size = img_size - self.num_features = self.embed_dim = embed_dim # num_features for consistency with other models - norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6) - - self.pos_embed_x = nn.Parameter(torch.zeros(1, num_hyperpixel, 1, img_size, embed_dim // 2)) - self.pos_embed_y = nn.Parameter(torch.zeros(1, num_hyperpixel, img_size, 1, embed_dim // 2)) - self.pos_drop = nn.Dropout(p=drop_rate) - - dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)] # stochastic depth decay rule - self.blocks = nn.Sequential(*[ - MultiscaleBlock( - dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, qk_scale=qk_scale, - drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer) - for i in range(depth)]) - - self.proj = nn.Linear(embed_dim, img_size ** 2) - self.norm = norm_layer(embed_dim) - - trunc_normal_(self.pos_embed_x, std=.02) - trunc_normal_(self.pos_embed_y, std=.02) - self.apply(self._init_weights) - - def _init_weights(self, m): - if isinstance(m, nn.Linear): - trunc_normal_(m.weight, std=.02) - if isinstance(m, nn.Linear) and m.bias is not None: - nn.init.constant_(m.bias, 0) - elif isinstance(m, nn.LayerNorm): - nn.init.constant_(m.bias, 0) - nn.init.constant_(m.weight, 1.0) - - def forward(self, corr): - B = corr.shape[0] - x = corr.clone() - - pos_embed = torch.cat((self.pos_embed_x.repeat(1, 1, self.img_size, 1, 1), self.pos_embed_y.repeat(1, 1, 1, self.img_size, 1)), dim=4) - pos_embed = pos_embed.flatten(2, 3) - - x = x.transpose(-1, -2) + pos_embed - x = self.proj(self.blocks(x)).transpose(-1, -2) + corr # swapping the axis for swapping self-attention. - - x = x + pos_embed - x = self.proj(self.blocks(x)) + corr - - return x.mean(1) - - -class FeatureExtractionHyperPixel(nn.Module): - def __init__(self, hyperpixel_ids, feature_size, freeze=True): - super().__init__() - self.backbone = resnet.resnet101(pretrained=True) - self.feature_size = feature_size - if freeze: - for param in self.backbone.parameters(): - param.requires_grad = False - nbottlenecks = [3, 4, 23, 3] - self.bottleneck_ids = reduce(add, list(map(lambda x: list(range(x)), nbottlenecks))) - self.layer_ids = reduce(add, [[i + 1] * x for i, x in enumerate(nbottlenecks)]) - self.hyperpixel_ids = hyperpixel_ids - - - def forward(self, img): - r"""Extract desired a list of intermediate features""" - - feats = [] - - # Layer 0 - feat = self.backbone.conv1.forward(img) - feat = self.backbone.bn1.forward(feat) - feat = self.backbone.relu.forward(feat) - feat = self.backbone.maxpool.forward(feat) - if 0 in self.hyperpixel_ids: - feats.append(feat.clone()) - - # Layer 1-4 - for hid, (bid, lid) in enumerate(zip(self.bottleneck_ids, self.layer_ids)): - res = feat - feat = self.backbone.__getattr__('layer%d' % lid)[bid].conv1.forward(feat) - feat = self.backbone.__getattr__('layer%d' % lid)[bid].bn1.forward(feat) - feat = self.backbone.__getattr__('layer%d' % lid)[bid].relu.forward(feat) - feat = self.backbone.__getattr__('layer%d' % lid)[bid].conv2.forward(feat) - feat = self.backbone.__getattr__('layer%d' % lid)[bid].bn2.forward(feat) - feat = self.backbone.__getattr__('layer%d' % lid)[bid].relu.forward(feat) - feat = self.backbone.__getattr__('layer%d' % lid)[bid].conv3.forward(feat) - feat = self.backbone.__getattr__('layer%d' % lid)[bid].bn3.forward(feat) - - if bid == 0: - res = self.backbone.__getattr__('layer%d' % lid)[bid].downsample.forward(res) - - feat += res - - if hid + 1 in self.hyperpixel_ids: - feats.append(feat.clone()) - #if hid + 1 == max(self.hyperpixel_ids): - # break - feat = self.backbone.__getattr__('layer%d' % lid)[bid].relu.forward(feat) - - # Up-sample & concatenate features to construct a hyperimage - - """ - for idx, feat in enumerate(feats): - feats[idx] = F.interpolate(feat, self.feature_size, None, 'bilinear', True) - """ - - return feats - - -class CATs(nn.Module): - def __init__(self, - feature_size=16, - feature_proj_dim=128, - depth=4, - num_heads=6, - mlp_ratio=4, - hyperpixel_ids=[0,8,20,21,26,28,29,30], - freeze=True): - super().__init__() - self.feature_size = feature_size - self.feature_proj_dim = feature_proj_dim - self.decoder_embed_dim = self.feature_size ** 2 + self.feature_proj_dim - - channels = [64] + [256] * 3 + [512] * 4 + [1024] * 23 + [2048] * 3 - - self.feature_extraction = FeatureExtractionHyperPixel(hyperpixel_ids, feature_size, freeze) - self.proj = nn.ModuleList([ - nn.Linear(channels[i], self.feature_proj_dim) for i in hyperpixel_ids - ]) - - self.decoder = TransformerAggregator( - img_size=self.feature_size, embed_dim=self.decoder_embed_dim, depth=depth, num_heads=num_heads, - mlp_ratio=mlp_ratio, qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), - num_hyperpixel=len(hyperpixel_ids)) - - self.l2norm = FeatureL2Norm() - - self.x_normal = np.linspace(-1,1,self.feature_size) - self.x_normal = nn.Parameter(torch.tensor(self.x_normal, dtype=torch.float, requires_grad=False)) - self.y_normal = np.linspace(-1,1,self.feature_size) - self.y_normal = nn.Parameter(torch.tensor(self.y_normal, dtype=torch.float, requires_grad=False)) - - def softmax_with_temperature(self, x, beta, d = 1): - r'''SFNet: Learning Object-aware Semantic Flow (Lee et al.)''' - M, _ = x.max(dim=d, keepdim=True) - x = x - M # subtract maximum value for stability - exp_x = torch.exp(x/beta) - exp_x_sum = exp_x.sum(dim=d, keepdim=True) - return exp_x / exp_x_sum - - def soft_argmax(self, corr, beta=0.02): - r'''SFNet: Learning Object-aware Semantic Flow (Lee et al.)''' - b,_,h,w = corr.size() - - corr = self.softmax_with_temperature(corr, beta=beta, d=1) - corr = corr.view(-1,h,w,h,w) # (target hxw) x (source hxw) - - grid_x = corr.sum(dim=1, keepdim=False) # marginalize to x-coord. - x_normal = self.x_normal.expand(b,w) - x_normal = x_normal.view(b,w,1,1) - grid_x = (grid_x*x_normal).sum(dim=1, keepdim=True) # b x 1 x h x w - - grid_y = corr.sum(dim=2, keepdim=False) # marginalize to y-coord. - y_normal = self.y_normal.expand(b,h) - y_normal = y_normal.view(b,h,1,1) - grid_y = (grid_y*y_normal).sum(dim=1, keepdim=True) # b x 1 x h x w - return grid_x, grid_y - - def mutual_nn_filter(self, correlation_matrix): - r"""Mutual nearest neighbor filtering (Rocco et al. NeurIPS'18)""" - corr_src_max = torch.max(correlation_matrix, dim=3, keepdim=True)[0] - corr_trg_max = torch.max(correlation_matrix, dim=2, keepdim=True)[0] - corr_src_max[corr_src_max == 0] += 1e-30 - corr_trg_max[corr_trg_max == 0] += 1e-30 - - corr_src = correlation_matrix / corr_src_max - corr_trg = correlation_matrix / corr_trg_max - - return correlation_matrix * (corr_src * corr_trg) - - def corr(self, src, trg): - return src.flatten(2).transpose(-1, -2) @ trg.flatten(2) - - def forward(self, target, source): - B, _, H, W = target.size() - - src_feats = self.feature_extraction(source) - tgt_feats = self.feature_extraction(target) - - corrs = [] - src_feats_proj = [] - tgt_feats_proj = [] - for i, (src, tgt) in enumerate(zip(src_feats, tgt_feats)): - corr = self.corr(self.l2norm(src), self.l2norm(tgt)) - corrs.append(corr) - src_feats_proj.append(self.proj[i](src.flatten(2).transpose(-1, -2))) - tgt_feats_proj.append(self.proj[i](tgt.flatten(2).transpose(-1, -2))) - - src_feats = torch.stack(src_feats_proj, dim=1) - tgt_feats = torch.stack(tgt_feats_proj, dim=1) - corr = torch.stack(corrs, dim=1) - - corr = self.mutual_nn_filter(corr) - - refined_corr = self.decoder(corr, src_feats, tgt_feats) - - grid_x, grid_y = self.soft_argmax(refined_corr.view(B, -1, self.feature_size, self.feature_size)) - - flow = torch.cat((grid_x, grid_y), dim=1) - flow = unnormalise_and_convert_mapping_to_flow(flow) - - return flow diff --git a/sjlee_backup/cats/feature_backbones/__pycache__/resnet.cpython-38.pyc b/sjlee_backup/cats/feature_backbones/__pycache__/resnet.cpython-38.pyc deleted file mode 100644 index 29a0c6e..0000000 Binary files a/sjlee_backup/cats/feature_backbones/__pycache__/resnet.cpython-38.pyc and /dev/null differ diff --git a/sjlee_backup/cats/feature_backbones/resnet.py b/sjlee_backup/cats/feature_backbones/resnet.py deleted file mode 100644 index 2c94e68..0000000 --- a/sjlee_backup/cats/feature_backbones/resnet.py +++ /dev/null @@ -1,342 +0,0 @@ -import torch -import torch.nn as nn -#from .utils import load_state_dict_from_url -try: - from torch.hub import load_state_dict_from_url -except ImportError: - from torch.utils.model_zoo import load_url as load_state_dict_from_url - - -__all__ = ['ResNet', 'resnet18', 'resnet34', 'resnet50', 'resnet101', - 'resnet152', 'resnext50_32x4d', 'resnext101_32x8d', - 'wide_resnet50_2', 'wide_resnet101_2'] - - -model_urls = { - 'resnet18': 'https://download.pytorch.org/models/resnet18-5c106cde.pth', - 'resnet34': 'https://download.pytorch.org/models/resnet34-333f7ec4.pth', - 'resnet50': 'https://download.pytorch.org/models/resnet50-19c8e357.pth', - 'resnet101': 'https://download.pytorch.org/models/resnet101-5d3b4d8f.pth', - 'resnet152': 'https://download.pytorch.org/models/resnet152-b121ed2d.pth', - 'resnext50_32x4d': 'https://download.pytorch.org/models/resnext50_32x4d-7cdf4587.pth', - 'resnext101_32x8d': 'https://download.pytorch.org/models/resnext101_32x8d-8ba56ff5.pth', - 'wide_resnet50_2': 'https://download.pytorch.org/models/wide_resnet50_2-95faca4d.pth', - 'wide_resnet101_2': 'https://download.pytorch.org/models/wide_resnet101_2-32ee1156.pth', -} - - -def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1): - """3x3 convolution with padding""" - return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, - padding=dilation, groups=groups, bias=False, dilation=dilation) - - -def conv1x1(in_planes, out_planes, stride=1): - """1x1 convolution""" - return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) - - -class BasicBlock(nn.Module): - expansion = 1 - __constants__ = ['downsample'] - - def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, - base_width=64, dilation=1, norm_layer=None): - super(BasicBlock, self).__init__() - if norm_layer is None: - norm_layer = nn.BatchNorm2d - if groups != 1 or base_width != 64: - raise ValueError('BasicBlock only supports groups=1 and base_width=64') - if dilation > 1: - raise NotImplementedError("Dilation > 1 not supported in BasicBlock") - # Both self.conv1 and self.downsample layers downsample the input when stride != 1 - self.conv1 = conv3x3(inplanes, planes, stride) - self.bn1 = norm_layer(planes) - self.relu = nn.ReLU(inplace=True) - self.conv2 = conv3x3(planes, planes) - self.bn2 = norm_layer(planes) - self.downsample = downsample - self.stride = stride - - def forward(self, x): - identity = x - - out = self.conv1(x) - out = self.bn1(out) - out = self.relu(out) - - out = self.conv2(out) - out = self.bn2(out) - - if self.downsample is not None: - identity = self.downsample(x) - - out += identity - out = self.relu(out) - - return out - - -class Bottleneck(nn.Module): - expansion = 4 - __constants__ = ['downsample'] - - def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, - base_width=64, dilation=1, norm_layer=None): - super(Bottleneck, self).__init__() - if norm_layer is None: - norm_layer = nn.BatchNorm2d - width = int(planes * (base_width / 64.)) * groups - # Both self.conv2 and self.downsample layers downsample the input when stride != 1 - self.conv1 = conv1x1(inplanes, width) - self.bn1 = norm_layer(width) - self.conv2 = conv3x3(width, width, stride, groups, dilation) - self.bn2 = norm_layer(width) - self.conv3 = conv1x1(width, planes * self.expansion) - self.bn3 = norm_layer(planes * self.expansion) - self.relu = nn.ReLU(inplace=True) - self.downsample = downsample - self.stride = stride - - def forward(self, x): - identity = x - - out = self.conv1(x) - out = self.bn1(out) - out = self.relu(out) - - out = self.conv2(out) - out = self.bn2(out) - out = self.relu(out) - - out = self.conv3(out) - out = self.bn3(out) - - if self.downsample is not None: - identity = self.downsample(x) - - out += identity - out = self.relu(out) - - return out - - -class ResNet(nn.Module): - - def __init__(self, block, layers, num_classes=1000, zero_init_residual=False, - groups=1, width_per_group=64, replace_stride_with_dilation=None, - norm_layer=None): - super(ResNet, self).__init__() - if norm_layer is None: - norm_layer = nn.BatchNorm2d - self._norm_layer = norm_layer - - self.inplanes = 64 - self.dilation = 1 - if replace_stride_with_dilation is None: - # each element in the tuple indicates if we should replace - # the 2x2 stride with a dilated convolution instead - replace_stride_with_dilation = [False, False, False] - if len(replace_stride_with_dilation) != 3: - raise ValueError("replace_stride_with_dilation should be None " - "or a 3-element tuple, got {}".format(replace_stride_with_dilation)) - self.groups = groups - self.base_width = width_per_group - self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, - bias=False) - self.bn1 = norm_layer(self.inplanes) - self.relu = nn.ReLU(inplace=True) - self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) - self.layer1 = self._make_layer(block, 64, layers[0]) - self.layer2 = self._make_layer(block, 128, layers[1], stride=2, - dilate=replace_stride_with_dilation[0]) - self.layer3 = self._make_layer(block, 256, layers[2], stride=2, - dilate=replace_stride_with_dilation[1]) - self.layer4 = self._make_layer(block, 512, layers[3], stride=2, - dilate=replace_stride_with_dilation[2]) - self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) - self.fc = nn.Linear(512 * block.expansion, num_classes) - - for m in self.modules(): - if isinstance(m, nn.Conv2d): - nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') - elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): - nn.init.constant_(m.weight, 1) - nn.init.constant_(m.bias, 0) - - # Zero-initialize the last BN in each residual branch, - # so that the residual branch starts with zeros, and each residual block behaves like an identity. - # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677 - if zero_init_residual: - for m in self.modules(): - if isinstance(m, Bottleneck): - nn.init.constant_(m.bn3.weight, 0) - elif isinstance(m, BasicBlock): - nn.init.constant_(m.bn2.weight, 0) - - def _make_layer(self, block, planes, blocks, stride=1, dilate=False): - norm_layer = self._norm_layer - downsample = None - previous_dilation = self.dilation - if dilate: - self.dilation *= stride - stride = 1 - if stride != 1 or self.inplanes != planes * block.expansion: - downsample = nn.Sequential( - conv1x1(self.inplanes, planes * block.expansion, stride), - norm_layer(planes * block.expansion), - ) - - layers = [] - layers.append(block(self.inplanes, planes, stride, downsample, self.groups, - self.base_width, previous_dilation, norm_layer)) - self.inplanes = planes * block.expansion - for _ in range(1, blocks): - layers.append(block(self.inplanes, planes, groups=self.groups, - base_width=self.base_width, dilation=self.dilation, - norm_layer=norm_layer)) - - return nn.Sequential(*layers) - - def _forward(self, x): - x = self.conv1(x) - print(x.shape) - x = self.bn1(x) - x = self.relu(x) - x = self.maxpool(x) - - x = self.layer1(x) - x = self.layer2(x) - x = self.layer3(x) - x = self.layer4(x) - - x = self.avgpool(x) - x = torch.flatten(x, 1) - x = self.fc(x) - - return x - - # Allow for accessing forward method in a inherited class - forward = _forward - - -def _resnet(arch, block, layers, pretrained, progress, **kwargs): - model = ResNet(block, layers, **kwargs) - if pretrained: - state_dict = load_state_dict_from_url(model_urls[arch], - progress=progress) - model.load_state_dict(state_dict) - return model - - -def resnet18(pretrained=False, progress=True, **kwargs): - r"""ResNet-18 model from - `"Deep Residual Learning for Image Recognition" `_ - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - return _resnet('resnet18', BasicBlock, [2, 2, 2, 2], pretrained, progress, - **kwargs) - - -def resnet34(pretrained=False, progress=True, **kwargs): - r"""ResNet-34 model from - `"Deep Residual Learning for Image Recognition" `_ - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - return _resnet('resnet34', BasicBlock, [3, 4, 6, 3], pretrained, progress, - **kwargs) - - -def resnet50(pretrained=False, progress=True, **kwargs): - r"""ResNet-50 model from - `"Deep Residual Learning for Image Recognition" `_ - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - return _resnet('resnet50', Bottleneck, [3, 4, 6, 3], pretrained, progress, - **kwargs) - - -def resnet101(pretrained=False, progress=True, **kwargs): - r"""ResNet-101 model from - `"Deep Residual Learning for Image Recognition" `_ - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - return _resnet('resnet101', Bottleneck, [3, 4, 23, 3], pretrained, progress, - **kwargs) - - -def resnet152(pretrained=False, progress=True, **kwargs): - r"""ResNet-152 model from - `"Deep Residual Learning for Image Recognition" `_ - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - return _resnet('resnet152', Bottleneck, [3, 8, 36, 3], pretrained, progress, - **kwargs) - - -def resnext50_32x4d(pretrained=False, progress=True, **kwargs): - r"""ResNeXt-50 32x4d model from - `"Aggregated Residual Transformation for Deep Neural Networks" `_ - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - kwargs['groups'] = 32 - kwargs['width_per_group'] = 4 - return _resnet('resnext50_32x4d', Bottleneck, [3, 4, 6, 3], - pretrained, progress, **kwargs) - - -def resnext101_32x8d(pretrained=False, progress=True, **kwargs): - r"""ResNeXt-101 32x8d model from - `"Aggregated Residual Transformation for Deep Neural Networks" `_ - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - kwargs['groups'] = 32 - kwargs['width_per_group'] = 8 - return _resnet('resnext101_32x8d', Bottleneck, [3, 4, 23, 3], - pretrained, progress, **kwargs) - - -def wide_resnet50_2(pretrained=False, progress=True, **kwargs): - r"""Wide ResNet-50-2 model from - `"Wide Residual Networks" `_ - The model is the same as ResNet except for the bottleneck number of channels - which is twice larger in every block. The number of channels in outer 1x1 - convolutions is the same, e.g. last block in ResNet-50 has 2048-512-2048 - channels, and in Wide ResNet-50-2 has 2048-1024-2048. - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - kwargs['width_per_group'] = 64 * 2 - return _resnet('wide_resnet50_2', Bottleneck, [3, 4, 6, 3], - pretrained, progress, **kwargs) - - -def wide_resnet101_2(pretrained=False, progress=True, **kwargs): - r"""Wide ResNet-101-2 model from - `"Wide Residual Networks" `_ - The model is the same as ResNet except for the bottleneck number of channels - which is twice larger in every block. The number of channels in outer 1x1 - convolutions is the same, e.g. last block in ResNet-50 has 2048-512-2048 - channels, and in Wide ResNet-50-2 has 2048-1024-2048. - Args: - pretrained (bool): If True, returns a model pre-trained on ImageNet - progress (bool): If True, displays a progress bar of the download to stderr - """ - kwargs['width_per_group'] = 64 * 2 - return _resnet('wide_resnet101_2', Bottleneck, [3, 4, 23, 3], - pretrained, progress, **kwargs) \ No newline at end of file diff --git a/sjlee_backup/cats/mod.py b/sjlee_backup/cats/mod.py deleted file mode 100644 index 7ce21fa..0000000 --- a/sjlee_backup/cats/mod.py +++ /dev/null @@ -1,213 +0,0 @@ -import torch -import torch.nn as nn -import numpy as np -from torch.autograd import Variable - -r''' -Copy-pasted from GLU-Net -https://github.com/PruneTruong/GLU-Net -''' - - -def conv(in_planes, out_planes, kernel_size=3, stride=1, padding=1, dilation=1, batch_norm=False): - if batch_norm: - return nn.Sequential( - nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, - padding=padding, dilation=dilation, bias=True), - nn.BatchNorm2d(out_planes), - nn.LeakyReLU(0.1, inplace=True)) - else: - return nn.Sequential( - nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, - padding=padding, dilation=dilation, bias=True), - nn.LeakyReLU(0.1)) - - -def predict_flow(in_planes): - return nn.Conv2d(in_planes,2,kernel_size=3,stride=1,padding=1,bias=True) - - -def deconv(in_planes, out_planes, kernel_size=4, stride=2, padding=1): - return nn.ConvTranspose2d(in_planes, out_planes, kernel_size, stride, padding, bias=True) - - -def unnormalise_and_convert_mapping_to_flow(map): - # here map is normalised to -1;1 - # we put it back to 0,W-1, then convert it to flow - B, C, H, W = map.size() - mapping = torch.zeros_like(map) - # mesh grid - mapping[:,0,:,:] = (map[:, 0, :, :].float().clone() + 1) * (W - 1) / 2.0 # unormalise - mapping[:,1,:,:] = (map[:, 1, :, :].float().clone() + 1) * (H - 1) / 2.0 # unormalise - - xx = torch.arange(0, W).view(1,-1).repeat(H,1) - yy = torch.arange(0, H).view(-1,1).repeat(1,W) - xx = xx.view(1,1,H,W).repeat(B,1,1,1) - yy = yy.view(1,1,H,W).repeat(B,1,1,1) - grid = torch.cat((xx,yy),1).float() - - if mapping.is_cuda: - grid = grid.cuda() - flow = mapping - grid - return flow - - -class CorrelationVolume(nn.Module): - """ - Implementation by Ignacio Rocco - paper: https://arxiv.org/abs/1703.05593 - project: https://github.com/ignacio-rocco/cnngeometric_pytorch - """ - - def __init__(self): - super(CorrelationVolume, self).__init__() - - def forward(self, feature_A, feature_B): - b, c, h, w = feature_A.size() - - # reshape features for matrix multiplication - feature_A = feature_A.transpose(2, 3).contiguous().view(b, c, h * w) # shape (b,c,h*w) - feature_B = feature_B.view(b, c, h * w).transpose(1, 2) # shape (b,h*w,c) - feature_mul = torch.bmm(feature_B, feature_A) # shape (b,h*w,h*w) - correlation_tensor = feature_mul.view(b, h, w, h * w).transpose(2, 3).transpose(1, 2) - return correlation_tensor # shape (b,h*w,h,w) - - -class FeatureL2Norm(nn.Module): - """ - Implementation by Ignacio Rocco - paper: https://arxiv.org/abs/1703.05593 - project: https://github.com/ignacio-rocco/cnngeometric_pytorch - """ - def __init__(self): - super(FeatureL2Norm, self).__init__() - - def forward(self, feature, dim=1): - epsilon = 1e-6 - norm = torch.pow(torch.sum(torch.pow(feature, 2), dim) + epsilon, 0.5).unsqueeze(dim).expand_as(feature) - return torch.div(feature, norm) - - -class OpticalFlowEstimator(nn.Module): - - def __init__(self, in_channels, batch_norm): - super(OpticalFlowEstimator, self).__init__() - - dd = np.cumsum([128,128,96,64,32]) - self.conv_0 = conv(in_channels, 128, kernel_size=3, stride=1, batch_norm=batch_norm) - self.conv_1 = conv(in_channels + dd[0], 128, kernel_size=3, stride=1, batch_norm=batch_norm) - self.conv_2 = conv(in_channels + dd[1], 96, kernel_size=3, stride=1, batch_norm=batch_norm) - self.conv_3 = conv(in_channels + dd[2], 64, kernel_size=3, stride=1, batch_norm=batch_norm) - self.conv_4 = conv(in_channels + dd[3], 32, kernel_size=3, stride=1, batch_norm=batch_norm) - self.predict_flow = predict_flow(in_channels + dd[4]) - - def forward(self, x): - # dense net connection - x = torch.cat((self.conv_0(x), x),1) - x = torch.cat((self.conv_1(x), x),1) - x = torch.cat((self.conv_2(x), x),1) - x = torch.cat((self.conv_3(x), x),1) - x = torch.cat((self.conv_4(x), x),1) - flow = self.predict_flow(x) - return x, flow - - -class OpticalFlowEstimatorNoDenseConnection(nn.Module): - - def __init__(self, in_channels, batch_norm): - super(OpticalFlowEstimatorNoDenseConnection, self).__init__() - self.conv_0 = conv(in_channels, 128, kernel_size=3, stride=1, batch_norm=batch_norm) - self.conv_1 = conv(128, 128, kernel_size=3, stride=1, batch_norm=batch_norm) - self.conv_2 = conv(128, 96, kernel_size=3, stride=1, batch_norm=batch_norm) - self.conv_3 = conv(96, 64, kernel_size=3, stride=1, batch_norm=batch_norm) - self.conv_4 = conv(64, 32, kernel_size=3, stride=1, batch_norm=batch_norm) - self.predict_flow = predict_flow(32) - - def forward(self, x): - x = self.conv_4(self.conv_3(self.conv_2(self.conv_1(self.conv_0(x))))) - flow = self.predict_flow(x) - return x, flow - - -# extracted from DGCNet -def conv_blck(in_channels, out_channels, kernel_size=3, - stride=1, padding=1, dilation=1, bn=False): - if bn: - return nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size, - stride, padding, dilation), - nn.BatchNorm2d(out_channels), - nn.ReLU(inplace=True)) - else: - return nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size, - stride, padding, dilation), - nn.ReLU(inplace=True)) - - -def conv_head(in_channels): - return nn.Conv2d(in_channels, 2, kernel_size=3, padding=1) - - -class CorrespondenceMapBase(nn.Module): - def __init__(self, in_channels, bn=False): - super().__init__() - - def forward(self, x1, x2=None, x3=None): - x = x1 - # concatenating dimensions - if (x2 is not None) and (x3 is None): - x = torch.cat((x1, x2), 1) - elif (x2 is None) and (x3 is not None): - x = torch.cat((x1, x3), 1) - elif (x2 is not None) and (x3 is not None): - x = torch.cat((x1, x2, x3), 1) - - return x - - -class CMDTop(CorrespondenceMapBase): - def __init__(self, in_channels, bn=False): - super().__init__(in_channels, bn) - chan = [128, 128, 96, 64, 32] - self.conv0 = conv_blck(in_channels, chan[0], bn=bn) - self.conv1 = conv_blck(chan[0], chan[1], bn=bn) - self.conv2 = conv_blck(chan[1], chan[2], bn=bn) - self.conv3 = conv_blck(chan[2], chan[3], bn=bn) - self.conv4 = conv_blck(chan[3], chan[4], bn=bn) - self.final = conv_head(chan[-1]) - - def forward(self, x1, x2=None, x3=None): - x = super().forward(x1, x2, x3) - x = self.conv4(self.conv3(self.conv2(self.conv1(self.conv0(x))))) - return self.final(x) - - -def warp(x, flo): - """ - warp an image/tensor (im2) back to im1, according to the optical flow - x: [B, C, H, W] (im2) - flo: [B, 2, H, W] flow - """ - B, C, H, W = x.size() - # mesh grid - xx = torch.arange(0, W).view(1, -1).repeat(H, 1) - yy = torch.arange(0, H).view(-1, 1).repeat(1, W) - xx = xx.view(1, 1, H, W).repeat(B, 1, 1, 1) - yy = yy.view(1, 1, H, W).repeat(B, 1, 1, 1) - grid = torch.cat((xx, yy), 1).float() - - if x.is_cuda: - grid = grid.cuda() - vgrid = grid + flo - # makes a mapping out of the flow - - # scale grid to [-1,1] - vgrid[:, 0, :, :] = 2.0 * vgrid[:, 0, :, :].clone() / max(W - 1, 1) - 1.0 - vgrid[:, 1, :, :] = 2.0 * vgrid[:, 1, :, :].clone() / max(H - 1, 1) - 1.0 - - vgrid = vgrid.permute(0, 2, 3, 1) - - if float(torch.__version__[:3]) >= 1.3: - output = nn.functional.grid_sample(x, vgrid, align_corners=True) - else: - output = nn.functional.grid_sample(x, vgrid) - return output \ No newline at end of file diff --git a/sjlee_backup/loss.py b/sjlee_backup/loss.py deleted file mode 100644 index 6edbb37..0000000 --- a/sjlee_backup/loss.py +++ /dev/null @@ -1,19 +0,0 @@ -import torch - -def loss_superglue(scores, all_matches): - # check if indexed correctly - loss = [] - loss.append(torch.tensor(0.).cuda()) - for i in range(len(all_matches[0])): - x = all_matches[0][i][0] - y = all_matches[0][i][1] - if x>=len(scores[0]) or y>=len(scores[0][0]):continue - - loss.append(-torch.log( scores[0][x][y] )) # check batch size == 1 ? - # for p0 in unmatched0: - # loss += -torch.log(scores[0][p0][-1]) - # for p1 in unmatched1: - # loss += -torch.log(scores[0][-1][p1]) - loss_mean = torch.mean(torch.stack(loss)) - loss_mean = torch.reshape(loss_mean, (1, -1)) - return loss_mean[0] diff --git a/sjlee_backup/losssuperglue.py b/sjlee_backup/losssuperglue.py deleted file mode 100644 index cecca4e..0000000 --- a/sjlee_backup/losssuperglue.py +++ /dev/null @@ -1,19 +0,0 @@ -import torch - -def loss_superglue(scores, all_matches): - # check if indexed correctly - loss = [] - loss.append(torch.tensor(0.).cuda()) - for i in range(len(all_matches[0])): - x = all_matches[0][i][0] - y = all_matches[0][i][1] - - if x>=len(scores[0]) or y>=len(scores[0][0]):continue - loss.append(-torch.log( scores[0][x][y] )) # check batch size == 1 ? - # for p0 in unmatched0: - # loss += -torch.log(scores[0][p0][-1]) - # for p1 in unmatched1: - # loss += -torch.log(scores[0][-1][p1]) - loss_mean = torch.mean(torch.stack(loss)) - loss_mean = torch.reshape(loss_mean, (1, -1)) - return loss_mean[0] diff --git a/sjlee_backup/superglue.py b/sjlee_backup/superglue.py deleted file mode 100644 index 6837d47..0000000 --- a/sjlee_backup/superglue.py +++ /dev/null @@ -1,359 +0,0 @@ -# %BANNER_BEGIN% -# --------------------------------------------------------------------- -# %COPYRIGHT_BEGIN% -# -# Magic Leap, Inc. ("COMPANY") CONFIDENTIAL -# -# Unpublished Copyright (c) 2020 -# Magic Leap, Inc., All Rights Reserved. -# -# NOTICE: All information contained herein is, and remains the property -# of COMPANY. The intellectual and technical concepts contained herein -# are proprietary to COMPANY and may be covered by U.S. and Foreign -# Patents, patents in process, and are protected by trade secret or -# copyright law. Dissemination of this information or reproduction of -# this material is strictly forbidden unless prior written permission is -# obtained from COMPANY. Access to the source code contained herein is -# hereby forbidden to anyone except current COMPANY employees, managers -# or contractors who have executed Confidentiality and Non-disclosure -# agreements explicitly covering such access. -# -# The copyright notice above does not evidence any actual or intended -# publication or disclosure of this source code, which includes -# information that is confidential and/or proprietary, and is a trade -# secret, of COMPANY. ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, -# PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS -# SOURCE CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF COMPANY IS -# STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND -# INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE -# CODE AND/OR RELATED INFORMATION DOES NOT CONVEY OR IMPLY ANY RIGHTS -# TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, -# USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. -# -# %COPYRIGHT_END% -# ---------------------------------------------------------------------- -# %AUTHORS_BEGIN% -# -# Originating Authors: Paul-Edouard Sarlin -# -# %AUTHORS_END% -# --------------------------------------------------------------------*/ -# %BANNER_END% - -from copy import deepcopy -from pathlib import Path -import torch -from torch import nn - - -def MLP(channels: list, do_bn=True): - """ Multi-layer perceptron """ - n = len(channels) - layers = [] - for i in range(1, n): - layers.append( - nn.Conv1d(channels[i - 1], channels[i], kernel_size=1, bias=True)) - if i < (n-1): - if do_bn: - # layers.append(nn.BatchNorm1d(channels[i])) - layers.append(nn.InstanceNorm1d(channels[i])) - layers.append(nn.ReLU()) - return nn.Sequential(*layers) - - -def normalize_keypoints(kpts, image_shape): - """ Normalize keypoints locations based on image image_shape""" - _, _, height, width = image_shape - one = kpts.new_tensor(1) - size = torch.stack([one*width, one*height])[None] - center = size / 2 - scaling = size.max(1, keepdim=True).values * 0.7 - return (kpts - center[:, None, :]) / scaling[:, None, :] - - -class KeypointEncoder(nn.Module): - """ Joint encoding of visual appearance and location using MLPs""" - def __init__(self, feature_dim, layers): - super().__init__() - self.encoder = MLP([3] + layers + [feature_dim]) - nn.init.constant_(self.encoder[-1].bias, 0.0) - - def forward(self, kpts, scores): - inputs = [kpts.transpose(1, 2), scores.unsqueeze(1)] - return self.encoder(torch.cat(inputs, dim=1)) - - -def attention(query, key, value): - dim = query.shape[1] - scores = torch.einsum('bdhn,bdhm->bhnm', query, key) / dim**.5 - prob = torch.nn.functional.softmax(scores, dim=-1) - return torch.einsum('bhnm,bdhm->bdhn', prob, value), prob - - -class MultiHeadedAttention(nn.Module): - """ Multi-head attention to increase model expressivitiy """ - def __init__(self, num_heads: int, d_model: int): - super().__init__() - assert d_model % num_heads == 0 - self.dim = d_model // num_heads - self.num_heads = num_heads - self.merge = nn.Conv1d(d_model, d_model, kernel_size=1) - self.proj = nn.ModuleList([deepcopy(self.merge) for _ in range(3)]) - - def forward(self, query, key, value): - batch_dim = query.size(0) - query, key, value = [l(x).view(batch_dim, self.dim, self.num_heads, -1) - for l, x in zip(self.proj, (query, key, value))] - x, prob = attention(query, key, value) - self.prob.append(prob) - return self.merge(x.contiguous().view(batch_dim, self.dim*self.num_heads, -1)) - - -class AttentionalPropagation(nn.Module): - def __init__(self, feature_dim: int, num_heads: int): - super().__init__() - self.attn = MultiHeadedAttention(num_heads, feature_dim) - self.mlp = MLP([feature_dim*2, feature_dim*2, feature_dim]) - nn.init.constant_(self.mlp[-1].bias, 0.0) - - def forward(self, x, source): - message = self.attn(x, source, source) - return self.mlp(torch.cat([x, message], dim=1)) - - -class AttentionalGNN(nn.Module): - def __init__(self, feature_dim: int, layer_names: list): - super().__init__() - self.layers = nn.ModuleList([ - AttentionalPropagation(feature_dim, 4) - for _ in range(len(layer_names))]) - self.names = layer_names - - def forward(self, desc0, desc1): - for layer, name in zip(self.layers, self.names): - layer.attn.prob = [] - if name == 'cross': - src0, src1 = desc1, desc0 - else: # if name == 'self': - src0, src1 = desc0, desc1 - delta0, delta1 = layer(desc0, src0), layer(desc1, src1) - desc0, desc1 = (desc0 + delta0), (desc1 + delta1) - return desc0, desc1 - - -def log_sinkhorn_iterations(Z, log_mu, log_nu, iters: int): - """ Perform Sinkhorn Normalization in Log-space for stability""" - u, v = torch.zeros_like(log_mu), torch.zeros_like(log_nu) - for _ in range(iters): - u = log_mu - torch.logsumexp(Z + v.unsqueeze(1), dim=2) - v = log_nu - torch.logsumexp(Z + u.unsqueeze(2), dim=1) - return Z + u.unsqueeze(2) + v.unsqueeze(1) - - -def log_optimal_transport(scores, alpha, iters: int): - """ Perform Differentiable Optimal Transport in Log-space for stability""" - b, m, n = scores.shape - one = scores.new_tensor(1) - ms, ns = (m*one).to(scores), (n*one).to(scores) - - bins0 = alpha.expand(b, m, 1) - bins1 = alpha.expand(b, 1, n) - alpha = alpha.expand(b, 1, 1) - - couplings = torch.cat([torch.cat([scores, bins0], -1), - torch.cat([bins1, alpha], -1)], 1) - - norm = - (ms + ns).log() - log_mu = torch.cat([norm.expand(m), ns.log()[None] + norm]) - log_nu = torch.cat([norm.expand(n), ms.log()[None] + norm]) - log_mu, log_nu = log_mu[None].expand(b, -1), log_nu[None].expand(b, -1) - - Z = log_sinkhorn_iterations(couplings, log_mu, log_nu, iters) - Z = Z - norm # multiply probabilities by M+N - return Z - - -def arange_like(x, dim: int): - return x.new_ones(x.shape[dim]).cumsum(0) - 1 # traceable in 1.1 - - -class SuperGlue(nn.Module): - """SuperGlue feature matching middle-end - Given two sets of keypoints and locations, we determine the - correspondences by: - 1. Keypoint Encoding (normalization + visual feature and location fusion) - 2. Graph Neural Network with multiple self and cross-attention layers - 3. Final projection layer - 4. Optimal Transport Layer (a differentiable Hungarian matching algorithm) - 5. Thresholding matrix based on mutual exclusivity and a match_threshold - The correspondence ids use -1 to indicate non-matching points. - Paul-Edouard Sarlin, Daniel DeTone, Tomasz Malisiewicz, and Andrew - Rabinovich. SuperGlue: Learning Feature Matching with Graph Neural - Networks. In CVPR, 2020. https://arxiv.org/abs/1911.11763 - """ - default_config = { - 'descriptor_dim': 256, - 'weights': 'indoor', - 'keypoint_encoder': [32, 64, 128, 256], - 'GNN_layers': ['self', 'cross'] * 9, - 'sinkhorn_iterations': 100, - 'match_threshold': 0.2, - } - - def __init__(self, config): - super().__init__() - self.config = {**self.default_config, **config} - - self.kenc = KeypointEncoder( - self.config['descriptor_dim'], self.config['keypoint_encoder']) - - self.gnn = AttentionalGNN( - self.config['descriptor_dim'], self.config['GNN_layers']) - - self.final_proj = nn.Conv1d( - self.config['descriptor_dim'], self.config['descriptor_dim'], - kernel_size=1, bias=True) - - bin_score = torch.nn.Parameter(torch.tensor(1.)) - self.register_parameter('bin_score', bin_score) - - # assert self.config['weights'] in ['indoor', 'outdoor'] - # path = Path(__file__).parent - # path = path / 'weights/superglue_{}.pth'.format(self.config['weights']) - # self.load_state_dict(torch.load(path)) - # print('Loaded SuperGlue model (\"{}\" weights)'.format( - # self.config['weights'])) - - def forward(self, data): - """Run SuperGlue on a pair of keypoints and descriptors""" - desc0, desc1 = data['descriptors0'], data['descriptors1'] - kpts0, kpts1 = data['keypoints0'], data['keypoints1'] - - """ - desc0 = desc0.transpose(0,1) - desc1 = desc1.transpose(0,1) - kpts0 = torch.reshape(kpts0, (1, -1, 2)) - kpts1 = torch.reshape(kpts1, (1, -1, 2)) - """ - - if kpts0.shape[1] == 0 or kpts1.shape[1] == 0: # no keypoints - shape0, shape1 = kpts0.shape[:-1], kpts1.shape[:-1] - return { - 'matches0': kpts0.new_full(shape0, -1, dtype=torch.int)[0], - 'matches1': kpts1.new_full(shape1, -1, dtype=torch.int)[0], - 'matching_scores0': kpts0.new_zeros(shape0)[0], - 'matching_scores1': kpts1.new_zeros(shape1)[0], - 'skip_train': True - } - - """ - file_name = data['file_name'] - all_matches = data['all_matches'].permute(1,2,0) # shape=torch.Size([1, 87, 2]) - """ - - # Keypoint normalization. - kpts0 = normalize_keypoints(kpts0, data['image0'].shape) - kpts1 = normalize_keypoints(kpts1, data['image1'].shape) - - # Keypoint MLP encoder. - """ - desc0 = desc0 + self.kenc(kpts0, torch.transpose(data['scores0'], 0, 1)) - desc1 = desc1 + self.kenc(kpts1, torch.transpose(data['scores1'], 0, 1)) - """ - desc0 = desc0 + self.kenc(kpts0, data['scores0']) - desc1 = desc1 + self.kenc(kpts1, data['scores1']) - - # Multi-layer Transformer network. - desc0, desc1 = self.gnn(desc0, desc1) - - # Final MLP projection. - mdesc0, mdesc1 = self.final_proj(desc0), self.final_proj(desc1) - - # Compute matching descriptor distance. - scores = torch.einsum('bdn,bdm->bnm', mdesc0, mdesc1) - scores = scores / self.config['descriptor_dim']**.5 - - # Run the optimal transport. - scores = log_optimal_transport( - scores, self.bin_score, - iters=self.config['sinkhorn_iterations']) - - # Get the matches with score above "match_threshold". - max0, max1 = scores[:, :-1, :-1].max(2), scores[:, :-1, :-1].max(1) - indices0, indices1 = max0.indices, max1.indices - mutual0 = arange_like(indices0, 1)[None] == indices1.gather(1, indices0) - mutual1 = arange_like(indices1, 1)[None] == indices0.gather(1, indices1) - zero = scores.new_tensor(0) - mscores0 = torch.where(mutual0, max0.values.exp(), zero) - mscores1 = torch.where(mutual1, mscores0.gather(1, indices1), zero) - valid0 = mutual0 & (mscores0 > self.config['match_threshold']) - valid1 = mutual1 & valid0.gather(1, indices1) - indices0 = torch.where(valid0, indices0, indices0.new_tensor(-1)) - indices1 = torch.where(valid1, indices1, indices1.new_tensor(-1)) - - """ - # check if indexed correctly - loss = [] - for i in range(len(all_matches[0])): - x = all_matches[0][i][0] - y = all_matches[0][i][1] - loss.append(-torch.log( scores[0][x][y].exp() )) # check batch size == 1 ? - # for p0 in unmatched0: - # loss += -torch.log(scores[0][p0][-1]) - # for p1 in unmatched1: - # loss += -torch.log(scores[0][-1][p1]) - loss_mean = torch.mean(torch.stack(loss)) - loss_mean = torch.reshape(loss_mean, (1, -1)) - """ - - return { - 'matches0': indices0[0], # use -1 for invalid match - 'matches1': indices1[0], # use -1 for invalid match - 'matching_scores0': mscores0[0], - 'matching_scores1': mscores1[0], - # 'loss': loss_mean[0], - 'skip_train': False - } - - # scores big value or small value means confidence? log can't take neg value - -if __name__ == '__main__': - from superpoint import SuperPoint - - config = { - 'superpoint': { - 'nms_radius': 4, - 'keypoint_threshold': 0.005, - 'max_keypoints': -1 - }, - 'superglue': { - 'weights': 'indoor', - 'sinkhorn_iterations': 20, - 'match_threshold':0.2, - } - } - - data = { - 'image0': torch.randn(1, 1, 512, 512), - 'image1': torch.randn(1, 1, 512, 512) - } - - superpoint = SuperPoint(config.get('superpoint', {})) - - output1 = superpoint({'image': data['image0']}) - output2 = superpoint({'image': data['image1']}) - - pred = {} - - pred = {**pred, **{k+'0': v for k, v in output1.items()}} - pred = {**pred, **{k+'1': v for k, v in output2.items()}} - - data = {**data, **pred} - - for k in data: - if isinstance(data[k], (list, tuple)): - data[k] = torch.stack(data[k]) - - print(data['descriptors0'].shape) - superglue = SuperGlue(config.get('superglue', {})) - superglue(data) \ No newline at end of file diff --git a/sjlee_backup/superglue2.py b/sjlee_backup/superglue2.py deleted file mode 100644 index 5bd4028..0000000 --- a/sjlee_backup/superglue2.py +++ /dev/null @@ -1,326 +0,0 @@ - -# %BANNER_BEGIN% -# --------------------------------------------------------------------- -# %COPYRIGHT_BEGIN% -# -# Magic Leap, Inc. ("COMPANY") CONFIDENTIAL -# -# Unpublished Copyright (c) 2020 -# Magic Leap, Inc., All Rights Reserved. -# -# NOTICE: All information contained herein is, and remains the property -# of COMPANY. The intellectual and technical concepts contained herein -# are proprietary to COMPANY and may be covered by U.S. and Foreign -# Patents, patents in process, and are protected by trade secret or -# copyright law. Dissemination of this information or reproduction of -# this material is strictly forbidden unless prior written permission is -# obtained from COMPANY. Access to the source code contained herein is -# hereby forbidden to anyone except current COMPANY employees, managers -# or contractors who have executed Confidentiality and Non-disclosure -# agreements explicitly covering such access. -# -# The copyright notice above does not evidence any actual or intended -# publication or disclosure of this source code, which includes -# information that is confidential and/or proprietary, and is a trade -# secret, of COMPANY. ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, -# PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS -# SOURCE CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF COMPANY IS -# STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND -# INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE -# CODE AND/OR RELATED INFORMATION DOES NOT CONVEY OR IMPLY ANY RIGHTS -# TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, -# USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. -# -# %COPYRIGHT_END% -# ---------------------------------------------------------------------- -# %AUTHORS_BEGIN% -# -# Originating Authors: Paul-Edouard Sarlin -# -# %AUTHORS_END% -# --------------------------------------------------------------------*/ -# %BANNER_END% - -from copy import deepcopy -from pathlib import Path -from typing import List, Tuple - -import torch -from torch import nn - - -def MLP(channels: List[int], do_bn: bool = True) -> nn.Module: - """ Multi-layer perceptron """ - n = len(channels) - layers = [] - for i in range(1, n): - layers.append( - nn.Conv1d(channels[i - 1], channels[i], kernel_size=1, bias=True)) - if i < (n-1): - if do_bn: - layers.append(nn.BatchNorm1d(channels[i])) - layers.append(nn.ReLU()) - return nn.Sequential(*layers) - - -def normalize_keypoints(kpts, image_shape): - """ Normalize keypoints locations based on image image_shape""" - _, _, height, width = image_shape - one = kpts.new_tensor(1) - size = torch.stack([one*width, one*height])[None] - center = size / 2 - scaling = size.max(1, keepdim=True).values * 0.7 - return (kpts - center[:, None, :]) / scaling[:, None, :] - - -class KeypointEncoder(nn.Module): - """ Joint encoding of visual appearance and location using MLPs""" - def __init__(self, feature_dim: int, layers: List[int]) -> None: - super().__init__() - self.encoder = MLP([3] + layers + [feature_dim]) - nn.init.constant_(self.encoder[-1].bias, 0.0) - - def forward(self, kpts, scores): - inputs = [kpts.transpose(1, 2), scores.unsqueeze(1)] - return self.encoder(torch.cat(inputs, dim=1)) - - -def attention(query: torch.Tensor, key: torch.Tensor, value: torch.Tensor) -> Tuple[torch.Tensor,torch.Tensor]: - dim = query.shape[1] - scores = torch.einsum('bdhn,bdhm->bhnm', query, key) / dim**.5 - prob = torch.nn.functional.softmax(scores, dim=-1) - return torch.einsum('bhnm,bdhm->bdhn', prob, value), prob - - -class MultiHeadedAttention(nn.Module): - """ Multi-head attention to increase model expressivitiy """ - def __init__(self, num_heads: int, d_model: int): - super().__init__() - assert d_model % num_heads == 0 - self.dim = d_model // num_heads - self.num_heads = num_heads - self.merge = nn.Conv1d(d_model, d_model, kernel_size=1) - self.proj = nn.ModuleList([deepcopy(self.merge) for _ in range(3)]) - - def forward(self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor) -> torch.Tensor: - batch_dim = query.size(0) - query, key, value = [l(x).view(batch_dim, self.dim, self.num_heads, -1) - for l, x in zip(self.proj, (query, key, value))] - x, _ = attention(query, key, value) - return self.merge(x.contiguous().view(batch_dim, self.dim*self.num_heads, -1)) - - -class AttentionalPropagation(nn.Module): - def __init__(self, feature_dim: int, num_heads: int): - super().__init__() - self.attn = MultiHeadedAttention(num_heads, feature_dim) - self.mlp = MLP([feature_dim*2, feature_dim*2, feature_dim]) - nn.init.constant_(self.mlp[-1].bias, 0.0) - - def forward(self, x: torch.Tensor, source: torch.Tensor) -> torch.Tensor: - message = self.attn(x, source, source) - return self.mlp(torch.cat([x, message], dim=1)) - - -class AttentionalGNN(nn.Module): - def __init__(self, feature_dim: int, layer_names: List[str]) -> None: - super().__init__() - self.layers = nn.ModuleList([ - AttentionalPropagation(feature_dim, 4) - for _ in range(len(layer_names))]) - self.names = layer_names - - def forward(self, desc0: torch.Tensor, desc1: torch.Tensor) -> Tuple[torch.Tensor,torch.Tensor]: - for layer, name in zip(self.layers, self.names): - if name == 'cross': - src0, src1 = desc1, desc0 - else: # if name == 'self': - src0, src1 = desc0, desc1 - delta0, delta1 = layer(desc0, src0), layer(desc1, src1) - desc0, desc1 = (desc0 + delta0), (desc1 + delta1) - return desc0, desc1 - - -def log_sinkhorn_iterations(Z: torch.Tensor, log_mu: torch.Tensor, log_nu: torch.Tensor, iters: int) -> torch.Tensor: - """ Perform Sinkhorn Normalization in Log-space for stability""" - u, v = torch.zeros_like(log_mu), torch.zeros_like(log_nu) - for _ in range(iters): - u = log_mu - torch.logsumexp(Z + v.unsqueeze(1), dim=2) - v = log_nu - torch.logsumexp(Z + u.unsqueeze(2), dim=1) - - return Z + u.unsqueeze(2) + v.unsqueeze(1) - - -def log_optimal_transport(scores: torch.Tensor, alpha: torch.Tensor, iters: int) -> torch.Tensor: - """ Perform Differentiable Optimal Transport in Log-space for stability""" - b, m, n = scores.shape - one = scores.new_tensor(1) - ms, ns = (m*one).to(scores), (n*one).to(scores) - - bins0 = alpha.expand(b, m, 1) - bins1 = alpha.expand(b, 1, n) - alpha = alpha.expand(b, 1, 1) - - couplings = torch.cat([torch.cat([scores, bins0], -1), - torch.cat([bins1, alpha], -1)], 1) - - norm = - (ms + ns).log() - log_mu = torch.cat([norm.expand(m), ns.log()[None] + norm]) - log_nu = torch.cat([norm.expand(n), ms.log()[None] + norm]) - log_mu, log_nu = log_mu[None].expand(b, -1), log_nu[None].expand(b, -1) - - Z = log_sinkhorn_iterations(couplings, log_mu, log_nu, iters) - Z = Z - norm # multiply probabilities by M+N - return Z - - -def arange_like(x, dim: int): - return x.new_ones(x.shape[dim]).cumsum(0) - 1 # traceable in 1.1 - - -class SuperGlue(nn.Module): - """SuperGlue feature matching middle-end - Given two sets of keypoints and locations, we determine the - correspondences by: - 1. Keypoint Encoding (normalization + visual feature and location fusion) - 2. Graph Neural Network with multiple self and cross-attention layers - 3. Final projection layer - 4. Optimal Transport Layer (a differentiable Hungarian matching algorithm) - 5. Thresholding matrix based on mutual exclusivity and a match_threshold - The correspondence ids use -1 to indicate non-matching points. - Paul-Edouard Sarlin, Daniel DeTone, Tomasz Malisiewicz, and Andrew - Rabinovich. SuperGlue: Learning Feature Matching with Graph Neural - Networks. In CVPR, 2020. https://arxiv.org/abs/1911.11763 - """ - default_config = { - 'descriptor_dim': 256, - 'weights': 'indoor', - 'keypoint_encoder': [32, 64, 128, 256], - 'GNN_layers': ['self', 'cross'] * 9, - 'sinkhorn_iterations': 100, - 'match_threshold': 0.2, - } - - def __init__(self, config): - super().__init__() - self.config = {**self.default_config, **config} - - self.kenc = KeypointEncoder( - self.config['descriptor_dim'], self.config['keypoint_encoder']) - - self.gnn = AttentionalGNN( - feature_dim=self.config['descriptor_dim'], layer_names=self.config['GNN_layers']) - - self.final_proj = nn.Conv1d( - self.config['descriptor_dim'], self.config['descriptor_dim'], - kernel_size=1, bias=True) - - bin_score = torch.nn.Parameter(torch.tensor(1.)) - self.register_parameter('bin_score', bin_score) - - assert self.config['weights'] in ['indoor', 'outdoor'] - path = Path(__file__).parent - path = path / 'weights/superglue_{}.pth'.format(self.config['weights']) - self.load_state_dict(torch.load(str(path))) - print('Loaded SuperGlue model (\"{}\" weights)'.format( - self.config['weights'])) - - def forward(self, data): - """Run SuperGlue on a pair of keypoints and descriptors""" - desc0, desc1 = data['descriptors0'], data['descriptors1'] - kpts0, kpts1 = data['keypoints0'], data['keypoints1'] - - if kpts0.shape[1] == 0 or kpts1.shape[1] == 0: # no keypoints - shape0, shape1 = kpts0.shape[:-1], kpts1.shape[:-1] - return { - 'matches0': kpts0.new_full(shape0, -1, dtype=torch.int), - 'matches1': kpts1.new_full(shape1, -1, dtype=torch.int), - 'matching_scores0': kpts0.new_zeros(shape0), - 'matching_scores1': kpts1.new_zeros(shape1), - } - - # Keypoint normalization. - kpts0 = normalize_keypoints(kpts0, data['image0'].shape) - kpts1 = normalize_keypoints(kpts1, data['image1'].shape) - - # Keypoint MLP encoder. - desc0 = desc0 + self.kenc(kpts0, data['scores0']) - desc1 = desc1 + self.kenc(kpts1, data['scores1']) - - # Multi-layer Transformer network. - desc0, desc1 = self.gnn(desc0, desc1) - - # Final MLP projection. - mdesc0, mdesc1 = self.final_proj(desc0), self.final_proj(desc1) - - # Compute matching descriptor distance. - scores = torch.einsum('bdn,bdm->bnm', mdesc0, mdesc1) - scores = scores / self.config['descriptor_dim']**.5 - - print(scores.shape) - - # Run the optimal transport. - scores = log_optimal_transport( - scores, self.bin_score, - iters=self.config['sinkhorn_iterations']) - - # Get the matches with score above "match_threshold". - max0, max1 = scores[:, :-1, :-1].max(2), scores[:, :-1, :-1].max(1) - indices0, indices1 = max0.indices, max1.indices - mutual0 = arange_like(indices0, 1)[None] == indices1.gather(1, indices0) - mutual1 = arange_like(indices1, 1)[None] == indices0.gather(1, indices1) - zero = scores.new_tensor(0) - mscores0 = torch.where(mutual0, max0.values.exp(), zero) - mscores1 = torch.where(mutual1, mscores0.gather(1, indices1), zero) - valid0 = mutual0 & (mscores0 > self.config['match_threshold']) - valid1 = mutual1 & valid0.gather(1, indices1) - indices0 = torch.where(valid0, indices0, indices0.new_tensor(-1)) - indices1 = torch.where(valid1, indices1, indices1.new_tensor(-1)) - - print(scores.shape) - return { - 'matches0': indices0, # use -1 for invalid match - 'matches1': indices1, # use -1 for invalid match - 'matching_scores0': mscores0, - 'matching_scores1': mscores1, - } - -if __name__ == '__main__': - from superpoint import SuperPoint - - config = { - 'superpoint': { - 'nms_radius': 4, - 'keypoint_threshold': 0.005, - 'max_keypoints': -1 - }, - 'superglue': { - 'weights': 'indoor', - 'sinkhorn_iterations': 20, - 'match_threshold':0.2, - } - } - - data = { - 'image0': torch.randn(1, 1, 512, 512), - 'image1': torch.randn(1, 1, 512, 512) - } - - superpoint = SuperPoint(config.get('superpoint', {})) - - output1 = superpoint({'image': data['image0']}) - output2 = superpoint({'image': data['image1']}) - - pred = {} - - pred = {**pred, **{k+'0': v for k, v in output1.items()}} - pred = {**pred, **{k+'1': v for k, v in output2.items()}} - - data = {**data, **pred} - - for k in data: - if isinstance(data[k], (list, tuple)): - data[k] = torch.stack(data[k]) - - superglue = SuperGlue(config.get('superglue', {})) - output = superglue(data) diff --git a/sjlee_backup/superpoint.py b/sjlee_backup/superpoint.py deleted file mode 100644 index 14a07fd..0000000 --- a/sjlee_backup/superpoint.py +++ /dev/null @@ -1,222 +0,0 @@ -# %BANNER_BEGIN% -# --------------------------------------------------------------------- -# %COPYRIGHT_BEGIN% -# -# Magic Leap, Inc. ("COMPANY") CONFIDENTIAL -# -# Unpublished Copyright (c) 2020 -# Magic Leap, Inc., All Rights Reserved. -# -# NOTICE: All information contained herein is, and remains the property -# of COMPANY. The intellectual and technical concepts contained herein -# are proprietary to COMPANY and may be covered by U.S. and Foreign -# Patents, patents in process, and are protected by trade secret or -# copyright law. Dissemination of this information or reproduction of -# this material is strictly forbidden unless prior written permission is -# obtained from COMPANY. Access to the source code contained herein is -# hereby forbidden to anyone except current COMPANY employees, managers -# or contractors who have executed Confidentiality and Non-disclosure -# agreements explicitly covering such access. -# -# The copyright notice above does not evidence any actual or intended -# publication or disclosure of this source code, which includes -# information that is confidential and/or proprietary, and is a trade -# secret, of COMPANY. ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, -# PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS -# SOURCE CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF COMPANY IS -# STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND -# INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE -# CODE AND/OR RELATED INFORMATION DOES NOT CONVEY OR IMPLY ANY RIGHTS -# TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, -# USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. -# -# %COPYRIGHT_END% -# ---------------------------------------------------------------------- -# %AUTHORS_BEGIN% -# -# Originating Authors: Paul-Edouard Sarlin -# -# %AUTHORS_END% -# --------------------------------------------------------------------*/ -# %BANNER_END% - -from pathlib import Path -import torch -from torch import nn - -def simple_nms(scores, nms_radius: int): - """ Fast Non-maximum suppression to remove nearby points """ - assert(nms_radius >= 0) - - def max_pool(x): - return torch.nn.functional.max_pool2d( - x, kernel_size=nms_radius*2+1, stride=1, padding=nms_radius) - - zeros = torch.zeros_like(scores) - max_mask = scores == max_pool(scores) - for _ in range(2): - supp_mask = max_pool(max_mask.float()) > 0 - supp_scores = torch.where(supp_mask, zeros, scores) - new_max_mask = supp_scores == max_pool(supp_scores) - max_mask = max_mask | (new_max_mask & (~supp_mask)) - return torch.where(max_mask, scores, zeros) - - -def remove_borders(keypoints, scores, border: int, height: int, width: int): - """ Removes keypoints too close to the border """ - mask_h = (keypoints[:, 0] >= border) & (keypoints[:, 0] < (height - border)) - mask_w = (keypoints[:, 1] >= border) & (keypoints[:, 1] < (width - border)) - mask = mask_h & mask_w - return keypoints[mask], scores[mask] - - -def top_k_keypoints(keypoints, scores, k: int): - if k >= len(keypoints): - return keypoints, scores - scores, indices = torch.topk(scores, k, dim=0) - return keypoints[indices], scores - - -def sample_descriptors(keypoints, descriptors, s: int = 8): - """ Interpolate descriptors at keypoint locations """ - b, c, h, w = descriptors.shape - keypoints = keypoints - s / 2 + 0.5 - keypoints /= torch.tensor([(w*s - s/2 - 0.5), (h*s - s/2 - 0.5)], - ).to(keypoints)[None] - keypoints = keypoints*2 - 1 # normalize to (-1, 1) - args = {'align_corners': True} if int(torch.__version__[2]) > 2 else {} - descriptors = torch.nn.functional.grid_sample( - descriptors, keypoints.view(b, 1, -1, 2), mode='bilinear', **args) - descriptors = torch.nn.functional.normalize( - descriptors.reshape(b, c, -1), p=2, dim=1) - return descriptors - - -class SuperPoint(nn.Module): - """SuperPoint Convolutional Detector and Descriptor - SuperPoint: Self-Supervised Interest Point Detection and - Description. Daniel DeTone, Tomasz Malisiewicz, and Andrew - Rabinovich. In CVPRW, 2019. https://arxiv.org/abs/1712.07629 - """ - default_config = { - 'descriptor_dim': 256, - 'nms_radius': 4, - 'keypoint_threshold': 0.005, - 'max_keypoints': -1, - 'remove_borders': 4, - } - - def __init__(self, config): - super().__init__() - self.config = {**self.default_config, **config} - - self.relu = nn.ReLU(inplace=True) - self.pool = nn.MaxPool2d(kernel_size=2, stride=2) - c1, c2, c3, c4, c5 = 64, 64, 128, 128, 256 - - self.conv1a = nn.Conv2d(1, c1, kernel_size=3, stride=1, padding=1) - self.conv1b = nn.Conv2d(c1, c1, kernel_size=3, stride=1, padding=1) - self.conv2a = nn.Conv2d(c1, c2, kernel_size=3, stride=1, padding=1) - self.conv2b = nn.Conv2d(c2, c2, kernel_size=3, stride=1, padding=1) - self.conv3a = nn.Conv2d(c2, c3, kernel_size=3, stride=1, padding=1) - self.conv3b = nn.Conv2d(c3, c3, kernel_size=3, stride=1, padding=1) - self.conv4a = nn.Conv2d(c3, c4, kernel_size=3, stride=1, padding=1) - self.conv4b = nn.Conv2d(c4, c4, kernel_size=3, stride=1, padding=1) - - self.convPa = nn.Conv2d(c4, c5, kernel_size=3, stride=1, padding=1) - self.convPb = nn.Conv2d(c5, 65, kernel_size=1, stride=1, padding=0) - - self.convDa = nn.Conv2d(c4, c5, kernel_size=3, stride=1, padding=1) - self.convDb = nn.Conv2d( - c5, self.config['descriptor_dim'], - kernel_size=1, stride=1, padding=0) - - path = Path(__file__).parent / 'weights/superpoint_v1.pth' - self.load_state_dict(torch.load(str(path))) - - mk = self.config['max_keypoints'] - if mk == 0 or mk < -1: - raise ValueError('\"max_keypoints\" must be positive or \"-1\"') - - print('Loaded SuperPoint model') - - def forward(self, data): - """ Compute keypoints, scores, descriptors for image """ - # Shared Encoder - x = self.relu(self.conv1a(data['image'])) - x = self.relu(self.conv1b(x)) - x = self.pool(x) - x = self.relu(self.conv2a(x)) - x = self.relu(self.conv2b(x)) - x = self.pool(x) - x = self.relu(self.conv3a(x)) - x = self.relu(self.conv3b(x)) - x = self.pool(x) - x = self.relu(self.conv4a(x)) - x = self.relu(self.conv4b(x)) - - # Compute the dense keypoint scores - cPa = self.relu(self.convPa(x)) - scores = self.convPb(cPa) - scores = torch.nn.functional.softmax(scores, 1)[:, :-1] - b, _, h, w = scores.shape - scores = scores.permute(0, 2, 3, 1).reshape(b, h, w, 8, 8) - scores = scores.permute(0, 1, 3, 2, 4).reshape(b, h*8, w*8) - scores = simple_nms(scores, self.config['nms_radius']) - - # Extract keypoints - keypoints = [ - torch.nonzero(s > self.config['keypoint_threshold']) - for s in scores] - scores = [s[tuple(k.t())] for s, k in zip(scores, keypoints)] - - # Discard keypoints near the image borders - keypoints, scores = list(zip(*[ - remove_borders(k, s, self.config['remove_borders'], h*8, w*8) - for k, s in zip(keypoints, scores)])) - - # Keep the k keypoints with highest score - if self.config['max_keypoints'] >= 0: - keypoints, scores = list(zip(*[ - top_k_keypoints(k, s, self.config['max_keypoints']) - for k, s in zip(keypoints, scores)])) - - # Convert (h, w) to (x, y) - keypoints = [torch.flip(k, [1]).float() for k in keypoints] - - # Compute the dense descriptors - cDa = self.relu(self.convDa(x)) - descriptors = self.convDb(cDa) - descriptors = torch.nn.functional.normalize(descriptors, p=2, dim=1) - - # Extract descriptors - descriptors = [sample_descriptors(k[None], d[None], 8)[0] - for k, d in zip(keypoints, descriptors)] - - return { - 'keypoints': keypoints, - 'scores': scores, - 'descriptors': descriptors, - } - -if __name__ == '__main__': - config = { - 'superpoint': { - 'nms_radius': 4, - 'keypoint_threshold': 0.005, - 'max_keypoints': -1 - }, - 'superglue': { - 'weights': 'indoor', - 'sinkhorn_iterations': 20, - 'match_threshold':0.2, - } - } - - test_img = torch.randn(1, 1, 512, 512) - data = {'image': test_img} - - superpoint = SuperPoint(config.get('superpoint', {})) - output = superpoint(data) - - print(output['keypoints'][0].shape, output['descriptors'][0].shape) \ No newline at end of file diff --git a/sjlee_backup/train_pseudo.py b/sjlee_backup/train_pseudo.py deleted file mode 100644 index 3c09bf4..0000000 --- a/sjlee_backup/train_pseudo.py +++ /dev/null @@ -1,41 +0,0 @@ - -""" -1. config 아래와 같이 설정 -2. weights은 상황에 맞게 indoor, outdoor 설정해주어야 함 -config = { - 'superpoint': { - 'nms_radius': 4, - 'keypoint_threshold': 0.005, - 'max_keypoints': 1024 - }, - 'superglue': { - 'weights': 'outdoor', - 'sinkhorn_iterations': 20, - 'match_threshold':0.2 - } - } -""" - -""" -# start training -for epoch in range(1, opt.epoch+1): - epoch_loss = 0 - superglue.double().train() - for i, pred in enumerate(train_loader): - for k in pred: - if k != 'file_name' and k!='image0' and k!='image1': - if type(pred[k]) == torch.Tensor: - pred[k] = Variable(pred[k].cuda()) - else: - pred[k] = Variable(torch.stack(pred[k]).cuda()) - - # =========== new code =============== # - scores, data = superglue(pred) - loss = loss_superglue(scores, data['all_matches'].permute(1, 2, 0)) - - for k, v in pred.items(): - pred[k] = v[0] - pred = {**pred, **data, **{'loss', loss}} - - # ... keep going -""" \ No newline at end of file