-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoauth_handler.py
More file actions
132 lines (108 loc) · 4.45 KB
/
oauth_handler.py
File metadata and controls
132 lines (108 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
Google OAuth Authentication Handler
Provides OAuth login/logout functionality for Wikipedia API
"""
from __future__ import annotations
from google_auth_oauthlib.flow import Flow
from google.oauth2 import id_token
from google.auth.transport import requests as google_requests
import os
import secrets
from typing import Optional, Tuple
from fastapi import HTTPException
from datetime import datetime, timedelta
# OAuth Configuration
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "")
OAUTH_REDIRECT_URI = os.environ.get("OAUTH_REDIRECT_URI", "https://wiki.built-simple.ai/auth/google/callback")
APP_URL = os.environ.get("APP_URL", "https://wiki.built-simple.ai")
# Session storage (in-memory, replace with Redis for production)
oauth_sessions = {}
class OAuthHandler:
"""Handle Google OAuth authentication"""
def __init__(self):
if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET:
raise ValueError("Google OAuth credentials not configured")
self.client_config = {
"web": {
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [OAUTH_REDIRECT_URI]
}
}
self.scopes = [
'openid',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile'
]
def create_authorization_url(self) -> tuple[str, str]:
"""
Create authorization URL for Google OAuth.
Returns: (authorization_url, state)
"""
flow = Flow.from_client_config(
self.client_config,
scopes=self.scopes,
redirect_uri=OAUTH_REDIRECT_URI
)
authorization_url, state = flow.authorization_url(
access_type='offline',
include_granted_scopes='true',
prompt='select_account'
)
# Store state in session for CSRF protection
oauth_sessions[state] = {
'created_at': datetime.now(),
'flow': flow
}
return authorization_url, state
def handle_callback(self, state: str, code: str, error: Optional[str] = None) -> dict:
"""
Handle OAuth callback from Google.
Returns: user_info dict with email, name, picture
"""
if error:
raise HTTPException(status_code=400, detail=f"OAuth error: {error}")
if not state or state not in oauth_sessions:
raise HTTPException(status_code=400, detail="Invalid or expired state")
session = oauth_sessions.pop(state)
# Check session age (expire after 10 minutes)
if datetime.now() - session['created_at'] > timedelta(minutes=10):
raise HTTPException(status_code=400, detail="OAuth session expired")
flow = session['flow']
try:
# Exchange authorization code for tokens
flow.fetch_token(code=code)
credentials = flow.credentials
# Verify ID token and extract user info
idinfo = id_token.verify_oauth2_token(
credentials.id_token,
google_requests.Request(),
GOOGLE_CLIENT_ID
)
# Extract user information
user_info = {
'email': idinfo.get('email'),
'name': idinfo.get('name'),
'picture': idinfo.get('picture'),
'google_id': idinfo.get('sub'),
'email_verified': idinfo.get('email_verified', False)
}
return user_info
except ValueError as e:
raise HTTPException(status_code=400, detail=f"Token verification failed: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"OAuth error: {str(e)}")
def cleanup_expired_sessions(self):
"""Remove expired OAuth sessions (older than 10 minutes)"""
now = datetime.now()
expired = [
state for state, session in oauth_sessions.items()
if now - session['created_at'] > timedelta(minutes=10)
]
for state in expired:
del oauth_sessions[state]
# Global OAuth handler instance
oauth_handler = OAuthHandler() if GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET else None