$ emrebener
home personal-projects lichess autobot

Lichess Autobot

published: updated: type: pet

1. What I built and why

A PyQt6 desktop client that finds a game on Lichess, streams its state over NDJSON, hands each position to a UCI engine running as a child process, and ships the engine’s move back to the server. The interesting part isn’t the chess — it’s that the program has to speak three protocols at once and keep them all live on a single thread: HTTP long-poll streaming to Lichess, a stdio request/response loop to the engine, and Qt’s GUI event loop driving the board widget.

I picked this scope deliberately. A web bot would have let me skip the GUI loop entirely. A pure CLI engine harness would have let me skip the network streaming. Putting both into a desktop app forces the part I wanted to learn: how to wire asyncio into a framework that owns the main thread, and how to keep two long-lived async pipelines (the API stream and the engine subprocess) cancellable from a button click without leaving zombies behind.

Lichess Autobot — data flowqasync event loop (single thread)LichessBoard APIapi_client.pyNDJSON readermain_window.pyorchestratorPlaying engine(UCI subprocess)Eval engine(UCI subprocess)every 500msBoard widgetEval barBoard APIUCI stdioUCI stdioLichess Autobot — data flowqasync event loop (single thread)LichessBoard APIapi_client.pyNDJSON readermain_window.pyorchestratorPlaying engine(UCI subprocess)Eval engine(UCI subprocess)every 500msBoard widgetEval barBoard APIUCI stdioUCI stdio

The codebase is split along those three protocol surfaces: src/lichess/api_client.py owns the NDJSON streams, src/engine/uci_engine.py wraps python-chess’s UCI subprocess transport, and src/ui/main_window.py is the orchestrator that subscribes Qt signals to async tasks. Everything else — the board widget, evaluation bar, captured-piece display, SQLite settings store — exists because a desktop app without those feels like a demo, not a project.

2. Bridging asyncio and PyQt6 with qasync

The structural problem with mixing PyQt and asyncio is that they each want to own the thread. QApplication.exec() runs a C++ event loop dispatching Qt events; asyncio.run() runs a Python event loop dispatching coroutine steps. The naive approaches — running asyncio in a worker thread and bouncing results back via signals, or driving the asyncio loop on a QTimer — both work and both are miserable. The first means every coroutine result needs a thread-safe handoff. The second means every await adds at least one timer tick of latency.

qasync solves this by implementing asyncio.AbstractEventLoop on top of Qt’s loop. There’s one event loop in the process; it dispatches Qt events and drives coroutines. await works inside Qt slots. Network sockets, subprocess pipes, and the GUI all share the same scheduler. The entry point is four lines:

app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
window = MainWindow(db, engines_dir)
window.show()
with loop:
    loop.run_forever()

The cost is everywhere else in the codebase. Qt signals are synchronous — a button click calls a slot, the slot returns, the framework moves on. So every “start an async thing from a button” path goes through the same pattern: a sync slot schedules a coroutine and returns immediately.

def on_start_bot_clicked(self):
    asyncio.ensure_future(self._start_bot_async(token, engine_path))

main_window.py has a dozen of these. Each _async method is then responsible for grabbing back onto the GUI thread to update widgets — which under qasync is just calling the widget method directly, because we never left the thread in the first place. That’s the payoff: the bot can await self.lichess_client.send_move(...) from the same call stack that updates the evaluation bar, and Qt sees no nested loop, no thread crossing, no synchronization.

The thing that bit me was shutdown. When the user clicks “Stop Bot,” I need to cancel four tasks — event stream, game stream, seek, evaluation — close the aiohttp session, terminate the engine subprocess, and let the window finish closing. All of those await. If any one of them hangs, the close handler hangs, the window stays half-dead, and the next launch hits a locked database file. The fix is asyncio.wait_for around every cleanup step with a hard ceiling — three seconds for HTTP, two for the engine — and return_exceptions=True on the gather so one stuck task can’t poison the others.

3. NDJSON streaming with cancellation that actually works

