-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathapi_trading.py
More file actions
3444 lines (3019 loc) · 135 KB
/
api_trading.py
File metadata and controls
3444 lines (3019 loc) · 135 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Almanac API Interactive Trading Client
This script provides an interactive trading session with the Almanac API.
It allows you to:
- Generate Polymarket API credentials
- Initiate trading sessions and place orders
- Fetch positions summary
- Link/unlink Bittensor UID to Almanac account
- Manage multiple credential sets (wallet accounts)
Credential Sets:
Supports multiple wallet accounts via prefixed environment variables (e.g., WALLET1_EOA_WALLET_ADDRESS).
Each set can have its own wallet address, private key, proxy funder, and Polymarket API credentials.
Switch between sets at runtime - sessions are automatically cleared when switching accounts.
Requirements:
- Python 3.10+
- Almanac account (setup at https://almanac.market)
- EOA wallet private key for signing transactions
- Optional: Polymarket API credentials (can be generated via this script)
Python dependencies:
- requests
- dotenv
- tabulate
- py-clob-client
- eth-account
- bittensor
pip install requests dotenv py-clob-client eth-account bittensor tabulate
"""
import os
import json
import math
import urllib.parse
from pathlib import Path
import requests
from dotenv import load_dotenv
from py_clob_client.client import ClobClient
from eth_account import Account
from eth_account.messages import encode_defunct
from eth_account.messages import encode_typed_data as EIP712_ENCODE # type: ignore
import time
import secrets
import bittensor as bt
from datetime import datetime
from tabulate import tabulate
from constants import VOLUME_FEE, PRICE_BUFFER_ADJUSTMENT
ALMANAC_API_URL = "https://api.almanac.market/api"
#ALMANAC_API_URL = "http://localhost:3001/api"
POLYMARKET_CLOB_HOST = "https://clob.polymarket.com"
POLYGON_CHAIN_ID = 137
# EIP-712 domain contract for Polymarket CTF Exchange
EIP712_DOMAIN_CONTRACT = "0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E"
EIP712_DOMAIN_NEGRISK_CONTRACT = "0xC5d563A36AE78145C45a50134d48A1215220f80a"
ENV_PATH = Path("api_trading.env")
# Default number of positions to fetch per page
DEFAULT_POSITIONS_LIMIT = 25
# Debug mode: if True, injects a static non-eligible market into search results for testing
DEBUG = False
# Static non-eligible market for testing (will be rejected by backend API)
DEBUG_STATIC_MARKET = {
"id": "680904",
"question": "Will Bill Ackman say \"Communist\" or \"Communism\" during the X Space event on November 18?",
"conditionId": "0xaaac5595aecf8ba003fdb425c1697e9ac2e528aae492c052b781486c453e5ffe",
"slug": "will-bill-ackman-say-communist-or-communism-during-the-x-space-event-on-november-18",
"title": "Will Bill Ackman say \"Communist\" or \"Communism\" during the X Space event on November 18?",
"outcomes": ["Yes", "No"],
"outcome_prices": [0.155, 0.845],
"clob_token_ids": [
"39362723615320203601565062388169914485014370896130401193658074341957733150044",
"104010206690696361697404185514804406127715733374741772559130541144838358362611"
],
"active": True,
"closed": False,
"restricted": True,
}
CURRENT_SESSION = None
SELECTED_MARKET = None
SELECTED_CREDENTIAL_SET = None # Stores the name of the selected credential set (None = default)
CREDENTIAL_SETS = {} # Dictionary of available credential sets
def _detect_credential_sets():
"""
Scan the .env file for credential sets.
Supports both default (no prefix) and named sets (with prefix like WALLET1_, WALLET2_, etc.)
Returns:
dict: Dictionary mapping credential set names to their credential dicts
"""
credential_sets = {}
if not ENV_PATH.exists():
return credential_sets
# Load the .env file to get all variables
load_dotenv(dotenv_path=str(ENV_PATH), override=True)
# Required credential keys (must have values)
required_keys = [
"EOA_WALLET_ADDRESS",
"EOA_WALLET_PK",
"EOA_PROXY_FUNDER"
]
# Optional credential keys (must exist but can be empty)
optional_keys = [
"POLYMARKET_API_KEY",
"POLYMARKET_API_SECRET",
"POLYMARKET_API_PASSPHRASE"
]
# All credential keys (for detection purposes)
all_credential_keys = required_keys + optional_keys
# Read the .env file directly to detect prefixes and values
try:
env_vars = {} # Store all env vars from file
with open(ENV_PATH, 'r') as f:
lines = f.readlines()
# First pass: read all variables from file
for line in lines:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' not in line:
continue
key, value = line.split('=', 1)
key = key.strip()
value = value.strip().strip('"').strip("'") # Remove quotes if present
env_vars[key] = value
# Track which prefixes we've seen
seen_prefixes = set()
# Second pass: detect prefixes
for key in env_vars.keys():
# Check if this key matches any credential key (with or without prefix)
for cred_key in all_credential_keys:
if key == cred_key:
# Default credential set (no prefix)
seen_prefixes.add("")
elif key.endswith(f"_{cred_key}"):
# Named credential set (has prefix)
prefix = key[:-len(f"_{cred_key}")]
seen_prefixes.add(prefix)
# Third pass: collect credentials for each prefix
for prefix in seen_prefixes:
creds = {}
all_required_present = True
# Check required keys (must have values)
for req_key in required_keys:
if prefix:
env_key = f"{prefix}_{req_key}"
else:
env_key = req_key
value = env_vars.get(env_key, "").strip()
if value:
creds[req_key] = value
else:
all_required_present = False
break
# Only proceed if all required keys are present with values
if not all_required_present:
continue
# Add optional keys (can be empty)
for opt_key in optional_keys:
if prefix:
env_key = f"{prefix}_{opt_key}"
else:
env_key = opt_key
value = env_vars.get(env_key, "").strip()
# Store even if empty (allows for credential generation later)
creds[opt_key] = value
# Add the credential set
set_name = prefix if prefix else "default"
credential_sets[set_name] = creds
except Exception as exc:
print(f"Warning: Could not detect credential sets: {exc}")
return credential_sets
def _get_credential(key: str) -> str | None:
"""
Get a credential value, checking the selected credential set first,
then falling back to default environment variables.
Args:
key: The credential key (e.g., "EOA_WALLET_ADDRESS")
Returns:
The credential value or None if not found
"""
# If a credential set is selected, use it
if SELECTED_CREDENTIAL_SET and SELECTED_CREDENTIAL_SET in CREDENTIAL_SETS:
creds = CREDENTIAL_SETS[SELECTED_CREDENTIAL_SET]
if key in creds:
return creds[key]
# Fallback to default credentials (no prefix)
if "default" in CREDENTIAL_SETS and key in CREDENTIAL_SETS["default"]:
return CREDENTIAL_SETS["default"][key]
# Final fallback to environment variables
return os.getenv(key)
def select_credential_set():
"""
Allow user to select which credential set to use.
Clears the current trading session when switching accounts.
"""
global SELECTED_CREDENTIAL_SET, CREDENTIAL_SETS, CURRENT_SESSION, SELECTED_MARKET
# Refresh credential sets
CREDENTIAL_SETS = _detect_credential_sets()
if not CREDENTIAL_SETS:
print("\nNo credential sets found in the environment file.")
print("Please configure credentials in api_trading.env")
# Clear selection and session if no sets available
SELECTED_CREDENTIAL_SET = None
CURRENT_SESSION = None
SELECTED_MARKET = None
return
# Validate that the currently selected set still exists
if SELECTED_CREDENTIAL_SET and SELECTED_CREDENTIAL_SET not in CREDENTIAL_SETS:
print(f"\n⚠ Previously selected credential set '{SELECTED_CREDENTIAL_SET}' no longer exists.")
print("Clearing session and resetting selection...")
SELECTED_CREDENTIAL_SET = None
CURRENT_SESSION = None
SELECTED_MARKET = None
print("\nAvailable credential sets:")
sets_list = sorted(CREDENTIAL_SETS.keys())
for idx, set_name in enumerate(sets_list, start=1):
marker = " (current)" if set_name == SELECTED_CREDENTIAL_SET else ""
print(f" {idx}) {set_name}{marker}")
print(f" {len(sets_list) + 1}) Cancel")
choice = input(f"\nSelect credential set (1-{len(sets_list) + 1}): ").strip()
try:
choice_num = int(choice)
if 1 <= choice_num <= len(sets_list):
new_set = sets_list[choice_num - 1]
# If switching to a different credential set, clear the session
if new_set != SELECTED_CREDENTIAL_SET:
if CURRENT_SESSION:
print("\n⚠ Clearing existing trading session (switching accounts)...")
CURRENT_SESSION = None
SELECTED_MARKET = None
SELECTED_CREDENTIAL_SET = new_set
print(f"\n✓ Selected credential set: {SELECTED_CREDENTIAL_SET}")
# Show wallet address for confirmation
wallet = _get_credential("EOA_WALLET_ADDRESS")
if wallet:
print(f" Wallet Address: {wallet}")
elif choice_num == len(sets_list) + 1:
print("Cancelled.")
else:
print("Invalid choice.")
except ValueError:
print("Invalid input. Please enter a number.")
def _format_price(value):
try:
f = float(value)
# Clip to [0,1] range for probabilities if out-of-range but close
if -0.05 <= f <= 1.05:
f = min(max(f, 0.0), 1.0)
return f"{f:.2f}"
except Exception:
return "-"
def _format_game_start_time(game_start_time):
"""
Format game start time to display like "Dec 5, 2025 7p EST"
Args:
game_start_time: ISO format datetime string or timestamp
Returns:
Formatted string like "Dec 5, 2025 7p EST" or None if parsing fails
"""
if not game_start_time:
return None
try:
# Try parsing as ISO format string
if isinstance(game_start_time, str):
# Handle ISO format with or without timezone
if 'T' in game_start_time:
dt = datetime.fromisoformat(game_start_time.replace('Z', '+00:00'))
else:
# Try timestamp
dt = datetime.fromtimestamp(int(game_start_time))
elif isinstance(game_start_time, (int, float)):
# Assume timestamp
dt = datetime.fromtimestamp(game_start_time)
else:
return None
# Format: "Dec 5, 2025 7p EST"
# Get month abbreviation, day, year
month_abbr = dt.strftime("%b")
day = dt.day
year = dt.year
# Format hour (12-hour format, no leading zero)
hour = dt.hour
if hour == 0:
hour_str = "12"
period = "a"
elif hour < 12:
hour_str = str(hour)
period = "a"
elif hour == 12:
hour_str = "12"
period = "p"
else:
hour_str = str(hour - 12)
period = "p"
# Get timezone abbreviation (try to get EST/EDT, etc.)
# For simplicity, we'll use the timezone offset or default to EST
tz_str = "EST" # Default
try:
if dt.tzinfo:
offset = dt.utcoffset().total_seconds() / 3600
if offset == -5:
tz_str = "EST"
elif offset == -4:
tz_str = "EDT"
elif offset == -8:
tz_str = "PST"
elif offset == -7:
tz_str = "PDT"
else:
tz_str = dt.strftime("%Z") or "EST"
except Exception:
pass
return f"{month_abbr} {day}, {year} {hour_str}{period} {tz_str}"
except Exception:
return None
def _extract_outcomes_summary(market: dict) -> str:
"""
Try to extract a concise outcomes summary like:
'Yes 0.41 | No 0.59' or 'A 0.10 | B 0.20 | C 0.70'
Supports a few common shapes defensively.
"""
# Preferred: 'outcomes' list with aligned 'outcome_prices' list
outcomes = market.get("outcomes")
outcome_prices = market.get("outcome_prices")
if isinstance(outcomes, list) and isinstance(outcome_prices, list) and len(outcomes) == len(outcome_prices):
parts = []
for name, price in zip(outcomes, outcome_prices):
name_str = name if isinstance(name, str) else (name.get("name") if isinstance(name, dict) else str(name))
parts.append(f"{name_str} {_format_price(price)}")
if parts:
return " | ".join(parts[:6])
return "" # no concise summary available
def _normalize_search_results(payload) -> list:
"""
Accepts either a list of markets or common envelope shapes and returns a list.
Handles: {results: [...]}, {data: [...]}, {markets: [...]}, {items: [...]}
Falls back to [] if nothing recognized.
"""
if isinstance(payload, list):
return payload
if isinstance(payload, dict):
for key in ("results", "data", "markets", "items"):
val = payload.get(key)
if isinstance(val, list):
return val
# Some APIs wrap under {success, data: {...}} with inner list keys
data = payload.get("data")
if isinstance(data, dict):
for key in ("results", "markets", "items", "list"):
val = data.get(key)
if isinstance(val, list):
return val
return []
def _extract_event_list(payload_list: list) -> list:
"""
From a generic list returned by search, prefer items that look like Events (contain 'markets').
If none contain 'markets', return the original list and treat items as markets directly.
"""
if not isinstance(payload_list, list):
return []
has_event_shape = any(isinstance(it, dict) and isinstance(it.get("markets"), list) for it in payload_list)
return payload_list if has_event_shape else payload_list
def _display_markets_for_event(event: dict) -> list:
"""
Print markets for a single event with concise outcomes summary in a table format.
Returns the list of markets for further selection.
"""
markets = event.get("markets") or []
if not isinstance(markets, list) or not markets:
print("No markets found for this event.")
return []
# Sort markets: Moneyline first, then by volume (big to small)
def _sort_key(m):
# Check if it's a Moneyline market
sports_market_type = m.get("sports_market_type") or m.get("sportsMarketType") or ""
is_moneyline = sports_market_type.lower() == "moneyline"
# Get volume (try multiple field names)
volume = m.get("volume") or m.get("totalVolume") or m.get("total_volume") or 0
try:
volume = float(volume)
except (ValueError, TypeError):
volume = 0.0
# Return tuple: (is_moneyline, -volume)
# is_moneyline: True=0 (sorts first), False=1 (sorts later)
# -volume: negative so bigger volumes sort first
return (0 if is_moneyline else 1, -volume)
markets = sorted(markets, key=_sort_key)
# Find maximum number of outcomes across all markets to determine column count
max_outcomes = 0
for m in markets:
outcomes = m.get("outcomes") or []
if isinstance(outcomes, list):
max_outcomes = max(max_outcomes, len(outcomes))
# Build table rows
table_rows = []
for idx, m in enumerate(markets, start=1):
title = m.get("title") or m.get("question") or m.get("name") or "Untitled"
market_id = m.get("id") or m.get("marketId") or m.get("_id") or "unknown"
# Extract outcomes and prices for this market
outcomes = m.get("outcomes") or []
outcome_prices = m.get("outcome_prices") or []
# Build row: #, Market, then each outcome (name + price), then Market ID
row = [
str(idx),
_truncate_text(title, 50)
]
# Add outcome name and price for each outcome column
if isinstance(outcomes, list) and isinstance(outcome_prices, list) and len(outcomes) == len(outcome_prices):
for outcome, price in zip(outcomes, outcome_prices):
outcome_name = outcome if isinstance(outcome, str) else (outcome.get("name") if isinstance(outcome, dict) else str(outcome))
formatted_price = _format_price(price)
row.append(f"{outcome_name} {formatted_price}")
else:
# Fallback: try to extract outcomes from other formats
for i in range(len(outcomes) if isinstance(outcomes, list) else 0):
outcome = outcomes[i]
outcome_name = outcome if isinstance(outcome, str) else (outcome.get("name") if isinstance(outcome, dict) else str(outcome))
price = outcome_prices[i] if isinstance(outcome_prices, list) and i < len(outcome_prices) else None
if price is not None:
formatted_price = _format_price(price)
row.append(f"{outcome_name} {formatted_price}")
else:
row.append(outcome_name if outcome_name else "-")
# Pad with "-" if market has fewer outcomes than max
while len(row) < 2 + max_outcomes:
row.append("-")
# Add Volume column (round to nearest dollar, no dollar sign)
volume = m.get("volume") or m.get("totalVolume") or m.get("total_volume") or 0
try:
volume_float = float(volume)
volume_rounded = round(volume_float)
row.append(str(volume_rounded))
except (ValueError, TypeError):
row.append("-")
# Add Liquidity column (round to nearest dollar, no dollar sign)
liquidity = m.get("liquidity") or m.get("totalLiquidity") or m.get("total_liquidity") or 0
try:
liquidity_float = float(liquidity)
liquidity_rounded = round(liquidity_float)
row.append(str(liquidity_rounded))
except (ValueError, TypeError):
row.append("-")
# Add Type column (sports_market_type if it exists, replace _ with spaces)
sports_market_type = m.get("sports_market_type") or m.get("sportsMarketType") or "-"
if sports_market_type != "-":
sports_market_type = sports_market_type.replace("_", " ")
row.append(sports_market_type)
# Add Market ID as last column
row.append(market_id)
table_rows.append(row)
# Define table headers: #, Market, then Outcome 1, Outcome 2, etc., then Volume, Liquidity, Type, then Market ID
outcome_headers = [f"Outcome {i+1}" for i in range(max_outcomes)]
headers = ["#", "Market"] + outcome_headers + ["Volume", "Liquidity", "Type", "Market ID"]
# Print table
print(f"\nMarkets ({len(markets)} found):")
print(tabulate(table_rows, headers=headers, tablefmt="grid", stralign="left"))
return markets
def fetch_clob_prices(token_ids: list) -> dict | None:
"""
Fetch latest prices from the CLOB API for given token IDs.
Args:
token_ids: List of token ID strings
Returns:
Dictionary mapping token IDs to price data:
{
"token_id_1": {"BUY": "0.45", "SELL": "0.44"},
"token_id_2": {"BUY": "0.52", "SELL": "0.51"}
}
Returns None if the request fails.
"""
if not token_ids:
return None
try:
# Build BookParams array with both BUY and SELL side for each token
book_params = []
for token_id in token_ids:
book_params.append({"token_id": token_id, "side": "BUY"})
book_params.append({"token_id": token_id, "side": "SELL"})
# Make POST request to CLOB API
response = requests.post(
f"{POLYMARKET_CLOB_HOST}/prices",
json=book_params,
headers={"Content-Type": "application/json"},
timeout=10
)
if response.status_code != 200:
print(f"Warning: Failed to fetch CLOB prices (status {response.status_code})")
return None
prices_data = response.json()
return prices_data
except Exception as exc:
print(f"Warning: Error fetching CLOB prices: {exc}")
return None
def _update_all_markets_prices_from_clob(markets: list) -> list:
"""
Batch fetch latest prices from CLOB API for all markets and update their outcome_prices.
Stores original prices in _original_outcome_prices for fallback.
Args:
markets: List of market dictionaries with clob_token_ids
Returns:
List of updated market dictionaries with fresh outcome_prices
"""
if not isinstance(markets, list) or not markets:
return markets
# Collect all token IDs from all markets
all_token_ids = []
market_token_map = {} # Map token_id -> list of (market_idx, outcome_idx) tuples
for market_idx, market in enumerate(markets):
clob_token_ids = market.get("clob_token_ids")
if isinstance(clob_token_ids, list) and clob_token_ids:
# Store original prices for fallback
original_prices = market.get("outcome_prices")
if original_prices:
market["_original_outcome_prices"] = original_prices.copy() if isinstance(original_prices, list) else original_prices
# Track which market/outcome each token belongs to
for outcome_idx, token_id in enumerate(clob_token_ids):
token_id_str = str(token_id)
if token_id_str not in market_token_map:
market_token_map[token_id_str] = []
all_token_ids.append(token_id)
market_token_map[token_id_str].append((market_idx, outcome_idx))
# If no token IDs found, return markets unchanged
if not all_token_ids:
return markets
# Batch fetch all prices from CLOB API
prices_data = fetch_clob_prices(all_token_ids)
if not prices_data:
return markets
# Update all markets with fetched prices
for market_idx, market in enumerate(markets):
clob_token_ids = market.get("clob_token_ids")
if not isinstance(clob_token_ids, list) or not clob_token_ids:
continue
updated_prices = []
for token_id in clob_token_ids:
# Try multiple formats for token ID lookup
token_prices = None
token_id_str = str(token_id)
if token_id_str in prices_data:
token_prices = prices_data[token_id_str]
else:
try:
token_id_num = int(token_id)
if str(token_id_num) in prices_data:
token_prices = prices_data[str(token_id_num)]
except (ValueError, TypeError):
pass
if token_prices:
# Use mid price (average of BUY and SELL) or BUY price as fallback
buy_price = token_prices.get("BUY")
sell_price = token_prices.get("SELL")
if buy_price and sell_price:
try:
buy_float = float(buy_price)
sell_float = float(sell_price)
mid_price = (buy_float + sell_float) / 2.0
updated_prices.append(mid_price)
except (ValueError, TypeError):
try:
updated_prices.append(float(buy_price))
except (ValueError, TypeError):
updated_prices.append(None)
elif buy_price:
try:
updated_prices.append(float(buy_price))
except (ValueError, TypeError):
updated_prices.append(None)
else:
updated_prices.append(None)
else:
updated_prices.append(None)
# Update market with fresh prices (only if we got valid prices)
if updated_prices and any(p is not None for p in updated_prices):
market["outcome_prices"] = updated_prices
# Store raw CLOB price data for this market's tokens
market_clob_prices = {}
for token_id in clob_token_ids:
token_id_str = str(token_id)
if token_id_str in prices_data:
market_clob_prices[token_id_str] = prices_data[token_id_str]
else:
try:
token_id_num = int(token_id)
if str(token_id_num) in prices_data:
market_clob_prices[str(token_id_num)] = prices_data[str(token_id_num)]
except (ValueError, TypeError):
pass
if market_clob_prices:
market["_clob_prices"] = market_clob_prices
return markets
def fetch_fee_rate_bps(token_id: str | None, timeout: float = 3.0) -> int:
"""
Match frontend useFeeRate: GET /fee-rate?token_id=... -> { "base_fee": int }.
Returns basis points; 0 on failure / missing token (same as feeRateBps ?? 0).
"""
if not token_id:
return 0
try:
q = urllib.parse.urlencode({"token_id": str(token_id)})
r = requests.get(f"{POLYMARKET_CLOB_HOST}/fee-rate?{q}", timeout=timeout)
r.raise_for_status()
data = r.json()
if isinstance(data, dict) and isinstance(data.get("base_fee"), (int, float)):
return int(data["base_fee"])
except Exception:
pass
return 0
def _update_market_prices_from_clob(market: dict) -> dict:
"""
Fetch latest prices from CLOB API and update the market's outcome_prices.
Stores original prices in _original_outcome_prices for comparison.
This is a single-market version - use _update_all_markets_prices_from_clob for batch updates.
Args:
market: Market dictionary with clob_token_ids
Returns:
Updated market dictionary with fresh outcome_prices
"""
# Use the batch function for single market
updated_markets = _update_all_markets_prices_from_clob([market])
return updated_markets[0] if updated_markets else market
def _display_outcomes_and_choose(market: dict):
"""
Show detailed outcomes for a market (if available) and let the user pick one.
Uses CLOB prices if available, falls back to original prices if CLOB fetch failed.
Returns a tuple (chosen_outcome_name, chosen_outcome_price, chosen_token_id).
"""
outcomes = market.get("outcomes")
outcome_prices = market.get("outcome_prices") # These should be CLOB prices if fetched successfully
original_prices = market.get("_original_outcome_prices") # Original prices from API (fallback)
clob_token_ids = market.get("clob_token_ids")
clob_prices = market.get("_clob_prices") # Raw CLOB price data
# Check if CLOB fetch was successful
clob_fetch_successful = clob_prices is not None
# Normalize into list of dicts {name, price?, tokenId?} with index alignment
normalized = []
if isinstance(outcomes, list):
for idx, o in enumerate(outcomes):
name = o if isinstance(o, str) else (o.get("name") if isinstance(o, dict) else str(o))
price = None
token_id = None
# Get current price (prefer CLOB prices if fetch was successful)
if clob_fetch_successful and isinstance(outcome_prices, list) and idx < len(outcome_prices):
price = outcome_prices[idx]
# Fallback to original prices if CLOB fetch failed
if price is None and isinstance(original_prices, list) and idx < len(original_prices):
price = original_prices[idx]
if isinstance(clob_token_ids, list) and idx < len(clob_token_ids):
token_id = clob_token_ids[idx]
# Fallback to other price sources if still no price
if price is None and isinstance(o, dict):
price = (
o.get("price")
or o.get("lastPrice")
or o.get("midPrice")
or o.get("probability")
or o.get("p")
)
normalized.append({"name": name or "?", "price": price, "tokenId": token_id})
elif isinstance(outcomes, dict):
# If dict, best-effort alignment by iteration order
for idx, (name, maybe_price) in enumerate(outcomes.items()):
price = maybe_price
token_id = None
# Get current price (prefer CLOB prices if fetch was successful)
if clob_fetch_successful and isinstance(outcome_prices, list) and idx < len(outcome_prices):
price = outcome_prices[idx]
# Fallback to original prices if CLOB fetch failed
if price is None and isinstance(original_prices, list) and idx < len(original_prices):
price = original_prices[idx]
if isinstance(clob_token_ids, list) and idx < len(clob_token_ids):
token_id = clob_token_ids[idx]
# If we still don't have a price, use maybe_price from dict
if price is None:
price = maybe_price
normalized.append({"name": str(name), "price": price, "tokenId": token_id})
else:
# Fallback binary representation
yes_price = market.get("yesPrice") or market.get("yes")
no_price = market.get("noPrice") or market.get("no")
# Use CLOB prices if fetch was successful
if clob_fetch_successful and isinstance(outcome_prices, list) and len(outcome_prices) >= 2:
yes_price = outcome_prices[0]
no_price = outcome_prices[1]
# Fallback to original prices if CLOB fetch failed
elif isinstance(original_prices, list) and len(original_prices) >= 2:
yes_price = original_prices[0]
no_price = original_prices[1]
if yes_price is not None or no_price is not None:
normalized = [
{"name": "Yes", "price": yes_price, "tokenId": None},
{"name": "No", "price": no_price, "tokenId": None},
]
if not normalized:
print("No explicit outcomes provided by API; proceeding without outcome selection.")
return (None, None, None)
print("\nOutcomes:")
for idx, o in enumerate(normalized, start=1):
price_str = _format_price(o.get('price'))
print(f" {idx}) {o['name']} {price_str}")
sel = input("\nSelect outcome to trade (or Enter to cancel): ").strip()
if not sel:
return None # Signal cancellation
try:
sel_idx = int(sel)
except ValueError:
print("Invalid selection; cancelling.")
return None # Signal cancellation
if sel_idx < 1 or sel_idx > len(normalized):
print("Selection out of range; cancelling.")
return None # Signal cancellation
chosen = normalized[sel_idx - 1]
return (chosen.get("name"), chosen.get("price"), chosen.get("tokenId"))
def _place_order_now(market: dict, chosen_outcome_name: str | None = None, chosen_token_id: str | None = None, available_shares: float | None = None):
"""
Inline order placement flow; prompts for side/size/price, shows summary, and submits with confirmation.
User can type 'c' at any prompt to cancel.
Args:
market: Market dictionary
chosen_outcome_name: Pre-selected outcome name
chosen_token_id: Pre-selected token ID
available_shares: Available shares for sell orders (to validate against)
"""
global CURRENT_SESSION
if not CURRENT_SESSION:
print("No active trading session. Create a session first in the Trading Menu.")
return
market_id = market.get("id") or market.get("marketId") or market.get("_id")
if not market_id:
print("Selected market missing id.")
return
market_title = market.get("title") or market.get("question") or market.get("name") or "Unknown Market"
# Extract neg_risk field (supports both snake_case and camelCase)
neg_risk = market.get("neg_risk") or market.get("negRisk") or False
if isinstance(neg_risk, str):
neg_risk = neg_risk.lower() in ("true", "1", "yes")
neg_risk = bool(neg_risk)
print("\nPlace order (type 'c' or 'cancel' at any prompt to cancel):")
# Side input with cancel option
side_input = input("Side (buy/b/sell/s/cancel/c) [buy]: ").strip().lower()
if side_input in ("c", "cancel"):
print("Order cancelled.")
return
side = side_input or "buy"
# Map aliases to full names
if side in ("b", "buy"):
side = "buy"
elif side in ("s", "sell"):
side = "sell"
else:
print("Side must be 'buy', 'b', 'sell', or 's'.")
return
# Size and price input with cancel option
# For sell orders: check available shares, no $5 minimum
# For buy orders: check $5 minimum
while True:
size_prompt = f"Size (quantity/cancel/c) [{available_shares:.2f}]: " if (side == "sell" and available_shares) else "Size (quantity/cancel/c) [1]: "
size_str = input(size_prompt).strip()
if size_str.lower() in ("c", "cancel"):
print("Order cancelled.")
return
size_str = size_str or (f"{available_shares:.2f}" if (side == "sell" and available_shares) else "1")
price_str = input("Price (0-1/cancel/c) [0.01]: ").strip()
if price_str.lower() in ("c", "cancel"):
print("Order cancelled.")
return
price_str = price_str or "0.01"
try:
size = float(size_str)
price = float(price_str)
except ValueError:
print("Invalid size or price. Try again.")
continue
if size <= 0 or price <= 0 or price > 1:
print("Size must be > 0 and price must be in (0, 1]. Try again.")
continue
# For sell orders: cap at exact position size from API (never round available_shares for the order)
if side == "sell" and available_shares is not None:
raw_available = float(available_shares)
requested = size
size = min(size, raw_available)
if requested > raw_available and round(requested, 2) != round(raw_available, 2):
print(
f"Sell size capped to your position: {raw_available:.2f} shares "
f"(requested {requested:.2f})."
)
# For buy orders: check $5 minimum
if side == "buy":
notional = size * price
if notional < 5.0:
print(f"Order notional ${notional:.2f} is below the $5 minimum. Please increase size and/or price.")
continue
break
# SELL: CLOB EIP-712 uses round(size, 2) in micro-units; floor to 0.01-share grid so we never exceed balance
if side == "sell":
size = _normalize_sell_size_for_clob(size)
if size <= 0:
print("Sell size is below 0.01 shares after aligning to the exchange grid. Nothing to sell.")
return
# Order type input with cancel option
print("\nOrder Type:")
print(" GTC - Good Till Canceled: Limit order remains active until filled or cancelled")
print(" FOK - Fill Or Kill: Market Order must be filled immediately or it's cancelled")
print(" FAK - Fill And Kill: Market Order will be filled immediately with what is available and the rest cancelled")
order_type_input = input("Order type (gtc/fok/fak/cancel/c) [gtc]: ").strip().upper()
if order_type_input in ("C", "CANCEL"):
print("Order cancelled.")
return
order_type = order_type_input or "GTC"
if order_type not in ("GTC", "FOK", "FAK"):
print("Order type must be 'GTC', 'FOK', or 'FAK'. Using default 'GTC'.")
order_type = "GTC"
side_upper = "BUY" if side == "buy" else "SELL"
# Apply the same price adjustment logic that place_order() uses. Only for Market Orders (FOK, FAK).
# This ensures the order summary shows the actual values that will be sent to the API
if order_type in ("FOK", "FAK"):
if side_upper == "BUY":
adjusted_price = price + PRICE_BUFFER_ADJUSTMENT
price_note = f"(+${PRICE_BUFFER_ADJUSTMENT} buffer for {side_upper} orders)"
else: # SELL
adjusted_price = max(0.01, price - PRICE_BUFFER_ADJUSTMENT)
price_note = f"(-${PRICE_BUFFER_ADJUSTMENT} buffer for {side_upper} orders)"
else:
adjusted_price = price
price_note = ""
# Calculate with adjusted price (what will actually be sent to API)
notional = size * adjusted_price
if side_upper == "BUY":
fee = notional * VOLUME_FEE
total_with_fee = notional + fee
else:
total_with_fee = notional
# Display summary with actual values
print("\n" + "="*60)
print("Order Summary:")
print("="*60)
print(f"Market: {market_title}")
if chosen_outcome_name:
print(f"Outcome: {chosen_outcome_name}")
print(f"Neg Risk: {neg_risk}")
print(f"Side: {side_upper}")
print(f"Order Type: {order_type}")
print(f"Size: {size}")
if adjusted_price != price:
print(f"Requested Price: {price}")
print(f"Adjusted Price: {adjusted_price} {price_note}")
else:
print(f"Price: {price}")
print(f"Subtotal: ${notional:.2f}")
if side_upper == "BUY":
print(f"Platform Fee ({VOLUME_FEE*100:.1f}%): ${fee:.2f}")
print(f"Total: ${total_with_fee:.2f}")
print("="*60)
# Final confirmation
confirm = input("\nSubmit this order? (y/n): ").strip().lower()
if confirm not in ("y", "yes"):
print("Order cancelled.")
return
# Call unified order poster
place_order(
market_id=market_id,
side_upper=side_upper,
size=size,
price=price,