hat.drivers.cdt

Chrome DevTools Protocol

 1"""Chrome DevTools Protocol"""
 2
 3from hat.drivers.cdt.browser import Browser
 4from hat.drivers.cdt.connection import (SessionId,
 5                                        EventName,
 6                                        EventCb,
 7                                        connect,
 8                                        Connection)
 9from hat.drivers.cdt.page import Page
10from hat.drivers.cdt.runtime import Runtime
11from hat.drivers.cdt.target import (TargetId,
12                                    TargetInfo,
13                                    getTargetInfos,
14                                    createTarget,
15                                    Session,
16                                    Target)
17
18
19__all__ = ['Browser',
20           'SessionId',
21           'EventName',
22           'EventCb',
23           'connect',
24           'Connection',
25           'Page',
26           'Runtime',
27           'TargetId',
28           'TargetInfo',
29           'getTargetInfos',
30           'createTarget',
31           'Session',
32           'Target']
class Browser:
 5class Browser:
 6
 7    def __init__(self, conn: Connection):
 8        self._conn = conn
 9
10    @property
11    def conn(self) -> Connection:
12        return self._conn
13
14    async def close(self):
15        await self._conn.call('Browser.close')
Browser(conn: Connection)
7    def __init__(self, conn: Connection):
8        self._conn = conn
conn: Connection
10    @property
11    def conn(self) -> Connection:
12        return self._conn
async def close(self):
14    async def close(self):
15        await self._conn.call('Browser.close')
SessionId = <class 'int'>
EventName = <class 'str'>
EventCb = typing.Callable[[str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], NoneType]
async def connect(host: str, port: int) -> Connection:
22async def connect(host: str,
23                  port: int
24                  ) -> 'Connection':
25    session = aiohttp.ClientSession()
26
27    try:
28        res = await session.get(f'http://{host}:{port}/json/version')
29        res = await res.json()
30        addr = res['webSocketDebuggerUrl']
31        ws = await session.ws_connect(addr, max_msg_size=0)
32
33    except BaseException:
34        await aio.uncancellable(session.close())
35        raise
36
37    conn = Connection()
38    conn._ws = ws
39    conn._ws_session = session
40    conn._async_group = aio.Group()
41    conn._event_cbs = util.CallbackRegistry()
42    conn._next_ids = itertools.count(1)
43    conn._result_futures = {}
44
45    conn._async_group.spawn(aio.call_on_cancel, conn._on_close)
46    conn._async_group.spawn(conn._receive_loop)
47
48    return conn
class Connection(hat.aio.group.Resource):
 51class Connection(aio.Resource):
 52
 53    @property
 54    def async_group(self) -> aio.Group:
 55        return self._async_group
 56
 57    def register_event_cb(self,
 58                          cb: EventCb
 59                          ) -> util.RegisterCallbackHandle:
 60        return self._event_cbs.register(cb)
 61
 62    async def call(self,
 63                   method: str,
 64                   params: json.Data = {},
 65                   session_id: SessionId | None = None
 66                   ) -> json.Data:
 67        if not self.is_open:
 68            raise ConnectionError()
 69
 70        msg = {'id': next(self._next_ids),
 71               'method': method,
 72               'params': params}
 73        if session_id is not None:
 74            msg['sessionId'] = session_id
 75
 76        future = asyncio.Future()
 77        self._result_futures[msg['id']] = future
 78
 79        try:
 80            await self._ws.send_json(msg)
 81            return await future
 82
 83        finally:
 84            del self._result_futures[msg['id']]
 85
 86    async def _on_close(self):
 87        for future in self._result_futures.values():
 88            if not future.done():
 89                future.set_exception(ConnectionError())
 90
 91        await self._ws.close()
 92        await self._ws_session.close()
 93
 94    async def _receive_loop(self):
 95        try:
 96            while True:
 97                msg_ws = await self._ws.receive()
 98                if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING:
 99                    break
100                if msg_ws.type != aiohttp.WSMsgType.TEXT:
101                    raise Exception('unsupported message type')
102
103                msg = json.decode(msg_ws.data)
104
105                if 'id' in msg:
106                    future = self._result_futures.get(msg['id'])
107                    if future and not future.done():
108                        if 'result' in msg:
109                            future.set_result(msg['result'])
110
111                        else:
112                            future.set_exception(
113                                Exception(msg['error']['message']
114                                          if 'error' in msg
115                                          else 'unknown response'))
116
117                else:
118                    self._event_cbs.notify(msg['method'], msg['params'])
119
120        except Exception as e:
121            mlog.error("receive loop error: %s", e, exc_info=e)
122
123        finally:
124            self.close()

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
53    @property
54    def async_group(self) -> aio.Group:
55        return self._async_group

