-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbinanceAPI.py
More file actions
66 lines (58 loc) · 2.28 KB
/
binanceAPI.py
File metadata and controls
66 lines (58 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import hmac
import time
import hashlib
import requests
import json
from urllib.parse import urlencode
import socket
old_getaddrinfo = socket.getaddrinfo
def new_getaddrinfo(*args, **kwargs):
responses = old_getaddrinfo(*args, **kwargs)
return [response
for response in responses
if response[0] == socket.AF_INET]
socket.getaddrinfo = new_getaddrinfo
class dAPI:
def __init__(self, APIKEY, APISECRET, BASE_URL = 'https://dapi.binance.com'):
self.KEY = APIKEY
self.SECRET = APISECRET
self.BASE_URL = BASE_URL
def hashing(self, query_string):
return hmac.new(self.SECRET.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
def get_timestamp(self):
return int(time.time() * 1000)
def dispatch_request(self, http_method):
session = requests.Session()
session.headers.update({
'Content-Type': 'application/json;charset=utf-8',
'X-MBX-APIKEY': self.KEY
})
return {
'GET': session.get,
'DELETE': session.delete,
'PUT': session.put,
'POST': session.post,
}.get(http_method, 'GET')
# used for sending request requires the signature
def send_signed_request(self, http_method, url_path, payload={}):
query_string = urlencode(payload)
# replace single quote to double quote
query_string = query_string.replace('%27', '%22')
if query_string:
query_string = "{}×tamp={}".format(query_string, self.get_timestamp())
else:
query_string = 'timestamp={}'.format(self.get_timestamp())
url = self.BASE_URL + url_path + '?' + query_string + '&signature=' + self.hashing(query_string)
print("{} {}".format(http_method, url))
params = {'url': url, 'params': {}}
response = self.dispatch_request(http_method)(**params)
return response.json()
# used for sending public data request
def send_public_request(self, url_path, payload={}):
query_string = urlencode(payload, True)
url = self.BASE_URL + url_path
if query_string:
url = url + '?' + query_string
print("{}".format(url))
response = self.dispatch_request('GET')(url=url)
return response.json()