diff --git a/Cargo.lock b/Cargo.lock index 318c189..448308e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -360,6 +360,28 @@ dependencies = [ "syn", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -1826,8 +1848,11 @@ name = "hyperdb-api" version = "0.3.1" dependencies = [ "arrow", + "async-stream", "bytes", "deadpool", + "futures", + "futures-core", "hyperdb-api-core", "hyperdb-api-derive", "libc", diff --git a/docs/ROW_MAPPING.md b/docs/ROW_MAPPING.md new file mode 100644 index 0000000..76d3ed8 --- /dev/null +++ b/docs/ROW_MAPPING.md @@ -0,0 +1,322 @@ +# Row Mapping: Five Forms + +When querying Hyper, there are five ways to map result rows into Rust values — +from fully manual to fully automatic. Forms 1–4 trade manual control for +convenience; Form 5 combines the automatic struct mapping of Form 4 with the +constant-memory streaming of Form 1. Start with the simplest that fits your +situation. + +All five forms are demonstrated end-to-end in one runnable example: + +``` +cargo run -p hyperdb-api --example row_mapping_forms +``` + +The examples below all use the same schema: + +```sql +CREATE TABLE products ( + id INT NOT NULL, + name TEXT NOT NULL, + price DOUBLE PRECISION NOT NULL, + in_stock BOOLEAN NOT NULL +) +``` + +--- + +## Form 1 — Manual streaming (lowest level) + +`Connection::execute_query` returns a `Rowset` that you drain chunk by chunk. +Column access is positional (`row.get(0)`) and returns `Option`. + +This is the right choice when you need streaming (constant memory for huge +result sets), want to process rows without allocating a `Vec`, or are building +infrastructure that works with arbitrary schemas. + +```rust +use hyperdb_api::{Connection, CreateMode, HyperProcess, Result}; + +fn main() -> Result<()> { + let hyper = HyperProcess::new(None, None)?; + let conn = Connection::new(&hyper, "products.hyper", CreateMode::DoNotCreate)?; + + let mut result = conn.execute_query( + "SELECT id, name, price, in_stock FROM products ORDER BY id", + )?; + + while let Some(chunk) = result.next_chunk()? { + for row in &chunk { + // Positional access — column order must match the SELECT list. + let id: Option = row.get(0); + let name: Option = row.get(1); + let price: Option = row.get(2); + let in_stock: Option = row.get(3); + + println!( + "{:>2} {:<10} ${:.2} in_stock={}", + id.unwrap_or(-1), + name.unwrap_or_default(), + price.unwrap_or(0.0), + in_stock.unwrap_or(false), + ); + } + } + Ok(()) +} +``` + +**Trade-offs:** Maximum control and minimum allocations. Column indices are +fragile — a reordered `SELECT` silently breaks the mapping. All values come +back as `Option`, so you handle nullability at every call site. + +--- + +## Form 2 — Named access with `fetch_all` + `Row::get_by_name` + +`Connection::fetch_all` collects every row into a `Vec`. Access each field +by name with `row.get_by_name("col")`, which returns `Result` (error on NULL +or missing column). + +Use this when you want name-based safety without defining a struct — good for +one-off scripts, exploration, or when the struct would only be used in one place. + +```rust +use hyperdb_api::{Connection, CreateMode, HyperProcess, Result}; + +fn main() -> Result<()> { + let hyper = HyperProcess::new(None, None)?; + let conn = Connection::new(&hyper, "products.hyper", CreateMode::DoNotCreate)?; + + let rows = conn.fetch_all( + "SELECT id, name, price, in_stock FROM products ORDER BY id", + )?; + + for row in &rows { + // Named access — column order in the SELECT doesn't matter. + let id: i32 = row.get_by_name("id")?; + let name: String = row.get_by_name("name")?; + let price: f64 = row.get_by_name("price")?; + let in_stock: bool = row.get_by_name("in_stock")?; + + println!( + "{:>2} {:<10} ${:.2} in_stock={}", + id, name, price, in_stock, + ); + } + Ok(()) +} +``` + +**Trade-offs:** Column order independence and `Result` on every access (NULL +→ error, missing column → error). The name-to-index lookup is a linear scan per +call — fine for small result sets, but for large ones prefer Form 3 or 4 which +build the lookup once. + +--- + +## Form 3 — Manual `FromRow` impl + `fetch_all_as` + +Implement `FromRow` on your struct, then call `Connection::fetch_all_as::`. +The engine builds the column-name → index map once per query and hands every +`from_row` call a `RowAccessor` that reuses it — a single `HashMap` lookup +per field instead of a linear scan. + +Use this when you need a named struct but can't use derive (generic struct, custom +mapping logic, non-matching field/column names without `rename`, etc.). + +```rust +use hyperdb_api::{ + Connection, CreateMode, FromRow, HyperProcess, Result, RowAccessor, +}; + +#[derive(Debug)] +struct Product { + id: i32, + name: String, + price: f64, + in_stock: bool, +} + +impl FromRow for Product { + fn from_row(row: RowAccessor<'_>) -> Result { + Ok(Product { + id: row.get("id")?, + name: row.get("name")?, + price: row.get("price")?, + in_stock: row.get("in_stock")?, + }) + } +} + +fn main() -> Result<()> { + let hyper = HyperProcess::new(None, None)?; + let conn = Connection::new(&hyper, "products.hyper", CreateMode::DoNotCreate)?; + + let products: Vec = conn.fetch_all_as( + "SELECT id, name, price, in_stock FROM products ORDER BY id", + )?; + + for p in &products { + println!( + "{:>2} {:<10} ${:.2} in_stock={}", + p.id, p.name, p.price, p.in_stock, + ); + } + Ok(()) +} +``` + +**Trade-offs:** Explicit control — you see every field mapping, can add +transformation logic, and can map columns to differently-named fields. The +downside is boilerplate: adding or renaming a field means updating the `impl` +block by hand. Form 4 removes that boilerplate. + +--- + +## Form 4 — `#[derive(FromRow)]` (simplest) + +Add `#[derive(FromRow)]` to the struct. The proc-macro generates the same +`FromRow` impl as Form 3 — field names are matched to column names by default, +and `Option` fields use `get_opt` (NULL → `None`) instead of `get` (NULL → error). + +Use `#[hyperdb(rename = "col_name")]` when a field name doesn't match its +column name, or `#[hyperdb(index = N)]` for positional access. + +```rust +use hyperdb_api::{ + Connection, CreateMode, FromRow, HyperProcess, Result, +}; + +// The derive generates: impl FromRow for Product { fn from_row(...) { ... } } +// Each field maps to the column with the same name. +#[derive(Debug, FromRow)] +struct Product { + id: i32, + name: String, + price: f64, + in_stock: bool, +} + +fn main() -> Result<()> { + let hyper = HyperProcess::new(None, None)?; + let conn = Connection::new(&hyper, "products.hyper", CreateMode::DoNotCreate)?; + + let products: Vec = conn.fetch_all_as( + "SELECT id, name, price, in_stock FROM products ORDER BY id", + )?; + + for p in &products { + println!( + "{:>2} {:<10} ${:.2} in_stock={}", + p.id, p.name, p.price, p.in_stock, + ); + } + Ok(()) +} +``` + +**Trade-offs:** Zero boilerplate — add or rename a struct field and the mapping +updates automatically. Use Form 3 when you need custom logic in `from_row`; use +Form 4 for everything else. + +### Attribute reference + +| Attribute | Effect | +|---|---| +| *(none)* | Field `foo` maps to column `"foo"` | +| `#[hyperdb(rename = "col")]` | Field maps to column `"col"` | +| `#[hyperdb(index = N)]` | Field maps to column at position `N` (positional, not named) | +| Field type `Option` | NULL → `None`; non-NULL decoded as `T` | +| Field type `T` (non-Option) | NULL → error; non-NULL decoded as `T` | + +--- + +## Form 5 — Streaming `FromRow` mapping + +Forms 1–4 leave a gap: Form 4 (`fetch_all_as`) gives you automatic struct +mapping but calls `fetch_all` first — collecting **all rows** into a `Vec` +before any mapping happens, so memory is O(total rows). Form 1 streams with +constant memory but is positional and untyped. + +`Connection::stream_as::()` closes the gap: it returns a **lazy iterator** +that maps each row to `T` via `FromRow` (hand-written or `#[derive(FromRow)]`, +exactly as in Forms 3 and 4) while holding only one chunk in memory at a time. +The column-name → index lookup is built once from the first chunk's schema and +reused for every row, so per-row mapping stays O(1) in the column count. Rows +arrive one transport chunk at a time (up to ~64K rows per chunk), and only the +current chunk is held in memory — so peak memory is bounded by the chunk size, +not by how many rows the query returns. + +```rust +use hyperdb_api::{Connection, CreateMode, FromRow, HyperProcess, Result}; + +fn main() -> Result<()> { + let hyper = HyperProcess::new(None, None)?; + let conn = Connection::new(&hyper, "products.hyper", CreateMode::DoNotCreate)?; + + // Product derives FromRow (see Form 4); fields match the column names. + for row_result in conn.stream_as::( + "SELECT id, name, price, in_stock FROM products ORDER BY id", + )? { + let p: Product = row_result?; + println!("{:>2} {:<10} ${:.2} in_stock={}", p.id, p.name, p.price, p.in_stock); + } + Ok(()) +} +``` + +### Error handling + +`stream_as` reports errors in two places, and robust code handles both: + +- The outer `Result` (the `?` after `stream_as(...)`) carries failures detected + while *opening* the stream. On the gRPC transport that includes SQL parse and + server errors; on the default TCP transport the query streams lazily, so a SQL + error such as a missing table is usually reported as the **first iterator + item** instead. +- Each item is itself a `Result` — `Err` for a per-row mapping failure + (missing column, type mismatch, NULL in a non-`Option` field) or for a + server/transport error hit while fetching a later chunk. + +Do not assume a successfully-returned iterator means the query succeeded; always +handle the per-item `Result` too (the `let p = row_result?;` above does this). + +### Async + +`AsyncConnection::stream_as::()` is the async equivalent, returning +`impl Stream>`. The stream is lazy — nothing executes until +first polled — so a submission failure surfaces as the first `Err` item. Pin the +stream before polling: + +```rust +use futures::StreamExt; + +async fn print_products(conn: &hyperdb_api::AsyncConnection) -> hyperdb_api::Result<()> { + let stream = conn.stream_as::( + "SELECT id, name, price, in_stock FROM products ORDER BY id", + ); + tokio::pin!(stream); + while let Some(row_result) = stream.next().await { + let p: Product = row_result?; + println!("{}: {}", p.id, p.name); + } + Ok(()) +} +``` + +--- + +## Choosing a form + +| Need | Use | +|---|---| +| Streaming / billion-row result sets, no struct | Form 1 (`execute_query` + `next_chunk`) | +| Ad-hoc access, no struct needed | Form 2 (`fetch_all` + `get_by_name`) | +| Named struct, custom mapping logic | Form 3 (`impl FromRow` manually) | +| Named struct, fields match columns | Form 4 (`#[derive(FromRow)]`) | +| Streaming + named struct (constant memory) | Form 5 (`stream_as`) | + +For scalar values (a single `COUNT(*)`, `MAX`, etc.), use +[`fetch_scalar`](https://docs.rs/hyperdb-api/latest/hyperdb_api/struct.Connection.html#method.fetch_scalar) +instead — it skips the struct entirely. diff --git a/hyperdb-api/Cargo.toml b/hyperdb-api/Cargo.toml index 9a7a796..b243bf2 100644 --- a/hyperdb-api/Cargo.toml +++ b/hyperdb-api/Cargo.toml @@ -33,6 +33,13 @@ deadpool = { version = "0.13" } # tokio sync primitives — already present transitively via hyperdb-api-core's # async TCP client; declared here for the pool's first-connection mutex. tokio = { workspace = true, features = ["sync"] } +# async-stream's try_stream! powers AsyncConnection::stream_as, letting us +# await next_chunk() inside a natural loop and surface the submit error as +# the stream's first item. +async-stream = "0.3" +# futures-core provides the Stream trait used in stream_as's return type. +# Already present transitively; pinned here as a direct dep for the public API. +futures-core = "0.3" # Serde JSON - for query stats log parsing serde = { workspace = true } serde_json = { workspace = true } @@ -48,6 +55,8 @@ tempfile = { workspace = true } libc = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tokio = { version = "1", features = ["full"] } +# StreamExt/TryStreamExt for draining stream_as in async tests +futures = "0.3" rand = { workspace = true } serde = { workspace = true } sysinfo = { workspace = true } @@ -66,6 +75,10 @@ harness = true # Additional examples (in additional_examples/ subdirectory) +[[example]] +name = "row_mapping_forms" +path = "examples/additional_examples/row_mapping_forms.rs" + [[example]] name = "compile_time_validation" path = "examples/additional_examples/compile_time_validation.rs" diff --git a/hyperdb-api/examples/additional_examples/row_mapping_forms.rs b/hyperdb-api/examples/additional_examples/row_mapping_forms.rs new file mode 100644 index 0000000..c59b71a --- /dev/null +++ b/hyperdb-api/examples/additional_examples/row_mapping_forms.rs @@ -0,0 +1,254 @@ +// Copyright (c) 2026, Salesforce, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! Example: the five forms of row mapping +//! +//! Runnable companion to `docs/ROW_MAPPING.md`. Loads the same `products` +//! table the doc uses, then maps its rows five different ways — from fully +//! manual to fully automatic, plus the streaming variant: +//! +//! - **Form 1** — manual streaming: `execute_query` + `next_chunk`, positional +//! `row.get(N)` returning `Option`. +//! - **Form 2** — named access: `fetch_all` + `Row::get_by_name`. +//! - **Form 3** — hand-written `FromRow` impl + `fetch_all_as`. +//! - **Form 4** — `#[derive(FromRow)]` + `fetch_all_as`. +//! - **Form 5** — streaming `FromRow`: `stream_as`, constant memory. +//! +//! Every form prints the same four products, so you can see they are +//! equivalent. Form 5 is shown on both the sync `Connection` and the async +//! `AsyncConnection` (which returns an `impl Stream` instead of an iterator). +//! Run with: +//! +//! cargo run -p hyperdb-api --example row_mapping_forms +//! +//! `#[derive(FromRow)]` lives in the `hyperdb-api-derive` crate (it is not +//! re-exported from `hyperdb-api`), so Forms 4 and 5 import it directly. + +use futures::StreamExt; +use hyperdb_api::{ + AsyncConnection, Connection, CreateMode, FromRow, HyperProcess, Parameters, Result, RowAccessor, +}; +use hyperdb_api_derive::FromRow; + +const QUERY: &str = "SELECT id, name, price, in_stock FROM products ORDER BY id"; + +fn main() -> Result<()> { + std::fs::create_dir_all("test_results")?; + + let mut params = Parameters::new(); + params.set("log_dir", "test_results"); + let hyper = HyperProcess::new(None, Some(¶ms))?; + + let conn = Connection::new( + &hyper, + "test_results/row_mapping_forms.hyper", + CreateMode::CreateAndReplace, + )?; + seed_products(&conn)?; + + form1_manual_streaming(&conn)?; + form2_named_access(&conn)?; + form3_manual_from_row(&conn)?; + form4_derive_from_row(&conn)?; + form5_streaming_from_row(&conn)?; + + // Form 5 also has an async flavor. Drop the sync connection first so the + // async one reopens the same database file cleanly, then drive the stream + // on a small Tokio runtime built just for this section. + drop(conn); + let endpoint = hyper.require_endpoint()?.to_string(); + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| hyperdb_api::Error::config(format!("failed to build Tokio runtime: {e}")))? + .block_on(form5_streaming_from_row_async(&endpoint))?; + + Ok(()) +} + +/// Creates the `products` table from `docs/ROW_MAPPING.md` and inserts four +/// rows that every form below reads back. +fn seed_products(conn: &Connection) -> Result<()> { + conn.execute_command( + "CREATE TABLE products ( + id INT NOT NULL, + name TEXT NOT NULL, + price DOUBLE PRECISION NOT NULL, + in_stock BOOLEAN NOT NULL + )", + )?; + conn.execute_command( + "INSERT INTO products VALUES + (1, 'Widget', 9.99, true), + (2, 'Gadget', 19.95, false), + (3, 'Gizmo', 4.50, true), + (4, 'Doohickey', 14.00, true)", + )?; + Ok(()) +} + +/// Form 1 — manual streaming. `execute_query` returns a `Rowset` drained chunk +/// by chunk; column access is positional and returns `Option`. Maximum +/// control, minimum allocation, but indices are fragile and every value is an +/// `Option`. +fn form1_manual_streaming(conn: &Connection) -> Result<()> { + println!("== Form 1 — manual streaming (execute_query + next_chunk) =="); + + let mut result = conn.execute_query(QUERY)?; + while let Some(chunk) = result.next_chunk()? { + for row in &chunk { + // Positional access — column order must match the SELECT list. + let id: Option = row.get(0); + let name: Option = row.get(1); + let price: Option = row.get(2); + let in_stock: Option = row.get(3); + + print_row( + id.unwrap_or(-1), + &name.unwrap_or_default(), + price.unwrap_or(0.0), + in_stock.unwrap_or(false), + ); + } + } + println!(); + Ok(()) +} + +/// Form 2 — named access. `fetch_all` collects every row into a `Vec`; +/// `Row::get_by_name` looks each field up by column name (order-independent) +/// and returns `Result` (NULL or missing column → error). The name lookup +/// is a linear scan per call — fine for small results. +fn form2_named_access(conn: &Connection) -> Result<()> { + println!("== Form 2 — named access (fetch_all + Row::get_by_name) =="); + + let rows = conn.fetch_all(QUERY)?; + for row in &rows { + // Named access — column order in the SELECT doesn't matter. + let id: i32 = row.get_by_name("id")?; + let name: String = row.get_by_name("name")?; + let price: f64 = row.get_by_name("price")?; + let in_stock: bool = row.get_by_name("in_stock")?; + + print_row(id, &name, price, in_stock); + } + println!(); + Ok(()) +} + +/// Form 3 — hand-written `FromRow`. The struct controls its own field mapping; +/// `fetch_all_as` builds the column-name → index map once per query and hands +/// each `from_row` call a `RowAccessor` that reuses it (one `HashMap` lookup +/// per field, not a linear scan). Use when you need custom mapping logic or +/// can't use the derive. +#[derive(Debug)] +struct ProductManual { + id: i32, + name: String, + price: f64, + in_stock: bool, +} + +impl FromRow for ProductManual { + fn from_row(row: RowAccessor<'_>) -> Result { + Ok(ProductManual { + id: row.get("id")?, + name: row.get("name")?, + price: row.get("price")?, + in_stock: row.get("in_stock")?, + }) + } +} + +fn form3_manual_from_row(conn: &Connection) -> Result<()> { + println!("== Form 3 — manual FromRow impl (fetch_all_as) =="); + + let products: Vec = conn.fetch_all_as(QUERY)?; + for p in &products { + print_row(p.id, &p.name, p.price, p.in_stock); + } + println!(); + Ok(()) +} + +/// Form 4 — `#[derive(FromRow)]`. The proc-macro generates the same impl as +/// Form 3; field names match column names by default (use +/// `#[hyperdb(rename = "...")]` or `#[hyperdb(index = N)]` to override). +/// `Option` fields map NULL to `None`. Zero boilerplate. +#[derive(Debug, FromRow)] +struct ProductDerived { + id: i32, + name: String, + price: f64, + in_stock: bool, +} + +fn form4_derive_from_row(conn: &Connection) -> Result<()> { + println!("== Form 4 — #[derive(FromRow)] (fetch_all_as) =="); + + let products: Vec = conn.fetch_all_as(QUERY)?; + for p in &products { + print_row(p.id, &p.name, p.price, p.in_stock); + } + println!(); + Ok(()) +} + +/// Form 5 — streaming `FromRow`. `stream_as` returns a lazy iterator of +/// `Result`, mapping each row via `FromRow` (here the derived +/// `ProductDerived`) while holding only one transport chunk in memory at a +/// time. The column-index map is built once on the first chunk and reused — +/// peak memory is bounded by the chunk size, not the total row count, so this +/// is the form to reach for on large or unbounded result sets. +/// +/// `stream_as` reports errors in two places, and this loop handles both: the +/// `?` after `stream_as(...)` surfaces stream-open failures, and each +/// `row_result?` surfaces a per-row mapping error (or a transport error hit +/// while fetching a later chunk). +fn form5_streaming_from_row(conn: &Connection) -> Result<()> { + println!("== Form 5 — streaming FromRow (stream_as, constant memory) =="); + + for row_result in conn.stream_as::(QUERY)? { + let p = row_result?; + print_row(p.id, &p.name, p.price, p.in_stock); + } + println!(); + Ok(()) +} + +/// Form 5, async flavor. `AsyncConnection::stream_as` returns an +/// `impl Stream>` rather than an iterator — otherwise the +/// shape is identical to the sync version: lazy, one chunk in memory at a +/// time, index map built once. The stream is `!Unpin`, so it must be pinned +/// (here with `tokio::pin!`) before polling with `StreamExt::next`. +async fn form5_streaming_from_row_async(endpoint: &str) -> Result<()> { + println!("== Form 5 (async) — AsyncConnection::stream_as (impl Stream) =="); + + let conn = AsyncConnection::connect( + endpoint, + "test_results/row_mapping_forms.hyper", + CreateMode::DoNotCreate, + ) + .await?; + + // Scope the stream so it (and its borrow of `conn`) is dropped before the + // `conn.close()` below, which moves `conn`. + { + let stream = conn.stream_as::(QUERY); + tokio::pin!(stream); + while let Some(row_result) = stream.next().await { + let p = row_result?; + print_row(p.id, &p.name, p.price, p.in_stock); + } + } + + conn.close().await?; + println!(); + Ok(()) +} + +/// Shared row formatter so every form prints identically — making it obvious +/// the five forms return the same data. +fn print_row(id: i32, name: &str, price: f64, in_stock: bool) { + println!("{id:>2} {name:<10} ${price:.2} in_stock={in_stock}"); +} diff --git a/hyperdb-api/src/async_connection.rs b/hyperdb-api/src/async_connection.rs index 2c795e0..c6dd6ad 100644 --- a/hyperdb-api/src/async_connection.rs +++ b/hyperdb-api/src/async_connection.rs @@ -386,6 +386,85 @@ impl AsyncConnection { .collect() } + /// Returns a lazy `Stream` over rows, mapping each to `T` via + /// [`FromRow`]. + /// + /// This is the streaming variant of [`fetch_all_as`](Self::fetch_all_as): + /// memory usage is bounded by the chunk size (default 64K rows), not by + /// the total row count. Use this for large result sets where collecting + /// all rows into a `Vec` would exceed memory limits. + /// + /// The column-name → index lookup table is built exactly once (on the + /// first non-empty chunk) and reused for all rows, so per-row mapping is + /// O(1) in column count. + /// + /// # Example + /// + /// ```no_run + /// # use hyperdb_api::{AsyncConnection, CreateMode, FromRow, RowAccessor, Result}; + /// # use futures::StreamExt; + /// # struct User { id: i32, name: String } + /// # impl FromRow for User { + /// # fn from_row(row: RowAccessor<'_>) -> Result { + /// # Ok(User { id: row.get("id")?, name: row.get("name")? }) + /// # } + /// # } + /// # async fn example(conn: &AsyncConnection) -> Result<()> { + /// let stream = conn.stream_as::("SELECT id, name FROM users"); + /// tokio::pin!(stream); + /// while let Some(row_result) = stream.next().await { + /// let user = row_result?; + /// println!("{}: {}", user.id, user.name); + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// # Errors + /// + /// Each yielded item is a `Result`: + /// - The first item will be `Err(e)` if query submission fails (parse + /// failures, server errors, transport failures). The stream is lazy and + /// does not execute the query until first polled. + /// - Subsequent items are `Ok(T)` if the row was successfully mapped via + /// `FromRow`, or `Err(e)` if mapping failed (missing column, type + /// mismatch, NULL in a non-optional field). These errors surface lazily + /// during iteration. + /// + /// [`FromRow`]: crate::FromRow + pub fn stream_as<'a, T: crate::FromRow + 'a>( + &'a self, + query: &str, + ) -> impl futures_core::Stream> + 'a { + // Own the query string so the stream doesn't borrow the &str arg + // across await points. + let query = query.to_owned(); + async_stream::try_stream! { + let mut rs = self.execute_query(&query).await?; // submit err → first Err item + let mut indices: Option> = None; + while let Some(chunk) = rs.next_chunk().await? { + // Build the name→index map once, on the first chunk, after + // next_chunk() has materialized the schema (TCP sends the + // RowDescription as the first stream message). If the schema is + // somehow unavailable, fall back to an empty map so per-row + // lookups surface a `Missing` error — matching `fetch_all_as`'s + // `unwrap_or_default()` and the sync `stream_as`, rather than + // silently skipping the chunk. + if indices.is_none() { + let map = rs + .schema() + .map(|schema| crate::RowAccessor::build_owned_indices(&schema)) + .unwrap_or_default(); + indices = Some(map); + } + let idx = indices.get_or_insert_with(Default::default); + for row in &chunk { + yield T::from_row(crate::RowAccessor::new_owned(row, idx))?; + } + } + } + } + /// Fetches a single non-NULL scalar value. Errors on empty / NULL. /// /// # Errors diff --git a/hyperdb-api/src/connection.rs b/hyperdb-api/src/connection.rs index 5629233..a3b41b8 100644 --- a/hyperdb-api/src/connection.rs +++ b/hyperdb-api/src/connection.rs @@ -787,6 +787,64 @@ impl Connection { .collect() } + /// Returns a lazy iterator over rows, mapping each to `T` via + /// [`FromRow`]. + /// + /// This is the streaming variant of [`fetch_all_as`](Self::fetch_all_as): + /// memory usage is bounded by the chunk size (default 64K rows), not by + /// the total row count. Use this for large result sets where collecting + /// all rows into a `Vec` would exceed memory limits. + /// + /// The column-name → index lookup table is built exactly once (on the + /// first non-empty chunk) and reused for all rows, so per-row mapping is + /// O(1) in column count. + /// + /// # Example + /// + /// ```no_run + /// # use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result}; + /// # struct User { id: i32, name: String } + /// # impl FromRow for User { + /// # fn from_row(row: RowAccessor<'_>) -> Result { + /// # Ok(User { id: row.get("id")?, name: row.get("name")? }) + /// # } + /// # } + /// # fn example(conn: &Connection) -> Result<()> { + /// for row_result in conn.stream_as::("SELECT id, name FROM users")? { + /// let user = row_result?; + /// println!("{}: {}", user.id, user.name); + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// # Errors + /// + /// - The returned `Result` wraps errors detected while *opening* the + /// result stream — transport/connection failures, and (on the gRPC + /// transport, which establishes the query stream eagerly) SQL parse and + /// server errors. On the default TCP transport the query is streamed + /// lazily, so SQL errors such as a missing table are typically reported + /// as the **first yielded item** rather than by this outer `Result`. + /// - Each yielded item is itself a `Result`: + /// - `Ok(T)` if the row was successfully mapped via `FromRow`. + /// - `Err(e)` for a server/transport error encountered while streaming a + /// later chunk, or for a per-row mapping failure (missing column, type + /// mismatch, NULL in a non-optional field). + /// + /// In short: always handle errors *both* on the outer `Result` and on each + /// item — do not assume a successfully-returned iterator means the query + /// succeeded. + /// + /// [`FromRow`]: crate::FromRow + pub fn stream_as<'a, T>(&'a self, query: &str) -> Result> + 'a> + where + T: crate::FromRow + 'a, + { + let rowset = self.execute_query(query)?; + Ok(crate::result::TypedRowIterator::::new(rowset)) + } + /// Fetches a single scalar value from a query. /// /// Returns an error if the query returns no rows or NULL. diff --git a/hyperdb-api/src/result.rs b/hyperdb-api/src/result.rs index 7cd98d5..5a7fa38 100644 --- a/hyperdb-api/src/result.rs +++ b/hyperdb-api/src/result.rs @@ -1557,6 +1557,118 @@ impl Iterator for RowIterator<'_> { } } +/// Iterator over rows of a result set with `FromRow` deserialization. +/// +/// Returned by [`Connection::stream_as`]. Lazily fetches chunks from the +/// server and maps each row to `T` via [`FromRow::from_row`]. Memory usage is +/// bounded by the chunk size: at most one chunk of rows is held in memory at a +/// time. +/// +/// The column-name → index lookup table is built exactly once (on the first +/// non-empty chunk) and reused for all rows, making per-row mapping O(1) in +/// column count. +/// +/// [`Connection::stream_as`]: crate::Connection::stream_as +/// [`FromRow::from_row`]: crate::FromRow::from_row +/// +/// # Example +/// +/// ```no_run +/// # use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result}; +/// # struct User { id: i32, name: String } +/// # impl FromRow for User { +/// # fn from_row(row: RowAccessor<'_>) -> Result { +/// # Ok(User { id: row.get("id")?, name: row.get("name")? }) +/// # } +/// # } +/// # fn example(conn: &Connection) -> Result<()> { +/// for row in conn.stream_as::("SELECT id, name FROM users")? { +/// let user = row?; +/// println!("{}: {}", user.id, user.name); +/// } +/// # Ok(()) +/// # } +/// ``` +/// +/// # Errors +/// +/// Each yielded item is a `Result`: +/// - `Ok(T)` if the row was successfully mapped via `FromRow`. +/// - `Err(e)` if mapping failed (e.g., missing column, type mismatch, NULL in +/// a non-optional field). +/// +/// Transport-level errors (chunk-fetching failures) are also surfaced as +/// `Err` items. +pub(crate) struct TypedRowIterator<'conn, T> { + rowset: Rowset<'conn>, + current_iter: std::vec::IntoIter, + indices: Option>, + _marker: std::marker::PhantomData T>, +} + +impl std::fmt::Debug for TypedRowIterator<'_, T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TypedRowIterator") + .field("rowset", &self.rowset) + .field("current_iter_len", &self.current_iter.len()) + .field("indices_built", &self.indices.is_some()) + .finish_non_exhaustive() + } +} + +impl<'conn, T> TypedRowIterator<'conn, T> { + /// Constructs a new `TypedRowIterator` over the given rowset. + /// Crate-internal: callers go through `Connection::stream_as`. + pub(crate) fn new(rowset: Rowset<'conn>) -> Self { + Self { + rowset, + current_iter: Vec::new().into_iter(), + indices: None, + _marker: std::marker::PhantomData, + } + } +} + +impl Iterator for TypedRowIterator<'_, T> { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + // Try to get next row from current chunk. `get_or_insert_with` is + // a no-op here (the map was already built when this chunk was + // fetched, below) but lets us borrow it without an Option unwrap. + if let Some(row) = self.current_iter.next() { + let indices = self.indices.get_or_insert_with(Default::default); + return Some(T::from_row(crate::RowAccessor::new_owned(&row, indices))); + } + + // Current chunk exhausted, fetch next chunk + match self.rowset.next_chunk() { + Ok(Some(chunk)) => { + // Build the index map once, on the first chunk. The schema + // is populated by `next_chunk` as a side effect. If the + // schema is somehow unavailable, fall back to an empty map + // so column lookups surface a `Missing` error per row — + // matching `fetch_all_as`'s `unwrap_or_default()` rather + // than silently truncating the stream. + if self.indices.is_none() { + let map = self + .rowset + .schema() + .map(|schema| crate::RowAccessor::build_owned_indices(&schema)) + .unwrap_or_default(); + self.indices = Some(map); + } + self.current_iter = chunk.into_iter(); + // loop around to drain the freshly fetched chunk + } + Ok(None) => return None, // No more rows + Err(e) => return Some(Err(e)), // Error fetching chunk + } + } + } +} + // ============================================================================= // Unit tests that don't need a live hyperd backend. // diff --git a/hyperdb-api/src/row_accessor.rs b/hyperdb-api/src/row_accessor.rs index fa012dd..0a27f85 100644 --- a/hyperdb-api/src/row_accessor.rs +++ b/hyperdb-api/src/row_accessor.rs @@ -25,6 +25,30 @@ use std::collections::HashMap; use crate::error::{ColumnErrorKind, Error, Result}; use crate::result::{Row, RowValue}; +/// Storage for the column-name → index lookup behind a [`RowAccessor`]. +/// +/// `fetch_*_as` builds a zero-alloc `&str`-keyed map borrowing from the +/// `ResultSchema` (the `Borrowed` variant). The streaming path +/// (`Connection::stream_as`) needs to own its map across iterator steps, +/// which a `&str`-keyed map can't do (its keys borrow the schema), so it +/// uses an owned `String`-keyed map (the `Owned` variant). Both look up by +/// `&str` via `HashMap`'s `Borrow` bound, so the getters are agnostic. +#[derive(Debug)] +enum Indices<'a> { + Borrowed(&'a HashMap<&'a str, usize>), + Owned(&'a HashMap), +} + +impl Indices<'_> { + /// Resolves a column name to its index, agnostic to key ownership. + fn get(&self, name: &str) -> Option { + match self { + Indices::Borrowed(m) => m.get(name).copied(), + Indices::Owned(m) => m.get(name).copied(), + } + } +} + /// A view over a [`Row`] that supports name-based access via a /// pre-resolved column-name → index lookup table. /// @@ -53,7 +77,7 @@ use crate::result::{Row, RowValue}; #[derive(Debug)] pub struct RowAccessor<'a> { row: &'a Row, - indices: &'a HashMap<&'a str, usize>, + indices: Indices<'a>, } impl<'a> RowAccessor<'a> { @@ -61,7 +85,20 @@ impl<'a> RowAccessor<'a> { /// lookup map. Crate-internal: callers go through `fetch_*_as` to /// get a `RowAccessor`, never construct one directly. pub(crate) fn new(row: &'a Row, indices: &'a HashMap<&'a str, usize>) -> Self { - Self { row, indices } + Self { + row, + indices: Indices::Borrowed(indices), + } + } + + /// Constructs a new `RowAccessor` over the given row and an owned + /// lookup map. Crate-internal: used by `stream_as` where the map must + /// persist across iterator steps. + pub(crate) fn new_owned(row: &'a Row, indices: &'a HashMap) -> Self { + Self { + row, + indices: Indices::Owned(indices), + } } /// Builds a `name → index` lookup table from a [`ResultSchema`]. @@ -79,6 +116,21 @@ impl<'a> RowAccessor<'a> { map } + /// Builds an owned `name → index` lookup table from a [`ResultSchema`]. + /// + /// Used by `stream_as` where the map must persist across iterator + /// steps, requiring owned keys. Consumes O(N) time and allocates one + /// entry per column plus string copies. + /// + /// [`ResultSchema`]: crate::ResultSchema + pub(crate) fn build_owned_indices(schema: &crate::ResultSchema) -> HashMap { + let mut map = HashMap::with_capacity(schema.column_count()); + for i in 0..schema.column_count() { + map.insert(schema.column(i).name().to_string(), i); + } + map + } + /// Returns the named column's value, decoded as `T`. /// /// # Errors @@ -93,7 +145,6 @@ impl<'a> RowAccessor<'a> { let idx = self .indices .get(name) - .copied() .ok_or_else(|| Error::column(name, ColumnErrorKind::Missing))?; match self.row.get::(idx) { Some(v) => Ok(v), @@ -133,7 +184,6 @@ impl<'a> RowAccessor<'a> { let idx = self .indices .get(name) - .copied() .ok_or_else(|| Error::column(name, ColumnErrorKind::Missing))?; if self.row.is_null(idx) { return Ok(None); @@ -416,4 +466,60 @@ mod tests { other => panic!("expected Error::Column {{ kind: Null }}, got {other:?}"), } } + + // --- Owned-key variant tests --- + + #[test] + fn owned_happy_path_get_returns_value() { + let (row, schema) = user_row(Some(42), Some("alice")); + let indices = RowAccessor::build_owned_indices(&schema); + let accessor = RowAccessor::new_owned(&row, &indices); + + let id: i32 = accessor.get("id").expect("get id"); + let name: String = accessor.get("name").expect("get name"); + assert_eq!(id, 42); + assert_eq!(name, "alice"); + } + + #[test] + fn owned_missing_column_errors_with_kind_missing() { + let (row, schema) = user_row(Some(1), Some("alice")); + let indices = RowAccessor::build_owned_indices(&schema); + let accessor = RowAccessor::new_owned(&row, &indices); + + let err = accessor.get::("does_not_exist").unwrap_err(); + match err { + Error::Column { name, kind } => { + assert_eq!(name, "does_not_exist"); + assert!(matches!(kind, ColumnErrorKind::Missing)); + } + other => panic!("expected Error::Column {{ kind: Missing }}, got {other:?}"), + } + } + + #[test] + fn owned_null_in_required_column_errors_with_kind_null() { + let (row, schema) = user_row(Some(1), None); + let indices = RowAccessor::build_owned_indices(&schema); + let accessor = RowAccessor::new_owned(&row, &indices); + + let err = accessor.get::("name").unwrap_err(); + match err { + Error::Column { name, kind } => { + assert_eq!(name, "name"); + assert!(matches!(kind, ColumnErrorKind::Null)); + } + other => panic!("expected Error::Column {{ kind: Null }}, got {other:?}"), + } + } + + #[test] + fn owned_null_in_optional_column_returns_none() { + let (row, schema) = user_row(Some(1), None); + let indices = RowAccessor::build_owned_indices(&schema); + let accessor = RowAccessor::new_owned(&row, &indices); + + let v: Option = accessor.get_opt("name").expect("get_opt for NULL"); + assert_eq!(v, None); + } } diff --git a/hyperdb-api/tests/async_connection_tests.rs b/hyperdb-api/tests/async_connection_tests.rs index 0b4611a..dbc4007 100644 --- a/hyperdb-api/tests/async_connection_tests.rs +++ b/hyperdb-api/tests/async_connection_tests.rs @@ -9,6 +9,7 @@ mod common; use common::{test_hyper_params, test_result_path}; +use futures::{StreamExt, TryStreamExt}; use hyperdb_api::{AsyncConnection, CreateMode, FromRow, HyperProcess, Result}; async fn fresh_async_conn(name: &str) -> Result<(HyperProcess, AsyncConnection)> { @@ -266,3 +267,163 @@ async fn transaction_with_commit() { conn.close().await.unwrap(); } + +#[tokio::test(flavor = "current_thread")] +async fn stream_as_happy_path() { + let (_hyper, conn) = fresh_async_conn("async_stream_as_happy").await.unwrap(); + + conn.execute_command("CREATE TABLE users (id INT NOT NULL, name TEXT)") + .await + .unwrap(); + conn.execute_command("INSERT INTO users VALUES (1, 'alice'), (2, 'bob'), (3, NULL)") + .await + .unwrap(); + + let users = { + let stream = conn.stream_as::("SELECT id, name FROM users ORDER BY id"); + tokio::pin!(stream); + stream.try_collect::>().await.unwrap() + }; + + assert_eq!(users.len(), 3); + assert_eq!( + users[0], + User { + id: 1, + name: Some("alice".to_string()) + } + ); + assert_eq!( + users[1], + User { + id: 2, + name: Some("bob".to_string()) + } + ); + assert_eq!(users[2], User { id: 3, name: None }); + + // Verify it matches fetch_all_as + let fetch_all: Vec = conn + .fetch_all_as("SELECT id, name FROM users ORDER BY id") + .await + .unwrap(); + assert_eq!(users, fetch_all); + + conn.close().await.unwrap(); +} + +#[tokio::test(flavor = "current_thread")] +async fn stream_as_multi_chunk() { + let (_hyper, conn) = fresh_async_conn("async_stream_as_multi_chunk") + .await + .unwrap(); + + conn.execute_command("CREATE TABLE big (id INT NOT NULL, name TEXT)") + .await + .unwrap(); + // The TCP client accumulates up to DEFAULT_BINARY_CHUNK_SIZE (65_536) rows + // per chunk, so insert > 2× that to force at least two non-empty chunks and + // genuinely exercise the cross-chunk re-entry path (index map built once, + // reused on the second chunk). + const ROWS: i32 = 140_000; + conn.execute_command(&format!( + "INSERT INTO big SELECT id, 'user_' || id::TEXT FROM GENERATE_SERIES(1, {ROWS}) AS id", + )) + .await + .unwrap(); + + let users = { + let stream = conn.stream_as::("SELECT id, name FROM big ORDER BY id"); + tokio::pin!(stream); + stream.try_collect::>().await.unwrap() + }; + + let last = usize::try_from(ROWS).expect("row count fits usize") - 1; + assert_eq!(users.len(), ROWS as usize); + // Verify first and last + assert_eq!( + users[0], + User { + id: 1, + name: Some("user_1".to_string()) + } + ); + assert_eq!( + users[last], + User { + id: ROWS, + name: Some(format!("user_{ROWS}")) + } + ); + + conn.close().await.unwrap(); +} + +#[tokio::test(flavor = "current_thread")] +async fn stream_as_submit_error() { + let (_hyper, conn) = fresh_async_conn("async_stream_as_submit_error") + .await + .unwrap(); + + // Query a nonexistent table + { + let stream = conn.stream_as::("SELECT id, name FROM nonexistent_table"); + tokio::pin!(stream); + + // The first item should be an Err (submit error surfaces lazily) + let first = stream.next().await; + assert!(first.is_some()); + assert!(first.unwrap().is_err()); + } + + conn.close().await.unwrap(); +} + +#[tokio::test(flavor = "current_thread")] +async fn stream_as_empty() { + let (_hyper, conn) = fresh_async_conn("async_stream_as_empty").await.unwrap(); + + conn.execute_command("CREATE TABLE empty_users (id INT NOT NULL, name TEXT)") + .await + .unwrap(); + + let users = { + let stream = conn.stream_as::("SELECT id, name FROM empty_users WHERE 1=0"); + tokio::pin!(stream); + stream.try_collect::>().await.unwrap() + }; + + assert_eq!(users.len(), 0); + + conn.close().await.unwrap(); +} + +#[tokio::test(flavor = "current_thread")] +async fn stream_as_lenient_extra_column() { + let (_hyper, conn) = fresh_async_conn("async_stream_as_lenient").await.unwrap(); + + conn.execute_command("CREATE TABLE users_extra (id INT NOT NULL, name TEXT, extra TEXT)") + .await + .unwrap(); + conn.execute_command("INSERT INTO users_extra VALUES (1, 'alice', 'data')") + .await + .unwrap(); + + // SELECT * includes the extra column, but User only maps id and name + let users = { + let stream = conn.stream_as::("SELECT * FROM users_extra"); + tokio::pin!(stream); + stream.try_collect::>().await.unwrap() + }; + + assert_eq!(users.len(), 1); + assert_eq!( + users[0], + User { + id: 1, + name: Some("alice".to_string()) + } + ); + + conn.close().await.unwrap(); +} diff --git a/hyperdb-api/tests/remaining_features_tests.rs b/hyperdb-api/tests/remaining_features_tests.rs index f05e009..1e549d8 100644 --- a/hyperdb-api/tests/remaining_features_tests.rs +++ b/hyperdb-api/tests/remaining_features_tests.rs @@ -477,6 +477,183 @@ fn test_derive_from_row_missing_column_errors() { } } +// ============================================================================= +// #17 cont: stream_as — Lazy FromRow Iterator +// ============================================================================= + +#[test] +fn test_stream_as_happy_path() { + let test = TestConnection::new().expect("Failed to create test connection"); + + test.execute_command( + "CREATE TABLE stream_as_test (id INT NOT NULL, name TEXT, score DOUBLE PRECISION)", + ) + .expect("create"); + test.execute_command( + "INSERT INTO stream_as_test VALUES (1, 'Alice', 95.5), (2, 'Bob', 87.0), (3, 'Carol', 92.3)", + ) + .expect("insert"); + + let users: Vec = test + .connection + .stream_as::("SELECT id, name, score FROM stream_as_test ORDER BY id") + .expect("stream_as") + .collect::>>() + .expect("collect"); + + let expected: Vec = test + .connection + .fetch_all_as("SELECT id, name, score FROM stream_as_test ORDER BY id") + .expect("fetch_all_as"); + + assert_eq!(users, expected); + assert_eq!(users.len(), 3); + assert_eq!(users[0].id, 1); + assert_eq!(users[0].name, "Alice"); + assert_eq!(users[2].id, 3); + assert_eq!(users[2].name, "Carol"); +} + +#[test] +fn test_stream_as_multi_chunk() { + // Insert enough rows to span multiple transport chunks, proving the + // index map is built once and reused across chunk boundaries. The TCP + // client accumulates up to DEFAULT_BINARY_CHUNK_SIZE (65_536) rows per + // chunk, so we insert > 2× that to force at least two non-empty chunks + // and exercise the cross-chunk re-entry path. + const ROWS: i32 = 140_000; + let test = TestConnection::new().expect("Failed to create test connection"); + + test.execute_command( + "CREATE TABLE stream_multi_chunk (id INT NOT NULL, name TEXT, score DOUBLE PRECISION)", + ) + .expect("create"); + test.execute_command(&format!( + "INSERT INTO stream_multi_chunk SELECT id, 'name' || id, id * 1.0 \ + FROM GENERATE_SERIES(1, {ROWS}) AS g(id)", + )) + .expect("insert rows"); + + let users: Vec = test + .connection + .stream_as::("SELECT id, name, score FROM stream_multi_chunk ORDER BY id") + .expect("stream_as") + .collect::>>() + .expect("collect"); + + let last = usize::try_from(ROWS).expect("row count fits usize") - 1; + assert_eq!(users.len(), ROWS as usize); + assert_eq!(users[0].id, 1); + assert_eq!(users[0].name, "name1"); + assert!((users[0].score - 1.0).abs() < 0.001); + assert_eq!(users[last].id, ROWS); + assert_eq!(users[last].name, format!("name{ROWS}")); + assert!((users[last].score - f64::from(ROWS)).abs() < 0.001); +} + +#[test] +fn test_stream_as_submit_error() { + // SQL errors (like referencing a non-existent table) surface during + // iteration, not at stream creation. Streaming queries defer validation + // until the first chunk is fetched. + let test = TestConnection::new().expect("Failed to create test connection"); + + let mut iter = test + .connection + .stream_as::("SELECT * FROM no_such_table_xyz") + .expect("stream creation succeeds"); + + let first = iter.next().expect("iterator yields an item"); + assert!( + first.is_err(), + "Expected Err for non-existent table, got Ok(_)" + ); +} + +#[test] +fn test_stream_as_per_row_map_error() { + // Per-row mapping errors surface lazily as Err items during iteration. + // Define a struct whose FromRow requests a non-existent column. + #[derive(Debug)] + #[allow( + dead_code, + reason = "fields are accessed via FromRow impl, not directly" + )] + struct BadUser { + id: i32, + does_not_exist: String, + } + + impl FromRow for BadUser { + fn from_row(row: hyperdb_api::RowAccessor<'_>) -> hyperdb_api::Result { + Ok(BadUser { + id: row.get("id")?, + does_not_exist: row.get("does_not_exist")?, + }) + } + } + + let test = TestConnection::new().expect("Failed to create test connection"); + test.execute_command("CREATE TABLE bad_map (id INT NOT NULL)") + .expect("create"); + test.execute_command("INSERT INTO bad_map VALUES (1)") + .expect("insert"); + + let mut iter = test + .connection + .stream_as::("SELECT id FROM bad_map") + .expect("stream_as succeeds (submit)"); + + let first = iter.next().expect("iterator yields an item"); + assert!(first.is_err(), "Expected Err for missing column, got Ok(_)"); +} + +#[test] +fn test_stream_as_empty() { + // Empty result set yields an empty iterator, no error. + let test = TestConnection::new().expect("Failed to create test connection"); + + test.execute_command( + "CREATE TABLE stream_empty (id INT NOT NULL, name TEXT, score DOUBLE PRECISION)", + ) + .expect("create"); + // no INSERT + + let users: Vec = test + .connection + .stream_as::("SELECT id, name, score FROM stream_empty WHERE 1=0") + .expect("stream_as") + .collect::>>() + .expect("collect"); + + assert_eq!(users.len(), 0); +} + +#[test] +fn test_stream_as_lenient_extra_column() { + // Selecting extra columns not present in TestUser's FromRow impl should + // not cause errors — mirror fetch_all_as lenient semantics. + let test = TestConnection::new().expect("Failed to create test connection"); + + test.execute_command( + "CREATE TABLE stream_lenient (id INT NOT NULL, name TEXT, score DOUBLE PRECISION, extra TEXT)", + ) + .expect("create"); + test.execute_command("INSERT INTO stream_lenient VALUES (1, 'Alice', 95.5, 'extra_val')") + .expect("insert"); + + let users: Vec = test + .connection + .stream_as::("SELECT id, name, score, extra FROM stream_lenient ORDER BY id") + .expect("stream_as") + .collect::>>() + .expect("collect"); + + assert_eq!(users.len(), 1); + assert_eq!(users[0].id, 1); + assert_eq!(users[0].name, "Alice"); +} + // ============================================================================= // #16: Connection Health (ping) // ============================================================================= diff --git a/run_all_examples.ps1 b/run_all_examples.ps1 index 1385b8b..4b6d788 100644 --- a/run_all_examples.ps1 +++ b/run_all_examples.ps1 @@ -54,7 +54,10 @@ $examples = @( "threaded_inserter", "grpc_query", "connection_pool", - "transactions" + "transactions", + "async_parity_smoke", + "prepared_statements", + "row_mapping_forms" ) # No feature-gated examples — hyperdb-api has zero feature flags. diff --git a/run_all_examples.sh b/run_all_examples.sh index ccad6fe..fe7dfac 100755 --- a/run_all_examples.sh +++ b/run_all_examples.sh @@ -59,6 +59,7 @@ examples=( "transactions" "async_parity_smoke" "prepared_statements" + "row_mapping_forms" ) # Examples that require additional feature flags diff --git a/run_examples_wsl.sh b/run_examples_wsl.sh index e137eb4..6981fce 100644 --- a/run_examples_wsl.sh +++ b/run_examples_wsl.sh @@ -33,37 +33,30 @@ echo "=== Running All Rust API Examples ===" echo "HYPERD_PATH=$HYPERD_PATH" echo "" +# Keep this list in sync with run_all_examples.sh — both run the same set of +# registered hyperdb-api examples (see the [[example]] targets in +# hyperdb-api/Cargo.toml). Benchmarks and the feature-gated +# compile_time_validation example are intentionally excluded. examples=( + # Core canonical examples (matching C++/Python APIs) "insert_data_into_single_table" "insert_data_into_multiple_tables" "create_hyper_file_from_csv" + "delete_data_in_existing_hyper_file" + "update_data_in_existing_hyper_file" "read_and_print_data_from_existing_hyper_file" "insert_data_with_expressions" "insert_geospatial_data_to_a_hyper_file" - "delete_data_in_existing_hyper_file" - "update_data_in_existing_hyper_file" - # Consolidated examples - "reading_data" # Was: read_data + result_types - "inserter" - "threaded_inserter" + # Rust-specific value-add examples "arrow" - "catalog_and_schema" # Was: catalog_operations + schema_introspection - "multiple_databases" - "name_types" - "type_system" - "geography" - "logging" - "notice_receiver" - "parameterized_queries" - "struct_mapping" - "async_usage" # Was: async_connection + async_integration - "sharding_cluster" - "query_builder" # Was: query_builder_demo + advanced_query_builder - "future_improvements" - "grpc_query" # Now includes grpc_query_builder_demo content + "async_usage" + "threaded_inserter" + "grpc_query" + "connection_pool" "transactions" - "grpc_benchmark_tests" - "grpc_compilation_check" + "async_parity_smoke" + "prepared_statements" + "row_mapping_forms" # All five FromRow mapping forms (sync + async stream_as) ) passed=0 @@ -88,35 +81,6 @@ for ex in "${examples[@]}"; do echo "" done -# Examples that require additional feature flags -# Format: "example_name:feature1,feature2" -feature_examples=( - "sea_query:sea-query" - "connection_pool:pool" -) - -for entry in "${feature_examples[@]}"; do - # Parse "example_name:features" format - ex="${entry%%:*}" - features="${entry#*:}" - - echo "----------------------------------------" - echo "Running: $ex (features: $features)" - echo "----------------------------------------" - - if cargo run --release -p hyperdb-api --features "$features" --example "$ex" > "/tmp/rust_ex_${ex}.log" 2>&1; then - echo "✓ PASSED: $ex" - passed=$((passed + 1)) - else - echo "✗ FAILED: $ex" - echo "Last few lines:" - tail -5 "/tmp/rust_ex_${ex}.log" | sed 's/^/ /' - failed=$((failed + 1)) - failed_list+=("$ex") - fi - echo "" -done - echo "========================================" echo "Summary:" echo " Passed: $passed"