]> git.xonotic.org Git - xonotic/xonotic.git/blob - misc/infrastructure/python/slist/master.py
12bfd220857b648b1c008f397a727fe2d602941b
[xonotic/xonotic.git] / misc / infrastructure / python / slist / master.py
1 import ipaddress
2 import logging
3 from struct import Struct
4
5 import attr
6
7 from .utils import *
8
9 logger = logging.getLogger(__name__)
10
11 HEADER = b"\xFF\xFF\xFF\xFF"
12
13
14 @attr.s(auto_attribs=True, frozen=True, slots=True)
15 class CLGetServersExt(Writable):
16     game: str
17     protocol: int
18
19     def encode(self) -> bytes:
20         return HEADER + f"getserversExt {self.game} {self.protocol} empty full".encode(UTF_8)
21
22
23 @attr.s(auto_attribs=True, frozen=True, slots=True)
24 class SVGetServersExtResponse(Readable):
25     @attr.s(auto_attribs=True, frozen=True, slots=True)
26     class Server:
27         addr: str
28         port: int
29
30     servers: List[Server]
31
32     @classmethod
33     @generator
34     def decode(cls) -> Generator[Optional["SVGetServersExtResponse"], bytes, None]:
35         end = SVGetServersExtResponse.Server("", 0)
36         ipv4 = Struct(">4sH")
37         ipv6 = Struct(">16sH")
38
39         def servers() -> Iterator[SVGetServersExtResponse.Server]:
40             offset = 0
41             while True:
42                 h = buf[offset:offset + 1]
43                 offset += 1
44                 if h == b"":
45                     return
46                 elif h == b"\\":
47                     record = ipv4
48                 elif h == b"/":
49                     record = ipv6
50                 else:
51                     assert False, f"unknown record type: {h}"
52
53                 it = record.unpack_from(buf, offset)
54                 if record == ipv4:
55                     addr, port = it
56                     if addr == b"EOT\x00" and port == 0:
57                         yield end
58                         return
59                     addr = ipaddress.IPv4Address(addr)
60                     yield SVGetServersExtResponse.Server(addr=addr, port=port)
61                 elif record == ipv6:
62                     addr, port = it
63                     addr = ipaddress.IPv6Address(addr)
64                     yield SVGetServersExtResponse.Server(addr=addr, port=port)
65                 offset += record.size
66
67         chunks: List[List[SVGetServersExtResponse.Server]] = []
68         ret: Optional[SVGetServersExtResponse] = None
69         done = False
70         while True:
71             buf: bytes
72             buf = yield ret
73             if done:
74                 return
75             chunk = list(servers())
76             chunks.append(chunk)
77             if chunk[-1] == end:
78                 chunk.pop()
79                 ret = SVGetServersExtResponse(servers=[x for l in chunks for x in l])
80                 done = True
81
82
83 SVMessage = Union[SVGetServersExtResponse]
84
85
86 @generator
87 def sv_parse() -> Generator[Optional[SVMessage], bytes, None]:
88     getservers_ext_response = b"getserversExtResponse"
89     getservers_ext_gen: Optional[Generator[Optional[SVGetServersExtResponse], bytes, None]] = None
90     ret: Optional[SVMessage] = None
91     while True:
92         buf: bytes
93         buf = yield ret
94         ret = None
95         if buf.startswith(HEADER):
96             buf = buf[len(HEADER):]
97             if buf.startswith(getservers_ext_response):
98                 buf = buf[len(getservers_ext_response):]
99                 if not getservers_ext_gen:
100                     getservers_ext_gen = SVGetServersExtResponse.decode()
101                 assert getservers_ext_gen
102                 ret = getservers_ext_gen.send(buf)
103                 if ret:
104                     getservers_ext_gen = None
105                 continue
106
107
108 if __name__ == "__main__":
109     import socket
110
111     connection = Tuple[str, int]
112     connections: Dict[connection, Generator[Optional[SVMessage], bytes, None]] = {}
113
114     conn = (socket.gethostbyname("dpmaster.deathmask.net"), 27950)
115     connections[conn] = sv_parse()
116
117     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
118     q = CLGetServersExt(game="Xonotic", protocol=3)
119     sock.sendto(q.encode(), conn)
120     while True:
121         logger.debug("wait")
122         data, addr = sock.recvfrom(1400)
123         logger.debug(f"recv({addr}): {data}")
124         msg = connections[addr].send(data)
125         if msg:
126             logger.info(f"recv({addr}): {msg}")
127             if isinstance(msg, SVGetServersExtResponse):
128                 logger.info(f"servers: {len(msg.servers)}")