]> git.xonotic.org Git - xonotic/xonotic.git/blob - misc/infrastructure/python/slist/utils.py
slist: extend parsing
[xonotic/xonotic.git] / misc / infrastructure / python / slist / utils.py
1 from functools import wraps
2 from typing import *
3
4 UTF_8 = "utf-8"
5
6
7 def generator(f):
8     O = TypeVar("O")
9     I = TypeVar("I")
10     R = TypeVar("R")
11
12     def prepare(g: Generator[O, I, R]) -> Generator[O, I, R]:
13         next(g)
14         return g
15
16     @wraps(f)
17     def w(*args, **kwargs):
18         return prepare(f(*args, **kwargs))
19
20     return w
21
22
23 class Readable:
24     @classmethod
25     def decode(cls) -> Generator[Optional[object], bytes, None]:
26         raise NotImplemented
27
28
29 class Writable:
30     def encode(self) -> bytes:
31         raise NotImplemented
32
33
34 class ByteReader:
35     __slots__ = (
36         "_buf",
37         "_ptr",
38     )
39
40     def __init__(self, buf: bytes) -> None:
41         self._buf = buf
42         self._ptr = 0
43
44     def underflow(self) -> bytes:
45         return self._buf[self._ptr:]
46
47     def skip(self, n: int) -> None:
48         self._ptr += n
49
50     def u8(self) -> int:
51         ret = self._buf[self._ptr]
52         self.skip(1)
53         return ret
54
55     def u8_array(self, n: int) -> bytes:
56         ret = self._buf[self._ptr:self._ptr + n]
57         self.skip(n)
58         return ret
59
60     def u16(self) -> int:
61         ret = 0
62         ret |= self.u8() << 0
63         ret |= self.u8() << 8
64         return ret
65
66     def u16_be(self) -> int:
67         ret = 0
68         ret |= self.u8() << 8
69         ret |= self.u8() << 0
70         return ret
71
72     def u32(self) -> int:
73         ret = 0
74         ret |= self.u8() << 0
75         ret |= self.u8() << 8
76         ret |= self.u8() << 16
77         ret |= self.u8() << 24
78         return ret
79
80     def u32_be(self) -> int:
81         ret = 0
82         ret |= self.u8() << 24
83         ret |= self.u8() << 16
84         ret |= self.u8() << 8
85         ret |= self.u8() << 0
86         return ret
87
88     def f32(self) -> float:
89         import struct
90         return struct.unpack("<f", self.u8_array(4))[0]
91
92     def string(self) -> str:
93         arr = bytearray()
94         while True:
95             b = self.u8()
96             if b == 0:
97                 break
98             arr.append(b)
99         return arr.decode(UTF_8)
100
101
102 class ByteWriter:
103     __slots__ = (
104         "_buf",
105     )
106
107     def __init__(self):
108         self._buf: List[bytes] = []
109
110     def __bytes__(self):
111         return b"".join(self._buf)
112
113     def u8(self, it: int) -> "ByteWriter":
114         self._buf.append(it.to_bytes(1, "little"))
115         return self
116
117     def u16(self, it: int) -> "ByteWriter":
118         self._buf.append(it.to_bytes(2, "little"))
119         return self
120
121     def u16_be(self, it: int) -> "ByteWriter":
122         self._buf.append(it.to_bytes(2, "big"))
123         return self
124
125     def u32(self, it: int) -> "ByteWriter":
126         self._buf.append(it.to_bytes(4, "little"))
127         return self
128
129     def u32_be(self, it: int) -> "ByteWriter":
130         self._buf.append(it.to_bytes(4, "big"))
131         return self
132
133     def f32(self, it: float) -> "ByteWriter":
134         import struct
135         self._buf.append(struct.pack("<f", it))
136         return self
137
138     def string(self, it: str) -> "ByteWriter":
139         self._buf.append(it.encode(UTF_8))
140         self._buf.append(b"\x00")
141         return self
142
143
144 Connection = Tuple[str, int]
145
146 HEADER = b"\xFF\xFF\xFF\xFF"
147
148
149 def infostring_decode(s: str) -> dict:
150     parts = s.split("\\")[1:]
151     pairs = zip(*[iter(parts)] * 2)
152     return dict(pairs)
153
154
155 def infostring_encode(d: dict) -> str:
156     return "".join(f"\\{k}\\{v}" for k, v in d.items())