Group controlling resource's lifetime.

def register_event_cb( self, cb: Callable[[str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], NoneType]) -> hat.util.callback.RegisterCallbackHandle:
57    def register_event_cb(self,
58                          cb: EventCb
59                          ) -> util.RegisterCallbackHandle:
60        return self._event_cbs.register(cb)
async def call( self, method: str, params: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')] = {}, session_id: int | None = None) -> None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]:
62    async def call(self,
63                   method: str,
64                   params: json.Data = {},
65                   session_id: SessionId | None = None
66                   ) -> json.Data:
67        if not self.is_open:
68            raise ConnectionError()
69
70        msg = {'id': next(self._next_ids),
71               'method': method,
72               'params': params}
73        if session_id is not None:
74            msg['sessionId'] = session_id
75
76        future = asyncio.Future()
77        self._result_futures[msg['id']] = future
78
79        try:
80            await self._ws.send_json(msg)
81            return await future
82
83        finally:
84            del self._result_futures[msg['id']]
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class Page:
 5class Page:
 6
 7    def __init__(self, session: Session):
 8        self._session = session
 9
10    @property
11    def session(self):
12        return self._session
13
14    async def enable(self):
15        await self._session.call('Page.enable')
16
17    async def disable(self):
18        await self._session.call('Page.disable')
19
20    async def navigate(self, url: str):
21        await self._session.call('Page.navigate', {'url': url})
22
23    async def reload(self, ignore_cache: bool = False):
24        await self._session.call('Page.reload', {'ignoreCache': ignore_cache})
Page(session: Session)
7    def __init__(self, session: Session):
8        self._session = session
session
10    @property
11    def session(self):
12        return self._session
async def enable(self):
14    async def enable(self):
15        await self._session.call('Page.enable')
async def disable(self):
17    async def disable(self):
18        await self._session.call('Page.disable')
async def navigate(self, url: str):
20    async def navigate(self, url: str):
21        await self._session.call('Page.navigate', {'url': url})
async def reload(self, ignore_cache: bool = False):
23    async def reload(self, ignore_cache: bool = False):
24        await self._session.call('Page.reload', {'ignoreCache': ignore_cache})
class Runtime:
19class Runtime:
20
21    def __init__(self, session: Session):
22        self._session = session
23
24    @property
25    def session(self):
26        return self._session12
27
28    async def enable(self):
29        await self._session.call('Runtime.enable')
30
31    async def disable(self):
32        await self._session.call('Runtime.disable')
33
34    async def evaluate(self,
35                       expression: str,
36                       await_promise: bool = False
37                       ) -> RemoteObject:
38        res = await self._session.call('Runtime.evaluate',
39                                       {'expression': expression,
40                                        'awaitPromise': await_promise})
41        if 'exceptionDetails' in res:
42            raise Exception(res['exceptionDetails']['text'])
43        return RemoteObject(type=res['result']['type'],
44                            subtype=res['result'].get('subtype'),
45                            value=res['result'].get('value'),
46                            id=res['result'].get('objectId'))
Runtime(session: Session)
21    def __init__(self, session: Session):
22        self._session = session
session
24    @property
25    def session(self):
26        return self._session12
async def enable(self):
28    async def enable(self):
29        await self._session.call('Runtime.enable')
async def disable(self):
31    async def disable(self):
32        await self._session.call('Runtime.disable')
async def evaluate( self, expression: str, await_promise: bool = False) -> hat.drivers.cdt.runtime.RemoteObject:
34    async def evaluate(self,
35                       expression: str,
36                       await_promise: bool = False
37                       ) -> RemoteObject:
38        res = await self._session.call('Runtime.evaluate',
39                                       {'expression': expression,
40                                        'awaitPromise': await_promise})
41        if 'exceptionDetails' in res:
42            raise Exception(res['exceptionDetails']['text'])
43        return RemoteObject(type=res['result']['type'],
44                            subtype=res['result'].get('subtype'),
45                            value=res['result'].get('value'),
46                            id=res['result'].get('objectId'))
TargetId = <class 'str'>
class TargetInfo(typing.NamedTuple):
12class TargetInfo(typing.NamedTuple):
13    target_id: TargetId
14    type: str
15    title: str
16    url: str
17    attached: bool

TargetInfo(target_id, type, title, url, attached)

TargetInfo(target_id: str, type: str, title: str, url: str, attached: bool)

Create new instance of TargetInfo(target_id, type, title, url, attached)

target_id: str

Alias for field number 0

type: str

Alias for field number 1

title: str

Alias for field number 2

url: str

Alias for field number 3

attached: bool

Alias for field number 4

Inherited Members
builtins.tuple
index
count
async def getTargetInfos( conn: Connection) -> list[TargetInfo]:
20async def getTargetInfos(conn: Connection) -> list[TargetInfo]:
21    res = await conn.call('Target.getTargets')
22    return [TargetInfo(target_id=i['targetId'],
23                       type=i['type'],
24                       title=i['title'],
25                       url=i['url'],
26                       attached=i['attached'])
27            for i in res['targetInfos']]
async def createTarget( conn: Connection, url: str = '', width: int | None = None, height: int | None = None, new_window: bool = False, background: bool = False) -> Target:
30async def createTarget(conn: Connection,
31                       url: str = '',
32                       width: int | None = None,
33                       height: int | None = None,
34                       new_window: bool = False,
35                       background: bool = False
36                       ) -> 'Target':
37    req = {'url': url,
38           'new_window': new_window,
39           'background': background}
40    if width is not None:
41        req['width'] = width
42    if height is not None:
43        req['height'] = height
44    res = await conn.call('Target.createTarget', req)
45    return Target(conn, res['targetId'])
class Session:
48class Session:
49
50    def __init__(self,
51                 conn: Connection,
52                 session_id: SessionId):
53        self._conn = conn
54        self._session_id = session_id
55
56    @property
57    def conn(self) -> Connection:
58        return self._conn
59
60    @property
61    def session_id(self) -> SessionId:
62        return self._session_id
63
64    async def call(self,
65                   method: str,
66                   params: json.Data = {}
67                   ) -> json.Data:
68        return await self._conn.call(method, params, self._session_id)
69
70    async def detach(self):
71        await self._conn.call('Target.detachFromTarget',
72                              {'sessionId': self._session_id})
Session(conn: Connection, session_id: int)
50    def __init__(self,
51                 conn: Connection,
52                 session_id: SessionId):
53        self._conn = conn
54        self._session_id = session_id
conn: Connection
56    @property
57    def conn(self) -> Connection:
58        return self._conn
session_id: int
60    @property
61    def session_id(self) -> SessionId:
62        return self._session_id
async def call( self, method: str, params: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')] = {}) -> None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]:
64    async def call(self,
65                   method: str,
66                   params: json.Data = {}
67                   ) -> json.Data:
68        return await self._conn.call(method, params, self._session_id)
async def detach(self):
70    async def detach(self):
71        await self._conn.call('Target.detachFromTarget',
72                              {'sessionId': self._session_id})
class Target:
 75class Target:
 76
 77    def __init__(self,
 78                 conn: Connection,
 79                 target_id: TargetId):
 80        self._conn = conn
 81        self._target_id = target_id
 82
 83    @property
 84    def conn(self) -> Connection:
 85        return self._conn
 86
 87    @property
 88    def target_id(self):
 89        return self._target_id
 90
 91    async def activate(self):
 92        await self._conn.call('Target.activateTarget',
 93                              {'targetId': self._target_id})
 94
 95    async def attach(self) -> Session:
 96        res = await self._conn.call('Target.attachToTarget',
 97                                    {'targetId': self._target_id,
 98                                     'flatten': True})
 99        return Session(self._conn, res['sessionId'])
100
101    async def close(self):
102        await self._conn.call('Target.closeTarget',
103                              {'targetId': self._target_id})
Target(conn: Connection, target_id: str)
77    def __init__(self,
78                 conn: Connection,
79                 target_id: TargetId):
80        self._conn = conn
81        self._target_id = target_id
conn: Connection
83    @property
84    def conn(self) -> Connection:
85        return self._conn
target_id
87    @property
88    def target_id(self):
89        return self._target_id
async def activate(self):
91    async def activate(self):
92        await self._conn.call('Target.activateTarget',
93                              {'targetId': self._target_id})
async def attach(self) -> Session:
95    async def attach(self) -> Session:
96        res = await self._conn.call('Target.attachToTarget',
97                                    {'targetId': self._target_id,
98                                     'flatten': True})
99        return Session(self._conn, res['sessionId'])
async def close(self):
101    async def close(self):
102        await self._conn.call('Target.closeTarget',
103                              {'targetId': self._target_id})