Skip to content

WebSocket server

WebSocket server.

Source code in pycrdt_websocket/websocket_server.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class WebsocketServer:
    """WebSocket server."""

    auto_clean_rooms: bool
    rooms: dict[str, YRoom]
    _started: Event | None
    _starting: bool
    _task_group: TaskGroup | None

    def __init__(
        self, rooms_ready: bool = True, auto_clean_rooms: bool = True, log: Logger | None = None
    ) -> None:
        """Initialize the object.

        The WebsocketServer instance should preferably be used as an async context manager:
        ```py
        async with websocket_server:
            ...
        ```
        However, a lower-level API can also be used:
        ```py
        task = asyncio.create_task(websocket_server.start())
        await websocket_server.started.wait()
        ...
        websocket_server.stop()
        ```

        Arguments:
            rooms_ready: Whether rooms are ready to be synchronized when opened.
            auto_clean_rooms: Whether rooms should be deleted when no client is there anymore.
            log: An optional logger.
        """
        self.rooms_ready = rooms_ready
        self.auto_clean_rooms = auto_clean_rooms
        self.log = log or getLogger(__name__)
        self.rooms = {}
        self._started = None
        self._starting = False
        self._task_group = None

    @property
    def started(self) -> Event:
        """An async event that is set when the WebSocket server has started."""
        if self._started is None:
            self._started = Event()
        return self._started

    async def get_room(self, name: str) -> YRoom:
        """Get or create a room with the given name, and start it.

        Arguments:
            name: The room name.

        Returns:
            The room with the given name, or a new one if no room with that name was found.
        """
        if name not in self.rooms.keys():
            self.rooms[name] = YRoom(ready=self.rooms_ready, log=self.log)
        room = self.rooms[name]
        await self.start_room(room)
        return room

    async def start_room(self, room: YRoom) -> None:
        """Start a room, if not already started.

        Arguments:
            room: The room to start.
        """
        if self._task_group is None:
            raise RuntimeError(
                "The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.start()`"
            )

        if not room.started.is_set():
            await self._task_group.start(room.start)

    def get_room_name(self, room: YRoom) -> str:
        """Get the name of a room.

        Arguments:
            room: The room to get the name from.

        Returns:
            The room name.
        """
        return list(self.rooms.keys())[list(self.rooms.values()).index(room)]

    def rename_room(
        self, to_name: str, *, from_name: str | None = None, from_room: YRoom | None = None
    ) -> None:
        """Rename a room.

        Arguments:
            to_name: The new name of the room.
            from_name: The previous name of the room (if `from_room` is not passed).
            from_room: The room to be renamed (if `from_name` is not passed).
        """
        if from_name is not None and from_room is not None:
            raise RuntimeError("Cannot pass from_name and from_room")
        if from_name is None:
            assert from_room is not None
            from_name = self.get_room_name(from_room)
        self.rooms[to_name] = self.rooms.pop(from_name)

    def delete_room(self, *, name: str | None = None, room: YRoom | None = None) -> None:
        """Delete a room.

        Arguments:
            name: The name of the room to delete (if `room` is not passed).
            room: The room to delete ( if `name` is not passed).
        """
        if name is not None and room is not None:
            raise RuntimeError("Cannot pass name and room")
        if name is None:
            assert room is not None
            name = self.get_room_name(room)
        room = self.rooms.pop(name)
        room.stop()

    async def serve(self, websocket: Websocket) -> None:
        """Serve a client through a WebSocket.

        Arguments:
            websocket: The WebSocket through which to serve the client.
        """
        if self._task_group is None:
            raise RuntimeError(
                "The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.start()`"
            )

        async with create_task_group() as tg:
            tg.start_soon(self._serve, websocket, tg)

    async def _serve(self, websocket: Websocket, tg: TaskGroup):
        room = await self.get_room(websocket.path)
        await self.start_room(room)
        await room.serve(websocket)

        if self.auto_clean_rooms and not room.clients:
            self.delete_room(room=room)
        tg.cancel_scope.cancel()

    async def __aenter__(self) -> WebsocketServer:
        if self._task_group is not None:
            raise RuntimeError("WebsocketServer already running")

        async with AsyncExitStack() as exit_stack:
            tg = create_task_group()
            self._task_group = await exit_stack.enter_async_context(tg)
            self._exit_stack = exit_stack.pop_all()
            self.started.set()

        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        if self._task_group is None:
            raise RuntimeError("WebsocketServer not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None
        return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

    async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
        """Start the WebSocket server.

        Arguments:
            task_status: The status to set when the task has started.
        """
        if self._starting:
            return
        else:
            self._starting = True

        if self._task_group is not None:
            raise RuntimeError("WebsocketServer already running")

        # create the task group and wait forever
        async with create_task_group() as self._task_group:
            self._task_group.start_soon(Event().wait)
            self.started.set()
            self._starting = False
            task_status.started()

    def stop(self) -> None:
        """Stop the WebSocket server."""
        if self._task_group is None:
            raise RuntimeError("WebsocketServer not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None

started: Event property

An async event that is set when the WebSocket server has started.

__init__(rooms_ready=True, auto_clean_rooms=True, log=None)

Initialize the object.

The WebsocketServer instance should preferably be used as an async context manager:

async with websocket_server:
    ...
However, a lower-level API can also be used:
task = asyncio.create_task(websocket_server.start())
await websocket_server.started.wait()
...
websocket_server.stop()

Parameters:

Name Type Description Default
rooms_ready bool

Whether rooms are ready to be synchronized when opened.

True
auto_clean_rooms bool

Whether rooms should be deleted when no client is there anymore.

True
log Logger | None

An optional logger.

None
Source code in pycrdt_websocket/websocket_server.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self, rooms_ready: bool = True, auto_clean_rooms: bool = True, log: Logger | None = None
) -> None:
    """Initialize the object.

    The WebsocketServer instance should preferably be used as an async context manager:
    ```py
    async with websocket_server:
        ...
    ```
    However, a lower-level API can also be used:
    ```py
    task = asyncio.create_task(websocket_server.start())
    await websocket_server.started.wait()
    ...
    websocket_server.stop()
    ```

    Arguments:
        rooms_ready: Whether rooms are ready to be synchronized when opened.
        auto_clean_rooms: Whether rooms should be deleted when no client is there anymore.
        log: An optional logger.
    """
    self.rooms_ready = rooms_ready
    self.auto_clean_rooms = auto_clean_rooms
    self.log = log or getLogger(__name__)
    self.rooms = {}
    self._started = None
    self._starting = False
    self._task_group = None

delete_room(*, name=None, room=None)

Delete a room.

Parameters:

Name Type Description Default
name str | None

The name of the room to delete (if room is not passed).

None
room YRoom | None

The room to delete ( if name is not passed).

None
Source code in pycrdt_websocket/websocket_server.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def delete_room(self, *, name: str | None = None, room: YRoom | None = None) -> None:
    """Delete a room.

    Arguments:
        name: The name of the room to delete (if `room` is not passed).
        room: The room to delete ( if `name` is not passed).
    """
    if name is not None and room is not None:
        raise RuntimeError("Cannot pass name and room")
    if name is None:
        assert room is not None
        name = self.get_room_name(room)
    room = self.rooms.pop(name)
    room.stop()

get_room(name) async

Get or create a room with the given name, and start it.

Parameters:

Name Type Description Default
name str

The room name.

required

Returns:

Type Description
YRoom

The room with the given name, or a new one if no room with that name was found.

Source code in pycrdt_websocket/websocket_server.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def get_room(self, name: str) -> YRoom:
    """Get or create a room with the given name, and start it.

    Arguments:
        name: The room name.

    Returns:
        The room with the given name, or a new one if no room with that name was found.
    """
    if name not in self.rooms.keys():
        self.rooms[name] = YRoom(ready=self.rooms_ready, log=self.log)
    room = self.rooms[name]
    await self.start_room(room)
    return room

get_room_name(room)

Get the name of a room.

Parameters:

Name Type Description Default
room YRoom

The room to get the name from.

required

Returns:

Type Description
str

The room name.

Source code in pycrdt_websocket/websocket_server.py
89
90
91
92
93
94
95
96
97
98
def get_room_name(self, room: YRoom) -> str:
    """Get the name of a room.

    Arguments:
        room: The room to get the name from.

    Returns:
        The room name.
    """
    return list(self.rooms.keys())[list(self.rooms.values()).index(room)]

rename_room(to_name, *, from_name=None, from_room=None)

Rename a room.

Parameters:

Name Type Description Default
to_name str

The new name of the room.

required
from_name str | None

The previous name of the room (if from_room is not passed).

None
from_room YRoom | None

The room to be renamed (if from_name is not passed).

None
Source code in pycrdt_websocket/websocket_server.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def rename_room(
    self, to_name: str, *, from_name: str | None = None, from_room: YRoom | None = None
) -> None:
    """Rename a room.

    Arguments:
        to_name: The new name of the room.
        from_name: The previous name of the room (if `from_room` is not passed).
        from_room: The room to be renamed (if `from_name` is not passed).
    """
    if from_name is not None and from_room is not None:
        raise RuntimeError("Cannot pass from_name and from_room")
    if from_name is None:
        assert from_room is not None
        from_name = self.get_room_name(from_room)
    self.rooms[to_name] = self.rooms.pop(from_name)

serve(websocket) async

Serve a client through a WebSocket.

Parameters:

Name Type Description Default
websocket Websocket

The WebSocket through which to serve the client.

required
Source code in pycrdt_websocket/websocket_server.py
132
133
134
135
136
137
138
139
140
141
142
143
144
async def serve(self, websocket: Websocket) -> None:
    """Serve a client through a WebSocket.

    Arguments:
        websocket: The WebSocket through which to serve the client.
    """
    if self._task_group is None:
        raise RuntimeError(
            "The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.start()`"
        )

    async with create_task_group() as tg:
        tg.start_soon(self._serve, websocket, tg)

start(*, task_status=TASK_STATUS_IGNORED) async

Start the WebSocket server.

Parameters:

Name Type Description Default
task_status TaskStatus[None]

The status to set when the task has started.

TASK_STATUS_IGNORED
Source code in pycrdt_websocket/websocket_server.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
    """Start the WebSocket server.

    Arguments:
        task_status: The status to set when the task has started.
    """
    if self._starting:
        return
    else:
        self._starting = True

    if self._task_group is not None:
        raise RuntimeError("WebsocketServer already running")

    # create the task group and wait forever
    async with create_task_group() as self._task_group:
        self._task_group.start_soon(Event().wait)
        self.started.set()
        self._starting = False
        task_status.started()

start_room(room) async

Start a room, if not already started.

Parameters:

Name Type Description Default
room YRoom

The room to start.

required
Source code in pycrdt_websocket/websocket_server.py
75
76
77
78
79
80
81
82
83
84
85
86
87
async def start_room(self, room: YRoom) -> None:
    """Start a room, if not already started.

    Arguments:
        room: The room to start.
    """
    if self._task_group is None:
        raise RuntimeError(
            "The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.start()`"
        )

    if not room.started.is_set():
        await self._task_group.start(room.start)

stop()

Stop the WebSocket server.

Source code in pycrdt_websocket/websocket_server.py
196
197
198
199
200
201
202
def stop(self) -> None:
    """Stop the WebSocket server."""
    if self._task_group is None:
        raise RuntimeError("WebsocketServer not running")

    self._task_group.cancel_scope.cancel()
    self._task_group = None