-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient_upload.py
More file actions
105 lines (90 loc) · 3.53 KB
/
client_upload.py
File metadata and controls
105 lines (90 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
Fortress Developer CLI - Artifact Upload Tool
Simulates the "Supply Chain Source" for secure artifact uploads.
"""
import sys
import base64
import hashlib
import requests
from pathlib import Path
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
# Server configuration
SERVER_URL = "http://localhost:8000"
def calculate_file_hash(file_path: str) -> str:
"""Calculate SHA-256 hash of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256.update(chunk)
return sha256.hexdigest()
def read_file_as_base64(file_path: str) -> str:
"""Read file and return Base64 encoded content."""
with open(file_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def load_private_key(key_path: str):
"""Load RSA private key from PEM file."""
with open(key_path, "rb") as f:
return serialization.load_pem_private_key(f.read(), password=None)
def sign_hash(private_key, hash_bytes: bytes) -> str:
"""Sign hash using RSA-PSS and return Base64 encoded signature."""
signature = private_key.sign(
hash_bytes,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return base64.b64encode(signature).decode("utf-8")
def main():
"""Main CLI entry point."""
if len(sys.argv) < 3:
print("Usage: python client_upload.py <file_path> <private_key_path> [username]")
print("Example: python client_upload.py ./app-v1.exe ./keys/dev_private.pem developer")
sys.exit(1)
file_path = sys.argv[1]
private_key_path = sys.argv[2]
# Default to 'developer' if not provided for backward compatibility
username = sys.argv[3] if len(sys.argv) > 3 else "developer"
# Validate paths
if not Path(file_path).exists():
print(f"Error: File not found: {file_path}")
sys.exit(1)
if not Path(private_key_path).exists():
print(f"Error: Private key not found: {private_key_path}")
sys.exit(1)
# Step 1: Calculate file hash
print(f"[1/4] Calculating SHA-256 hash...")
file_hash = calculate_file_hash(file_path)
print(f" Hash: {file_hash}")
# Step 2: Read and encode file
print(f"[2/4] Encoding file to Base64...")
file_b64 = read_file_as_base64(file_path)
print(f" Size: {len(file_b64)} bytes (encoded)")
# Step 3: Sign the hash
print(f"[3/4] Signing hash with private key...")
private_key = load_private_key(private_key_path)
signature_b64 = sign_hash(private_key, file_hash.encode())
print(f" Signature generated successfully")
# Step 4: Upload to server
print(f"[4/4] Uploading to server...")
payload = {
"filename": Path(file_path).name,
"username": username,
"file_b64": file_b64,
"signature": signature_b64,
"hash": file_hash
}
try:
response = requests.post(f"{SERVER_URL}/upload", json=payload)
if response.status_code == 200:
print(f" Upload successful!")
print(f" Response: {response.json()}")
else:
print(f" Upload failed: {response.status_code}")
print(f" Response: {response.text}")
except requests.exceptions.ConnectionError:
print(f" Error: Could not connect to server at {SERVER_URL}")
if __name__ == "__main__":
main()