|
1 | | -# main.py |
| 1 | +# temp_main.py |
2 | 2 | import json |
3 | 3 | import random |
4 | 4 | import string |
| 5 | +import base64 |
5 | 6 |
|
6 | 7 | from flask import Flask, Response |
7 | 8 |
|
|
13 | 14 | HEX = ''.join(sorted(set(string.hexdigits.lower()) - set('x'))) |
14 | 15 | B64 = string.ascii_letters + string.digits + '+/' |
15 | 16 |
|
| 17 | + |
| 18 | +def b64url_nopad(b: bytes) -> str: |
| 19 | + return base64.urlsafe_b64encode(b).rstrip(b"=").decode("ascii") |
| 20 | + |
| 21 | +def fake_jwt(rng: random.Random) -> str: |
| 22 | + # Deterministic header/payload, random-looking signature from rng |
| 23 | + header = {"alg": "HS256", "typ": "JWT"} |
| 24 | + payload = {"sub": stable_id("user", 3100, 8), "ts": FIXED_TIMESTAMP, "aud": "keploy-tests"} |
| 25 | + seg1 = b64url_nopad(json.dumps(header, sort_keys=True).encode()) |
| 26 | + seg2 = b64url_nopad(json.dumps(payload, sort_keys=True).encode()) |
| 27 | + sig = b64url_nopad(bytes(rng.randrange(0, 256) for _ in range(32))) |
| 28 | + return f"{seg1}.{seg2}.{sig}" |
| 29 | + |
| 30 | +def opaque_token(rng: random.Random, n: int = 40) -> str: |
| 31 | + # Matches your Bearer rule class: [A-Za-z0-9._~-]{20,} |
| 32 | + return rand(rng, string.ascii_letters + string.digits + "._~-", n) |
| 33 | + |
| 34 | +def u_escape_qs(s: str) -> str: |
| 35 | + # JSON-style unicode escaping for '=' and '&' |
| 36 | + return s.replace("=", r"\u003d").replace("&", r"\u0026") |
| 37 | + |
| 38 | +def pct_amp(s: str) -> str: |
| 39 | + # Percent-encode just ampersands after the token (your rule handles %26) |
| 40 | + return s.replace("&", "%26") |
| 41 | + |
| 42 | + |
16 | 43 | def rand(rng, charset, n): |
17 | 44 | return ''.join(rng.choice(charset) for _ in range(n)) |
18 | 45 |
|
@@ -405,6 +432,73 @@ def secret3(): |
405 | 432 | def astro(): |
406 | 433 | return Response(ASTRO_JSON, mimetype="application/json") |
407 | 434 |
|
| 435 | +@app.route("/jwtlab", methods=["GET"]) |
| 436 | +def jwtlab(): |
| 437 | + rng = random.Random(424242) # fixed seed => stable output |
| 438 | + j = fake_jwt(rng) |
| 439 | + uid = stable_id("user", 9090, 12) |
| 440 | + |
| 441 | + base = f"https://example.test/api/callback?token={j}&user_uuid={uid}&mode=demo" |
| 442 | + payload = { |
| 443 | + "case": "jwtlab", |
| 444 | + "status": 200, |
| 445 | + "meta": {"endpoint": "jwtlab", "timestamp": FIXED_TIMESTAMP}, |
| 446 | + "examples": { |
| 447 | + "url_raw": base, |
| 448 | + "url_pct_amp": pct_amp(base), |
| 449 | + "json_param": {"token": j}, |
| 450 | + }, |
| 451 | + } |
| 452 | + return make_response(payload) |
| 453 | + |
| 454 | +@app.route("/curlmix", methods=["GET"]) |
| 455 | +def curlmix(): |
| 456 | + commons = generate_common_secrets() |
| 457 | + rng = random.Random(515151) |
| 458 | + |
| 459 | + bearer = opaque_token(rng, 40) |
| 460 | + api_key = commons["openai_api_key"] |
| 461 | + |
| 462 | + # Put the same secrets in regular fields so the redaction mappings exist |
| 463 | + shadow = { |
| 464 | + "bearer_token_shadow": bearer, |
| 465 | + "api_key_shadow": api_key, |
| 466 | + } |
| 467 | + |
| 468 | + curl_raw = ( |
| 469 | + f"curl -s -H 'Authorization: Bearer {bearer}' " |
| 470 | + f"-H 'X-Api-Key: {api_key}' https://api.example.test/v1/things" |
| 471 | + ) |
| 472 | + |
| 473 | + payload = { |
| 474 | + "case": "curlmix", |
| 475 | + "status": 200, |
| 476 | + "meta": {"endpoint": "curlmix", "timestamp": FIXED_TIMESTAMP}, |
| 477 | + "shadow": shadow, |
| 478 | + "curl": curl_raw, |
| 479 | + } |
| 480 | + return make_response(payload) |
| 481 | + |
| 482 | +@app.route("/cdn", methods=["GET"]) |
| 483 | +def cdn(): |
| 484 | + rng = random.Random(616161) |
| 485 | + hmac_hex = rand(rng, HEX, 64) |
| 486 | + |
| 487 | + hdnts_plain = f"hdnts=st=1700000000~exp=1999999999~acl=/*~hmac={hmac_hex}" |
| 488 | + payload = { |
| 489 | + "case": "cdn", |
| 490 | + "status": 200, |
| 491 | + "meta": {"endpoint": "cdn", "timestamp": FIXED_TIMESTAMP}, |
| 492 | + "urls": { |
| 493 | + "akamai_hdnts": f"https://cdn.example.test/asset.m3u8?{hdnts_plain}", |
| 494 | + }, |
| 495 | + "fields": { |
| 496 | + "hdnts_plain": hdnts_plain, |
| 497 | + }, |
| 498 | + } |
| 499 | + return make_response(payload) |
| 500 | + |
| 501 | + |
408 | 502 | @app.route("/health", methods=["GET"]) |
409 | 503 | def health(): |
410 | 504 | return make_response({"status": "ok", "ts": FIXED_TIMESTAMP}) |
|
0 commit comments