avanza.avanza_socket

  1import asyncio
  2import logging
  3import json
  4
  5import websockets.legacy.client as websockets
  6
  7from typing import Any, Callable, Sequence
  8
  9from .constants import ChannelType
 10
 11WEBSOCKET_URL = "wss://www.avanza.se/_push/cometd"
 12
 13logger = logging.getLogger("avanza_socket")
 14
 15
 16class AvanzaSocket:
 17    def __init__(self, push_subscription_id, cookies):
 18        self._socket = None
 19        self._client_id = None
 20        self._message_count = 1
 21        self._push_subscription_id = push_subscription_id
 22        self._connected = False
 23        self._subscriptions = {}
 24        self._cookies = cookies
 25
 26    async def init(self):
 27        asyncio.ensure_future(self.__create_socket())
 28        await self.__wait_for_websocket_to_be_connected()
 29
 30    async def __wait_for_websocket_to_be_connected(self):
 31        timeout_count = 40
 32        timeout_value = 0.250
 33
 34        # Waits for a maximum of 10 seconds for the connection to be complete
 35        for _ in range(0, timeout_count):
 36            if self._connected:
 37                return
 38            await asyncio.sleep(timeout_value)
 39
 40        raise TimeoutError(
 41            "\
 42            We weren't able to connect \
 43            to the websocket within the expected timeframe \
 44        "
 45        )
 46
 47    async def __create_socket(self):
 48        async with websockets.connect(
 49            WEBSOCKET_URL, extra_headers={"Cookie": self._cookies}
 50        ) as self._socket:
 51            await self.__send_handshake_message()
 52            await self.__socket_message_handler()
 53
 54    async def __send_handshake_message(self):
 55        await self.__send(
 56            {
 57                "advice": {"timeout": 60000, "interval": 0},
 58                "channel": "/meta/handshake",
 59                "ext": {"subscriptionId": self._push_subscription_id},
 60                "minimumVersion": "1.0",
 61                "supportedConnectionTypes": [
 62                    "websocket",
 63                    "long-polling",
 64                    "callback-polling",
 65                ],
 66                "version": "1.0",
 67            }
 68        )
 69
 70    async def __send_connect_message(self):
 71        await self.__send(
 72            {
 73                "channel": "/meta/connect",
 74                "clientId": self._client_id,
 75                "connectionType": "websocket",
 76                "id": self._message_count,
 77            }
 78        )
 79
 80    async def __socket_subscribe(
 81        self, subscription_string, callback: Callable[[str, dict], Any]
 82    ):
 83        self._subscriptions[subscription_string] = {"callback": callback}
 84
 85        await self.__send(
 86            {
 87                "channel": "/meta/subscribe",
 88                "clientId": self._client_id,
 89                "subscription": subscription_string,
 90            }
 91        )
 92
 93    async def __send(self, message):
 94        wrapped_message = [{**message, "id": str(self._message_count)}]
 95
 96        logger.info(f"Outgoing message: {wrapped_message}")
 97
 98        await self._socket.send(json.dumps(wrapped_message))
 99
