24-add-uci-layer (#39)
Reviewed-on: #39 Co-authored-by: Josh <josh@joshuaschuett.com> Co-committed-by: Josh <josh@joshuaschuett.com>
This commit is contained in:
207
uci/uci_engine.py
Executable file
207
uci/uci_engine.py
Executable file
@@ -0,0 +1,207 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys, shlex
|
||||
import ctypes as C
|
||||
from binding.python_c_ffi import ChessFFI, Board, Move, WHITE, Q, R, B, N, q, r, b, n
|
||||
|
||||
|
||||
START_FEN = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1"
|
||||
PROMO_MAP = {"q": (Q, q), "r": (R, r), "b": (B, b), "n": (N, n)}
|
||||
ONE_MINUTE = 60000
|
||||
THREE_MINUTES = ONE_MINUTE * 3
|
||||
FIVE_MINUTES = ONE_MINUTE * 5
|
||||
TEN_MINUTES = ONE_MINUTE * 10
|
||||
THIRTY_MINUTES = ONE_MINUTE * 30
|
||||
|
||||
|
||||
def flushln(s: str):
|
||||
sys.stdout.write(s + "\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def sq_to_uci(sq: int) -> str:
|
||||
return chr(ord("a") + (sq % 8)) + chr(ord("1") + (sq // 8))
|
||||
|
||||
|
||||
def move_to_uci(m: Move) -> str:
|
||||
s = sq_to_uci(getattr(m, "from")) + sq_to_uci(m.to)
|
||||
if m.promo:
|
||||
for c, (wpid, bpid) in PROMO_MAP.items():
|
||||
if m.promo in (wpid, bpid):
|
||||
s += c
|
||||
break
|
||||
return s
|
||||
|
||||
|
||||
def uci_to_move(uci: str, ffi, board) -> Move | None:
|
||||
ffile, frank = ord(uci[0])-97, ord(uci[1])-49
|
||||
tfile, trank = ord(uci[2])-97, ord(uci[3])-49
|
||||
from_sq, to_sq = frank*8+ffile, trank*8+tfile
|
||||
promo = uci[4].lower() if len(uci) > 4 else None
|
||||
|
||||
buf = (Move * 256)()
|
||||
n = ffi.get_legal_moves(board, buf)
|
||||
|
||||
want_promo = None
|
||||
if promo:
|
||||
wpid, bpid = PROMO_MAP[promo]
|
||||
want_promo = wpid if board.side_to_move == WHITE else bpid
|
||||
|
||||
for i in range(n):
|
||||
m = buf[i]
|
||||
if getattr(m, "from") == from_sq and m.to == to_sq:
|
||||
if want_promo is None or m.promo == want_promo:
|
||||
return m
|
||||
return None
|
||||
|
||||
|
||||
class UCIEngine:
|
||||
def __init__(self):
|
||||
self.ffi = ChessFFI()
|
||||
self.ffi.init_attack_cache()
|
||||
self._seed_engine()
|
||||
self.board = Board()
|
||||
self.ffi.load_fen(self.board, START_FEN)
|
||||
|
||||
self.options = {
|
||||
"Depth": 5,
|
||||
"WindowCp": 50,
|
||||
"MoveOverhead": 100,
|
||||
}
|
||||
|
||||
self.default_depth = 6
|
||||
self.short_depth = 5
|
||||
self.long_depth = 7
|
||||
|
||||
self.wtime = None
|
||||
self.btime = None
|
||||
|
||||
|
||||
def _seed_engine(self):
|
||||
import ctypes as C, time
|
||||
|
||||
seed = time.time_ns()
|
||||
s32 = int(seed) & 0xFFFFFFFF
|
||||
|
||||
libc = C.CDLL("libc.so.6")
|
||||
libc.srand.argtypes = (C.c_uint,)
|
||||
libc.srand.restype = None
|
||||
libc.srand(C.c_uint(s32))
|
||||
return s32
|
||||
|
||||
|
||||
def cmd_uci(self):
|
||||
flushln("id name joshsbot")
|
||||
flushln("id author Josh")
|
||||
# UCI options. Do we even need these? We just use this interface
|
||||
# for the lichess-bot. It's not like we plan on using them?
|
||||
flushln("option name Depth type spin default 3 min 1 max 64")
|
||||
flushln("option name WindowCp type spin default 50 min 0 max 1000")
|
||||
flushln("option name Move Overhead type spin default 100 min 0 max 5000")
|
||||
flushln("uciok")
|
||||
|
||||
|
||||
def cmd_isready(self):
|
||||
flushln("readyok")
|
||||
|
||||
|
||||
def cmd_ucinewgame(self):
|
||||
self.ffi.load_fen(self.board, START_FEN)
|
||||
|
||||
|
||||
def cmd_position(self, args):
|
||||
# Reset to startpos or fen
|
||||
if args[0] == "startpos":
|
||||
self.ffi.load_fen(self.board, START_FEN)
|
||||
args = args[1:]
|
||||
elif args[0] == "fen":
|
||||
i = args.index("moves") if "moves" in args else len(args)
|
||||
fen = " ".join(args[1:i]).decode("ascii")
|
||||
self.ffi.load_fen(self.board, fen)
|
||||
args = args[i:]
|
||||
|
||||
# Apply all moves
|
||||
if args and args[0] == "moves":
|
||||
for mstr in args[1:]:
|
||||
m = uci_to_move(mstr, self.ffi, self.board)
|
||||
if m:
|
||||
next_board = Board()
|
||||
if self.ffi.apply_move_on_copy(self.board, next_board, m):
|
||||
self.board = next_board
|
||||
|
||||
|
||||
def cmd_go(self, args):
|
||||
wtime, btime = None, None
|
||||
|
||||
# parse clocks (milliseconds from lichess)
|
||||
if "wtime" in args:
|
||||
wtime = int(args[args.index("wtime") + 1])
|
||||
if "btime" in args:
|
||||
btime = int(args[args.index("btime") + 1])
|
||||
|
||||
self.wtime, self.btime = wtime, btime
|
||||
|
||||
mytime = self.get_my_time()
|
||||
|
||||
best = Move()
|
||||
if mytime >= TEN_MINUTES:
|
||||
ok = self.ffi._c_ai_find_best_move_with_window(self.board, self.long_depth, 3, best)
|
||||
elif mytime < TEN_MINUTES and mytime >= ONE_MINUTE:
|
||||
ok = self.ffi._c_ai_find_best_move_with_window(self.board, self.default_depth, 3, best)
|
||||
else:
|
||||
ok = self.ffi._c_ai_find_best_move_with_window(self.board, self.short_depth, 3, best)
|
||||
|
||||
if ok:
|
||||
flushln(f"bestmove {move_to_uci(best)}")
|
||||
else:
|
||||
flushln("bestmove 0000")
|
||||
|
||||
|
||||
def get_my_time(self):
|
||||
"""Return remaining time (ms) for the side to move."""
|
||||
if self.board.side_to_move == WHITE:
|
||||
return self.wtime
|
||||
else:
|
||||
return self.btime
|
||||
|
||||
|
||||
def cmd_setoption(self, tokens):
|
||||
try:
|
||||
i_name = tokens.index("name")
|
||||
i_val = tokens.index("value")
|
||||
name = " ".join(tokens[i_name+1:i_val])
|
||||
value = " ".join(tokens[i_val+1:]) if i_val + 1 < len(tokens) else ""
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
if name in self.options:
|
||||
try:
|
||||
self.options[name] = int(value)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def loop(self):
|
||||
for line in sys.stdin:
|
||||
parts = shlex.split(line.strip())
|
||||
if not parts:
|
||||
continue
|
||||
cmd, args = parts[0], parts[1:]
|
||||
|
||||
if cmd == "uci":
|
||||
self.cmd_uci()
|
||||
elif cmd == "isready":
|
||||
self.cmd_isready()
|
||||
elif cmd == "ucinewgame":
|
||||
self.cmd_ucinewgame()
|
||||
elif cmd == "setoption":
|
||||
self.cmd_setoption(args)
|
||||
elif cmd == "position":
|
||||
self.cmd_position(args)
|
||||
elif cmd == "go":
|
||||
self.cmd_go(args)
|
||||
elif cmd == "quit":
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
UCIEngine().loop()
|
||||
Reference in New Issue
Block a user