1 Класс Wrapper с aiohttp

aiohttp.ClientSession может быть использован в качестве родителя для пользовательского класса WebSocket.

импорт asyncio из aiohttp import класс ClientSession EchoWebSocket (ClientSession): URL = "wss: //echo.websocket.org" def __init __ (self): super () .__ init __ () self.websocket = нет async def connect (self): "" "Подключиться к WebSocket." "" Self.websocket = await self.ws_connect (self.URL) async def send (self, message): "" "Отправить сообщение в WebSocket." "" Assert self.websocket Нет, "Вы должны подключиться первым!" self.websocket.send_str (message) print ("Отправлено:", сообщение) async def receive (self): "" "Получить одно сообщение из WebSocket." "" Утвердить, что self.websocket не None, "Сначала необходимо подключиться !» return (await self.websocket.receive ()). data async def read (self): "" "Чтение сообщений из WebSocket." "" Утверждение, что self.websocket не является None, "Сначала вы должны подключиться!" в то время как self.websocket.receive (): message = await self.receive () print ("Received:", message), если message == "Echo 9!": прервать async def send (websocket): для n в диапазоне (10 ): await websocket.send ("Echo {}!". format (n)) await asyncio.sleep (1) loop = asyncio.get_event_loop () с EchoWebSocket () в качестве websocket: loop.run_until_complete (websocket.connect ()) tasks = (send (websocket), websocket.read ()) loop.run_until_complete (asyncio.wait (tasks)) loop.close ()