100        self._message_count = self._message_count + 1
101
102    async def __handshake(self, message: dict):
103        if message.get("successful", False):
104            self._client_id = message.get("clientId")
105            await self.__send(
106                {
107                    "advice": {"timeout": 0},
108                    "channel": "/meta/connect",
109                    "clientId": self._client_id,
110                    "connectionType": "websocket",
111                }
112            )
113            return
114
115        advice = message.get("advice")
116        if advice and advice.get("reconnect") == "handshake":
117            await self.__send_handshake_message()
118
119    async def __connect(self, message: dict):
120        successful = message.get("successful", False)
121        advice = message.get("advice", {})
122        reconnect = advice.get("reconnect") == "retry"
123        interval = advice.get("interval")
124
125        connect_successful = successful and (
126            not advice or (reconnect and interval >= 0)
127        )
128
129        if connect_successful:
130            await self.__send(
131                {
132                    "channel": "/meta/connect",
133                    "clientId": self._client_id,
134                    "connectionType": "websocket",
135                }
136            )
137
138            if not self._connected:
139                self._connected = True
140                await self.__resubscribe_existing_subscriptions()
141
142        elif self._client_id:
143            await self.__send_connect_message()
144
145    async def __resubscribe_existing_subscriptions(self):
146        for key, value in self._subscriptions.items():
147            if value.get("client_id") != self._client_id:
148                await self.__socket_subscribe(key, value["callback"])
149
150    async def __disconnect(self, message):
151        await self.__send_handshake_message()
152
153    async def __register_subscription(self, message):
154        subscription = message.get("subscription")
155        if subscription is None:
156            raise ValueError("No subscription channel found on subscription message")
157
158        self._subscriptions[subscription]["client_id"] = self._client_id
159
160    async def __socket_message_handler(self):
161        message_action = {
162            "/meta/disconnect": self.__disconnect,
163            "/meta/handshake": self.__handshake,
164            "/meta/connect": self.__connect,
165            "/meta/subscribe": self.__register_subscription,
166        }
167
168        async for message in self._socket:
169            message = json.loads(message)[0]
170            message_channel = message.get("channel")
171            error = message.get("error")
172
173            logger.info(f"Incoming message: {message}")
174
175            if error:
176                logger.error(error)
177
178            action = message_action.get(message_channel)
179            # Use user subscribed action
180            if action is None:
181                callback = self._subscriptions[message_channel]["callback"]
182                if asyncio.iscoroutinefunction(callback):
183                    await callback(message)
184                else:
185                    callback(message)
186            else:
187                await action(message)
188
189    async def subscribe_to_id(
190        self, channel: ChannelType, id: str, callback: Callable[[str, dict], Any]
191    ):
192        return await self.subscribe_to_ids(channel, [id], callback)
193
194    async def subscribe_to_ids(
195        self,
196        channel: ChannelType,
197        ids: Sequence[str],
198        callback: Callable[[str, dict], Any],
199    ):
200        valid_channels_for_multiple_ids = [
201            ChannelType.ORDERS,
202            ChannelType.DEALS,
203            ChannelType.POSITIONS,
204        ]
205
206        if len(ids) > 1 and channel not in valid_channels_for_multiple_ids:
207            raise ValueError(
208                f"Multiple ids is not supported for channels other than {valid_channels_for_multiple_ids}"
209            )
210
211        subscription_string = f'/{channel.value}/{",".join(ids)}'
212        await self.__socket_subscribe(subscription_string, callback)
WEBSOCKET_URL = 'wss://wwwavanza.avanza.se/_push/cometd'
logger = <Logger avanza_socket (WARNING)>
class AvanzaSocket:
 17class AvanzaSocket:
 18    def __init__(self, push_subscription_id, cookies):
 19        self._socket = None
 20        self._client_id = None
 21        self._message_count = 1
 22        self._push_subscription_id = push_subscription_id
 23        self._connected = False
 24        self._subscriptions = {}
 25        self._cookies = cookies
 26
 27    async def init(self):
 28        asyncio.ensure_future(self.__create_socket())
 29        await self.__wait_for_websocket_to_be_connected()
 30
 31    async def __wait_for_websocket_to_be_connected(self):
 32        timeout_count = 40
 33        timeout_value = 0.250
 34
 35        # Waits for a maximum of 10 seconds for the connection to be complete
 36        for _ in range(0, timeout_count):
 37            if self._connected:
 38                return
 39            await asyncio.sleep(timeout_value)
 40
 41        raise TimeoutError(
 42            "\
 43            We weren't able to connect \
 44            to the websocket within the expected timeframe \
 45        "
 46        )
 47
 48    async def __create_socket(self):
 49        async with websockets.connect(
 50            WEBSOCKET_URL, extra_headers={"Cookie": self._cookies}
 51        ) as self._socket:
 52            await self.__send_handshake_message()
 53            await self.__socket_message_handler()
 54
 55    async def __send_handshake_message(self):
 56        await self.__send(
 57            {
 58                "advice": {"timeout": 60000, "interval": 0},
 59                "channel": "/meta/handshake",
 60                "ext": {"subscriptionId": self._push_subscription_id},
 61                "minimumVersion": "1.0",
 62                "supportedConnectionTypes": [
 63                    "websocket",
 64                    "long-polling",
 65                    "callback-polling",
 66                ],
 67                "version": "1.0",
 68            }
 69        )
 70
 71    async def __send_connect_message(self):
 72        await self.__send(
 73            {
 74                "channel": "/meta/connect",
 75                "clientId": self._client_id,
 76                "connectionType": "websocket",
 77                "id": self._message_count,
 78            }
 79        )
 80
 81    async def __socket_subscribe(
 82        self, subscription_string, callback: Callable[[str, dict], Any]
 83    ):
 84        self._subscriptions[subscription_string] = {"callback": callback}
 85
 86        await self.__send(
 87            {
 88                "channel": "/meta/subscribe",
 89                "clientId": self._client_id,
 90                "subscription": subscription_string,
 91            }
 92        )
 93
 94    async def __send(self, message):
 95        wrapped_message = [{**message, "id": str(self._message_count)}]
 96
 97        logger.info(f"Outgoing message: {wrapped_message}")
 98
 99        await self._socket.send(json.dumps(wrapped_message))
