-
Notifications
You must be signed in to change notification settings - Fork 2
/
xAPIConnector.py
executable file
·374 lines (288 loc) · 11.4 KB
/
xAPIConnector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
import json
import socket
import logging
import time
import ssl
from threading import Thread
import sys
# set to true on debug environment only
DEBUG = True
# default connection properites
DEFAULT_XAPI_ADDRESS = 'xapi.xtb.com'
DEFAULT_XAPI_PORT = 5124
DEFUALT_XAPI_STREAMING_PORT = 5125
# wrapper name and version
WRAPPER_NAME = 'python'
WRAPPER_VERSION = '2.5.0'
# API inter-command timeout (in ms)
API_SEND_TIMEOUT = 100
# max connection tries
API_MAX_CONN_TRIES = 3
# logger properties
logger = logging.getLogger("jsonSocket")
FORMAT = '[%(asctime)-15s][%(funcName)s:%(lineno)d] %(message)s'
logging.basicConfig(format=FORMAT)
if DEBUG:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.CRITICAL)
class TransactionSide(object):
BUY = 0
SELL = 1
BUY_LIMIT = 2
SELL_LIMIT = 3
BUY_STOP = 4
SELL_STOP = 5
class TransactionType(object):
ORDER_OPEN = 0
ORDER_CLOSE = 2
ORDER_MODIFY = 3
ORDER_DELETE = 4
class JsonSocket(object):
def __init__(self, address, port, encrypt=False):
self._ssl = encrypt
if self._ssl != True:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket = ssl.wrap_socket(sock)
self.conn = self.socket
self._timeout = None
self._address = address
self._port = port
self._decoder = json.JSONDecoder()
self._receivedData = ''
def connect(self):
for i in range(API_MAX_CONN_TRIES):
try:
self.socket.connect((self.address, self.port))
except socket.error as msg:
logger.error("SockThread Error: %s" % msg)
time.sleep(0.25)
continue
logger.info("Socket connected")
return True
return False
def _sendObj(self, obj):
msg = json.dumps(obj)
self._waitingSend(msg)
def _waitingSend(self, msg):
if self.socket:
sent = 0
msg = msg.encode('utf-8')
while sent < len(msg):
sent += self.conn.send(msg[sent:])
logger.info('Sent: ' + str(msg))
time.sleep(API_SEND_TIMEOUT/1000)
def _read(self, bytesSize=4096):
if not self.socket:
raise RuntimeError("socket connection broken")
while True:
char = self.conn.recv(bytesSize).decode()
self._receivedData += char
try:
(resp, size) = self._decoder.raw_decode(self._receivedData)
if size == len(self._receivedData):
self._receivedData = ''
break
elif size < len(self._receivedData):
self._receivedData = self._receivedData[size:].strip()
break
except ValueError as e:
continue
logger.info('Received: ' + str(resp))
return resp
def _readObj(self):
msg = self._read()
return msg
def close(self):
logger.debug("Closing socket")
self._closeSocket()
if self.socket is not self.conn:
logger.debug("Closing connection socket")
self._closeConnection()
def _closeSocket(self):
self.socket.close()
def _closeConnection(self):
self.conn.close()
def _get_timeout(self):
return self._timeout
def _set_timeout(self, timeout):
self._timeout = timeout
self.socket.settimeout(timeout)
def _get_address(self):
return self._address
def _set_address(self, address):
pass
def _get_port(self):
return self._port
def _set_port(self, port):
pass
def _get_encrypt(self):
return self._ssl
def _set_encrypt(self, encrypt):
pass
timeout = property(_get_timeout, _set_timeout,
doc='Get/set the socket timeout')
address = property(_get_address, _set_address,
doc='read only property socket address')
port = property(_get_port, _set_port, doc='read only property socket port')
encrypt = property(_get_encrypt, _set_encrypt,
doc='read only property socket port')
class APIClient(JsonSocket):
def __init__(self, address=DEFAULT_XAPI_ADDRESS, port=DEFAULT_XAPI_PORT, encrypt=True):
super(APIClient, self).__init__(address, port, encrypt)
if (not self.connect()):
raise Exception("Cannot connect to " + address + ":" +
str(port) + " after " + str(API_MAX_CONN_TRIES) + " retries")
def readCredentials(self, file):
try:
f = open(file)
data = json.load(f)
f.close()
return data['userId'], data['password']
except (FileNotFoundError, TypeError) as e:
print("Missing json file!")
sys.exit(1)
def execute(self, dictionary):
self._sendObj(dictionary)
return self._readObj()
def disconnect(self):
self.close()
def commandExecute(self, commandName, arguments=None):
return self.execute(baseCommand(commandName, arguments))
class APIStreamClient(JsonSocket):
def __init__(self, address=DEFAULT_XAPI_ADDRESS, port=DEFUALT_XAPI_STREAMING_PORT, encrypt=True, ssId=None,
tickFun=None, tradeFun=None, balanceFun=None, tradeStatusFun=None, profitFun=None, newsFun=None):
super(APIStreamClient, self).__init__(address, port, encrypt)
self._ssId = ssId
self._tickFun = tickFun
self._tradeFun = tradeFun
self._balanceFun = balanceFun
self._tradeStatusFun = tradeStatusFun
self._profitFun = profitFun
self._newsFun = newsFun
if (not self.connect()):
raise Exception("Cannot connect to streaming on " + address + ":" +
str(port) + " after " + str(API_MAX_CONN_TRIES) + " retries")
self._running = True
self._t = Thread(target=self._readStream, args=())
self._t.setDaemon(True)
self._t.start()
def _readStream(self):
while (self._running):
msg = self._readObj()
logger.info("Stream received: " + str(msg))
if (msg["command"] == 'tickPrices'):
self._tickFun(msg)
elif (msg["command"] == 'trade'):
self._tradeFun(msg)
elif (msg["command"] == "balance"):
self._balanceFun(msg)
elif (msg["command"] == "tradeStatus"):
self._tradeStatusFun(msg)
elif (msg["command"] == "profit"):
self._profitFun(msg)
elif (msg["command"] == "news"):
self._newsFun(msg)
def disconnect(self):
self._running = False
self._t.join()
self.close()
def execute(self, dictionary):
self._sendObj(dictionary)
def subscribePrice(self, symbol):
self.execute(dict(command='getTickPrices',
symbol=symbol, streamSessionId=self._ssId))
def subscribePrices(self, symbols):
for symbolX in symbols:
self.subscribePrice(symbolX)
def subscribeTrades(self):
self.execute(dict(command='getTrades', streamSessionId=self._ssId))
def subscribeBalance(self):
self.execute(dict(command='getBalance', streamSessionId=self._ssId))
def subscribeTradeStatus(self):
self.execute(dict(command='getTradeStatus',
streamSessionId=self._ssId))
def subscribeProfits(self):
self.execute(dict(command='getProfits', streamSessionId=self._ssId))
def subscribeNews(self):
self.execute(dict(command='getNews', streamSessionId=self._ssId))
def unsubscribePrice(self, symbol):
self.execute(dict(command='stopTickPrices',
symbol=symbol, streamSessionId=self._ssId))
def unsubscribePrices(self, symbols):
for symbolX in symbols:
self.unsubscribePrice(symbolX)
def unsubscribeTrades(self):
self.execute(dict(command='stopTrades', streamSessionId=self._ssId))
def unsubscribeBalance(self):
self.execute(dict(command='stopBalance', streamSessionId=self._ssId))
def unsubscribeTradeStatus(self):
self.execute(dict(command='stopTradeStatus',
streamSessionId=self._ssId))
def unsubscribeProfits(self):
self.execute(dict(command='stopProfits', streamSessionId=self._ssId))
def unsubscribeNews(self):
self.execute(dict(command='stopNews', streamSessionId=self._ssId))
# Command templates
def baseCommand(commandName, arguments=None):
if arguments == None:
arguments = dict()
return dict([('command', commandName), ('arguments', arguments)])
def loginCommand(userId, password, appName=''):
return baseCommand('login', dict(userId=userId, password=password, appName=appName))
# example function for processing ticks from Streaming socket
def procTickExample(msg):
print("TICK: ", msg)
# example function for processing trades from Streaming socket
def procTradeExample(msg):
print("TRADE: ", msg)
# example function for processing trades from Streaming socket
def procBalanceExample(msg):
print("BALANCE: ", msg)
# example function for processing trades from Streaming socket
def procTradeStatusExample(msg):
print("TRADE STATUS: ", msg)
# example function for processing trades from Streaming socket
def procProfitExample(msg):
print("PROFIT: ", msg)
# example function for processing news from Streaming socket
def procNewsExample(msg):
print("NEWS: ", msg)
def main():
# create & connect to RR socket
client = APIClient()
# enter your login credentials in my_credentials.json file
userId, password = client.readCredentials('my_credentials.json')
# connect to RR socket, login
loginResponse = client.execute(
loginCommand(userId=userId, password=password))
logger.info(str(loginResponse))
# check if user logged in correctly
if (loginResponse['status'] == False):
print('Login failed. Error code: {0}'.format(
loginResponse['errorCode']))
return
# get ssId from login response
ssid = loginResponse['streamSessionId']
# second method of invoking commands
resp = client.commandExecute('getAllSymbols')
# create & connect to Streaming socket with given ssID
# and functions for processing ticks, trades, profit and tradeStatus
sclient = APIStreamClient(ssId=ssid, tickFun=procTickExample, tradeFun=procTradeExample,
profitFun=procProfitExample, tradeStatusFun=procTradeStatusExample)
# subscribe for trades
sclient.subscribeTrades()
# subscribe for prices
sclient.subscribePrices(['EURUSD', 'EURGBP', 'EURJPY'])
# subscribe for profits
sclient.subscribeProfits()
# this is an example, make it run for 5 seconds
time.sleep(5)
# gracefully close streaming socket
sclient.disconnect()
# gracefully close RR socket
client.disconnect()
if __name__ == "__main__":
main()