3 from struct import Struct
9 logger = logging.getLogger(__name__)
11 HEADER = b"\xFF\xFF\xFF\xFF"
14 @attr.s(auto_attribs=True, frozen=True, slots=True)
15 class CLGetServersExt(Writable):
19 def encode(self) -> bytes:
20 return HEADER + f"getserversExt {self.game} {self.protocol} empty full".encode(UTF_8)
23 @attr.s(auto_attribs=True, frozen=True, slots=True)
24 class SVGetServersExtResponse(Readable):
25 @attr.s(auto_attribs=True, frozen=True, slots=True)
34 def decode(cls) -> Generator[Optional["SVGetServersExtResponse"], bytes, None]:
35 end = SVGetServersExtResponse.Server("", 0)
37 ipv6 = Struct(">16sH")
39 def servers() -> Iterator[SVGetServersExtResponse.Server]:
42 h = buf[offset:offset + 1]
51 assert False, f"unknown record type: {h}"
53 it = record.unpack_from(buf, offset)
56 if addr == b"EOT\x00" and port == 0:
59 addr = ipaddress.IPv4Address(addr)
60 yield SVGetServersExtResponse.Server(addr=addr, port=port)
63 addr = ipaddress.IPv6Address(addr)
64 yield SVGetServersExtResponse.Server(addr=addr, port=port)
67 chunks: List[List[SVGetServersExtResponse.Server]] = []
68 ret: Optional[SVGetServersExtResponse] = None
75 chunk = list(servers())
79 ret = SVGetServersExtResponse(servers=[x for l in chunks for x in l])
83 SVMessage = Union[SVGetServersExtResponse]
87 def sv_parse() -> Generator[Optional[SVMessage], bytes, None]:
88 getservers_ext_response = b"getserversExtResponse"
89 getservers_ext_gen: Optional[Generator[Optional[SVGetServersExtResponse], bytes, None]] = None
90 ret: Optional[SVMessage] = None
95 if buf.startswith(HEADER):
96 buf = buf[len(HEADER):]
97 if buf.startswith(getservers_ext_response):
98 buf = buf[len(getservers_ext_response):]
99 if not getservers_ext_gen:
100 getservers_ext_gen = SVGetServersExtResponse.decode()
101 assert getservers_ext_gen
102 ret = getservers_ext_gen.send(buf)
104 getservers_ext_gen = None
108 if __name__ == "__main__":
111 connection = Tuple[str, int]
112 connections: Dict[connection, Generator[Optional[SVMessage], bytes, None]] = {}
114 conn = (socket.gethostbyname("dpmaster.deathmask.net"), 27950)
115 connections[conn] = sv_parse()
117 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
118 q = CLGetServersExt(game="Xonotic", protocol=3)
119 sock.sendto(q.encode(), conn)
122 data, addr = sock.recvfrom(1400)
123 logger.debug(f"recv({addr}): {data}")
124 msg = connections[addr].send(data)
126 logger.info(f"recv({addr}): {msg}")
127 if isinstance(msg, SVGetServersExtResponse):
128 logger.info(f"servers: {len(msg.servers)}")