100
101        self._message_count = self._message_count + 1
102
103    async def __handshake(self, message: dict):
104        if message.get("successful", False):
105            self._client_id = message.get("clientId")
106            await self.__send(
107                {
108                    "advice": {"timeout": 0},
109                    "channel": "/meta/connect",
110                    "clientId": self._client_id,
111                    "connectionType": "websocket",
112                }
113            )
114            return
115
116        advice = message.get("advice")
117        if advice and advice.get("reconnect") == "handshake":
118            await self.__send_handshake_message()
119
120    async def __connect(self, message: dict):
121        successful = message.get("successful", False)
122        advice = message.get("advice", {})
123        reconnect = advice.get("reconnect") == "retry"
124        interval = advice.get("interval")
125
126        connect_successful = successful and (
127            not advice or (reconnect and interval >= 0)
128        )
129
130        if connect_successful:
131            await self.__send(
132                {
133                    "channel": "/meta/connect",
134                    "clientId": self._client_id,
135                    "connectionType": "websocket",
136                }
137            )
138
139            if not self._connected:
140                self._connected = True
141                await self.__resubscribe_existing_subscriptions()
142
143        elif self._client_id:
144            await self.__send_connect_message()
145
146    async def __resubscribe_existing_subscriptions(self):
147        for key, value in self._subscriptions.items():
148            if value.get("client_id") != self._client_id:
149                await self.__socket_subscribe(key, value["callback"])
150
151    async def __disconnect(self, message):
152        await self.__send_handshake_message()
153
154    async def __register_subscription(self, message):
155        subscription = message.get("subscription")
156        if subscription is None:
157            raise ValueError("No subscription channel found on subscription message")
158
159        self._subscriptions[subscription]["client_id"] = self._client_id
160
161    async def __socket_message_handler(self):
162        message_action = {
163            "/meta/disconnect": self.__disconnect,
164            "/meta/handshake": self.__handshake,
165            "/meta/connect": self.__connect,
166            "/meta/subscribe": self.__register_subscription,
167        }
168
169        async for message in self._socket:
170            message = json.loads(message)[0]
171            message_channel = message.get("channel")
172            error = message.get("error")
173
174            logger.info(f"Incoming message: {message}")
175
176            if error:
177                logger.error(error)
178
179            action = message_action.get(message_channel)
180            # Use user subscribed action
181            if action is None:
182                callback = self._subscriptions[message_channel]["callback"]
183                if asyncio.iscoroutinefunction(callback):
184                    await callback(message)
185                else:
186                    callback(message)
187            else:
188                await action(message)
189
190    async def subscribe_to_id(
191        self, channel: ChannelType, id: str, callback: Callable[[str, dict], Any]
192    ):
193        return await self.subscribe_to_ids(channel, [id], callback)
194
195    async def subscribe_to_ids(
196        self,
197        channel: ChannelType,
198        ids: Sequence[str],
199        callback: Callable[[str, dict], Any],
200    ):
201        valid_channels_for_multiple_ids = [
202            ChannelType.ORDERS,
203            ChannelType.DEALS,
204            ChannelType.POSITIONS,
205        ]
206
207        if len(ids) > 1 and channel not in valid_channels_for_multiple_ids:
208            raise ValueError(
209                f"Multiple ids is not supported for channels other than {valid_channels_for_multiple_ids}"
210            )
211
212        subscription_string = f'/{channel.value}/{",".join(ids)}'
213        await self.__socket_subscribe(subscription_string, callback)
AvanzaSocket(push_subscription_id, cookies)
18    def __init__(self, push_subscription_id, cookies):
19        self._socket = None
20        self._client_id = None
21        self._message_count = 1
22        self._push_subscription_id = push_subscription_id
23        self._connected = False
24        self._subscriptions = {}
25        self._cookies = cookies
async def init(self):
27    async def init(self):
28        asyncio.ensure_future(self.__create_socket())
29        await self.__wait_for_websocket_to_be_connected()
async def subscribe_to_id( self, channel: avanza.constants.ChannelType, id: str, callback: Callable[[str, dict], Any]):
190    async def subscribe_to_id(
191        self, channel: ChannelType, id: str, callback: Callable[[str, dict], Any]
192    ):
193        return await self.subscribe_to_ids(channel, [id], callback)
async def subscribe_to_ids( self, channel: avanza.constants.ChannelType, ids: Sequence[str], callback: Callable[[str, dict], Any]):
195    async def subscribe_to_ids(
196        self,
197        channel: ChannelType,
198        ids: Sequence[str],
199        callback: Callable[[str, dict], Any],
200    ):
201        valid_channels_for_multiple_ids = [
202            ChannelType.ORDERS,
203            ChannelType.DEALS,
204            ChannelType.POSITIONS,
205        ]
206
207        if len(ids) > 1 and channel not in valid_channels_for_multiple_ids:
208            raise ValueError(
209                f"Multiple ids is not supported for channels other than {valid_channels_for_multiple_ids}"
210            )
211
212        subscription_string = f'/{channel.value}/{",".join(ids)}'
213        await self.__socket_subscribe(subscription_string, callback)