-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauthorization.py
More file actions
112 lines (96 loc) · 5.06 KB
/
authorization.py
File metadata and controls
112 lines (96 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# This Python file uses the following encoding: utf-8
import sys, logging, requests, time
from urllib.error import URLError, HTTPError
from PyQt5.QtCore import QThread
from PyQt5.QtCore import pyqtSignal as Signal
oidcConfig = {
'client_id': '17974ba8-746b-4e50-bcc8-6a087ac0e685',
#'client_id': '5b4f8edf-a319-4405-8705-58b2380c1fc1',
'scope': 'idp openid profile offline_access citiobs.secd.eu#create',
'oidc_configuration': 'https://authenix.eu/.well-known/openid-configuration'
}
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger()
class Authorization(QThread):
signalSetAuthLink = Signal(str)
signalDisplayError = Signal(str)
signalAuthApproval = Signal(dict)
baseUrl = None
openidConfiguration = None
def __init__(self):
super().__init__()
def getOpenIdConfig(self):
logger.debug(f"getOpenIdConfig({self.baseUrl})")
try:
url = oidcConfig['oidc_configuration']
with requests.get(url) as response:
self.openidConfiguration = response.json()
logger.debug(f"OIDC configuration: {self.openidConfiguration}")
except HTTPError as e:
logger.debug(f"HTTP error: {e.code}")
self.signalDisplayError.emit(f"HTTP error: {e.code}")
except URLError as e:
logger.debug(f"URL error: {e.reason}")
self.signalDisplayError.emit(f"HTTP error: {e.reason}")
except ValueError as e:
logger.debug(f"Value error: {e}")
self.signalDisplayError.emit(f"HTTP error: {e}")
def getAuthorizationCode(self):
logger.debug("running device authorization")
logger.info('Obtaining OAuth2 Device Flow user code')
with requests.post(self.openidConfiguration['device_authorization_endpoint'],
data = {'client_id': oidcConfig['client_id'],
'scope': oidcConfig['scope']}) as device_response:
if device_response.status_code == 429:
logger.error(device_response.headers)
retry = 10
if 'Retry-After' in device_response.headers:
retry = device_response.headers['Retry-After']
logger.error(device_response.text)
self.signalDisplayError.emit(f"Please retry in {retry} seconds...")
self.terminate()
elif device_response.status_code != 200:
logger.error(device_response.text)
self.signalDisplayError.emit("Error fetching Authorization Code")
self.terminate()
else:
self.user_code = device_response.json()
self.user_code['expires'] = int(time.time()) + self.user_code['expires_in']
logger.debug(self.user_code)
self.signalSetAuthLink.emit(self.user_code['verification_uri_complete'])
def pollForAuthorization(self):
logger.debug("polling for authorization...")
# calculate retry attempts based on deplay and code expiration
retries = 48 # 48 * 5s -> 240s or 4min
for ix in range(1, retries):
ix += 1
with requests.post(self.openidConfiguration['token_endpoint'],
data = {'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'scope': oidcConfig['scope'],
'client_id': oidcConfig['client_id'],
'device_code': self.user_code['device_code']}) as token_response:
logger.debug(f"device_token response status code: {token_response.status_code}")
logger.debug(token_response.text)
if token_response.status_code == 200:
self.token_data = token_response.json()
self.token_data['refresh_token_expires_in'] = 30 * 24* 60 * 60 #30 days
now = int(time.time())
self.token_data['access_token_expires'] = self.token_data['expires_in'] + now
self.token_data['refresh_token_expires'] = self.token_data['refresh_token_expires_in'] + now
logger.debug(f'token_data: {self.token_data}')
self.signalAuthApproval.emit(self.token_data)
return
elif token_response.status_code == 400:
interval = self.user_code['interval'] if 'interval' in self.user_code else 5
time.sleep(interval)
elif token_response.status_code == 429:
interval = self.user_code['interval'] if 'interval' in self.user_code else 5
time.sleep(2*interval)
else:
logger.debug(f"unknown device_token response status code: {token_response.status_code}")
def run(self):
self.getOpenIdConfig()
self.getAuthorizationCode()
self.pollForAuthorization()
logger.debug("User inactive")
self.signalDisplayError.emit("Authorization timeout. Please retry...")