+import logging
import uuid
+from enum import IntEnum
import attr
from .utils import *
-HEADER = b"\xFF\xFF\xFF\xFF"
+logger = logging.getLogger(__name__)
@attr.s(auto_attribs=True, frozen=True, slots=True)
while True:
buf: bytes
buf = yield ret
- parts = buf.decode(UTF_8).split("\\")[1:]
- pairs = zip(*[iter(parts)] * 2)
- args = dict(pairs)
+ args = infostring_decode(buf.decode(UTF_8))
for k in ("gameversion", "sv_maxclients", "clients", "bots", "protocol"):
args[k] = int(args[k])
ret = SVGetInfoResponse(**args)
-SVMessage = Union[SVGetInfoResponse]
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class CLConnect(Writable):
+ info: dict = {"protocol": "darkplaces 3", "protocols": "DP7"}
+
+ def encode(self) -> bytes:
+ return HEADER + b"connect" + infostring_encode(self.info).encode(UTF_8)
+
+
+class NetFlag(IntEnum):
+ DATA = 1 << 0
+ ACK = 1 << 1
+ NAK = 1 << 2
+ EOM = 1 << 3
+ UNRELIABLE = 1 << 4
+
+ CRYPTO0 = 1 << 12
+ CRYPTO1 = 1 << 13
+ CRYPTO2 = 1 << 14
+ CTL = 1 << 15
+
+
+@attr.s(auto_attribs=True, frozen=False, slots=True)
+class Packet(Writable):
+ flags: int
+ messages: List[Writable]
+ seq: Optional[int] = None
+
+ def encode(self) -> bytes:
+ assert self.seq is not None
+ payload = b"".join(map(lambda it: it.encode(), self.messages))
+ return bytes(
+ ByteWriter()
+ .u16_be(self.flags)
+ .u16_be(8 + len(payload))
+ .u32_be(self.seq)
+ ) + payload
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class SVSignonReply(Readable):
+ state: int
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class NOP(Writable):
+ def encode(self) -> bytes:
+ return bytes(
+ ByteWriter()
+ .u8(1)
+ )
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class CLStringCommand(Writable):
+ cmd: str
+
+ def encode(self) -> bytes:
+ return bytes(
+ ByteWriter()
+ .u8(4)
+ .string(self.cmd)
+ )
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class CLAckDownloadData(Writable):
+ start: int
+ size: int
+
+ def encode(self) -> bytes:
+ return bytes(
+ ByteWriter()
+ .u8(51)
+ .u32(self.start)
+ .u16(self.size)
+ )
+
+
+SVMessage = Union[
+ SVGetInfoResponse,
+ SVSignonReply,
+]
+
+
+@attr.s(auto_attribs=True, frozen=False, slots=True)
+class SequenceInfo:
+ recv_r: int = 0
+ recv_u: int = 0
+ send_u: int = 0
@generator
-def sv_parse() -> Generator[Optional[SVMessage], bytes, None]:
- getinfo_response = b"infoResponse\n"
+def sv_parse(reply: Callable[[Connection, Packet], None] = lambda _conn, _data: None) -> Generator[
+ Tuple[Optional[SVMessage], SequenceInfo], Tuple[Connection, bytes], None
+]:
ret: Optional[SVMessage] = None
+
+ getinfo_response = b"infoResponse\n"
+
+ seqs = SequenceInfo()
+ recvbuf = bytearray()
+
while True:
+ conn: Connection
buf: bytes
- buf = yield ret
+ conn, buf = yield ret, seqs
ret = None
if buf.startswith(HEADER):
buf = buf[len(HEADER):]
buf = buf[len(getinfo_response):]
ret = SVGetInfoResponse.decode().send(buf)
continue
+ logger.debug(f"unhandled connectionless msg: {buf}")
+ continue
+
+ r = ByteReader(buf)
+ flags = r.u16_be()
+ size = r.u16_be()
+
+ if (flags & NetFlag.CTL) or size != len(buf):
+ logger.debug("discard")
+ continue
+
+ seq = r.u32_be()
+ buf = buf[8:]
+ logger.debug(f"seq={seq}, len={size}, flags={bin(flags)}")
+
+ if flags & NetFlag.UNRELIABLE:
+ if seq < seqs.recv_u:
+ continue # old
+ if seq > seqs.recv_u:
+ pass # dropped a few packets
+ seqs.recv_u = seq + 1
+ elif flags & NetFlag.ACK:
+ continue # todo
+ elif flags & NetFlag.DATA:
+ reply(conn, Packet(NetFlag.ACK, [], seq))
+ if seq != seqs.recv_r:
+ continue
+ seqs.recv_r += 1
+ recvbuf.extend(buf)
+ if not (flags & NetFlag.EOM):
+ continue
+ r = ByteReader(bytes(recvbuf))
+ recvbuf.clear()
+
+ logger.debug(f"game: {r.underflow()}")
+
+ while True:
+ if not len(r.underflow()):
+ break
+ cmd = r.u8()
+ if cmd == 1: # svc_nop
+ logger.debug("<-- server to client keepalive")
+ ret = NOP()
+ elif cmd == 2: # svc_disconnect
+ logger.debug("Server disconnected")
+ elif cmd == 5: # svc_setview
+ ent = r.u16()
+ elif cmd == 7: # svc_time
+ time = r.f32()
+ elif cmd == 8: # svc_print
+ s = r.string()
+ logger.info(f"print: {repr(s)}")
+ elif cmd == 9: # svc_stufftext
+ s = r.string()
+ logger.debug(f"stufftext: {repr(s)}")
+ elif cmd == 11: # svc_serverinfo
+ protocol = r.u32()
+ logger.debug(f"proto: {protocol}")
+ maxclients = r.u8()
+ logger.debug(f"maxclients: {maxclients}")
+ game = r.u8()
+ logger.debug(f"game: {protocol}")
+ mapname = r.string()
+ logger.debug(f"mapname: {mapname}")
+ while True:
+ model = r.string()
+ if model == "":
+ break
+ logger.debug(f"model: {model}")
+ while True:
+ sound = r.string()
+ if sound == "":
+ break
+ logger.debug(f"sound: {sound}")
+ elif cmd == 23: # svc_temp_entity
+ break
+ elif cmd == 25: # svc_signonnum
+ state = r.u8()
+ ret = SVSignonReply(state)
+ elif cmd == 32: # svc_cdtrack
+ track = r.u8()
+ looptrack = r.u8()
+ elif cmd == 50: # svc_downloaddata
+ start = r.u32()
+ size = r.u16_be()
+ data = r.u8_array(size)
+ reply(conn, Packet(NetFlag.DATA | NetFlag.EOM, [CLAckDownloadData(start, size)]))
+ elif cmd == 59: # svc_spawnstaticsound2
+ origin = (r.f32(), r.f32(), r.f32())
+ soundidx = r.u16_be()
+ vol = r.u8()
+ atten = r.u8()
+ else:
+ logger.debug(f"unimplemented: {cmd}")
+ r.skip(-1)
+ break
+ uflow = r.underflow()
+ if len(uflow):
+ logger.debug(f"underflow_1: {uflow}")
#!/usr/bin/env python3
import logging
-from typing import *
from . import game
from . import master
+from .utils import *
logger = logging.getLogger(__name__)
-if __name__ == "__main__":
+
+def main():
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- connection = Tuple[str, int]
- connections: Dict[connection, Generator[Optional[Union[master.SVMessage, game.SVMessage]], bytes, None]] = {}
+ connections: Dict[Connection, Union[
+ Generator[Optional[master.SVMessage], Tuple[Connection, bytes], None],
+ Generator[Tuple[Optional[game.SVMessage], game.SequenceInfo], Tuple[Connection, bytes], None],
+ ]] = {}
count_inforesponse = 0
- q = master.CLGetServersExt(game="Xonotic", protocol=3)
+ q_master = master.CLGetServersExt(game="Xonotic", protocol=3)
conn = (socket.gethostbyname("dpmaster.deathmask.net"), 27950)
- sock.sendto(q.encode(), conn)
connections[conn] = master.sv_parse()
+ sock.sendto(q_master.encode(), conn)
while True:
- logger.debug("wait")
- data, addr = sock.recvfrom(1400)
- logger.debug(f"recv({addr}): {data}")
- msg = connections[addr].send(data)
+ logger.debug("recv(...)")
+ try:
+ data, conn = sock.recvfrom(1400)
+ except KeyboardInterrupt:
+ break
+ logger.debug(f"recv({conn}): {data}")
+ msg = connections[conn].send((conn, data))
+ if isinstance(msg, tuple):
+ msg = msg[0]
if msg:
- logger.info(f"recv({addr}): {msg}")
+ logger.info(f"recv({conn}): {msg}")
if isinstance(msg, master.SVGetServersExtResponse):
logger.info(f"servers: {len(msg.servers)}")
for srv in msg.servers:
+ conn = (str(srv.addr), srv.port)
+ q_server = game.CLGetInfo()
+ connections[conn] = game.sv_parse()
try:
- q_info = game.CLGetInfo()
- conn = (str(srv.addr), srv.port)
- sock.sendto(q_info.encode(), conn)
- connections[conn] = game.sv_parse()
+ sock.sendto(q_server.encode(), conn)
except socket.gaierror:
pass
if isinstance(msg, game.SVGetInfoResponse):
count_inforesponse += 1
logger.info(f"status-{count_inforesponse}: {msg}")
+
+
+if __name__ == "__main__":
+ main()
from .utils import *
-HEADER = b"\xFF\xFF\xFF\xFF"
-
@attr.s(auto_attribs=True, frozen=True, slots=True)
class CLGetServersExt(Writable):
protocol: int
def encode(self) -> bytes:
- return HEADER + f"getserversExt {self.game} {self.protocol} empty full".encode(UTF_8)
+ return HEADER + f"getserversExt {self.game} {self.protocol} empty full ipv4 ipv6".encode(UTF_8)
@attr.s(auto_attribs=True, frozen=True, slots=True)
@generator
-def sv_parse() -> Generator[Optional[SVMessage], bytes, None]:
+def sv_parse() -> Generator[Optional[SVMessage], Tuple[Connection, bytes], None]:
getservers_ext_response = b"getserversExtResponse"
getservers_ext_gen: Optional[Generator[Optional[SVGetServersExtResponse], bytes, None]] = None
ret: Optional[SVMessage] = None
while True:
buf: bytes
- buf = yield ret
+ _, buf = yield ret
ret = None
if buf.startswith(HEADER):
buf = buf[len(HEADER):]
UTF_8 = "utf-8"
-class Readable:
- @classmethod
- def decode(cls) -> Generator[Optional[object], bytes, None]:
- raise NotImplementedError
-
-
-class Writable:
- def encode(self) -> bytes:
- raise NotImplementedError
-
-
def generator(f):
O = TypeVar("O")
I = TypeVar("I")
return prepare(f(*args, **kwargs))
return w
+
+
+class Readable:
+ @classmethod
+ def decode(cls) -> Generator[Optional[object], bytes, None]:
+ raise NotImplemented
+
+
+class Writable:
+ def encode(self) -> bytes:
+ raise NotImplemented
+
+
+class ByteReader:
+ __slots__ = (
+ "_buf",
+ "_ptr",
+ )
+
+ def __init__(self, buf: bytes) -> None:
+ self._buf = buf
+ self._ptr = 0
+
+ def underflow(self) -> bytes:
+ return self._buf[self._ptr:]
+
+ def skip(self, n: int) -> None:
+ self._ptr += n
+
+ def u8(self) -> int:
+ ret = self._buf[self._ptr]
+ self.skip(1)
+ return ret
+
+ def u8_array(self, n: int) -> bytes:
+ ret = self._buf[self._ptr:self._ptr + n]
+ self.skip(n)
+ return ret
+
+ def u16(self) -> int:
+ ret = 0
+ ret |= self.u8() << 0
+ ret |= self.u8() << 8
+ return ret
+
+ def u16_be(self) -> int:
+ ret = 0
+ ret |= self.u8() << 8
+ ret |= self.u8() << 0
+ return ret
+
+ def u32(self) -> int:
+ ret = 0
+ ret |= self.u8() << 0
+ ret |= self.u8() << 8
+ ret |= self.u8() << 16
+ ret |= self.u8() << 24
+ return ret
+
+ def u32_be(self) -> int:
+ ret = 0
+ ret |= self.u8() << 24
+ ret |= self.u8() << 16
+ ret |= self.u8() << 8
+ ret |= self.u8() << 0
+ return ret
+
+ def f32(self) -> float:
+ import struct
+ return struct.unpack("<f", self.u8_array(4))[0]
+
+ def string(self) -> str:
+ arr = bytearray()
+ while True:
+ b = self.u8()
+ if b == 0:
+ break
+ arr.append(b)
+ return arr.decode(UTF_8)
+
+
+class ByteWriter:
+ __slots__ = (
+ "_buf",
+ )
+
+ def __init__(self):
+ self._buf: List[bytes] = []
+
+ def __bytes__(self):
+ return b"".join(self._buf)
+
+ def u8(self, it: int) -> "ByteWriter":
+ self._buf.append(it.to_bytes(1, "little"))
+ return self
+
+ def u16(self, it: int) -> "ByteWriter":
+ self._buf.append(it.to_bytes(2, "little"))
+ return self
+
+ def u16_be(self, it: int) -> "ByteWriter":
+ self._buf.append(it.to_bytes(2, "big"))
+ return self
+
+ def u32(self, it: int) -> "ByteWriter":
+ self._buf.append(it.to_bytes(4, "little"))
+ return self
+
+ def u32_be(self, it: int) -> "ByteWriter":
+ self._buf.append(it.to_bytes(4, "big"))
+ return self
+
+ def f32(self, it: float) -> "ByteWriter":
+ import struct
+ self._buf.append(struct.pack("<f", it))
+ return self
+
+ def string(self, it: str) -> "ByteWriter":
+ self._buf.append(it.encode(UTF_8))
+ self._buf.append(b"\x00")
+ return self
+
+
+Connection = Tuple[str, int]
+
+HEADER = b"\xFF\xFF\xFF\xFF"
+
+
+def infostring_decode(s: str) -> dict:
+ parts = s.split("\\")[1:]
+ pairs = zip(*[iter(parts)] * 2)
+ return dict(pairs)
+
+
+def infostring_encode(d: dict) -> str:
+ return "".join(f"\\{k}\\{v}" for k, v in d.items())