Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ summaries = steam_client.api_call('GET', 'IEconService', 'GetTradeOffersSummary
**get_trade_offers_summary() -> dict**


**get_trade_offers(merge: bool = True) -> dict**
**get_trade_offers(merge: bool = True, get_sent_offers: bool = True, get_received_offers: bool = True, use_webtoken: bool =False, max_retry:int = 5) -> dict**

Fetching trade offers from steam using an API call.
Method is fetching offers with descriptions that satisfy conditions:
Expand All @@ -243,8 +243,13 @@ Method is fetching offers with descriptions that satisfy conditions:
If `merge` is set `True` then offer items are merged from items data and items description into dict where items `id` is key
and descriptions merged with data are value.

**get_trade_offer(trade_offer_id: str, merge: bool = True) -> dict**
`get_sent_offers` and `get_received_offers` control which offers are fetched.

`max_retry` controls how many retries the api call will try.

**get_trade_offer(trade_offer_id: str, merge: bool = True, use_webtoken:bool =False) -> dict**

if `use_webtoken` is True, then request sent will contain access_token instead of api_key

**get_trade_receipt(trade_id: str) -> list**

Expand Down
77 changes: 55 additions & 22 deletions steampy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from decimal import Decimal

import requests
import time

from steampy import guard
from steampy.confirmation import ConfirmationExecutor
Expand Down Expand Up @@ -54,6 +55,7 @@ def __init__(
self.username = username
self._password = password
self.market = SteamMarket(self._session)
self._access_token = None

if login_cookies:
self.set_login_cookies(login_cookies)
Expand All @@ -73,10 +75,8 @@ def set_proxies(self, proxies: dict) -> dict:
def set_login_cookies(self, cookies: dict) -> None:
self._session.cookies.update(cookies)
self.was_login_executed = True

if self.steam_guard is None:
self.steam_guard = {'steamid': str(self.get_steam_id())}

self.market._set_login_executed(self.steam_guard, self._get_session_id())

@login_required
Expand Down Expand Up @@ -110,10 +110,22 @@ def login(self, username: str | None = None, password: str | None = None, steam_
LoginExecutor(self.username, self._password, self.steam_guard['shared_secret'], self._session).login()
self.was_login_executed = True
self.market._set_login_executed(self.steam_guard, self._get_session_id())
self._access_token = self._set_access_token()

def _set_access_token(self) ->str :
steam_login_secure_cookies = [cookie for cookie in self._session.cookies if cookie.name == 'steamLoginSecure']
cookie_value = steam_login_secure_cookies[0].value
decoded_cookie_value = urlparse.unquote(cookie_value)
access_token_parts = decoded_cookie_value.split('||')
if len(access_token_parts) < 2:
print(decoded_cookie_value)
raise ValueError('Access token not found in steamLoginSecure cookie')
access_token = access_token_parts[1]
return access_token

@login_required
def logout(self) -> None:
url = f'{SteamUrl.STORE_URL}/login/logout/'
url = f'{SteamUrl.COMMUNITY_URL}/login/logout/'
data = {'sessionid': self._get_session_id()}
self._session.post(url, data=data)

Expand Down Expand Up @@ -174,27 +186,41 @@ def get_partner_inventory(
return merge_items_with_descriptions_from_inventory(response_dict, game) if merge else response_dict

def _get_session_id(self) -> str:
return self._session.cookies.get_dict()['sessionid']
return self._session.cookies.get_dict(domain="steamcommunity.com", path="/").get('sessionid')

def get_trade_offers_summary(self) -> dict:
params = {'key': self._api_key}
return self.api_call('GET', 'IEconService', 'GetTradeOffersSummary', 'v1', params).json()

def get_trade_offers(self, merge: bool = True) -> dict:
params = {
'key': self._api_key,
'get_sent_offers': 1,
'get_received_offers': 1,
'get_descriptions': 1,
'language': 'english',
'active_only': 1,
'historical_only': 0,
'time_historical_cutoff': '',
}
response = self.api_call('GET', 'IEconService', 'GetTradeOffers', 'v1', params).json()
response = self._filter_non_active_offers(response)

return merge_items_with_descriptions_from_offers(response) if merge else response
def get_trade_offers(self, merge: bool = True, get_sent_offers: bool = True, get_received_offers: bool = True, use_webtoken: bool =False, max_retry:int = 5) -> dict:
params = {'key' if not use_webtoken else 'access_token': self._api_key if not use_webtoken else self._access_token,
'get_sent_offers': int(get_sent_offers),
'get_received_offers': int(get_received_offers),
'get_descriptions': 1,
'language': 'english',
'active_only': 1,
'historical_only': 0,
'time_historical_cutoff': ''}

response = self._try_to_get_trade_offers(params ,max_retry)
if response is None:
raise ApiException('Cannot get proper json from get_trade_offers method')
response_with_active_offers = self._filter_non_active_offers(response)
if merge:
return merge_items_with_descriptions_from_offers(response_with_active_offers)
else:
return response_with_active_offers

def _try_to_get_trade_offers(self, params:dict, max_retry: int) -> dict | None:
response = None
for _ in range(max_retry):
try:
response = self.api_call('GET', 'IEconService', 'GetTradeOffers', 'v1', params).json()
break
except json.decoder.JSONDecodeError:
time.sleep(2)
continue
return response

@staticmethod
def _filter_non_active_offers(offers_response):
Expand All @@ -210,8 +236,15 @@ def _filter_non_active_offers(offers_response):

return offers_response

def get_trade_offer(self, trade_offer_id: str, merge: bool = True) -> dict:
params = {'key': self._api_key, 'tradeofferid': trade_offer_id, 'language': 'english'}
def get_trade_offer(self, trade_offer_id: str, merge: bool = True, use_webtoken:bool =False) -> dict:
params = {
'tradeofferid': trade_offer_id,
'language': 'english'}
if use_webtoken:
params['access_token'] = self._access_token
else:
params['key'] = self._api_key

response = self.api_call('GET', 'IEconService', 'GetTradeOffer', 'v1', params).json()

if merge and 'descriptions' in response['response']:
Expand Down Expand Up @@ -250,7 +283,7 @@ def get_trade_receipt(self, trade_id: str):

@login_required
def accept_trade_offer(self, trade_offer_id: str) -> dict:
trade = self.get_trade_offer(trade_offer_id)
trade = self.get_trade_offer(trade_offer_id, use_webtoken=True)
trade_offer_state = TradeOfferState(trade['response']['offer']['trade_offer_state'])
if trade_offer_state is not TradeOfferState.Active:
raise ApiException(f'Invalid trade offer state: {trade_offer_state.name} ({trade_offer_state.value})')
Expand Down
11 changes: 8 additions & 3 deletions steampy/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ def _send_login_request(self) -> Response:
def set_sessionid_cookies(self):
community_domain = SteamUrl.COMMUNITY_URL[8:]
store_domain = SteamUrl.STORE_URL[8:]
community_cookie_dic = self.session.cookies.get_dict(domain = community_domain)
store_cookie_dic = self.session.cookies.get_dict(domain = store_domain)
community_cookie_dic = self.session.cookies.get_dict(domain=community_domain)
store_cookie_dic = self.session.cookies.get_dict(domain=store_domain)
for name in ('steamLoginSecure', 'sessionid', 'steamRefresh_steam', 'steamCountry'):
cookie = self.session.cookies.get_dict()[name]
if name == "steamLoginSecure":
store_cookie = create_cookie(name, store_cookie_dic[name], store_domain)
else:
store_cookie = create_cookie(name, cookie, store_domain)

if name in ["sessionid", "steamLoginSecure"]:
community_cookie = create_cookie(name, community_cookie_dic[name], community_domain)
else:
community_cookie = create_cookie(name, cookie, community_domain)

self.session.cookies.set(**community_cookie)
self.session.cookies.set(**store_cookie)
self.session.cookies.set(**store_cookie)

def _fetch_rsa_params(self, current_number_of_repetitions: int = 0) -> dict:
self.session.post(SteamUrl.COMMUNITY_URL)
Expand Down
Loading