The Board API exposes two long-lived streams: /api/stream/event for account-level events (game starts, challenges) and /api/board/game/stream/{id} for per-game state. Both are NDJSON — newline-delimited JSON objects pushed as the server has something to say, with empty lines mixed in every ~20 seconds as keepalives. A connection might be quiet for a full minute during the opponent’s think, then deliver six events in 50 milliseconds when the clock starts running out.

aiohttp’s default reader handles HTTP chunking transparently, which sounds like what you want until you realize it means you can’t actually tell when a single NDJSON line ends — chunks don’t line up with JSON object boundaries. The current implementation reads raw bytes and does its own line-splitting:

buffer = b""
while not self._stopping:
    chunk = await response.content.read(1024)
    if not chunk:
        break
    buffer += chunk
    while b'\n' in buffer:
        line, buffer = buffer.split(b'\n', 1)
        line_str = line.decode('utf-8').strip()
        if line_str:
            yield json.loads(line_str)

Two design choices in there are deliberate. First, sock_read=60 on the ClientTimeout instead of a total timeout — a total timeout fires after 60 seconds even on a healthy stream that’s just quiet, which is exactly what the Board API looks like between moves. Second, the cancel path doesn’t use asyncio.wait_for around the read. I tried that first and it caused timer exhaustion under qasync — every cancelled timer left a small amount of state Qt couldn’t reap fast enough during a long session. The cooperative-cancel pattern below uses a flag that the read loop checks every two seconds, plus asyncio.CancelledError propagation from the outer task. It’s less elegant on paper and behaves much better in practice.

now = time.time()
if now - last_stop_check > 2.0:
    last_stop_check = now
    if self._stopping:
        break

The reconnection logic on top of this lives in main_window._run_event_stream. The Board API will drop the stream after a few minutes of inactivity even on healthy connections; the loop catches the disconnect, sleeps a couple of seconds, and reopens. Game streams are different — those should die with the game, not reconnect — so each stream type owns its own loop policy rather than sharing a generic reconnect wrapper.

4. Two UCI engines, one event loop

python-chess’s popen_uci does the heavy lifting of UCI: spawning the subprocess, sending uci/isready/ucinewgame/position/go, parsing the bestmove response, surfacing info lines for analysis. What it doesn’t do — and what most UCI tutorials skip — is option discovery. Every UCI engine declares its tunables at startup as option name X type Y default Z lines, and the set is wildly different between engines. Stockfish offers Threads, Hash, MultiPV, SyzygyPath. Leela adds WeightsFile and a hundred network-tuning knobs. Maia overrides Skill Level with neural-network weights and refuses to search past one node.

The engine options dialog has to be generated dynamically from whatever the engine declared. UCIOption.from_engine_option translates python-chess’s Option into a typed record with min/max/var-list/default, then the dialog walks the dict and emits the right widget per type — QSpinBox for spin, QCheckBox for check, QComboBox for combo, QLineEdit for string, QPushButton for button. Anything new an engine throws at us shows up correctly without code changes.

A subtler thing is that the playing engine and the evaluation engine are independent instances. The evaluation bar runs continuously — every 500ms it asks its engine for a fresh score on the current position. The playing engine runs once per move with a real time budget. Putting them in one process means I could share a UCI instance and just reconfigure between calls, but that’s slow (engines warm up their hash table) and brittle (interrupting a go to start a new go is a UCI implementation detail that varies by engine). Two UCIEngine instances, two subprocesses, two stdio loops, both driven from the same qasync event loop — costs more memory, costs zero in throughput, and the code stays linear.

The Maia case is the one place this gets weird. Maia doesn’t search — it’s a single forward pass through a neural network — so the usual go movetime 5000 does nothing useful (you get the same answer at 50ms as 5s). The fix is a “single node?” checkbox that swaps movetime for nodes=1 in the go parameters:

limit = chess.engine.Limit(nodes=1) if single_node else chess.engine.Limit(time=move_time)
result = await self.engine.play(board, limit)

That one boolean changes the whole search semantics. It’s the kind of detail that you’d never find in a tutorial — you’d find it the same way I did, by running Maia against a movetime limit, watching it return instantly, and digging through the engine’s docs to find the line that says “nodes=1 only”.