Add initial uci layer
All checks were successful
Python tests (make) / test (push) Successful in 12s
All checks were successful
Python tests (make) / test (push) Successful in 12s
This commit is contained in:
205
uci/uci_engine.py
Executable file
205
uci/uci_engine.py
Executable file
@@ -0,0 +1,205 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import sys, shlex
|
||||||
|
import ctypes as C
|
||||||
|
from binding.python_c_ffi import ChessFFI, Board, Move, WHITE, Q, R, B, N, q, r, b, n
|
||||||
|
from scripts.evaluation import RandomEval
|
||||||
|
|
||||||
|
|
||||||
|
START_FEN = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1"
|
||||||
|
PROMO_MAP = {"q": (Q, q), "r": (R, r), "b": (B, b), "n": (N, n)}
|
||||||
|
ONE_MINUTE = 60000
|
||||||
|
|
||||||
|
def flushln(s: str):
|
||||||
|
sys.stdout.write(s + "\n")
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
|
||||||
|
def sq_to_uci(sq: int) -> str:
|
||||||
|
return chr(ord("a") + (sq % 8)) + chr(ord("1") + (sq // 8))
|
||||||
|
|
||||||
|
|
||||||
|
def move_to_uci(m: Move) -> str:
|
||||||
|
s = sq_to_uci(getattr(m, "from")) + sq_to_uci(m.to)
|
||||||
|
if m.promo:
|
||||||
|
for c, (wpid, bpid) in PROMO_MAP.items():
|
||||||
|
if m.promo in (wpid, bpid):
|
||||||
|
s += c
|
||||||
|
break
|
||||||
|
return s
|
||||||
|
|
||||||
|
|
||||||
|
def uci_to_move(uci: str, ffi, board) -> Move | None:
|
||||||
|
ffile, frank = ord(uci[0])-97, ord(uci[1])-49
|
||||||
|
tfile, trank = ord(uci[2])-97, ord(uci[3])-49
|
||||||
|
from_sq, to_sq = frank*8+ffile, trank*8+tfile
|
||||||
|
promo = uci[4].lower() if len(uci) > 4 else None
|
||||||
|
|
||||||
|
buf = (Move * 256)()
|
||||||
|
n = ffi.get_legal_moves(board, buf)
|
||||||
|
|
||||||
|
want_promo = None
|
||||||
|
if promo:
|
||||||
|
wpid, bpid = PROMO_MAP[promo]
|
||||||
|
want_promo = wpid if board.side_to_move == WHITE else bpid
|
||||||
|
|
||||||
|
for i in range(n):
|
||||||
|
m = buf[i]
|
||||||
|
if getattr(m, "from") == from_sq and m.to == to_sq:
|
||||||
|
if want_promo is None or m.promo == want_promo:
|
||||||
|
return m
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class UCIEngine:
|
||||||
|
def __init__(self):
|
||||||
|
self.ffi = ChessFFI()
|
||||||
|
self.ffi.init_attack_cache()
|
||||||
|
self._seed_engine()
|
||||||
|
self.board = Board()
|
||||||
|
self.ffi.load_fen(self.board, START_FEN)
|
||||||
|
|
||||||
|
self.options = {
|
||||||
|
"Depth": 5,
|
||||||
|
"WindowCp": 50,
|
||||||
|
"MoveOverhead": 100,
|
||||||
|
}
|
||||||
|
|
||||||
|
self.wtime = None
|
||||||
|
self.btime = None
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_engine(self):
|
||||||
|
import ctypes as C, time
|
||||||
|
|
||||||
|
seed = time.time_ns()
|
||||||
|
s32 = int(seed) & 0xFFFFFFFF
|
||||||
|
|
||||||
|
libc = C.CDLL("libc.so.6") # on Ubuntu
|
||||||
|
libc.srand.argtypes = (C.c_uint,)
|
||||||
|
libc.srand.restype = None
|
||||||
|
libc.srand(C.c_uint(s32))
|
||||||
|
return s32
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_uci(self):
|
||||||
|
flushln("id name joshsbot")
|
||||||
|
flushln("id author Josh")
|
||||||
|
# declare supported options properly
|
||||||
|
flushln("option name Depth type spin default 3 min 1 max 64")
|
||||||
|
flushln("option name WindowCp type spin default 50 min 0 max 1000")
|
||||||
|
flushln("option name Move Overhead type spin default 100 min 0 max 5000")
|
||||||
|
flushln("uciok")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_isready(self):
|
||||||
|
flushln("readyok")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_ucinewgame(self):
|
||||||
|
self.ffi.load_fen(self.board, START_FEN)
|
||||||
|
|
||||||
|
|
||||||
|
def get_my_time(self):
|
||||||
|
"""Return remaining time (ms) for the side to move."""
|
||||||
|
if self.board.side_to_move == WHITE:
|
||||||
|
return self.wtime
|
||||||
|
else:
|
||||||
|
return self.btime
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_position(self, args):
|
||||||
|
# Reset to startpos or fen
|
||||||
|
if args[0] == "startpos":
|
||||||
|
self.ffi.load_fen(self.board, START_FEN)
|
||||||
|
args = args[1:]
|
||||||
|
elif args[0] == "fen":
|
||||||
|
i = args.index("moves") if "moves" in args else len(args)
|
||||||
|
fen = " ".join(args[1:i]).decode("ascii")
|
||||||
|
self.ffi.load_fen(self.board, fen)
|
||||||
|
args = args[i:]
|
||||||
|
|
||||||
|
# Apply all moves
|
||||||
|
if args and args[0] == "moves":
|
||||||
|
for mstr in args[1:]:
|
||||||
|
m = uci_to_move(mstr, self.ffi, self.board)
|
||||||
|
if m:
|
||||||
|
nxt = Board()
|
||||||
|
if self.ffi.apply_move_on_copy(self.board, nxt, m):
|
||||||
|
self.board = nxt
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_go(self, args):
|
||||||
|
depth = 3
|
||||||
|
wtime, btime = None, None
|
||||||
|
|
||||||
|
# parse depth
|
||||||
|
if "depth" in args:
|
||||||
|
i = args.index("depth")
|
||||||
|
depth = int(args[i + 1])
|
||||||
|
|
||||||
|
# parse clocks (milliseconds from lichess)
|
||||||
|
if "wtime" in args:
|
||||||
|
wtime = int(args[args.index("wtime") + 1])
|
||||||
|
if "btime" in args:
|
||||||
|
btime = int(args[args.index("btime") + 1])
|
||||||
|
|
||||||
|
# store them so you can use/log later
|
||||||
|
self.wtime, self.btime = wtime, btime
|
||||||
|
|
||||||
|
mytime = self.get_my_time()
|
||||||
|
|
||||||
|
best = Move()
|
||||||
|
if mytime >= ONE_MINUTE:
|
||||||
|
ok = self.ffi._c_ai_find_best_move_with_window(self.board, 6, 3, best)
|
||||||
|
else:
|
||||||
|
ok = self.ffi._c_ai_find_best_move_with_window(self.board, 5, 3, best)
|
||||||
|
|
||||||
|
|
||||||
|
if ok:
|
||||||
|
flushln(f"bestmove {move_to_uci(best)}")
|
||||||
|
else:
|
||||||
|
flushln("bestmove 0000")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_setoption(self, tokens):
|
||||||
|
try:
|
||||||
|
i_name = tokens.index("name")
|
||||||
|
i_val = tokens.index("value")
|
||||||
|
name = " ".join(tokens[i_name+1:i_val])
|
||||||
|
value = " ".join(tokens[i_val+1:]) if i_val + 1 < len(tokens) else ""
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
|
||||||
|
if name in self.options:
|
||||||
|
try:
|
||||||
|
self.options[name] = int(value)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def loop(self):
|
||||||
|
for line in sys.stdin:
|
||||||
|
parts = shlex.split(line.strip())
|
||||||
|
if not parts:
|
||||||
|
continue
|
||||||
|
cmd, args = parts[0], parts[1:]
|
||||||
|
|
||||||
|
if cmd == "uci":
|
||||||
|
self.cmd_uci()
|
||||||
|
elif cmd == "isready":
|
||||||
|
self.cmd_isready()
|
||||||
|
elif cmd == "ucinewgame":
|
||||||
|
self.cmd_ucinewgame()
|
||||||
|
elif cmd == "setoption":
|
||||||
|
self.cmd_setoption(args)
|
||||||
|
elif cmd == "position":
|
||||||
|
self.cmd_position(args)
|
||||||
|
elif cmd == "go":
|
||||||
|
self.cmd_go(args)
|
||||||
|
elif cmd == "quit":
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
UCIEngine().loop()
|
||||||
Reference in New Issue
Block a user