]> git.xonotic.org Git - xonotic/xonotic.git/blob - misc/infrastructure/python/slist/master.py
slist: add game server query support
[xonotic/xonotic.git] / misc / infrastructure / python / slist / master.py
1 import ipaddress
2 from struct import Struct
3
4 import attr
5
6 from .utils import *
7
8 HEADER = b"\xFF\xFF\xFF\xFF"
9
10
11 @attr.s(auto_attribs=True, frozen=True, slots=True)
12 class CLGetServersExt(Writable):
13     game: str
14     protocol: int
15
16     def encode(self) -> bytes:
17         return HEADER + f"getserversExt {self.game} {self.protocol} empty full".encode(UTF_8)
18
19
20 @attr.s(auto_attribs=True, frozen=True, slots=True)
21 class SVGetServersExtResponse(Readable):
22     @attr.s(auto_attribs=True, frozen=True, slots=True)
23     class Server:
24         addr: str
25         port: int
26
27     servers: List[Server]
28
29     @classmethod
30     @generator
31     def decode(cls) -> Generator[Optional["SVGetServersExtResponse"], bytes, None]:
32         end = SVGetServersExtResponse.Server("", 0)
33         ipv4 = Struct(">4sH")
34         ipv6 = Struct(">16sH")
35
36         def servers() -> Iterator[SVGetServersExtResponse.Server]:
37             offset = 0
38             while True:
39                 h = buf[offset:offset + 1]
40                 offset += 1
41                 if h == b"":
42                     return
43                 elif h == b"\\":
44                     record = ipv4
45                 elif h == b"/":
46                     record = ipv6
47                 else:
48                     assert False, f"unknown record type: {h}"
49
50                 it = record.unpack_from(buf, offset)
51                 if record == ipv4:
52                     addr, port = it
53                     if addr == b"EOT\x00" and port == 0:
54                         yield end
55                         return
56                     addr = ipaddress.IPv4Address(addr)
57                     yield SVGetServersExtResponse.Server(addr=addr, port=port)
58                 elif record == ipv6:
59                     addr, port = it
60                     addr = ipaddress.IPv6Address(addr)
61                     yield SVGetServersExtResponse.Server(addr=addr, port=port)
62                 offset += record.size
63
64         chunks: List[List[SVGetServersExtResponse.Server]] = []
65         ret: Optional[SVGetServersExtResponse] = None
66         done = False
67         while True:
68             buf: bytes
69             buf = yield ret
70             if done:
71                 return
72             chunk = list(servers())
73             chunks.append(chunk)
74             if chunk[-1] == end:
75                 chunk.pop()
76                 ret = SVGetServersExtResponse(servers=[x for l in chunks for x in l])
77                 done = True
78
79
80 SVMessage = Union[SVGetServersExtResponse]
81
82
83 @generator
84 def sv_parse() -> Generator[Optional[SVMessage], bytes, None]:
85     getservers_ext_response = b"getserversExtResponse"
86     getservers_ext_gen: Optional[Generator[Optional[SVGetServersExtResponse], bytes, None]] = None
87     ret: Optional[SVMessage] = None
88     while True:
89         buf: bytes
90         buf = yield ret
91         ret = None
92         if buf.startswith(HEADER):
93             buf = buf[len(HEADER):]
94             if buf.startswith(getservers_ext_response):
95                 buf = buf[len(getservers_ext_response):]
96                 if not getservers_ext_gen:
97                     getservers_ext_gen = SVGetServersExtResponse.decode()
98                 assert getservers_ext_gen
99                 ret = getservers_ext_gen.send(buf)
100                 if ret:
101                     getservers_ext_gen = None
102                 continue