hat.drivers.cdt
Chrome DevTools Protocol
1"""Chrome DevTools Protocol""" 2 3from hat.drivers.cdt.browser import Browser 4from hat.drivers.cdt.connection import (SessionId, 5 EventName, 6 EventCb, 7 connect, 8 Connection) 9from hat.drivers.cdt.page import Page 10from hat.drivers.cdt.runtime import Runtime 11from hat.drivers.cdt.target import (TargetId, 12 TargetInfo, 13 getTargetInfos, 14 createTarget, 15 Session, 16 Target) 17 18 19__all__ = ['Browser', 20 'SessionId', 21 'EventName', 22 'EventCb', 23 'connect', 24 'Connection', 25 'Page', 26 'Runtime', 27 'TargetId', 28 'TargetInfo', 29 'getTargetInfos', 30 'createTarget', 31 'Session', 32 'Target']
class
Browser:
SessionId =
<class 'int'>
EventName =
<class 'str'>
EventCb =
typing.Callable[[str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], NoneType]
22async def connect(host: str, 23 port: int 24 ) -> 'Connection': 25 session = aiohttp.ClientSession() 26 27 try: 28 res = await session.get(f'http://{host}:{port}/json/version') 29 res = await res.json() 30 addr = res['webSocketDebuggerUrl'] 31 ws = await session.ws_connect(addr, max_msg_size=0) 32 33 except BaseException: 34 await aio.uncancellable(session.close()) 35 raise 36 37 conn = Connection() 38 conn._ws = ws 39 conn._ws_session = session 40 conn._async_group = aio.Group() 41 conn._event_cbs = util.CallbackRegistry() 42 conn._next_ids = itertools.count(1) 43 conn._result_futures = {} 44 45 conn._async_group.spawn(aio.call_on_cancel, conn._on_close) 46 conn._async_group.spawn(conn._receive_loop) 47 48 return conn
class
Connection(hat.aio.group.Resource):
51class Connection(aio.Resource): 52 53 @property 54 def async_group(self) -> aio.Group: 55 return self._async_group 56 57 def register_event_cb(self, 58 cb: EventCb 59 ) -> util.RegisterCallbackHandle: 60 return self._event_cbs.register(cb) 61 62 async def call(self, 63 method: str, 64 params: json.Data = {}, 65 session_id: SessionId | None = None 66 ) -> json.Data: 67 if not self.is_open: 68 raise ConnectionError() 69 70 msg = {'id': next(self._next_ids), 71 'method': method, 72 'params': params} 73 if session_id is not None: 74 msg['sessionId'] = session_id 75 76 future = asyncio.Future() 77 self._result_futures[msg['id']] = future 78 79 try: 80 await self._ws.send_json(msg) 81 return await future 82 83 finally: 84 del self._result_futures[msg['id']] 85 86 async def _on_close(self): 87 for future in self._result_futures.values(): 88 if not future.done(): 89 future.set_exception(ConnectionError()) 90 91 await self._ws.close() 92 await self._ws_session.close() 93 94 async def _receive_loop(self): 95 try: 96 while True: 97 msg_ws = await self._ws.receive() 98 if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING: 99 break 100 if msg_ws.type != aiohttp.WSMsgType.TEXT: 101 raise Exception('unsupported message type') 102 103 msg = json.decode(msg_ws.data) 104 105 if 'id' in msg: 106 future = self._result_futures.get(msg['id']) 107 if future and not future.done(): 108 if 'result' in msg: 109 future.set_result(msg['result']) 110 111 else: 112 future.set_exception( 113 Exception(msg['error']['message'] 114 if 'error' in msg 115 else 'unknown response')) 116 117 else: 118 self._event_cbs.notify(msg['method'], msg['params']) 119 120 except Exception as e: 121 mlog.error("receive loop error: %s", e, exc_info=e) 122 123 finally: 124 self.close()
Resource with lifetime control based on Group
.
def
register_event_cb( self, cb: Callable[[str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], NoneType]) -> hat.util.callback.RegisterCallbackHandle:
async def
call( self, method: str, params: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')] = {}, session_id: int | None = None) -> None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]:
62 async def call(self, 63 method: str, 64 params: json.Data = {}, 65 session_id: SessionId | None = None 66 ) -> json.Data: 67 if not self.is_open: 68 raise ConnectionError() 69 70 msg = {'id': next(self._next_ids), 71 'method': method, 72 'params': params} 73 if session_id is not None: 74 msg['sessionId'] = session_id 75 76 future = asyncio.Future() 77 self._result_futures[msg['id']] = future 78 79 try: 80 await self._ws.send_json(msg) 81 return await future 82 83 finally: 84 del self._result_futures[msg['id']]
class
Page:
5class Page: 6 7 def __init__(self, session: Session): 8 self._session = session 9 10 @property 11 def session(self): 12 return self._session 13 14 async def enable(self): 15 await self._session.call('Page.enable') 16 17 async def disable(self): 18 await self._session.call('Page.disable') 19 20 async def navigate(self, url: str): 21 await self._session.call('Page.navigate', {'url': url}) 22 23 async def reload(self, ignore_cache: bool = False): 24 await self._session.call('Page.reload', {'ignoreCache': ignore_cache})
class
Runtime:
19class Runtime: 20 21 def __init__(self, session: Session): 22 self._session = session 23 24 @property 25 def session(self): 26 return self._session12 27 28 async def enable(self): 29 await self._session.call('Runtime.enable') 30 31 async def disable(self): 32 await self._session.call('Runtime.disable') 33 34 async def evaluate(self, 35 expression: str, 36 await_promise: bool = False 37 ) -> RemoteObject: 38 res = await self._session.call('Runtime.evaluate', 39 {'expression': expression, 40 'awaitPromise': await_promise}) 41 if 'exceptionDetails' in res: 42 raise Exception(res['exceptionDetails']['text']) 43 return RemoteObject(type=res['result']['type'], 44 subtype=res['result'].get('subtype'), 45 value=res['result'].get('value'), 46 id=res['result'].get('objectId'))
async def
evaluate( self, expression: str, await_promise: bool = False) -> hat.drivers.cdt.runtime.RemoteObject:
34 async def evaluate(self, 35 expression: str, 36 await_promise: bool = False 37 ) -> RemoteObject: 38 res = await self._session.call('Runtime.evaluate', 39 {'expression': expression, 40 'awaitPromise': await_promise}) 41 if 'exceptionDetails' in res: 42 raise Exception(res['exceptionDetails']['text']) 43 return RemoteObject(type=res['result']['type'], 44 subtype=res['result'].get('subtype'), 45 value=res['result'].get('value'), 46 id=res['result'].get('objectId'))
TargetId =
<class 'str'>
class
TargetInfo(typing.NamedTuple):
12class TargetInfo(typing.NamedTuple): 13 target_id: TargetId 14 type: str 15 title: str 16 url: str 17 attached: bool
TargetInfo(target_id, type, title, url, attached)
async def
createTarget( conn: Connection, url: str = '', width: int | None = None, height: int | None = None, new_window: bool = False, background: bool = False) -> Target:
30async def createTarget(conn: Connection, 31 url: str = '', 32 width: int | None = None, 33 height: int | None = None, 34 new_window: bool = False, 35 background: bool = False 36 ) -> 'Target': 37 req = {'url': url, 38 'new_window': new_window, 39 'background': background} 40 if width is not None: 41 req['width'] = width 42 if height is not None: 43 req['height'] = height 44 res = await conn.call('Target.createTarget', req) 45 return Target(conn, res['targetId'])
class
Session:
48class Session: 49 50 def __init__(self, 51 conn: Connection, 52 session_id: SessionId): 53 self._conn = conn 54 self._session_id = session_id 55 56 @property 57 def conn(self) -> Connection: 58 return self._conn 59 60 @property 61 def session_id(self) -> SessionId: 62 return self._session_id 63 64 async def call(self, 65 method: str, 66 params: json.Data = {} 67 ) -> json.Data: 68 return await self._conn.call(method, params, self._session_id) 69 70 async def detach(self): 71 await self._conn.call('Target.detachFromTarget', 72 {'sessionId': self._session_id})
Session(conn: Connection, session_id: int)
class
Target:
75class Target: 76 77 def __init__(self, 78 conn: Connection, 79 target_id: TargetId): 80 self._conn = conn 81 self._target_id = target_id 82 83 @property 84 def conn(self) -> Connection: 85 return self._conn 86 87 @property 88 def target_id(self): 89 return self._target_id 90 91 async def activate(self): 92 await self._conn.call('Target.activateTarget', 93 {'targetId': self._target_id}) 94 95 async def attach(self) -> Session: 96 res = await self._conn.call('Target.attachToTarget', 97 {'targetId': self._target_id, 98 'flatten': True}) 99 return Session(self._conn, res['sessionId']) 100 101 async def close(self): 102 await self._conn.call('Target.closeTarget', 103 {'targetId': self._target_id})
Target(conn: Connection, target_id: str)