avanza.avanza_socket
1import asyncio 2import logging 3import json 4 5import websockets.legacy.client as websockets 6 7from typing import Any, Callable, Sequence 8 9from .constants import ChannelType 10 11WEBSOCKET_URL = "wss://www.avanza.se/_push/cometd" 12 13logger = logging.getLogger("avanza_socket") 14 15 16class AvanzaSocket: 17 def __init__(self, push_subscription_id, cookies): 18 self._socket = None 19 self._client_id = None 20 self._message_count = 1 21 self._push_subscription_id = push_subscription_id 22 self._connected = False 23 self._subscriptions = {} 24 self._cookies = cookies 25 26 async def init(self): 27 asyncio.ensure_future(self.__create_socket()) 28 await self.__wait_for_websocket_to_be_connected() 29 30 async def __wait_for_websocket_to_be_connected(self): 31 timeout_count = 40 32 timeout_value = 0.250 33 34 # Waits for a maximum of 10 seconds for the connection to be complete 35 for _ in range(0, timeout_count): 36 if self._connected: 37 return 38 await asyncio.sleep(timeout_value) 39 40 raise TimeoutError( 41 "\ 42 We weren't able to connect \ 43 to the websocket within the expected timeframe \ 44 " 45 ) 46 47 async def __create_socket(self): 48 async with websockets.connect( 49 WEBSOCKET_URL, extra_headers={"Cookie": self._cookies} 50 ) as self._socket: 51 await self.__send_handshake_message() 52 await self.__socket_message_handler() 53 54 async def __send_handshake_message(self): 55 await self.__send( 56 { 57 "advice": {"timeout": 60000, "interval": 0}, 58 "channel": "/meta/handshake", 59 "ext": {"subscriptionId": self._push_subscription_id}, 60 "minimumVersion": "1.0", 61 "supportedConnectionTypes": [ 62 "websocket", 63 "long-polling", 64 "callback-polling", 65 ], 66 "version": "1.0", 67 } 68 ) 69 70 async def __send_connect_message(self): 71 await self.__send( 72 { 73 "channel": "/meta/connect", 74 "clientId": self._client_id, 75 "connectionType": "websocket", 76 "id": self._message_count, 77 } 78 ) 79 80 async def __socket_subscribe( 81 self, subscription_string, callback: Callable[[str, dict], Any] 82 ): 83 self._subscriptions[subscription_string] = {"callback": callback} 84 85 await self.__send( 86 { 87 "channel": "/meta/subscribe", 88 "clientId": self._client_id, 89 "subscription": subscription_string, 90 } 91 ) 92 93 async def __send(self, message): 94 wrapped_message = [{**message, "id": str(self._message_count)}] 95 96 logger.info(f"Outgoing message: {wrapped_message}") 97 98 await self._socket.send(json.dumps(wrapped_message)) 99 100 self._message_count = self._message_count + 1 101 102 async def __handshake(self, message: dict): 103 if message.get("successful", False): 104 self._client_id = message.get("clientId") 105 await self.__send( 106 { 107 "advice": {"timeout": 0}, 108 "channel": "/meta/connect", 109 "clientId": self._client_id, 110 "connectionType": "websocket", 111 } 112 ) 113 return 114 115 advice = message.get("advice") 116 if advice and advice.get("reconnect") == "handshake": 117 await self.__send_handshake_message() 118 119 async def __connect(self, message: dict): 120 successful = message.get("successful", False) 121 advice = message.get("advice", {}) 122 reconnect = advice.get("reconnect") == "retry" 123 interval = advice.get("interval") 124 125 connect_successful = successful and ( 126 not advice or (reconnect and interval >= 0) 127 ) 128 129 if connect_successful: 130 await self.__send( 131 { 132 "channel": "/meta/connect", 133 "clientId": self._client_id, 134 "connectionType": "websocket", 135 } 136 ) 137 138 if not self._connected: 139 self._connected = True 140 await self.__resubscribe_existing_subscriptions() 141 142 elif self._client_id: 143 await self.__send_connect_message() 144 145 async def __resubscribe_existing_subscriptions(self): 146 for key, value in self._subscriptions.items(): 147 if value.get("client_id") != self._client_id: 148 await self.__socket_subscribe(key, value["callback"]) 149 150 async def __disconnect(self, message): 151 await self.__send_handshake_message() 152 153 async def __register_subscription(self, message): 154 subscription = message.get("subscription") 155 if subscription is None: 156 raise ValueError("No subscription channel found on subscription message") 157 158 self._subscriptions[subscription]["client_id"] = self._client_id 159 160 async def __socket_message_handler(self): 161 message_action = { 162 "/meta/disconnect": self.__disconnect, 163 "/meta/handshake": self.__handshake, 164 "/meta/connect": self.__connect, 165 "/meta/subscribe": self.__register_subscription, 166 } 167 168 async for message in self._socket: 169 message = json.loads(message)[0] 170 message_channel = message.get("channel") 171 error = message.get("error") 172 173 logger.info(f"Incoming message: {message}") 174 175 if error: 176 logger.error(error) 177 178 action = message_action.get(message_channel) 179 # Use user subscribed action 180 if action is None: 181 callback = self._subscriptions[message_channel]["callback"] 182 if asyncio.iscoroutinefunction(callback): 183 await callback(message) 184 else: 185 callback(message) 186 else: 187 await action(message) 188 189 async def subscribe_to_id( 190 self, channel: ChannelType, id: str, callback: Callable[[str, dict], Any] 191 ): 192 return await self.subscribe_to_ids(channel, [id], callback) 193 194 async def subscribe_to_ids( 195 self, 196 channel: ChannelType, 197 ids: Sequence[str], 198 callback: Callable[[str, dict], Any], 199 ): 200 valid_channels_for_multiple_ids = [ 201 ChannelType.ORDERS, 202 ChannelType.DEALS, 203 ChannelType.POSITIONS, 204 ] 205 206 if len(ids) > 1 and channel not in valid_channels_for_multiple_ids: 207 raise ValueError( 208 f"Multiple ids is not supported for channels other than {valid_channels_for_multiple_ids}" 209 ) 210 211 subscription_string = f'/{channel.value}/{",".join(ids)}' 212 await self.__socket_subscribe(subscription_string, callback)
WEBSOCKET_URL =
'wss://wwwavanza.avanza.se/_push/cometd'
logger =
<Logger avanza_socket (WARNING)>
class
AvanzaSocket:
17class AvanzaSocket: 18 def __init__(self, push_subscription_id, cookies): 19 self._socket = None 20 self._client_id = None 21 self._message_count = 1 22 self._push_subscription_id = push_subscription_id 23 self._connected = False 24 self._subscriptions = {} 25 self._cookies = cookies 26 27 async def init(self): 28 asyncio.ensure_future(self.__create_socket()) 29 await self.__wait_for_websocket_to_be_connected() 30 31 async def __wait_for_websocket_to_be_connected(self): 32 timeout_count = 40 33 timeout_value = 0.250 34 35 # Waits for a maximum of 10 seconds for the connection to be complete 36 for _ in range(0, timeout_count): 37 if self._connected: 38 return 39 await asyncio.sleep(timeout_value) 40 41 raise TimeoutError( 42 "\ 43 We weren't able to connect \ 44 to the websocket within the expected timeframe \ 45 " 46 ) 47 48 async def __create_socket(self): 49 async with websockets.connect( 50 WEBSOCKET_URL, extra_headers={"Cookie": self._cookies} 51 ) as self._socket: 52 await self.__send_handshake_message() 53 await self.__socket_message_handler() 54 55 async def __send_handshake_message(self): 56 await self.__send( 57 { 58 "advice": {"timeout": 60000, "interval": 0}, 59 "channel": "/meta/handshake", 60 "ext": {"subscriptionId": self._push_subscription_id}, 61 "minimumVersion": "1.0", 62 "supportedConnectionTypes": [ 63 "websocket", 64 "long-polling", 65 "callback-polling", 66 ], 67 "version": "1.0", 68 } 69 ) 70 71 async def __send_connect_message(self): 72 await self.__send( 73 { 74 "channel": "/meta/connect", 75 "clientId": self._client_id, 76 "connectionType": "websocket", 77 "id": self._message_count, 78 } 79 ) 80 81 async def __socket_subscribe( 82 self, subscription_string, callback: Callable[[str, dict], Any] 83 ): 84 self._subscriptions[subscription_string] = {"callback": callback} 85 86 await self.__send( 87 { 88 "channel": "/meta/subscribe", 89 "clientId": self._client_id, 90 "subscription": subscription_string, 91 } 92 ) 93 94 async def __send(self, message): 95 wrapped_message = [{**message, "id": str(self._message_count)}] 96 97 logger.info(f"Outgoing message: {wrapped_message}") 98 99 await self._socket.send(json.dumps(wrapped_message)) 100 101 self._message_count = self._message_count + 1 102 103 async def __handshake(self, message: dict): 104 if message.get("successful", False): 105 self._client_id = message.get("clientId") 106 await self.__send( 107 { 108 "advice": {"timeout": 0}, 109 "channel": "/meta/connect", 110 "clientId": self._client_id, 111 "connectionType": "websocket", 112 } 113 ) 114 return 115 116 advice = message.get("advice") 117 if advice and advice.get("reconnect") == "handshake": 118 await self.__send_handshake_message() 119 120 async def __connect(self, message: dict): 121 successful = message.get("successful", False) 122 advice = message.get("advice", {}) 123 reconnect = advice.get("reconnect") == "retry" 124 interval = advice.get("interval") 125 126 connect_successful = successful and ( 127 not advice or (reconnect and interval >= 0) 128 ) 129 130 if connect_successful: 131 await self.__send( 132 { 133 "channel": "/meta/connect", 134 "clientId": self._client_id, 135 "connectionType": "websocket", 136 } 137 ) 138 139 if not self._connected: 140 self._connected = True 141 await self.__resubscribe_existing_subscriptions() 142 143 elif self._client_id: 144 await self.__send_connect_message() 145 146 async def __resubscribe_existing_subscriptions(self): 147 for key, value in self._subscriptions.items(): 148 if value.get("client_id") != self._client_id: 149 await self.__socket_subscribe(key, value["callback"]) 150 151 async def __disconnect(self, message): 152 await self.__send_handshake_message() 153 154 async def __register_subscription(self, message): 155 subscription = message.get("subscription") 156 if subscription is None: 157 raise ValueError("No subscription channel found on subscription message") 158 159 self._subscriptions[subscription]["client_id"] = self._client_id 160 161 async def __socket_message_handler(self): 162 message_action = { 163 "/meta/disconnect": self.__disconnect, 164 "/meta/handshake": self.__handshake, 165 "/meta/connect": self.__connect, 166 "/meta/subscribe": self.__register_subscription, 167 } 168 169 async for message in self._socket: 170 message = json.loads(message)[0] 171 message_channel = message.get("channel") 172 error = message.get("error") 173 174 logger.info(f"Incoming message: {message}") 175 176 if error: 177 logger.error(error) 178 179 action = message_action.get(message_channel) 180 # Use user subscribed action 181 if action is None: 182 callback = self._subscriptions[message_channel]["callback"] 183 if asyncio.iscoroutinefunction(callback): 184 await callback(message) 185 else: 186 callback(message) 187 else: 188 await action(message) 189 190 async def subscribe_to_id( 191 self, channel: ChannelType, id: str, callback: Callable[[str, dict], Any] 192 ): 193 return await self.subscribe_to_ids(channel, [id], callback) 194 195 async def subscribe_to_ids( 196 self, 197 channel: ChannelType, 198 ids: Sequence[str], 199 callback: Callable[[str, dict], Any], 200 ): 201 valid_channels_for_multiple_ids = [ 202 ChannelType.ORDERS, 203 ChannelType.DEALS, 204 ChannelType.POSITIONS, 205 ] 206 207 if len(ids) > 1 and channel not in valid_channels_for_multiple_ids: 208 raise ValueError( 209 f"Multiple ids is not supported for channels other than {valid_channels_for_multiple_ids}" 210 ) 211 212 subscription_string = f'/{channel.value}/{",".join(ids)}' 213 await self.__socket_subscribe(subscription_string, callback)
async def
subscribe_to_id( self, channel: avanza.constants.ChannelType, id: str, callback: Callable[[str, dict], Any]):
async def
subscribe_to_ids( self, channel: avanza.constants.ChannelType, ids: Sequence[str], callback: Callable[[str, dict], Any]):
195 async def subscribe_to_ids( 196 self, 197 channel: ChannelType, 198 ids: Sequence[str], 199 callback: Callable[[str, dict], Any], 200 ): 201 valid_channels_for_multiple_ids = [ 202 ChannelType.ORDERS, 203 ChannelType.DEALS, 204 ChannelType.POSITIONS, 205 ] 206 207 if len(ids) > 1 and channel not in valid_channels_for_multiple_ids: 208 raise ValueError( 209 f"Multiple ids is not supported for channels other than {valid_channels_for_multiple_ids}" 210 ) 211 212 subscription_string = f'/{channel.value}/{",".join(ids)}' 213 await self.__socket_subscribe(subscription_string, callback)