-
Notifications
You must be signed in to change notification settings - Fork 383
Open
Description
I am using google api oauth authentication, and it is easier to have these helper functions to login
"""
OAuth2 authentication helper for Gmail
"""
import os
import json
import base64
import urllib.parse
import urllib.request
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
class OAuthHandler(BaseHTTPRequestHandler):
"""HTTP handler to capture OAuth callback"""
def do_GET(self):
"""Handle the OAuth callback"""
query_components = parse_qs(urlparse(self.path).query)
if 'code' in query_components:
self.server.auth_code = query_components['code'][0]
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'<html><body><h1>Authentication successful!</h1><p>You can close this window now.</p></body></html>')
elif 'error' in query_components:
self.server.auth_code = None
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'<html><body><h1>Authentication failed!</h1><p>You can close this window now.</p></body></html>')
def log_message(self, format, *args):
"""Suppress log messages"""
pass
def get_oauth_credentials(credentials_file='credentials.json', token_file='token.json'):
"""
Get OAuth2 access token for Gmail API
Args:
credentials_file: Path to the OAuth2 client credentials JSON file
token_file: Path to store/retrieve the access token
Returns:
dict: Token information including access_token, refresh_token, etc.
"""
# Check if we have a saved token
if os.path.exists(token_file):
with open(token_file, 'r') as f:
token_data = json.load(f)
print("Using saved access token")
return token_data
# Load client credentials
if not os.path.exists(credentials_file):
raise FileNotFoundError(f"Credentials file not found: {credentials_file}")
with open(credentials_file, 'r') as f:
creds = json.load(f)
client_config = creds.get('installed') or creds.get('web')
if not client_config:
raise ValueError("Invalid credentials file format")
client_id = client_config['client_id']
client_secret = client_config['client_secret']
redirect_uri = 'http://localhost:8080'
# Scopes for Gmail IMAP access
scopes = [
'https://mail.google.com/'
]
# Build authorization URL
auth_params = {
'client_id': client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': ' '.join(scopes),
'access_type': 'offline',
'prompt': 'consent'
}
auth_url = client_config['auth_uri'] + '?' + urllib.parse.urlencode(auth_params)
print("\n" + "="*60)
print("GMAIL OAUTH2 AUTHENTICATION")
print("="*60)
print("\nPlease visit this URL to authorize the application:")
print(f"\n{auth_url}\n")
print("Waiting for authorization...")
print("="*60 + "\n")
# Start local server to receive callback
server = HTTPServer(('localhost', 8080), OAuthHandler)
server.auth_code = None
server.handle_request()
if not server.auth_code:
raise Exception("Authorization failed or was cancelled")
print("Authorization code received, exchanging for access token...")
# Exchange authorization code for access token
token_params = {
'code': server.auth_code,
'client_id': client_id,
'client_secret': client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code'
}
token_data = urllib.parse.urlencode(token_params).encode('utf-8')
token_request = urllib.request.Request(
client_config['token_uri'],
data=token_data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
try:
with urllib.request.urlopen(token_request) as response:
token_response = json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8')
raise Exception(f"Token exchange failed: {error_body}")
# Save token for future use
with open(token_file, 'w') as f:
json.dump(token_response, f, indent=2)
print("Access token obtained and saved!")
return token_response
def refresh_access_token(credentials_file='credentials.json', token_file='token.json'):
"""
Refresh an expired access token using the refresh token
Args:
credentials_file: Path to the OAuth2 client credentials JSON file
token_file: Path to the stored token file
Returns:
dict: Updated token information
"""
if not os.path.exists(token_file):
raise FileNotFoundError(f"Token file not found: {token_file}")
with open(token_file, 'r') as f:
token_data = json.load(f)
if 'refresh_token' not in token_data:
raise ValueError("No refresh token available. Please re-authenticate.")
with open(credentials_file, 'r') as f:
creds = json.load(f)
client_config = creds.get('installed') or creds.get('web')
client_id = client_config['client_id']
client_secret = client_config['client_secret']
# Request new access token
refresh_params = {
'client_id': client_id,
'client_secret': client_secret,
'refresh_token': token_data['refresh_token'],
'grant_type': 'refresh_token'
}
refresh_data = urllib.parse.urlencode(refresh_params).encode('utf-8')
refresh_request = urllib.request.Request(
client_config['token_uri'],
data=refresh_data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
try:
with urllib.request.urlopen(refresh_request) as response:
refresh_response = json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8')
raise Exception(f"Token refresh failed: {error_body}")
# Update token data (keep refresh_token if not provided in response)
token_data['access_token'] = refresh_response['access_token']
if 'expires_in' in refresh_response:
token_data['expires_in'] = refresh_response['expires_in']
# Save updated token
with open(token_file, 'w') as f:
json.dump(token_data, f, indent=2)
print("Access token refreshed!")
return token_data
Metadata
Metadata
Assignees
Labels
No labels