From 889b4e59277d7dd2b1477d042324e5818797df43 Mon Sep 17 00:00:00 2001 From: mac Date: Wed, 10 Jun 2026 23:36:44 +0800 Subject: [PATCH 1/5] update conftest --- tests/conftest.py | 26 +++++++++++++++++++-- tests/test_storage.py | 53 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 6db6308..5e7af27 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,8 +35,9 @@ def __init__(self): class _FakeCursor: - def __init__(self, store: _FakeStore): + def __init__(self, store: _FakeStore, pool: FakePool | None = None): self._s = store + self._pool = pool self.rowcount = 0 self._row = None self._rows: list = [] @@ -48,6 +49,8 @@ def __exit__(self, *_): pass def execute(self, sql: str, params=()): + if self._pool is not None: + self._pool.call_log.append((sql, params)) self._row = None self._rows = [] self.rowcount = 0 @@ -149,7 +152,7 @@ def fetchall(self): class _FakeConn: def __init__(self, store: _FakeStore, pool: FakePool | None = None): - self._cur = _FakeCursor(store) + self._cur = _FakeCursor(store, pool) self._pool = pool self.rollback_called = False @@ -187,6 +190,25 @@ def __init__(self): self._store = _FakeStore() self.fail_on_commit = False self.rollback_count = 0 + self.call_log: list[tuple[str, tuple]] = [] + + def calls_matching(self, sql_fragment: str) -> list[tuple[str, tuple]]: + """Return all recorded (sql, params) pairs whose normalised SQL contains sql_fragment.""" + frag = " ".join(sql_fragment.split()).upper() + return [ + (sql, params) + for sql, params in self.call_log + if frag in " ".join(sql.split()).upper() + ] + + def assert_param_at(self, sql_fragment: str, position: int, expected) -> None: + """Assert that the last call matching sql_fragment has params[position] == expected.""" + matches = self.calls_matching(sql_fragment) + assert matches, f"No SQL call matching {sql_fragment!r} in call_log" + _, params = matches[-1] + assert params[position] == expected, ( + f"params[{position}] expected {expected!r}, got {params[position]!r}" + ) def getconn(self): return _FakeConn(self._store, self) diff --git a/tests/test_storage.py b/tests/test_storage.py index ded2160..95e4ccc 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -415,3 +415,56 @@ def test_matches_paper_with_none_number_never_paper_matched(self, fake_pool): assert paper.number is None result = wl.matches_for_users([paper], []) assert "U1" not in result + + +# ── FakePool parameter capture ──────────────────────────────────────────────── + + +class TestFakePoolParamCapture: + """FakePool captures bound parameters; column-value binding errors are detectable.""" + + def test_call_log_records_sql_and_params(self, fake_pool): + wl = UserWatchlist(fake_pool) + wl.add("U1", "alice") + assert fake_pool.calls_matching("INSERT INTO user_watchlist") + + def test_watchlist_user_id_is_first_param(self, fake_pool): + wl = UserWatchlist(fake_pool) + wl.add("U1", "alice") + fake_pool.assert_param_at("INSERT INTO user_watchlist", 0, "U1") + + def test_watchlist_entry_is_second_param(self, fake_pool): + wl = UserWatchlist(fake_pool) + wl.add("U1", "alice") + fake_pool.assert_param_at("INSERT INTO user_watchlist", 1, "alice") + + def test_watchlist_entry_type_is_third_param(self, fake_pool): + wl = UserWatchlist(fake_pool) + wl.add("U1", "2300") + fake_pool.assert_param_at("INSERT INTO user_watchlist", 2, "paper") + + def test_paper_cache_key_is_first_param(self, fake_pool): + cache = PaperCache(fake_pool) + cache.write({"a": 1}) + fake_pool.assert_param_at("INSERT INTO paper_cache", 0, "wg21_index") + + def test_discovered_url_is_first_param(self, fake_pool): + state = ProbeState(fake_pool) + url = "https://isocpp.org/files/papers/D2300R11.pdf" + state.mark_discovered(url, last_modified_ts=42.0) + fake_pool.assert_param_at("INSERT INTO discovered_urls", 0, url) + + def test_discovered_last_modified_is_second_param(self, fake_pool): + state = ProbeState(fake_pool) + url = "https://isocpp.org/files/papers/D2300R11.pdf" + state.mark_discovered(url, last_modified_ts=42.0) + fake_pool.assert_param_at("INSERT INTO discovered_urls", 1, 42.0) + + def test_regression_swapped_user_id_and_entry_would_fail(self, fake_pool): + """Prove assert_param_at catches binding transposition.""" + wl = UserWatchlist(fake_pool) + wl.add("U1", "alice") + # Correct binding: position 0 == "U1", position 1 == "alice" + # If code had (entry, user_id, etype) the assertion below would raise: + with pytest.raises(AssertionError): + fake_pool.assert_param_at("INSERT INTO user_watchlist", 0, "alice") From d75977f763fa5708171c247deeec8cf9a58d5bdb Mon Sep 17 00:00:00 2001 From: mac Date: Thu, 11 Jun 2026 00:49:34 +0800 Subject: [PATCH 2/5] fixed lint error --- tests/conftest.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5e7af27..d02fafe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -196,9 +196,7 @@ def calls_matching(self, sql_fragment: str) -> list[tuple[str, tuple]]: """Return all recorded (sql, params) pairs whose normalised SQL contains sql_fragment.""" frag = " ".join(sql_fragment.split()).upper() return [ - (sql, params) - for sql, params in self.call_log - if frag in " ".join(sql.split()).upper() + (sql, params) for sql, params in self.call_log if frag in " ".join(sql.split()).upper() ] def assert_param_at(self, sql_fragment: str, position: int, expected) -> None: From ce1eafe5c9df6ba3ae844bb40ff1a8f2376ebda5 Mon Sep 17 00:00:00 2001 From: mac Date: Thu, 11 Jun 2026 02:55:03 +0800 Subject: [PATCH 3/5] addressed ai reviews --- tests/conftest.py | 8 ++++++-- tests/test_storage.py | 17 +++++++++++++---- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index d02fafe..0af69ae 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,7 @@ import json as _json import tempfile +from collections.abc import Sequence from pathlib import Path import pytest @@ -190,9 +191,9 @@ def __init__(self): self._store = _FakeStore() self.fail_on_commit = False self.rollback_count = 0 - self.call_log: list[tuple[str, tuple]] = [] + self.call_log: list[tuple[str, Sequence]] = [] - def calls_matching(self, sql_fragment: str) -> list[tuple[str, tuple]]: + def calls_matching(self, sql_fragment: str) -> list[tuple[str, Sequence]]: """Return all recorded (sql, params) pairs whose normalised SQL contains sql_fragment.""" frag = " ".join(sql_fragment.split()).upper() return [ @@ -204,6 +205,9 @@ def assert_param_at(self, sql_fragment: str, position: int, expected) -> None: matches = self.calls_matching(sql_fragment) assert matches, f"No SQL call matching {sql_fragment!r} in call_log" _, params = matches[-1] + assert position < len(params), ( + f"params has only {len(params)} element(s); position {position} is out of range" + ) assert params[position] == expected, ( f"params[{position}] expected {expected!r}, got {params[position]!r}" ) diff --git a/tests/test_storage.py b/tests/test_storage.py index 95e4ccc..31466b4 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -426,7 +426,9 @@ class TestFakePoolParamCapture: def test_call_log_records_sql_and_params(self, fake_pool): wl = UserWatchlist(fake_pool) wl.add("U1", "alice") - assert fake_pool.calls_matching("INSERT INTO user_watchlist") + calls = fake_pool.calls_matching("INSERT INTO user_watchlist") + assert calls + assert calls[-1][1] # params sequence is non-empty def test_watchlist_user_id_is_first_param(self, fake_pool): wl = UserWatchlist(fake_pool) @@ -460,11 +462,18 @@ def test_discovered_last_modified_is_second_param(self, fake_pool): state.mark_discovered(url, last_modified_ts=42.0) fake_pool.assert_param_at("INSERT INTO discovered_urls", 1, 42.0) + def test_assert_param_at_out_of_bounds_raises_assertion_error(self, fake_pool): + wl = UserWatchlist(fake_pool) + wl.add("U1", "alice") + with pytest.raises(AssertionError, match="out of range"): + fake_pool.assert_param_at("INSERT INTO user_watchlist", 99, "anything") + def test_regression_swapped_user_id_and_entry_would_fail(self, fake_pool): - """Prove assert_param_at catches binding transposition.""" + """ + Prove assert_param_at catches binding transposition. + This test exercises the FakePool test helper itself, not production code. + """ wl = UserWatchlist(fake_pool) wl.add("U1", "alice") - # Correct binding: position 0 == "U1", position 1 == "alice" - # If code had (entry, user_id, etype) the assertion below would raise: with pytest.raises(AssertionError): fake_pool.assert_param_at("INSERT INTO user_watchlist", 0, "alice") From a0708bb5d6a1316db611ba550f4b2ceb4a836202 Mon Sep 17 00:00:00 2001 From: mac Date: Thu, 11 Jun 2026 02:59:39 +0800 Subject: [PATCH 4/5] update test --- tests/conftest.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 0af69ae..771b1c4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -126,7 +126,9 @@ def execute(self, sql: str, params=()): self._s.watchlist[key] = etype self.rowcount = 1 - elif "DELETE FROM USER_WATCHLIST WHERE SLACK_USER_ID" in su and "AND ENTRY" in su: + elif ( + "DELETE FROM USER_WATCHLIST WHERE SLACK_USER_ID" in su and "AND ENTRY" in su + ): uid, entry = params[0], params[1] key = (uid, entry) if key in self._s.watchlist: @@ -139,7 +141,9 @@ def execute(self, sql: str, params=()): self._rows = sorted(rows, key=lambda x: (x[1], x[0])) elif "SELECT ENTRY FROM USER_WATCHLIST WHERE ENTRY_TYPE" in su: - self._rows = [(e,) for (_, e), t in self._s.watchlist.items() if t == "paper"] + self._rows = [ + (e,) for (_, e), t in self._s.watchlist.items() if t == "paper" + ] elif "SELECT SLACK_USER_ID, ENTRY, ENTRY_TYPE FROM USER_WATCHLIST" in su: self._rows = [(u, e, t) for (u, e), t in self._s.watchlist.items()] @@ -197,7 +201,9 @@ def calls_matching(self, sql_fragment: str) -> list[tuple[str, Sequence]]: """Return all recorded (sql, params) pairs whose normalised SQL contains sql_fragment.""" frag = " ".join(sql_fragment.split()).upper() return [ - (sql, params) for sql, params in self.call_log if frag in " ".join(sql.split()).upper() + (sql, params) + for sql, params in self.call_log + if frag in " ".join(sql.split()).upper() ] def assert_param_at(self, sql_fragment: str, position: int, expected) -> None: @@ -205,12 +211,12 @@ def assert_param_at(self, sql_fragment: str, position: int, expected) -> None: matches = self.calls_matching(sql_fragment) assert matches, f"No SQL call matching {sql_fragment!r} in call_log" _, params = matches[-1] - assert position < len(params), ( - f"params has only {len(params)} element(s); position {position} is out of range" - ) - assert params[position] == expected, ( - f"params[{position}] expected {expected!r}, got {params[position]!r}" - ) + assert ( + 0 <= position < len(params) + ), f"params has only {len(params)} element(s); position {position} is out of range" + assert ( + params[position] == expected + ), f"params[{position}] expected {expected!r}, got {params[position]!r}" def getconn(self): return _FakeConn(self._store, self) From b2dd90b75fc066375f2ae0e5222bdebaa2948e44 Mon Sep 17 00:00:00 2001 From: mac Date: Thu, 11 Jun 2026 03:00:22 +0800 Subject: [PATCH 5/5] fixed lint error --- tests/conftest.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 771b1c4..dca2179 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -126,9 +126,7 @@ def execute(self, sql: str, params=()): self._s.watchlist[key] = etype self.rowcount = 1 - elif ( - "DELETE FROM USER_WATCHLIST WHERE SLACK_USER_ID" in su and "AND ENTRY" in su - ): + elif "DELETE FROM USER_WATCHLIST WHERE SLACK_USER_ID" in su and "AND ENTRY" in su: uid, entry = params[0], params[1] key = (uid, entry) if key in self._s.watchlist: @@ -141,9 +139,7 @@ def execute(self, sql: str, params=()): self._rows = sorted(rows, key=lambda x: (x[1], x[0])) elif "SELECT ENTRY FROM USER_WATCHLIST WHERE ENTRY_TYPE" in su: - self._rows = [ - (e,) for (_, e), t in self._s.watchlist.items() if t == "paper" - ] + self._rows = [(e,) for (_, e), t in self._s.watchlist.items() if t == "paper"] elif "SELECT SLACK_USER_ID, ENTRY, ENTRY_TYPE FROM USER_WATCHLIST" in su: self._rows = [(u, e, t) for (u, e), t in self._s.watchlist.items()] @@ -201,9 +197,7 @@ def calls_matching(self, sql_fragment: str) -> list[tuple[str, Sequence]]: """Return all recorded (sql, params) pairs whose normalised SQL contains sql_fragment.""" frag = " ".join(sql_fragment.split()).upper() return [ - (sql, params) - for sql, params in self.call_log - if frag in " ".join(sql.split()).upper() + (sql, params) for sql, params in self.call_log if frag in " ".join(sql.split()).upper() ] def assert_param_at(self, sql_fragment: str, position: int, expected) -> None: @@ -211,12 +205,12 @@ def assert_param_at(self, sql_fragment: str, position: int, expected) -> None: matches = self.calls_matching(sql_fragment) assert matches, f"No SQL call matching {sql_fragment!r} in call_log" _, params = matches[-1] - assert ( - 0 <= position < len(params) - ), f"params has only {len(params)} element(s); position {position} is out of range" - assert ( - params[position] == expected - ), f"params[{position}] expected {expected!r}, got {params[position]!r}" + assert 0 <= position < len(params), ( + f"params has only {len(params)} element(s); position {position} is out of range" + ) + assert params[position] == expected, ( + f"params[{position}] expected {expected!r}, got {params[position]!r}" + ) def getconn(self): return _FakeConn(self._store, self)