From faa6962879c1554b030fd887b798835266a9c711 Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Fri, 29 May 2026 14:39:59 -0700 Subject: [PATCH 1/9] 1185: support incremental update sample type materialized view --- .../labkey/experiment/ExperimentModule.java | 2 + .../experiment/api/ExpMaterialTableImpl.java | 610 ++++++++++++++++-- .../experiment/api/ExpSampleTypeImpl.java | 8 +- .../experiment/api/SampleTypeServiceImpl.java | 10 +- .../api/SampleTypeUpdateServiceDI.java | 29 +- 5 files changed, 589 insertions(+), 70 deletions(-) diff --git a/experiment/src/org/labkey/experiment/ExperimentModule.java b/experiment/src/org/labkey/experiment/ExperimentModule.java index 5aa431cc2b1..91e50f06f67 100644 --- a/experiment/src/org/labkey/experiment/ExperimentModule.java +++ b/experiment/src/org/labkey/experiment/ExperimentModule.java @@ -123,6 +123,7 @@ import org.labkey.experiment.api.ExpDataClassType; import org.labkey.experiment.api.ExpDataImpl; import org.labkey.experiment.api.ExpDataTableImpl; +import org.labkey.experiment.api.ExpMaterialTableImpl; import org.labkey.experiment.api.ExpMaterialImpl; import org.labkey.experiment.api.ExpProtocolImpl; import org.labkey.experiment.api.ExpSampleTypeImpl; @@ -1119,6 +1120,7 @@ public Collection getSummary(Container c) DomainImpl.TestCase.class, DomainPropertyImpl.TestCase.class, ExpDataTableImpl.TestCase.class, + ExpMaterialTableImpl.IncrementalUpdateTestCase.class, ExperimentServiceImpl.AuditDomainUriTest.class, ExperimentServiceImpl.LineageQueryTestCase.class, ExperimentServiceImpl.ParseInputOutputAliasTestCase.class, diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index e8d702b06e9..29fb311e9c5 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -21,6 +21,11 @@ import org.apache.commons.math3.util.Precision; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jspecify.annotations.NonNull; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; import org.labkey.api.assay.plate.AssayPlateMetadataService; import org.labkey.api.audit.AuditHandler; import org.labkey.api.cache.BlockingCache; @@ -48,8 +53,10 @@ import org.labkey.api.data.RenderContext; import org.labkey.api.data.SQLFragment; import org.labkey.api.data.Sort; +import org.labkey.api.data.SqlSelector; import org.labkey.api.data.TableInfo; import org.labkey.api.data.UnionContainerFilter; +import org.labkey.api.data.dialect.SqlDialect; import org.labkey.api.dataiterator.DataIteratorBuilder; import org.labkey.api.dataiterator.DataIteratorContext; import org.labkey.api.dataiterator.LoggingDataIterator; @@ -61,6 +68,7 @@ import org.labkey.api.exp.api.ExpMaterial; import org.labkey.api.exp.api.ExpProtocol; import org.labkey.api.exp.api.ExpSampleType; +import org.labkey.api.exp.api.SampleTypeService; import org.labkey.api.exp.api.ExperimentService; import org.labkey.api.exp.api.ExperimentUrls; import org.labkey.api.exp.api.NameExpressionOptionService; @@ -77,6 +85,7 @@ import org.labkey.api.exp.query.ExpSchema; import org.labkey.api.exp.query.SamplesSchema; import org.labkey.api.gwt.client.AuditBehaviorType; +import org.labkey.api.gwt.client.model.GWTPropertyDescriptor; import org.labkey.api.gwt.client.model.PropertyValidatorType; import org.labkey.api.inventory.InventoryService; import org.labkey.api.ontology.Quantity; @@ -89,6 +98,7 @@ import org.labkey.api.query.LookupForeignKey; import org.labkey.api.query.QueryException; import org.labkey.api.query.QueryForeignKey; +import org.labkey.api.query.BatchValidationException; import org.labkey.api.query.QueryService; import org.labkey.api.query.QueryUpdateService; import org.labkey.api.query.QueryUrls; @@ -105,11 +115,14 @@ import org.labkey.api.security.permissions.Permission; import org.labkey.api.security.permissions.ReadPermission; import org.labkey.api.security.permissions.UpdatePermission; +import org.labkey.api.test.TestWhen; import org.labkey.api.util.GUID; import org.labkey.api.util.HeartBeat; +import org.labkey.api.util.JunitUtil; import org.labkey.api.util.PageFlowUtil; import org.labkey.api.util.Pair; import org.labkey.api.util.StringExpression; +import org.labkey.api.util.TestContext; import org.labkey.api.util.UnexpectedException; import org.labkey.api.view.ActionURL; import org.labkey.api.view.ViewContext; @@ -131,6 +144,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -165,7 +179,7 @@ public class ExpMaterialTableImpl extends ExpRunItemTableImpl new SampleTypeAmountPrecisionDisplayColumn(colInfo, typeUnit)); columnInfo.setDescription("The amount of this sample, in the display unit for the sample type, currently on hand."); columnInfo.setShownInUpdateView(true); @@ -1212,9 +1226,35 @@ public boolean supportTableRules() // intentional override } + /** + * Once the pending changed-rowid set for a single update grows past this size, a targeted incremental update is no + * longer worth it (the changed-rowid IN list grows and the win over a whole-join re-sync shrinks), so we escalate to a + * full re-sync instead. Chosen to match the application's bulk-edit cap of 1,000 rows, so a normal single update stays + * targeted. + *

+ * This value is also bounded by SQL Server's 2,100-bind-parameter limit: the targeted predicate references the changed + * set twice ({@code m.rowid IN (...) OR m.rootmaterialrowid IN (...)}), so a targeted statement binds {@code 2*N + 1} + * parameters. At 1,000 that is 2,001, safely under the cap. Raising this past ~1,049 would require restructuring the + * predicate to avoid using the set twice. + */ + static final int UPDATE_ROWID_THRESHOLD = 1_000; + static class InvalidationCounters { public final AtomicLong update, insert, delete, rollup; + + /** + * The accumulated exp.material rowIds changed by {@code update}s since the last drain, used to drive a targeted + * incremental update. Populated by {@link RefreshMaterializedViewRunnable#run()} before the {@code update} + * counter is incremented, so any reader that observes the new counter-value is guaranteed to see these rowids. + */ + public final Set pendingUpdateRowIds = ConcurrentHashMap.newKeySet(); + /** + * Set when an {@code update} could not supply rowIds (or the pending set crossed {@link #UPDATE_ROWID_THRESHOLD}), + * meaning the next read must do a full re-sync rather than a targeted update. + */ + public volatile boolean fullUpdatePending = false; + InvalidationCounters() { long l = System.currentTimeMillis(); @@ -1223,6 +1263,49 @@ static class InvalidationCounters delete = new AtomicLong(l); rollup = new AtomicLong(l); } + + /** Record the rowIds changed by one update (before its counter-increment). null/empty rowIds force a full re-sync. */ + void recordPendingUpdate(@Nullable Set changedRowIds) + { + if (changedRowIds == null || changedRowIds.isEmpty()) + { + fullUpdatePending = true; + pendingUpdateRowIds.clear(); + } + else if (!fullUpdatePending) + { + pendingUpdateRowIds.addAll(changedRowIds); + if (pendingUpdateRowIds.size() > UPDATE_ROWID_THRESHOLD) + { + fullUpdatePending = true; + pendingUpdateRowIds.clear(); + } + } + // else: a full re-sync is already pending; the individual rowIds are redundant + } + + /** The drained pending update state: either a full re-sync or the specific rowIds to target. */ + record PendingUpdate(boolean full, Set rowIds) {} + + /** + * Atomically take and clear the pending update state. Should be called under the {@code _Materialized} loading lock. + * A snapshot of the rowIds is removed (not the live set) so any rowIds added concurrently by a POSTCOMMIT runnable + * survive for the next drain. + */ + PendingUpdate drainPendingUpdate() + { + if (fullUpdatePending) + { + fullUpdatePending = false; + pendingUpdateRowIds.clear(); + return new PendingUpdate(true, Set.of()); + } + if (pendingUpdateRowIds.isEmpty()) + return new PendingUpdate(false, Set.of()); + Set drained = new HashSet<>(pendingUpdateRowIds); + pendingUpdateRowIds.removeAll(drained); + return new PendingUpdate(false, drained); + } } static final BlockingCache _materializedQueries = CacheManager.getBlockingStringKeyCache(CacheManager.UNLIMITED, CacheManager.HOUR, "materialized sample types", null); @@ -1231,62 +1314,77 @@ static class InvalidationCounters // used by SampleTypeServiceImpl.refreshSampleTypeMaterializedView() public static void refreshMaterializedView(final String lsid, SampleTypeServiceImpl.SampleChangeType reason) + { + refreshMaterializedView(lsid, reason, null); + } + + /** + * @param changedRowIds the exp.material rowIds known to have changed (only meaningful for update); null means + * the caller could not list the changed rows, forcing a full re-sync on the next read. + */ + public static void refreshMaterializedView(final String lsid, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Set changedRowIds) { var scope = ExperimentServiceImpl.getExpSchema().getScope(); - var runnable = new RefreshMaterializedViewRunnable(lsid, reason); + var runnable = new RefreshMaterializedViewRunnable(lsid, reason, changedRowIds); scope.addCommitTask(runnable, DbScope.CommitTaskOption.POSTCOMMIT); } - private static class RefreshMaterializedViewRunnable implements Runnable + /** + * POSTCOMMIT task that turns a committed data change into an invalidation: on a schema change it drops the cached MQH + * (the SQL itself must be regenerated); otherwise it bumps the matching per-LSID counter, and for {@code update} also + * records the changed rowIds for a targeted incremental update. + */ + private record RefreshMaterializedViewRunnable( + String lsid, + SampleTypeServiceImpl.SampleChangeType reason, + @Nullable Set changedRowIds + ) implements Runnable { - private final String _lsid; - private final SampleTypeServiceImpl.SampleChangeType _reason; - - public RefreshMaterializedViewRunnable(String lsid, SampleTypeServiceImpl.SampleChangeType reason) - { - _lsid = lsid; - _reason = reason; - } - @Override public void run() { - if (_reason == schema) + if (reason == schema) { /* NOTE: MaterializedQueryHelper can detect data changes and refresh the materialized view using the provided SQL. * It does not handle schema changes where the SQL itself needs to be updated. In this case, we remove the * MQH from the cache to force the SQL to be regenerated. */ - _materializedQueries.remove(_lsid); + _materializedQueries.remove(lsid); return; } - var counters = getInvalidateCounters(_lsid); - switch (_reason) + + var counters = getInvalidateCounters(lsid); + switch (reason) { case insert -> counters.insert.incrementAndGet(); case rollup -> counters.rollup.incrementAndGet(); - case update -> counters.update.incrementAndGet(); + case update -> + { + // Record the changed rowIds BEFORE bumping the counter: a reader that observes the new counter-value + // is then guaranteed to see the corresponding rowIds when it drains the pending state. + counters.recordPendingUpdate(changedRowIds); + counters.update.incrementAndGet(); + } case delete -> counters.delete.incrementAndGet(); - default -> throw new IllegalStateException("Unexpected value: " + _reason); + default -> throw new IllegalStateException("Unexpected value: " + reason); } } @Override - public boolean equals(Object obj) + public @NonNull String toString() { - return obj instanceof RefreshMaterializedViewRunnable other && _lsid.equals(other._lsid) && _reason.equals(other._reason); + // Concise: report the rowId count, not the (potentially huge) set, so dedup DEBUG logging stays readable. + return "RefreshMaterializedViewRunnable{lsid=" + lsid + ", reason=" + reason + + ", changedRowIds=" + (changedRowIds == null ? "null" : changedRowIds.size() + " rows") + "}"; } } private static InvalidationCounters getInvalidateCounters(String lsid) { if (!initializedListeners.getAndSet(true)) - { CacheManager.addListener(_invalidationCounters::clear); - } - return _invalidationCounters.computeIfAbsent(lsid, (unused) -> - new InvalidationCounters() - ); + + return _invalidationCounters.computeIfAbsent(lsid, (_) -> new InvalidationCounters()); } /* SELECT and JOIN, does not include WHERE, same as getJoinSQL() */ @@ -1304,18 +1402,23 @@ private SQLFragment getMaterializedSQL() * Maybe have a callback to generate the SQL dynamically, and verify that the sql is unchanged. */ SQLFragment viewSql = getJoinSQL(null).append(" WHERE CpasType = ").appendValue(_ss.getLSID()); + // Capture the column mapping for incremental updates. Safe to cache: a schema change drops this MQH from the + // cache (reason == schema), so a changed column set always rebuilds with a fresh mapping. + JoinColumns joinColumns = getJoinColumns(null); return (_MaterializedQueryHelper) new _MaterializedQueryHelper.Builder(_ss.getLSID(), "", getExpSchema().getDbSchema().getScope(), viewSql) + .joinColumns(joinColumns) .addIndex("CREATE UNIQUE INDEX uq_${NAME}_rowid ON temp.${NAME} (rowid)") .addIndex("CREATE UNIQUE INDEX uq_${NAME}_lsid ON temp.${NAME} (lsid)") .addIndex("CREATE INDEX idx_${NAME}_container ON temp.${NAME} (container)") .addIndex("CREATE INDEX idx_${NAME}_root ON temp.${NAME} (rootmaterialrowid)") - .addInvalidCheck(() -> String.valueOf(getInvalidateCounters(_ss.getLSID()).update.get())) + // NOTE: no addInvalidCheck for the update counter. Updates are now applied incrementally by + // executeIncrementalUpdate() (like insert/delete/rollup) rather than triggering a full drop-and-rebuild. + // The MQH is still dropped on schema changes and on incremental-update errors (see incrementalUpdateBeforeSelect). .build(); }); return new SQLFragment("SELECT * FROM ").append(mqh.getFromSql("_cached_view_")); } - /** * MaterializedQueryHelper has a built-in mechanism for tracking when a temp table needs to be recomputed. * It does not help with incremental updates (except for providing the upsert() method). @@ -1323,11 +1426,17 @@ private SQLFragment getMaterializedSQL() */ static class _MaterializedQueryHelper extends MaterializedQueryHelper { + /** Columns that an incremental UPDATE never re-derives */ + static final Set IMMUTABLE_UPDATE_COLUMNS = new CaseInsensitiveHashSet(RowId.name(), LSID.name(), CpasType.name(), RootMaterialRowId.name()); + final String _lsid; + /** The column mapping that produced the temp table, used to build the incremental UPDATE SET clauses. */ + final JoinColumns _joinColumns; static class Builder extends MaterializedQueryHelper.Builder { String _lsid; + JoinColumns _joinColumns; public Builder(String lsid, String prefix, DbScope scope, SQLFragment select) { @@ -1335,18 +1444,25 @@ public Builder(String lsid, String prefix, DbScope scope, SQLFragment select) this._lsid = lsid; } + public Builder joinColumns(JoinColumns joinColumns) + { + this._joinColumns = joinColumns; + return this; + } + @Override public _MaterializedQueryHelper build() { - return new _MaterializedQueryHelper(_lsid, _prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto); + return new _MaterializedQueryHelper(_lsid, _joinColumns, _prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto); } } - _MaterializedQueryHelper(String lsid, String prefix, DbScope scope, SQLFragment select, @Nullable SQLFragment uptodate, Supplier supplier, @Nullable Collection indexes, long maxTimeToCache, + _MaterializedQueryHelper(String lsid, JoinColumns joinColumns, String prefix, DbScope scope, SQLFragment select, @Nullable SQLFragment uptodate, Supplier supplier, @Nullable Collection indexes, long maxTimeToCache, boolean isSelectIntoSql) { super(prefix, scope, select, uptodate, supplier, indexes, maxTimeToCache, isSelectIntoSql); this._lsid = lsid; + this._joinColumns = joinColumns; } @Override @@ -1373,6 +1489,8 @@ protected void incrementalUpdateBeforeSelect(Materialized m) if (!materialized.incrementalDeleteCheck.stillValid(0)) executeIncrementalDelete(); + if (!materialized.incrementalUpdateCheck.stillValid(0)) + executeIncrementalUpdate(); if (!materialized.incrementalRollupCheck.stillValid(0)) executeIncrementalRollup(); if (!materialized.incrementalInsertCheck.stillValid(0)) @@ -1399,7 +1517,7 @@ protected void incrementalUpdateBeforeSelect(Materialized m) void upsertWithRetry(SQLFragment sql) { // not actually read-only, but we don't want to start an explicit transaction - _scope.executeWithRetryReadOnly((tx) -> upsert(sql)); + _scope.executeWithRetryReadOnly((_) -> upsert(sql)); } void executeIncrementalInsert() @@ -1457,6 +1575,140 @@ void executeIncrementalRollup() } upsertWithRetry(incremental); } + + /** + * Generalized incremental update: re-derive the materialized columns from their source joins for the rows that + * changed since the last drain, instead of dropping and fully rebuilding the temp table. Drains the pending state + * under the {@code _Materialized} loading lock (held by the caller) and either does a full re-sync or a single + * targeted update of the changed rows. On failure the drained work is restored, so nothing is lost before the + * caller's handler drops the MQH and rebuilds. + */ + void executeIncrementalUpdate() + { + InvalidationCounters counters = getInvalidateCounters(_lsid); + InvalidationCounters.PendingUpdate pending = counters.drainPendingUpdate(); + if (!pending.full() && pending.rowIds().isEmpty()) + return; + + try + { + // The changed set is capped at UPDATE_ROWID_THRESHOLD, so a targeted update fits in one statement. + upsertWithRetry(buildIncrementalUpdateSql(pending.full() ? null : pending.rowIds())); + } + catch (RuntimeException ex) + { + // Restore the drained work so nothing is lost; the caller's handler will drop the MQH and rebuild cleanly. + if (pending.full()) + counters.fullUpdatePending = true; + else + counters.pendingUpdateRowIds.addAll(pending.rowIds()); + throw ex; + } + } + + /** + * Build one incremental UPDATE that re-derives the materialized columns from their source joins. When + * {@code changedRowIds} is null this is a full re-sync over the whole sample type; otherwise it targets the given + * rows plus the aliquots of any changed root (via {@code rootmaterialrowid}). The {@code IS DISTINCT FROM} + * (PostgreSQL) / {@code EXCEPT}-based (SQL Server) guard skips rows whose values already match, so only genuinely + * differing rows are rewritten. + */ + SQLFragment buildIncrementalUpdateSql(@Nullable Collection changedRowIds) + { + var d = CoreSchema.getInstance().getSchema().getSqlDialect(); + JoinColumns jc = _joinColumns; + + // Re-derive every temp column except the join key and immutable columns + List setColumns = jc.columns.stream() + .filter(c -> !IMMUTABLE_UPDATE_COLUMNS.contains(c.tempColumnName())) + .toList(); + + SQLFragment sql = new SQLFragment(); + if (d.isPostgreSQL()) + { + sql.append("UPDATE temp.${NAME} AS st\nSET "); + appendSetClause(sql, setColumns); + sql.append("\nFROM exp.Material m"); + appendProvisionedJoins(sql, jc); + sql.append("\nWHERE st.rowid = m.rowid AND m.cpastype = ").appendValue(_lsid, d); + appendChangedRowIdPredicate(sql, d, changedRowIds); + sql.append(" AND ("); + String or = ""; + for (JoinColumn c : setColumns) + { + sql.append(or).append("st.").append(c.tempColumnSql()).append(" IS DISTINCT FROM ").append(c.sourceSql()); + or = " OR "; + } + sql.append(")"); + } + else + { + sql.append("UPDATE st\nSET "); + appendSetClause(sql, setColumns); + sql.append("\nFROM temp.${NAME} st INNER JOIN exp.Material m ON st.rowid = m.rowid"); + appendProvisionedJoins(sql, jc); + sql.append("\nWHERE m.cpastype = ").appendValue(_lsid, d); + appendChangedRowIdPredicate(sql, d, changedRowIds); + // SQL Server before 2022 has no IS DISTINCT FROM; "SELECT EXCEPT SELECT " is a portable + // null-safe tuple comparison that yields a row iff the tuples differ. + sql.append(" AND EXISTS (SELECT "); + String comma = ""; + for (JoinColumn c : setColumns) + { + sql.append(comma).append("st.").append(c.tempColumnSql()); + comma = ", "; + } + sql.append(" EXCEPT SELECT "); + comma = ""; + for (JoinColumn c : setColumns) + { + sql.append(comma).append(c.sourceSql()); + comma = ", "; + } + sql.append(")"); + } + return sql; + } + + private static void appendSetClause(SQLFragment sql, List setColumns) + { + String comma = ""; + for (JoinColumn c : setColumns) + { + // PostgreSQL and SQL Server both want bare column names on the SET left-hand side. + sql.append(comma).append(c.tempColumnSql()).append(" = ").append(c.sourceSql()); + comma = ", "; + } + } + + private static void appendProvisionedJoins(SQLFragment sql, JoinColumns jc) + { + if (jc.hasSampleColumns) + sql.append(" INNER JOIN ").append(jc.provisioned, "m_sample").append(" ON m.RootMaterialRowId = m_sample.RowId"); + if (jc.hasAliquotColumns) + sql.append(" INNER JOIN ").append(jc.provisioned, "m_aliquot").append(" ON m.RowId = m_aliquot.RowId"); + } + + /** + * Restrict to the changed rows. The {@code rootmaterialrowid} clause fans a changed root out to all of its aliquots + * (whose root-derived columns must be re-derived); it only matters for changed roots, since an aliquot's rowid is + * never another row's root. No-op for a full re-sync (null). + *

+ * Passing a null large-IN generator to {@code appendInClauseSqlWithCustomInClauseGenerator} forces bind-parameter + * markers (never the temp-table generator), which is required because this runs as a single statement via + * {@code upsert()}. The set is bound twice; UPDATE_ROWID_THRESHOLD keeps the total parameter count under SQL + * Server's cap (see its javadoc). + */ + private static void appendChangedRowIdPredicate(SQLFragment sql, SqlDialect d, @Nullable Collection changedRowIds) + { + if (changedRowIds == null) + return; + sql.append(" AND (m.rowid"); + d.appendInClauseSqlWithCustomInClauseGenerator(sql, changedRowIds, null); + sql.append(" OR m.rootmaterialrowid"); + d.appendInClauseSqlWithCustomInClauseGenerator(sql, changedRowIds, null); + sql.append(")"); + } } static class _Materialized extends MaterializedQueryHelper.Materialized @@ -1464,6 +1716,7 @@ static class _Materialized extends MaterializedQueryHelper.Materialized final MaterializedQueryHelper.Invalidator incrementalInsertCheck; final MaterializedQueryHelper.Invalidator incrementalRollupCheck; final MaterializedQueryHelper.Invalidator incrementalDeleteCheck; + final MaterializedQueryHelper.Invalidator incrementalUpdateCheck; _Materialized(_MaterializedQueryHelper mqh, String tableName, String cacheKey, long created, String sql) { @@ -1472,6 +1725,7 @@ static class _Materialized extends MaterializedQueryHelper.Materialized incrementalInsertCheck = new MaterializedQueryHelper.SupplierInvalidator(() -> String.valueOf(counters.insert.get())); incrementalRollupCheck = new MaterializedQueryHelper.SupplierInvalidator(() -> String.valueOf(counters.rollup.get())); incrementalDeleteCheck = new MaterializedQueryHelper.SupplierInvalidator(() -> String.valueOf(counters.delete.get())); + incrementalUpdateCheck = new MaterializedQueryHelper.SupplierInvalidator(() -> String.valueOf(counters.update.get())); } @Override @@ -1482,6 +1736,7 @@ public void reset() incrementalInsertCheck.stillValid(now); incrementalRollupCheck.stillValid(now); incrementalDeleteCheck.stillValid(now); + incrementalUpdateCheck.stillValid(now); } Lock getLock() @@ -1490,38 +1745,68 @@ Lock getLock() } } + /** + * Which join alias a materialized-view column is sourced from: + *

    + *
  • {@code m} – the row's own {@code exp.material} record
  • + *
  • {@code m_sample} – the root material's provisioned row (root-derived / {@code ParentOnly} columns)
  • + *
  • {@code m_aliquot} – the row's own provisioned row (aliquot-scoped columns plus {@code genid} / unique-id fields)
  • + *
+ */ + enum JoinAlias { m, m_sample, m_aliquot } + + /** + * One column of the materialized join: the unencoded temp-table column name, the SQL reference to that column, the SQL + * expression that produces its value, and which join alias that expression reads from. This is the single source of + * truth shared by {@link #getJoinSQL} (which builds the SELECT that creates the temp table) and the incremental + * {@code UPDATE} paths (which build the SET clause). Keeping both consumers on this one mapping guarantees the temp + * column references and the SET expressions can never drift. {@code tempColumnSql} is the valid SQL reference (e.g., a + * quoted identifier) by which the column is named in the temp table; use it bare as an UPDATE SET target, or prefix it + * with a table alias (e.g., {@code st.}) to reference it elsewhere. {@code tempColumnName} is the unencoded name, useful + * for identifying a specific column (e.g., the {@code rowid} join key). + */ + record JoinColumn(String tempColumnName, SQLFragment tempColumnSql, SQLFragment sourceSql, JoinAlias alias) {} - /* SELECT and JOIN, does not include WHERE */ - private SQLFragment getJoinSQL(Set selectedColumns) + /** + * The ordered column mapping for the materialized join plus which provisioned joins are actually needed. Mirrors the + * {@code hasSampleColumns} / {@code hasAliquotColumns} bookkeeping that {@link #getJoinSQL} previously tracked inline. + */ + static class JoinColumns + { + final List columns = new ArrayList<>(); + TableInfo provisioned = null; + boolean hasSampleColumns = false; // m_sample join required + boolean hasAliquotColumns = false; // m_aliquot join required + } + + /** + * Classify every selected column into its temp-table name, source expression, and join alias, preserving exactly the + * rules previously inlined in {@link #getJoinSQL}: material columns → {@code m}; {@code genid} / unique-id fields + * → {@code m_aliquot}; root-derived ({@code ParentOnly} or empty derivation scope) → {@code m_sample}; all + * other provisioned columns → {@code m_aliquot}; MV-indicator columns are always included. + */ + private JoinColumns getJoinColumns(Set selectedColumns) { - TableInfo provisioned = null == _ss ? null : _ss.getTinfo(); - Set provisionedCols = new CaseInsensitiveHashSet(provisioned != null ? provisioned.getColumnNameSet() : Collections.emptySet()); + JoinColumns result = new JoinColumns(); + result.provisioned = null == _ss ? null : _ss.getTinfo(); + Set provisionedCols = new CaseInsensitiveHashSet(result.provisioned != null ? result.provisioned.getColumnNameSet() : Collections.emptySet()); provisionedCols.remove(RowId.name()); provisionedCols.remove(Name.name()); boolean hasProvisionedColumns = containsProvisionedColumns(selectedColumns, provisionedCols); - boolean hasSampleColumns = false; - boolean hasAliquotColumns = false; - Set materialCols = new CaseInsensitiveHashSet(_rootTable.getColumnNameSet()); selectedColumns = computeInnerSelectedColumns(selectedColumns); - SQLFragment sql = new SQLFragment(); - sql.appendComment("", getSqlDialect()); - sql.append("SELECT "); - String comma = ""; for (String materialCol : materialCols) { // don't need to generate SQL for columns that aren't selected if (ALL_COLUMNS == selectedColumns || selectedColumns.contains(new FieldKey(null, materialCol))) - { - sql.append(comma).append("m.").appendIdentifier(materialCol); - comma = ", "; - } + result.columns.add(new JoinColumn(materialCol, new SQLFragment().appendIdentifier(materialCol), new SQLFragment("m.").appendIdentifier(materialCol), JoinAlias.m)); } - if (null != provisioned && hasProvisionedColumns) + + if (null != result.provisioned && hasProvisionedColumns) { - for (ColumnInfo propertyColumn : provisioned.getColumns()) + for (ColumnInfo propertyColumn : result.provisioned.getColumns()) { // don't select twice if ( @@ -1535,35 +1820,56 @@ private SQLFragment getJoinSQL(Set selectedColumns) // don't need to generate SQL for columns that aren't selected if (ALL_COLUMNS == selectedColumns || selectedColumns.contains(propertyColumn.getFieldKey()) || propertyColumn.isMvIndicatorColumn()) { - sql.append(comma); boolean rootField = StringUtils.isEmpty(propertyColumn.getDerivationDataScope()) || ExpSchema.DerivationDataScopeType.ParentOnly.name().equalsIgnoreCase(propertyColumn.getDerivationDataScope()); + String tempColumnName = propertyColumn.getSelectIdentifier().getId(); + SQLFragment tempColumnSql = propertyColumn.getSelectIdentifier().getSql(); if ("genid".equalsIgnoreCase(propertyColumn.getColumnName()) || propertyColumn.isUniqueIdField()) { - sql.append(propertyColumn.getValueSql("m_aliquot")).append(" AS ").appendIdentifier(propertyColumn.getSelectIdentifier()); - hasAliquotColumns = true; + result.columns.add(new JoinColumn(tempColumnName, tempColumnSql, propertyColumn.getValueSql("m_aliquot"), JoinAlias.m_aliquot)); + result.hasAliquotColumns = true; } else if (rootField) { - sql.append(propertyColumn.getValueSql("m_sample")).append(" AS ").appendIdentifier(propertyColumn.getSelectIdentifier()); - hasSampleColumns = true; + result.columns.add(new JoinColumn(tempColumnName, tempColumnSql, propertyColumn.getValueSql("m_sample"), JoinAlias.m_sample)); + result.hasSampleColumns = true; } else { - sql.append(propertyColumn.getValueSql("m_aliquot")).append(" AS ").appendIdentifier(propertyColumn.getSelectIdentifier()); - hasAliquotColumns = true; + result.columns.add(new JoinColumn(tempColumnName, tempColumnSql, propertyColumn.getValueSql("m_aliquot"), JoinAlias.m_aliquot)); + result.hasAliquotColumns = true; } - comma = ", "; } } } + return result; + } + + /* SELECT and JOIN, does not include WHERE */ + private SQLFragment getJoinSQL(Set selectedColumns) + { + JoinColumns joinColumns = getJoinColumns(selectedColumns); + + SQLFragment sql = new SQLFragment(); + sql.appendComment("", getSqlDialect()); + sql.append("SELECT "); + String comma = ""; + for (JoinColumn joinColumn : joinColumns.columns) + { + sql.append(comma).append(joinColumn.sourceSql()); + // material columns are selected by name (no alias); provisioned columns are aliased to their temp-table column name + if (JoinAlias.m != joinColumn.alias()) + sql.append(" AS ").append(joinColumn.tempColumnSql()); + comma = ", "; + } + sql.append("\nFROM "); sql.append(_rootTable, "m"); - if (hasSampleColumns) - sql.append(" INNER JOIN ").append(provisioned, "m_sample").append(" ON m.RootMaterialRowId = m_sample.RowId"); - if (hasAliquotColumns) - sql.append(" INNER JOIN ").append(provisioned, "m_aliquot").append(" ON m.RowId = m_aliquot.RowId"); + if (joinColumns.hasSampleColumns) + sql.append(" INNER JOIN ").append(joinColumns.provisioned, "m_sample").append(" ON m.RootMaterialRowId = m_sample.RowId"); + if (joinColumns.hasAliquotColumns) + sql.append(" INNER JOIN ").append(joinColumns.provisioned, "m_aliquot").append(" ON m.RowId = m_aliquot.RowId"); sql.appendComment("", getSqlDialect()); return sql; @@ -1830,7 +2136,7 @@ public List> getImportTemplates(ViewContext ctx) url.addParameter("includeColumn", aliasKey); } } - catch (IOException e) + catch (IOException ignored) {} templates.add(Pair.of("Download Template", url.toString())); return templates; @@ -1874,4 +2180,182 @@ public Object getDisplayValue(RenderContext ctx) return value; } } + + @TestWhen(TestWhen.When.BVT) + public static class IncrementalUpdateTestCase extends Assert + { + private static User _user; + private static Container _c; + + @BeforeClass + public static void setup() + { + JunitUtil.deleteTestContainer(); + + _c = JunitUtil.getTestContainer(); + _user = TestContext.get().getUser(); + } + + @AfterClass + public static void tearDown() + { + JunitUtil.deleteTestContainer(); + } + + @Test + public void testTargetedSingleRowUpdate() throws Exception + { + ExpSampleType st = createSampleType("IncrUpdTargeted"); + int root = insertRoots(st, "R1").getFirst(); + int aliquot = insertAliquots(st, "R1", 1).getFirst(); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); // baseline materialization + + // Targeted update of one aliquot's aliquot-scoped property. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), aliquot, "aliquotProp", "changed"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // And a targeted update of the root's own (non-derived-onto-aliquot) name-adjacent material column path. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "rootChanged0"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testBatchUpdate() throws Exception + { + ExpSampleType st = createSampleType("IncrUpdBatch"); + List roots = insertRoots(st, "R1", "R2", "R3", "R4", "R5"); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + List> updates = new ArrayList<>(); + for (int i = 0; i < roots.size(); i++) + updates.add(CaseInsensitiveHashMap.of(RowId.name(), roots.get(i), "rootProp", "batch" + i)); + updateRows(st, updates); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testRootFanoutUpdate() throws Exception + { + ExpSampleType st = createSampleType("IncrUpdFanout"); + int root = insertRoots(st, "R1").getFirst(); + insertAliquots(st, "R1", 25); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // Editing the root's ParentOnly property must re-derive m_sample.* for every aliquot beneath it. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "fannedOut"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testFullResync() throws Exception + { + ExpSampleType st = createSampleType("IncrUpdFull"); + List roots = insertRoots(st, "R1", "R2", "R3"); + insertAliquots(st, "R1", 3); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // Make a real change, then force the full-re-sync branch (as a no-rowId write path would) before reading. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), roots.getFirst(), "rootProp", "fullResynced"))); + InvalidationCounters counters = getInvalidateCounters(st.getLSID()); + counters.pendingUpdateRowIds.clear(); + counters.fullUpdatePending = true; + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + private ExpSampleType createSampleType(String name) throws Exception + { + List props = new ArrayList<>(); + props.add(new GWTPropertyDescriptor("name", "string")); + GWTPropertyDescriptor rootProp = new GWTPropertyDescriptor("rootProp", "string"); + rootProp.setDerivationDataScope(ExpSchema.DerivationDataScopeType.ParentOnly.name()); // -> m_sample join + props.add(rootProp); + GWTPropertyDescriptor aliquotProp = new GWTPropertyDescriptor("aliquotProp", "string"); + aliquotProp.setDerivationDataScope(ExpSchema.DerivationDataScopeType.ChildOnly.name()); // -> m_aliquot join + props.add(aliquotProp); + return SampleTypeService.get().createSampleType(_c, _user, name, null, props, Collections.emptyList(), -1, -1, -1, -1, null); + } + + private ExpMaterialTableImpl getSamplesTable(ExpSampleType st) + { + return (ExpMaterialTableImpl) QueryService.get().getUserSchema(_user, _c, SamplesSchema.SCHEMA_NAME).getTable(st.getName()); + } + + private @NotNull QueryUpdateService getUpdateService(ExpSampleType st) + { + TableInfo table = getSamplesTable(st); + QueryUpdateService service = table.getUpdateService(); + if (service == null) + throw new IllegalArgumentException("No QueryUpdateService found for table " + st.getName()); + + return service; + } + + private List insertRoots(ExpSampleType st, String... names) throws Exception + { + List> rows = new ArrayList<>(); + for (int i = 0; i < names.length; i++) + rows.add(CaseInsensitiveHashMap.of("name", names[i], "rootProp", "root" + i)); + return insert(st, rows); + } + + private List insertAliquots(ExpSampleType st, String rootName, int count) throws Exception + { + List> rows = new ArrayList<>(); + for (int i = 0; i < count; i++) + rows.add(CaseInsensitiveHashMap.of("name", rootName + "-" + i, "AliquotedFrom", rootName, "aliquotProp", "aliquot" + i)); + return insert(st, rows); + } + + private List insert(ExpSampleType st, List> rows) throws Exception + { + BatchValidationException errors = new BatchValidationException(); + List> inserted = getUpdateService(st).insertRows(_user, _c, rows, errors, null, null); + if (errors.hasErrors()) + throw errors; + + if (inserted == null || inserted.isEmpty()) + return Collections.emptyList(); + + List rowIds = new ArrayList<>(); + for (Map row : inserted) + rowIds.add(((Number) row.get(RowId.name())).intValue()); + return rowIds; + } + + private void updateRows(ExpSampleType st, List> rows) throws Exception + { + BatchValidationException errors = new BatchValidationException(); + getUpdateService(st).updateRows(_user, _c, rows, null, errors, null, null); + if (errors.hasErrors()) + throw errors; + } + + /** + * Trigger materialization (and any pending incremental update) as a side effect of {@link #getMaterializedSQL()}, + * then assert the cached temp table equals a fresh derivation of the join query in both directions. + */ + private void assertCacheMatchesFreshDerivation(ExpMaterialTableImpl table, String lsid) + { + SQLFragment cached = table.getMaterializedSQL(); // builds/refreshes the temp table via the normal read path + SQLFragment fresh = table.getJoinSQL(null).append(" WHERE CpasType = ").appendValue(lsid); + DbScope scope = ExperimentServiceImpl.getExpSchema().getScope(); + assertEquals("Cached rows not found in fresh derivation", 0, countDiff(scope, cached, fresh)); + assertEquals("Fresh-derivation rows not found in cache", 0, countDiff(scope, fresh, cached)); + } + + private int countDiff(DbScope scope, SQLFragment a, SQLFragment b) + { + SQLFragment sql = new SQLFragment("SELECT COUNT(*) FROM (\n").append(a).append("\nEXCEPT\n").append(b).append("\n) diff_"); + Integer count = new SqlSelector(scope, sql).getObject(Integer.class); + return count == null ? 0 : count; + } + } } diff --git a/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java b/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java index c53185c77f5..91b52ef6697 100644 --- a/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java @@ -865,9 +865,13 @@ public ExpProtocol[] getProtocols(User user) return ret; } - public void onSamplesChanged(User user, List materials, SampleTypeServiceImpl.SampleChangeType reason) + /** + * @param changedRowIds RowIds of samples known to have changed (only meaningful for update); null means + * the caller could not list the changed rows, forcing a full re-sync on the next read. + */ + public void onSamplesChanged(User user, List materials, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Set changedRowIds) { - SampleTypeServiceImpl.get().refreshSampleTypeMaterializedView(this, reason); + SampleTypeServiceImpl.get().refreshSampleTypeMaterializedView(this, reason, changedRowIds); ExpProtocol[] protocols = getProtocols(user); if (protocols.length != 0) diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index 052fbc5b0bc..44675ee35f6 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -2403,9 +2403,17 @@ public enum SampleChangeType { insert, update, delete, rollup /* aliquot count * public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleChangeType reason) { - ExpMaterialTableImpl.refreshMaterializedView(st.getLSID(), reason); + refreshSampleTypeMaterializedView(st, reason, null); } + /** + * @param changedRowIds RowIds of samples known to have changed (only meaningful for update); null means + * the caller could not list the changed rows, forcing a full re-sync on the next read. + */ + public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleChangeType reason, @Nullable Set changedRowIds) + { + ExpMaterialTableImpl.refreshMaterializedView(st.getLSID(), reason, changedRowIds); + } public static class TestCase extends Assert { diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java index 9986758935b..d7d7527e5cf 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java @@ -1141,6 +1141,7 @@ protected Map getRow(User user, Container container, Map> results, Map params, Container container, SampleTypeServiceImpl.SampleChangeType reason) { var tx = getSchema().getDbSchema().getScope().getCurrentTransaction(); + Set changedRowIds = collectChangedRowIds(results, reason); Pair, Set> parentKeys = getSampleParentsForRecalc(results); boolean useBackgroundRecalc = false; if (parentKeys != null) @@ -1163,7 +1164,7 @@ private void onSamplesChanged(List> results, Map { - fireSamplesChanged(reason); + fireSamplesChanged(reason, changedRowIds); if (finalUseBackgroundRecalc && !finalSkipRecalc) handleRecalc(parentKeys.first, parentKeys.second, true, container); }, DbScope.CommitTaskOption.POSTCOMMIT); @@ -1173,7 +1174,7 @@ private void onSamplesChanged(List> results, Map rootRowIds, Set parentNames, boolean } } - private void fireSamplesChanged(SampleTypeServiceImpl.SampleChangeType reason) + private void fireSamplesChanged(SampleTypeServiceImpl.SampleChangeType reason, @Nullable Set changedRowIds) { if (_sampleType != null) - _sampleType.onSamplesChanged(getUser(), null, reason); + _sampleType.onSamplesChanged(getUser(), null, reason, changedRowIds); + } + + /** + * Collect the changed rowIds so an update refresh can do a targeted incremental update instead of a full rebuild. + */ + private @Nullable Set collectChangedRowIds(List> results, SampleTypeServiceImpl.SampleChangeType reason) + { + if (reason != update || results == null || results.isEmpty() || results.size() > ExpMaterialTableImpl.UPDATE_ROWID_THRESHOLD) + return null; + + Set changedRowIds = new HashSet<>(results.size()); + for (Map row : results) + { + Long rowId = getMaterialRowId(row); + if (rowId == null) + return null; // can't enumerate the full changed set -> fall back to full re-sync + changedRowIds.add(rowId.intValue()); + } + + return changedRowIds; } void audit(QueryService.AuditAction auditAction) From a5864ac040f32957326c90916f0228a8d5fa383c Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Thu, 4 Jun 2026 06:47:55 -0700 Subject: [PATCH 2/9] Use modified for incremental update --- .../experiment/api/ExpMaterialTableImpl.java | 188 +++++++++--------- .../experiment/api/ExpSampleTypeImpl.java | 10 +- .../experiment/api/SampleTypeServiceImpl.java | 10 +- .../api/SampleTypeUpdateServiceDI.java | 44 ++-- 4 files changed, 126 insertions(+), 126 deletions(-) diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index 29fb311e9c5..d4fd1148d30 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -134,6 +134,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -144,10 +145,10 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -1226,32 +1227,23 @@ public boolean supportTableRules() // intentional override } - /** - * Once the pending changed-rowid set for a single update grows past this size, a targeted incremental update is no - * longer worth it (the changed-rowid IN list grows and the win over a whole-join re-sync shrinks), so we escalate to a - * full re-sync instead. Chosen to match the application's bulk-edit cap of 1,000 rows, so a normal single update stays - * targeted. - *

- * This value is also bounded by SQL Server's 2,100-bind-parameter limit: the targeted predicate references the changed - * set twice ({@code m.rowid IN (...) OR m.rootmaterialrowid IN (...)}), so a targeted statement binds {@code 2*N + 1} - * parameters. At 1,000 that is 2,001, safely under the cap. Raising this past ~1,049 would require restructuring the - * predicate to avoid using the set twice. - */ - static final int UPDATE_ROWID_THRESHOLD = 1_000; - static class InvalidationCounters { public final AtomicLong update, insert, delete, rollup; /** - * The accumulated exp.material rowIds changed by {@code update}s since the last drain, used to drive a targeted + * The earliest {@code exp.material.Modified} timestamp from which rows must be re-derived by the next targeted * incremental update. Populated by {@link RefreshMaterializedViewRunnable#run()} before the {@code update} - * counter is incremented, so any reader that observes the new counter-value is guaranteed to see these rowids. + * counter is incremented, so any reader that observes the new counter-value is guaranteed to see this watermark + * when it drains the pending state. {@code null} means no targeted update is pending. Min-merged so the watermark + * always covers the oldest not-yet-applied change. Because {@code Modified} is written with the database clock + * (the constant {@code NowTimestamp} is inlined as {@code CURRENT_TIMESTAMP}), this timestamp is comparable to it + * directly, with no web-server-vs-database clock skew. */ - public final Set pendingUpdateRowIds = ConcurrentHashMap.newKeySet(); + public final AtomicReference pendingUpdateSince = new AtomicReference<>(); /** - * Set when an {@code update} could not supply rowIds (or the pending set crossed {@link #UPDATE_ROWID_THRESHOLD}), - * meaning the next read must do a full re-sync rather than a targeted update. + * Set when an {@code update} could not supply a watermark timestamp, meaning the next read must do a full re-sync + * rather than a targeted update. */ public volatile boolean fullUpdatePending = false; @@ -1264,47 +1256,39 @@ static class InvalidationCounters rollup = new AtomicLong(l); } - /** Record the rowIds changed by one update (before its counter-increment). null/empty rowIds force a full re-sync. */ - void recordPendingUpdate(@Nullable Set changedRowIds) + /** + * Record the watermark for one update (before its counter-increment). A null timestamp means the caller could + * not capture a change time, forcing a full re-sync; otherwise the watermark is min-merged so it covers the + * oldest pending change. + */ + void recordPendingUpdate(@Nullable Timestamp changedSince) { - if (changedRowIds == null || changedRowIds.isEmpty()) - { + if (changedSince == null) fullUpdatePending = true; - pendingUpdateRowIds.clear(); - } - else if (!fullUpdatePending) - { - pendingUpdateRowIds.addAll(changedRowIds); - if (pendingUpdateRowIds.size() > UPDATE_ROWID_THRESHOLD) - { - fullUpdatePending = true; - pendingUpdateRowIds.clear(); - } - } - // else: a full re-sync is already pending; the individual rowIds are redundant + else + pendingUpdateSince.accumulateAndGet(changedSince, (cur, next) -> cur == null || next.before(cur) ? next : cur); } - /** The drained pending update state: either a full re-sync or the specific rowIds to target. */ - record PendingUpdate(boolean full, Set rowIds) {} + /** The drained pending update state: either a full re-sync or the watermark to target. */ + record PendingUpdate(boolean full, @Nullable Timestamp since) {} /** - * Atomically take and clear the pending update state. Should be called under the {@code _Materialized} loading lock. - * A snapshot of the rowIds is removed (not the live set) so any rowIds added concurrently by a POSTCOMMIT runnable - * survive for the next drain. + * Atomically take and clear the pending update state. Should be called under the {@code _Materialized} loading + * lock. The watermark is read with {@code getAndSet(null)} so a timestamp recorded concurrently by a later + * POSTCOMMIT runnable survives for the next drain. A full re-sync clears any pending watermark: the re-sync + * already covers every committed row (a recorded watermark only ever corresponds to an already-committed change, + * since the POSTCOMMIT runnable runs after commit), so the watermark would be redundant. */ PendingUpdate drainPendingUpdate() { if (fullUpdatePending) { fullUpdatePending = false; - pendingUpdateRowIds.clear(); - return new PendingUpdate(true, Set.of()); + pendingUpdateSince.set(null); + return new PendingUpdate(true, null); } - if (pendingUpdateRowIds.isEmpty()) - return new PendingUpdate(false, Set.of()); - Set drained = new HashSet<>(pendingUpdateRowIds); - pendingUpdateRowIds.removeAll(drained); - return new PendingUpdate(false, drained); + + return new PendingUpdate(false, pendingUpdateSince.getAndSet(null)); } } @@ -1312,32 +1296,32 @@ PendingUpdate drainPendingUpdate() static final Map _invalidationCounters = Collections.synchronizedMap(new HashMap<>()); static final AtomicBoolean initializedListeners = new AtomicBoolean(false); - // used by SampleTypeServiceImpl.refreshSampleTypeMaterializedView() public static void refreshMaterializedView(final String lsid, SampleTypeServiceImpl.SampleChangeType reason) { refreshMaterializedView(lsid, reason, null); } /** - * @param changedRowIds the exp.material rowIds known to have changed (only meaningful for update); null means - * the caller could not list the changed rows, forcing a full re-sync on the next read. + * @param changedSince a watermark (from the database clock, captured before the update's writes) at or after which + * the changed {@code exp.material} rows were modified; only meaningful for update. null means + * the caller could not capture a watermark, forcing a full re-sync on the next read. */ - public static void refreshMaterializedView(final String lsid, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Set changedRowIds) + public static void refreshMaterializedView(final String lsid, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Timestamp changedSince) { var scope = ExperimentServiceImpl.getExpSchema().getScope(); - var runnable = new RefreshMaterializedViewRunnable(lsid, reason, changedRowIds); + var runnable = new RefreshMaterializedViewRunnable(lsid, reason, changedSince); scope.addCommitTask(runnable, DbScope.CommitTaskOption.POSTCOMMIT); } /** * POSTCOMMIT task that turns a committed data change into an invalidation: on a schema change it drops the cached MQH * (the SQL itself must be regenerated); otherwise it bumps the matching per-LSID counter, and for {@code update} also - * records the changed rowIds for a targeted incremental update. + * records the change watermark for a targeted incremental update. */ private record RefreshMaterializedViewRunnable( String lsid, SampleTypeServiceImpl.SampleChangeType reason, - @Nullable Set changedRowIds + @Nullable Timestamp changedSince ) implements Runnable { @Override @@ -1360,9 +1344,9 @@ public void run() case rollup -> counters.rollup.incrementAndGet(); case update -> { - // Record the changed rowIds BEFORE bumping the counter: a reader that observes the new counter-value - // is then guaranteed to see the corresponding rowIds when it drains the pending state. - counters.recordPendingUpdate(changedRowIds); + // Record the watermark BEFORE bumping the counter: a reader that observes the new counter-value is + // then guaranteed to see the corresponding watermark when it drains the pending state. + counters.recordPendingUpdate(changedSince); counters.update.incrementAndGet(); } case delete -> counters.delete.incrementAndGet(); @@ -1373,9 +1357,7 @@ public void run() @Override public @NonNull String toString() { - // Concise: report the rowId count, not the (potentially huge) set, so dedup DEBUG logging stays readable. - return "RefreshMaterializedViewRunnable{lsid=" + lsid + ", reason=" + reason + - ", changedRowIds=" + (changedRowIds == null ? "null" : changedRowIds.size() + " rows") + "}"; + return "RefreshMaterializedViewRunnable{lsid=" + lsid + ", reason=" + reason + ", changedSince=" + changedSince + "}"; } } @@ -1587,13 +1569,12 @@ void executeIncrementalUpdate() { InvalidationCounters counters = getInvalidateCounters(_lsid); InvalidationCounters.PendingUpdate pending = counters.drainPendingUpdate(); - if (!pending.full() && pending.rowIds().isEmpty()) + if (!pending.full() && pending.since() == null) return; try { - // The changed set is capped at UPDATE_ROWID_THRESHOLD, so a targeted update fits in one statement. - upsertWithRetry(buildIncrementalUpdateSql(pending.full() ? null : pending.rowIds())); + upsertWithRetry(buildIncrementalUpdateSql(pending.full() ? null : pending.since())); } catch (RuntimeException ex) { @@ -1601,19 +1582,19 @@ void executeIncrementalUpdate() if (pending.full()) counters.fullUpdatePending = true; else - counters.pendingUpdateRowIds.addAll(pending.rowIds()); + counters.recordPendingUpdate(pending.since()); throw ex; } } /** * Build one incremental UPDATE that re-derives the materialized columns from their source joins. When - * {@code changedRowIds} is null this is a full re-sync over the whole sample type; otherwise it targets the given - * rows plus the aliquots of any changed root (via {@code rootmaterialrowid}). The {@code IS DISTINCT FROM} - * (PostgreSQL) / {@code EXCEPT}-based (SQL Server) guard skips rows whose values already match, so only genuinely - * differing rows are rewritten. + * {@code changedSince} is null this is a full re-sync over the whole sample type; otherwise it targets the rows + * modified at or after the watermark plus the aliquots of any changed root (via {@code rootmaterialrowid}). The + * {@code IS DISTINCT FROM} (PostgreSQL) / {@code EXCEPT}-based (SQL Server) guard skips rows whose values already + * match, so only genuinely differing rows are rewritten. */ - SQLFragment buildIncrementalUpdateSql(@Nullable Collection changedRowIds) + SQLFragment buildIncrementalUpdateSql(@Nullable Timestamp changedSince) { var d = CoreSchema.getInstance().getSchema().getSqlDialect(); JoinColumns jc = _joinColumns; @@ -1631,7 +1612,7 @@ SQLFragment buildIncrementalUpdateSql(@Nullable Collection changedRowId sql.append("\nFROM exp.Material m"); appendProvisionedJoins(sql, jc); sql.append("\nWHERE st.rowid = m.rowid AND m.cpastype = ").appendValue(_lsid, d); - appendChangedRowIdPredicate(sql, d, changedRowIds); + appendModifiedSincePredicate(sql, d, changedSince); sql.append(" AND ("); String or = ""; for (JoinColumn c : setColumns) @@ -1648,7 +1629,7 @@ SQLFragment buildIncrementalUpdateSql(@Nullable Collection changedRowId sql.append("\nFROM temp.${NAME} st INNER JOIN exp.Material m ON st.rowid = m.rowid"); appendProvisionedJoins(sql, jc); sql.append("\nWHERE m.cpastype = ").appendValue(_lsid, d); - appendChangedRowIdPredicate(sql, d, changedRowIds); + appendModifiedSincePredicate(sql, d, changedSince); // SQL Server before 2022 has no IS DISTINCT FROM; "SELECT EXCEPT SELECT " is a portable // null-safe tuple comparison that yields a row iff the tuples differ. sql.append(" AND EXISTS (SELECT "); @@ -1689,25 +1670,13 @@ private static void appendProvisionedJoins(SQLFragment sql, JoinColumns jc) sql.append(" INNER JOIN ").append(jc.provisioned, "m_aliquot").append(" ON m.RowId = m_aliquot.RowId"); } - /** - * Restrict to the changed rows. The {@code rootmaterialrowid} clause fans a changed root out to all of its aliquots - * (whose root-derived columns must be re-derived); it only matters for changed roots, since an aliquot's rowid is - * never another row's root. No-op for a full re-sync (null). - *

- * Passing a null large-IN generator to {@code appendInClauseSqlWithCustomInClauseGenerator} forces bind-parameter - * markers (never the temp-table generator), which is required because this runs as a single statement via - * {@code upsert()}. The set is bound twice; UPDATE_ROWID_THRESHOLD keeps the total parameter count under SQL - * Server's cap (see its javadoc). - */ - private static void appendChangedRowIdPredicate(SQLFragment sql, SqlDialect d, @Nullable Collection changedRowIds) + private void appendModifiedSincePredicate(SQLFragment sql, SqlDialect d, @Nullable Timestamp changedSince) { - if (changedRowIds == null) + if (changedSince == null) return; - sql.append(" AND (m.rowid"); - d.appendInClauseSqlWithCustomInClauseGenerator(sql, changedRowIds, null); - sql.append(" OR m.rootmaterialrowid"); - d.appendInClauseSqlWithCustomInClauseGenerator(sql, changedRowIds, null); - sql.append(")"); + sql.append(" AND (m.modified >= ?").add(changedSince); + sql.append(" OR m.rootmaterialrowid IN (SELECT rowid FROM exp.material WHERE cpastype = ").appendValue(_lsid, d) + .append(" AND modified >= ?").add(changedSince).append("))"); } } @@ -2262,14 +2231,53 @@ public void testFullResync() throws Exception ExpMaterialTableImpl table = getSamplesTable(st); assertCacheMatchesFreshDerivation(table, st.getLSID()); - // Make a real change, then force the full-re-sync branch (as a no-rowId write path would) before reading. + // Make a real change, then force the full-re-sync branch (as a no-watermark write path would) before reading. updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), roots.getFirst(), "rootProp", "fullResynced"))); InvalidationCounters counters = getInvalidateCounters(st.getLSID()); - counters.pendingUpdateRowIds.clear(); + counters.pendingUpdateSince.set(null); counters.fullUpdatePending = true; assertCacheMatchesFreshDerivation(table, st.getLSID()); } + @Test + public void testModifiedBoundaryNotSkipped() throws Exception + { + // The watermark predicate uses >= so a row modified at exactly the watermark is never skipped. + ExpSampleType st = createSampleType("IncrUpdBoundary"); + int root = insertRoots(st, "R1").getFirst(); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // Drive the targeted update with a watermark equal to the row's own Modified, simulating same-tick capture. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "boundary"))); + Timestamp rowModified = new SqlSelector(ExperimentServiceImpl.getExpSchema().getScope(), + new SQLFragment("SELECT modified FROM exp.material WHERE rowid = ?").add(root)).getObject(Timestamp.class); + InvalidationCounters counters = getInvalidateCounters(st.getLSID()); + counters.fullUpdatePending = false; + counters.pendingUpdateSince.set(rowModified); + counters.update.incrementAndGet(); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testSequentialUpdates() throws Exception + { + // Two updates in sequence: each read must apply its own watermark and leave the cache consistent. + ExpSampleType st = createSampleType("IncrUpdSequential"); + int root = insertRoots(st, "R1").getFirst(); + insertAliquots(st, "R1", 2); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "first"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "second"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + private ExpSampleType createSampleType(String name) throws Exception { List props = new ArrayList<>(); diff --git a/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java b/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java index 91b52ef6697..1638e121f84 100644 --- a/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java @@ -75,6 +75,7 @@ import org.labkey.experiment.controllers.exp.ExperimentController; import java.io.IOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -865,13 +866,9 @@ public ExpProtocol[] getProtocols(User user) return ret; } - /** - * @param changedRowIds RowIds of samples known to have changed (only meaningful for update); null means - * the caller could not list the changed rows, forcing a full re-sync on the next read. - */ - public void onSamplesChanged(User user, List materials, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Set changedRowIds) + public void onSamplesChanged(User user, List materials, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Timestamp changedSince) { - SampleTypeServiceImpl.get().refreshSampleTypeMaterializedView(this, reason, changedRowIds); + SampleTypeServiceImpl.get().refreshSampleTypeMaterializedView(this, reason, changedSince); ExpProtocol[] protocols = getProtocols(user); if (protocols.length != 0) @@ -896,7 +893,6 @@ public void onSamplesChanged(User user, List materials, SampleTypeServ } } - @Override public void setContainer(Container container) { diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index 44675ee35f6..4d98067d44e 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -135,6 +135,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -2407,12 +2408,13 @@ public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleC } /** - * @param changedRowIds RowIds of samples known to have changed (only meaningful for update); null means - * the caller could not list the changed rows, forcing a full re-sync on the next read. + * @param changedSince a database-clock watermark captured before the update's writes, at or after which the changed + * samples were modified (only meaningful for update); null means the caller could not capture a + * watermark, forcing a full re-sync on the next read. */ - public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleChangeType reason, @Nullable Set changedRowIds) + public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleChangeType reason, @Nullable Timestamp changedSince) { - ExpMaterialTableImpl.refreshMaterializedView(st.getLSID(), reason, changedRowIds); + ExpMaterialTableImpl.refreshMaterializedView(st.getLSID(), reason, changedSince); } public static class TestCase extends Assert diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java index d7d7527e5cf..303de4f46d3 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java @@ -46,6 +46,7 @@ import org.labkey.api.data.RemapCache; import org.labkey.api.data.RuntimeSQLException; import org.labkey.api.data.SimpleFilter; +import org.labkey.api.data.SqlSelector; import org.labkey.api.data.TableInfo; import org.labkey.api.data.TableSelector; import org.labkey.api.data.UpdateableTableInfo; @@ -116,6 +117,7 @@ import java.io.IOException; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -466,11 +468,13 @@ public int loadRows(User user, Container container, DataIteratorBuilder rows, Da context.putConfigParameter(ExperimentService.QueryOptions.GetSampleRecomputeCol, true); ArrayList> outputRows = new ArrayList<>(); + Timestamp changedSince = context.getInsertOption().allowUpdate ? captureChangedSince() : null; + int ret = super.loadRows(user, container, rows, outputRows, context, extraScriptContext); if (ret > 0 && !context.getErrors().hasErrors() && _sampleType != null) { boolean isMediaUpdate = _sampleType.isMedia() && context.getInsertOption().updateOnly; - onSamplesChanged(!isMediaUpdate ? outputRows : null, context.getConfigParameters(), container, context.getInsertOption().allowUpdate ? update : insert); + onSamplesChanged(!isMediaUpdate ? outputRows : null, context.getConfigParameters(), container, context.getInsertOption().allowUpdate ? update : insert, changedSince); audit(context.getInsertOption().auditAction); } return ret; @@ -510,7 +514,7 @@ public List> insertRows(User user, Container container, List if (results != null && !results.isEmpty() && !errors.hasErrors()) { - onSamplesChanged(results, configParameters, container, SampleTypeServiceImpl.SampleChangeType.insert); + onSamplesChanged(results, configParameters, container, insert); audit(QueryService.AuditAction.INSERT); } return results; @@ -553,6 +557,7 @@ public List> updateRows( List> results; Map finalConfigParameters = configParameters == null ? new HashMap<>() : configParameters; recordDataIteratorUsed(finalConfigParameters); + Timestamp changedSince = captureChangedSince(); try { @@ -567,7 +572,7 @@ public List> updateRows( if (results != null && !results.isEmpty() && !errors.hasErrors()) { - onSamplesChanged(!_sampleType.isMedia() ? results : null, configParameters, container, update); + onSamplesChanged(!_sampleType.isMedia() ? results : null, configParameters, container, update, changedSince); audit(QueryService.AuditAction.UPDATE); } @@ -1139,9 +1144,13 @@ protected Map getRow(User user, Container container, Map> results, Map params, Container container, SampleTypeServiceImpl.SampleChangeType reason) + { + onSamplesChanged(results, params, container, reason, null); + } + + private void onSamplesChanged(List> results, Map params, Container container, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Timestamp changedSince) { var tx = getSchema().getDbSchema().getScope().getCurrentTransaction(); - Set changedRowIds = collectChangedRowIds(results, reason); Pair, Set> parentKeys = getSampleParentsForRecalc(results); boolean useBackgroundRecalc = false; if (parentKeys != null) @@ -1164,7 +1173,7 @@ private void onSamplesChanged(List> results, Map { - fireSamplesChanged(reason, changedRowIds); + fireSamplesChanged(reason, changedSince); if (finalUseBackgroundRecalc && !finalSkipRecalc) handleRecalc(parentKeys.first, parentKeys.second, true, container); }, DbScope.CommitTaskOption.POSTCOMMIT); @@ -1174,7 +1183,7 @@ private void onSamplesChanged(List> results, Map rootRowIds, Set parentNames, boolean } } - private void fireSamplesChanged(SampleTypeServiceImpl.SampleChangeType reason, @Nullable Set changedRowIds) + private void fireSamplesChanged(SampleTypeServiceImpl.SampleChangeType reason, @Nullable Timestamp changedSince) { if (_sampleType != null) - _sampleType.onSamplesChanged(getUser(), null, reason, changedRowIds); + _sampleType.onSamplesChanged(getUser(), null, reason, changedSince); } - /** - * Collect the changed rowIds so an update refresh can do a targeted incremental update instead of a full rebuild. - */ - private @Nullable Set collectChangedRowIds(List> results, SampleTypeServiceImpl.SampleChangeType reason) + private static @Nullable Timestamp captureChangedSince() { - if (reason != update || results == null || results.isEmpty() || results.size() > ExpMaterialTableImpl.UPDATE_ROWID_THRESHOLD) - return null; - - Set changedRowIds = new HashSet<>(results.size()); - for (Map row : results) - { - Long rowId = getMaterialRowId(row); - if (rowId == null) - return null; // can't enumerate the full changed set -> fall back to full re-sync - changedRowIds.add(rowId.intValue()); - } - - return changedRowIds; + return new SqlSelector(DbScope.getLabKeyScope(), "SELECT CURRENT_TIMESTAMP").getObject(Timestamp.class); } void audit(QueryService.AuditAction auditAction) From 9bd9df0d681933775aa33cb6376d6033cf1c8c53 Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Thu, 4 Jun 2026 09:18:42 -0700 Subject: [PATCH 3/9] Incremental merge --- .../experiment/api/ExpMaterialTableImpl.java | 151 +++++++++--------- .../experiment/api/SampleTypeServiceImpl.java | 2 +- .../api/SampleTypeUpdateServiceDI.java | 19 ++- 3 files changed, 88 insertions(+), 84 deletions(-) diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index d4fd1148d30..ef838437f48 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -60,6 +60,7 @@ import org.labkey.api.dataiterator.DataIteratorBuilder; import org.labkey.api.dataiterator.DataIteratorContext; import org.labkey.api.dataiterator.LoggingDataIterator; +import org.labkey.api.dataiterator.MapDataIterator; import org.labkey.api.dataiterator.SimpleTranslator; import org.labkey.api.exp.Lsid; import org.labkey.api.exp.MvColumn; @@ -162,6 +163,7 @@ import static org.labkey.api.exp.api.SampleTypeDomainKind.SAMPLE_TYPE_FILE_DIRECTORY_NAME; import static org.labkey.api.exp.query.ExpMaterialTable.Column.*; import static org.labkey.api.util.StringExpressionFactory.AbstractStringExpression.NullValueBehavior.NullResult; +import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.merge; import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.schema; public class ExpMaterialTableImpl extends ExpRunItemTableImpl implements ExpMaterialTable @@ -1232,20 +1234,13 @@ static class InvalidationCounters public final AtomicLong update, insert, delete, rollup; /** - * The earliest {@code exp.material.Modified} timestamp from which rows must be re-derived by the next targeted - * incremental update. Populated by {@link RefreshMaterializedViewRunnable#run()} before the {@code update} - * counter is incremented, so any reader that observes the new counter-value is guaranteed to see this watermark - * when it drains the pending state. {@code null} means no targeted update is pending. Min-merged so the watermark - * always covers the oldest not-yet-applied change. Because {@code Modified} is written with the database clock - * (the constant {@code NowTimestamp} is inlined as {@code CURRENT_TIMESTAMP}), this timestamp is comparable to it - * directly, with no web-server-vs-database clock skew. + * The earliest {@code exp.material.Modified} timestamp from which existing rows must be re-derived by the next + * targeted incremental update. Populated by {@link RefreshMaterializedViewRunnable#run()} before the + * {@code update} counter is incremented, so any reader that observes the new counter-value is guaranteed to see + * this watermark when it drains the pending state. {@code null} means no targeted update is pending. Min-merged so + * the watermark always covers the oldest not-yet-applied change. */ public final AtomicReference pendingUpdateSince = new AtomicReference<>(); - /** - * Set when an {@code update} could not supply a watermark timestamp, meaning the next read must do a full re-sync - * rather than a targeted update. - */ - public volatile boolean fullUpdatePending = false; InvalidationCounters() { @@ -1256,39 +1251,20 @@ static class InvalidationCounters rollup = new AtomicLong(l); } - /** - * Record the watermark for one update (before its counter-increment). A null timestamp means the caller could - * not capture a change time, forcing a full re-sync; otherwise the watermark is min-merged so it covers the - * oldest pending change. - */ - void recordPendingUpdate(@Nullable Timestamp changedSince) + /** Record the watermark for one update (before its counter-increment), min-merged so it covers the oldest pending change. */ + void recordPendingUpdate(@NotNull Timestamp changedSince) { - if (changedSince == null) - fullUpdatePending = true; - else - pendingUpdateSince.accumulateAndGet(changedSince, (cur, next) -> cur == null || next.before(cur) ? next : cur); + pendingUpdateSince.accumulateAndGet(changedSince, (cur, next) -> cur == null || next.before(cur) ? next : cur); } - /** The drained pending update state: either a full re-sync or the watermark to target. */ - record PendingUpdate(boolean full, @Nullable Timestamp since) {} - /** - * Atomically take and clear the pending update state. Should be called under the {@code _Materialized} loading - * lock. The watermark is read with {@code getAndSet(null)} so a timestamp recorded concurrently by a later - * POSTCOMMIT runnable survives for the next drain. A full re-sync clears any pending watermark: the re-sync - * already covers every committed row (a recorded watermark only ever corresponds to an already-committed change, - * since the POSTCOMMIT runnable runs after commit), so the watermark would be redundant. + * Atomically take and clear the pending watermark. Should be called under the {@code _Materialized} loading lock. + * Read with {@code getAndSet(null)} so a timestamp recorded concurrently by a later POSTCOMMIT runnable survives + * for the next drain. Returns null when no targeted update is pending. */ - PendingUpdate drainPendingUpdate() + @Nullable Timestamp drainPendingUpdate() { - if (fullUpdatePending) - { - fullUpdatePending = false; - pendingUpdateSince.set(null); - return new PendingUpdate(true, null); - } - - return new PendingUpdate(false, pendingUpdateSince.getAndSet(null)); + return pendingUpdateSince.getAndSet(null); } } @@ -1314,9 +1290,10 @@ public static void refreshMaterializedView(final String lsid, SampleTypeServiceI } /** - * POSTCOMMIT task that turns a committed data change into an invalidation: on a schema change it drops the cached MQH - * (the SQL itself must be regenerated); otherwise it bumps the matching per-LSID counter, and for {@code update} also - * records the change watermark for a targeted incremental update. + * POSTCOMMIT task that turns a committed data change into an invalidation: a schema change (or any update/merge that + * could not capture a watermark) drops the cached MQH so the next read rebuilds the whole view; otherwise it bumps the + * matching per-LSID counter(s), and for {@code update}/{@code merge} also records the change watermark for a targeted + * incremental update. */ private record RefreshMaterializedViewRunnable( String lsid, @@ -1342,14 +1319,24 @@ public void run() { case insert -> counters.insert.incrementAndGet(); case rollup -> counters.rollup.incrementAndGet(); - case update -> - { - // Record the watermark BEFORE bumping the counter: a reader that observes the new counter-value is - // then guaranteed to see the corresponding watermark when it drains the pending state. + case delete -> counters.delete.incrementAndGet(); + case update, merge -> { + // An update or merge did not capture a watermark, so it cannot be targeted incrementally; drop the + // cached MQH so the next read rebuilds the whole view. Clear any pending watermark since the + // rebuild supersedes it. + if (changedSince == null) + { + counters.pendingUpdateSince.set(null); + _materializedQueries.remove(lsid); + return; + } + counters.recordPendingUpdate(changedSince); counters.update.incrementAndGet(); + + if (reason == merge) + counters.insert.incrementAndGet(); } - case delete -> counters.delete.incrementAndGet(); default -> throw new IllegalStateException("Unexpected value: " + reason); } } @@ -1558,43 +1545,26 @@ void executeIncrementalRollup() upsertWithRetry(incremental); } - /** - * Generalized incremental update: re-derive the materialized columns from their source joins for the rows that - * changed since the last drain, instead of dropping and fully rebuilding the temp table. Drains the pending state - * under the {@code _Materialized} loading lock (held by the caller) and either does a full re-sync or a single - * targeted update of the changed rows. On failure the drained work is restored, so nothing is lost before the - * caller's handler drops the MQH and rebuilds. - */ void executeIncrementalUpdate() { InvalidationCounters counters = getInvalidateCounters(_lsid); - InvalidationCounters.PendingUpdate pending = counters.drainPendingUpdate(); - if (!pending.full() && pending.since() == null) + Timestamp since = counters.drainPendingUpdate(); + if (since == null) return; try { - upsertWithRetry(buildIncrementalUpdateSql(pending.full() ? null : pending.since())); + upsertWithRetry(buildIncrementalUpdateSql(since)); } catch (RuntimeException ex) { - // Restore the drained work so nothing is lost; the caller's handler will drop the MQH and rebuild cleanly. - if (pending.full()) - counters.fullUpdatePending = true; - else - counters.recordPendingUpdate(pending.since()); + // Restore the drained watermark so nothing is lost; the caller's handler will drop the MQH and rebuild cleanly. + counters.recordPendingUpdate(since); throw ex; } } - /** - * Build one incremental UPDATE that re-derives the materialized columns from their source joins. When - * {@code changedSince} is null this is a full re-sync over the whole sample type; otherwise it targets the rows - * modified at or after the watermark plus the aliquots of any changed root (via {@code rootmaterialrowid}). The - * {@code IS DISTINCT FROM} (PostgreSQL) / {@code EXCEPT}-based (SQL Server) guard skips rows whose values already - * match, so only genuinely differing rows are rewritten. - */ - SQLFragment buildIncrementalUpdateSql(@Nullable Timestamp changedSince) + SQLFragment buildIncrementalUpdateSql(@NotNull Timestamp changedSince) { var d = CoreSchema.getInstance().getSchema().getSqlDialect(); JoinColumns jc = _joinColumns; @@ -1670,10 +1640,8 @@ private static void appendProvisionedJoins(SQLFragment sql, JoinColumns jc) sql.append(" INNER JOIN ").append(jc.provisioned, "m_aliquot").append(" ON m.RowId = m_aliquot.RowId"); } - private void appendModifiedSincePredicate(SQLFragment sql, SqlDialect d, @Nullable Timestamp changedSince) + private void appendModifiedSincePredicate(SQLFragment sql, SqlDialect d, @NotNull Timestamp changedSince) { - if (changedSince == null) - return; sql.append(" AND (m.modified >= ?").add(changedSince); sql.append(" OR m.rootmaterialrowid IN (SELECT rowid FROM exp.material WHERE cpastype = ").appendValue(_lsid, d) .append(" AND modified >= ?").add(changedSince).append("))"); @@ -2231,11 +2199,10 @@ public void testFullResync() throws Exception ExpMaterialTableImpl table = getSamplesTable(st); assertCacheMatchesFreshDerivation(table, st.getLSID()); - // Make a real change, then force the full-re-sync branch (as a no-watermark write path would) before reading. + // Make a real change, then force the full rebuild path (as a no-watermark update/merge would: drop the MQH) + // before reading. The next read must rebuild the whole view from scratch. updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), roots.getFirst(), "rootProp", "fullResynced"))); - InvalidationCounters counters = getInvalidateCounters(st.getLSID()); - counters.pendingUpdateSince.set(null); - counters.fullUpdatePending = true; + _materializedQueries.remove(st.getLSID()); assertCacheMatchesFreshDerivation(table, st.getLSID()); } @@ -2254,7 +2221,6 @@ public void testModifiedBoundaryNotSkipped() throws Exception Timestamp rowModified = new SqlSelector(ExperimentServiceImpl.getExpSchema().getScope(), new SQLFragment("SELECT modified FROM exp.material WHERE rowid = ?").add(root)).getObject(Timestamp.class); InvalidationCounters counters = getInvalidateCounters(st.getLSID()); - counters.fullUpdatePending = false; counters.pendingUpdateSince.set(rowModified); counters.update.incrementAndGet(); assertCacheMatchesFreshDerivation(table, st.getLSID()); @@ -2278,6 +2244,27 @@ public void testSequentialUpdates() throws Exception assertCacheMatchesFreshDerivation(table, st.getLSID()); } + @Test + public void testMergeInsertAndUpdate() throws Exception + { + // A single merge that both updates existing rows and inserts new ones must leave the cache consistent: the + // merge signals both the update path (existing rows, watermark-targeted) and the insert path (new rows). + ExpSampleType st = createSampleType("IncrUpdMerge"); + insertRoots(st, "R1", "R2"); + insertAliquots(st, "R1", 2); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // R1/R2 already exist (matched by name -> updated); R3/R4 are new (-> inserted), all in one merge. + mergeRows(st, List.of( + CaseInsensitiveHashMap.of("name", "R1", "rootProp", "mergedR1"), + CaseInsensitiveHashMap.of("name", "R2", "rootProp", "mergedR2"), + CaseInsensitiveHashMap.of("name", "R3", "rootProp", "mergedR3"), + CaseInsensitiveHashMap.of("name", "R4", "rootProp", "mergedR4"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + private ExpSampleType createSampleType(String name) throws Exception { List props = new ArrayList<>(); @@ -2346,6 +2333,14 @@ private void updateRows(ExpSampleType st, List> rows) throws throw errors; } + private void mergeRows(ExpSampleType st, List> rows) throws Exception + { + BatchValidationException errors = new BatchValidationException(); + getUpdateService(st).mergeRows(_user, _c, MapDataIterator.of(rows), errors, null, null); + if (errors.hasErrors()) + throw errors; + } + /** * Trigger materialization (and any pending incremental update) as a side effect of {@link #getMaterializedSQL()}, * then assert the cached temp table equals a fresh derivation of the join query in both directions. diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index 4d98067d44e..cf256e61b4b 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -2400,7 +2400,7 @@ public long getCurrentCount(NameGenerator.EntityCounter counterType, Container c return getProjectSampleCount(container, counterType == NameGenerator.EntityCounter.rootSampleCount); } - public enum SampleChangeType { insert, update, delete, rollup /* aliquot count */, schema } + public enum SampleChangeType { insert, update, merge /* insert + update */, delete, rollup /* aliquot count */, schema } public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleChangeType reason) { diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java index 303de4f46d3..a47813c1e60 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java @@ -151,6 +151,7 @@ import static org.labkey.api.util.IntegerUtils.asLong; import static org.labkey.experiment.ExpDataIterators.incrementCounts; import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.insert; +import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.merge; import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.rollup; import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.update; @@ -468,14 +469,19 @@ public int loadRows(User user, Container container, DataIteratorBuilder rows, Da context.putConfigParameter(ExperimentService.QueryOptions.GetSampleRecomputeCol, true); ArrayList> outputRows = new ArrayList<>(); - Timestamp changedSince = context.getInsertOption().allowUpdate ? captureChangedSince() : null; + InsertOption io = context.getInsertOption(); + // Capture the watermark BEFORE the writes for any operation that updates existing rows (update or merge), so it is + // a lower bound on every exp.material.Modified the operation sets (ignored for a pure insert). + Timestamp changedSince = io.allowUpdate ? captureChangedSince() : null; int ret = super.loadRows(user, container, rows, outputRows, context, extraScriptContext); if (ret > 0 && !context.getErrors().hasErrors() && _sampleType != null) { - boolean isMediaUpdate = _sampleType.isMedia() && context.getInsertOption().updateOnly; - onSamplesChanged(!isMediaUpdate ? outputRows : null, context.getConfigParameters(), container, context.getInsertOption().allowUpdate ? update : insert, changedSince); - audit(context.getInsertOption().auditAction); + boolean isMediaUpdate = _sampleType.isMedia() && io.updateOnly; + // updateOnly -> update; insert+update (MERGE/UPSERT/REPLACE) -> merge; pure insert -> insert. + SampleTypeServiceImpl.SampleChangeType reason = io.updateOnly ? update : io.allowUpdate ? merge : insert; + onSamplesChanged(!isMediaUpdate ? outputRows : null, context.getConfigParameters(), container, reason, changedSince); + audit(io.auditAction); } return ret; } @@ -484,10 +490,13 @@ public int loadRows(User user, Container container, DataIteratorBuilder rows, Da public int mergeRows(User user, Container container, DataIteratorBuilder rows, BatchValidationException errors, @Nullable Map configParameters, Map extraScriptContext) { assert _sampleType != null : "SampleType required for insert/update, but not required for read/delete"; + // Capture the watermark BEFORE the writes so the merge's update portion can be targeted (the insert portion is + // handled by the insert path). A null watermark would route the merge to a full rebuild. + Timestamp changedSince = captureChangedSince(); int ret = _importRowsUsingDIB(user, container, rows, null, getDataIteratorContext(errors, InsertOption.MERGE, configParameters), extraScriptContext); if (ret > 0 && !errors.hasErrors()) { - onSamplesChanged(null, configParameters, container, update); // mergeRows not really used, skip wiring recalc + onSamplesChanged(null, configParameters, container, merge, changedSince); // mergeRows not really used, skip wiring recalc audit(QueryService.AuditAction.MERGE); } return ret; From 06039f7e69338e1b3142970b1a383dd816881ce6 Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Thu, 4 Jun 2026 10:00:42 -0700 Subject: [PATCH 4/9] Remove JoinColumns construct --- .../experiment/api/ExpMaterialTableImpl.java | 212 ++++++------------ 1 file changed, 65 insertions(+), 147 deletions(-) diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index ef838437f48..b3a831af80c 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -1370,12 +1370,13 @@ private SQLFragment getMaterializedSQL() * * Maybe have a callback to generate the SQL dynamically, and verify that the sql is unchanged. */ - SQLFragment viewSql = getJoinSQL(null).append(" WHERE CpasType = ").appendValue(_ss.getLSID()); - // Capture the column mapping for incremental updates. Safe to cache: a schema change drops this MQH from the - // cache (reason == schema), so a changed column set always rebuilds with a fresh mapping. - JoinColumns joinColumns = getJoinColumns(null); + // Collect the temp-table column identifiers (minus immutable join keys) from the same SELECT that builds the + // view, so the incremental UPDATE re-assigns exactly the columns this SELECT produces. Safe to cache: a schema + // change drops this MQH from the cache (reason == schema), so a changed column set always rebuilds fresh. + List updateColumns = new ArrayList<>(); + SQLFragment viewSql = getJoinSQL(null, updateColumns).append(" WHERE CpasType = ").appendValue(_ss.getLSID()); return (_MaterializedQueryHelper) new _MaterializedQueryHelper.Builder(_ss.getLSID(), "", getExpSchema().getDbSchema().getScope(), viewSql) - .joinColumns(joinColumns) + .updateColumns(updateColumns) .addIndex("CREATE UNIQUE INDEX uq_${NAME}_rowid ON temp.${NAME} (rowid)") .addIndex("CREATE UNIQUE INDEX uq_${NAME}_lsid ON temp.${NAME} (lsid)") .addIndex("CREATE INDEX idx_${NAME}_container ON temp.${NAME} (container)") @@ -1395,17 +1396,14 @@ private SQLFragment getMaterializedSQL() */ static class _MaterializedQueryHelper extends MaterializedQueryHelper { - /** Columns that an incremental UPDATE never re-derives */ - static final Set IMMUTABLE_UPDATE_COLUMNS = new CaseInsensitiveHashSet(RowId.name(), LSID.name(), CpasType.name(), RootMaterialRowId.name()); - final String _lsid; - /** The column mapping that produced the temp table, used to build the incremental UPDATE SET clauses. */ - final JoinColumns _joinColumns; + /** The temp-table column identifiers (minus immutable join keys) that an incremental UPDATE re-derives. */ + final List _updateColumns; static class Builder extends MaterializedQueryHelper.Builder { String _lsid; - JoinColumns _joinColumns; + List _updateColumns = List.of(); public Builder(String lsid, String prefix, DbScope scope, SQLFragment select) { @@ -1413,25 +1411,25 @@ public Builder(String lsid, String prefix, DbScope scope, SQLFragment select) this._lsid = lsid; } - public Builder joinColumns(JoinColumns joinColumns) + public Builder updateColumns(List updateColumns) { - this._joinColumns = joinColumns; + this._updateColumns = updateColumns; return this; } @Override public _MaterializedQueryHelper build() { - return new _MaterializedQueryHelper(_lsid, _joinColumns, _prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto); + return new _MaterializedQueryHelper(_lsid, _updateColumns, _prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto); } } - _MaterializedQueryHelper(String lsid, JoinColumns joinColumns, String prefix, DbScope scope, SQLFragment select, @Nullable SQLFragment uptodate, Supplier supplier, @Nullable Collection indexes, long maxTimeToCache, + _MaterializedQueryHelper(String lsid, List updateColumns, String prefix, DbScope scope, SQLFragment select, @Nullable SQLFragment uptodate, Supplier supplier, @Nullable Collection indexes, long maxTimeToCache, boolean isSelectIntoSql) { super(prefix, scope, select, uptodate, supplier, indexes, maxTimeToCache, isSelectIntoSql); this._lsid = lsid; - this._joinColumns = joinColumns; + this._updateColumns = updateColumns; } @Override @@ -1567,84 +1565,41 @@ void executeIncrementalUpdate() SQLFragment buildIncrementalUpdateSql(@NotNull Timestamp changedSince) { var d = CoreSchema.getInstance().getSchema().getSqlDialect(); - JoinColumns jc = _joinColumns; - - // Re-derive every temp column except the join key and immutable columns - List setColumns = jc.columns.stream() - .filter(c -> !IMMUTABLE_UPDATE_COLUMNS.contains(c.tempColumnName())) - .toList(); + SQLFragment src = appendChangedSincePredicate(getViewSourceSql(), changedSince); SQLFragment sql = new SQLFragment(); if (d.isPostgreSQL()) { sql.append("UPDATE temp.${NAME} AS st\nSET "); - appendSetClause(sql, setColumns); - sql.append("\nFROM exp.Material m"); - appendProvisionedJoins(sql, jc); - sql.append("\nWHERE st.rowid = m.rowid AND m.cpastype = ").appendValue(_lsid, d); - appendModifiedSincePredicate(sql, d, changedSince); - sql.append(" AND ("); - String or = ""; - for (JoinColumn c : setColumns) - { - sql.append(or).append("st.").append(c.tempColumnSql()).append(" IS DISTINCT FROM ").append(c.sourceSql()); - or = " OR "; - } - sql.append(")"); + appendSetFromSrc(sql); + sql.append("\nFROM (").append(src).append("\n) src\n").append("WHERE st.rowid = src.rowid"); } else { sql.append("UPDATE st\nSET "); - appendSetClause(sql, setColumns); - sql.append("\nFROM temp.${NAME} st INNER JOIN exp.Material m ON st.rowid = m.rowid"); - appendProvisionedJoins(sql, jc); - sql.append("\nWHERE m.cpastype = ").appendValue(_lsid, d); - appendModifiedSincePredicate(sql, d, changedSince); - // SQL Server before 2022 has no IS DISTINCT FROM; "SELECT EXCEPT SELECT " is a portable - // null-safe tuple comparison that yields a row iff the tuples differ. - sql.append(" AND EXISTS (SELECT "); - String comma = ""; - for (JoinColumn c : setColumns) - { - sql.append(comma).append("st.").append(c.tempColumnSql()); - comma = ", "; - } - sql.append(" EXCEPT SELECT "); - comma = ""; - for (JoinColumn c : setColumns) - { - sql.append(comma).append(c.sourceSql()); - comma = ", "; - } - sql.append(")"); + appendSetFromSrc(sql); + sql.append("\nFROM temp.${NAME} st INNER JOIN (").append(src).append("\n) src ON st.rowid = src.rowid"); } return sql; } - private static void appendSetClause(SQLFragment sql, List setColumns) + /** {@code col = src.col} for every re-derivable column. Both PostgreSQL and SQL Server want a bare column name on the SET left-hand side. */ + private void appendSetFromSrc(SQLFragment sql) { String comma = ""; - for (JoinColumn c : setColumns) + for (SQLFragment col : _updateColumns) { - // PostgreSQL and SQL Server both want bare column names on the SET left-hand side. - sql.append(comma).append(c.tempColumnSql()).append(" = ").append(c.sourceSql()); + sql.append(comma).append(col).append(" = src.").append(col); comma = ", "; } } - private static void appendProvisionedJoins(SQLFragment sql, JoinColumns jc) + private SQLFragment appendChangedSincePredicate(SQLFragment viewSource, @NotNull Timestamp changedSince) { - if (jc.hasSampleColumns) - sql.append(" INNER JOIN ").append(jc.provisioned, "m_sample").append(" ON m.RootMaterialRowId = m_sample.RowId"); - if (jc.hasAliquotColumns) - sql.append(" INNER JOIN ").append(jc.provisioned, "m_aliquot").append(" ON m.RowId = m_aliquot.RowId"); - } - - private void appendModifiedSincePredicate(SQLFragment sql, SqlDialect d, @NotNull Timestamp changedSince) - { - sql.append(" AND (m.modified >= ?").add(changedSince); - sql.append(" OR m.rootmaterialrowid IN (SELECT rowid FROM exp.material WHERE cpastype = ").appendValue(_lsid, d) + viewSource.append(" AND (m.modified >= ?").add(changedSince); + viewSource.append(" OR m.rootmaterialrowid IN (SELECT rowid FROM exp.material WHERE cpastype = ").appendValue(_lsid) .append(" AND modified >= ?").add(changedSince).append("))"); + return viewSource; } } @@ -1682,68 +1637,47 @@ Lock getLock() } } - /** - * Which join alias a materialized-view column is sourced from: - *

    - *
  • {@code m} – the row's own {@code exp.material} record
  • - *
  • {@code m_sample} – the root material's provisioned row (root-derived / {@code ParentOnly} columns)
  • - *
  • {@code m_aliquot} – the row's own provisioned row (aliquot-scoped columns plus {@code genid} / unique-id fields)
  • - *
- */ - enum JoinAlias { m, m_sample, m_aliquot } - - /** - * One column of the materialized join: the unencoded temp-table column name, the SQL reference to that column, the SQL - * expression that produces its value, and which join alias that expression reads from. This is the single source of - * truth shared by {@link #getJoinSQL} (which builds the SELECT that creates the temp table) and the incremental - * {@code UPDATE} paths (which build the SET clause). Keeping both consumers on this one mapping guarantees the temp - * column references and the SET expressions can never drift. {@code tempColumnSql} is the valid SQL reference (e.g., a - * quoted identifier) by which the column is named in the temp table; use it bare as an UPDATE SET target, or prefix it - * with a table alias (e.g., {@code st.}) to reference it elsewhere. {@code tempColumnName} is the unencoded name, useful - * for identifying a specific column (e.g., the {@code rowid} join key). - */ - record JoinColumn(String tempColumnName, SQLFragment tempColumnSql, SQLFragment sourceSql, JoinAlias alias) {} + /** Immutable join-key columns that an incremental UPDATE never re-derives (excluded from the re-assign list). */ + static final Set IMMUTABLE_UPDATE_COLUMNS = new CaseInsensitiveHashSet(RowId.name(), LSID.name(), CpasType.name(), RootMaterialRowId.name()); - /** - * The ordered column mapping for the materialized join plus which provisioned joins are actually needed. Mirrors the - * {@code hasSampleColumns} / {@code hasAliquotColumns} bookkeeping that {@link #getJoinSQL} previously tracked inline. - */ - static class JoinColumns + /* SELECT and JOIN, does not include WHERE */ + private SQLFragment getJoinSQL(Set selectedColumns) { - final List columns = new ArrayList<>(); - TableInfo provisioned = null; - boolean hasSampleColumns = false; // m_sample join required - boolean hasAliquotColumns = false; // m_aliquot join required + return getJoinSQL(selectedColumns, null); } - /** - * Classify every selected column into its temp-table name, source expression, and join alias, preserving exactly the - * rules previously inlined in {@link #getJoinSQL}: material columns → {@code m}; {@code genid} / unique-id fields - * → {@code m_aliquot}; root-derived ({@code ParentOnly} or empty derivation scope) → {@code m_sample}; all - * other provisioned columns → {@code m_aliquot}; MV-indicator columns are always included. - */ - private JoinColumns getJoinColumns(Set selectedColumns) + private SQLFragment getJoinSQL(Set selectedColumns, @Nullable List outUpdateColumns) { - JoinColumns result = new JoinColumns(); - result.provisioned = null == _ss ? null : _ss.getTinfo(); - Set provisionedCols = new CaseInsensitiveHashSet(result.provisioned != null ? result.provisioned.getColumnNameSet() : Collections.emptySet()); + TableInfo provisioned = null == _ss ? null : _ss.getTinfo(); + Set provisionedCols = new CaseInsensitiveHashSet(provisioned != null ? provisioned.getColumnNameSet() : Collections.emptySet()); provisionedCols.remove(RowId.name()); provisionedCols.remove(Name.name()); boolean hasProvisionedColumns = containsProvisionedColumns(selectedColumns, provisionedCols); + boolean hasSampleColumns = false; + boolean hasAliquotColumns = false; Set materialCols = new CaseInsensitiveHashSet(_rootTable.getColumnNameSet()); selectedColumns = computeInnerSelectedColumns(selectedColumns); + SQLFragment sql = new SQLFragment(); + sql.appendComment("", getSqlDialect()); + sql.append("SELECT "); + String comma = ""; for (String materialCol : materialCols) { // don't need to generate SQL for columns that aren't selected if (ALL_COLUMNS == selectedColumns || selectedColumns.contains(new FieldKey(null, materialCol))) - result.columns.add(new JoinColumn(materialCol, new SQLFragment().appendIdentifier(materialCol), new SQLFragment("m.").appendIdentifier(materialCol), JoinAlias.m)); + { + sql.append(comma).append("m.").appendIdentifier(materialCol); + comma = ", "; + if (null != outUpdateColumns && !IMMUTABLE_UPDATE_COLUMNS.contains(materialCol)) + outUpdateColumns.add(new SQLFragment().appendIdentifier(materialCol)); + } } - if (null != result.provisioned && hasProvisionedColumns) + if (null != provisioned && hasProvisionedColumns) { - for (ColumnInfo propertyColumn : result.provisioned.getColumns()) + for (ColumnInfo propertyColumn : provisioned.getColumns()) { // don't select twice if ( @@ -1759,54 +1693,38 @@ private JoinColumns getJoinColumns(Set selectedColumns) { boolean rootField = StringUtils.isEmpty(propertyColumn.getDerivationDataScope()) || ExpSchema.DerivationDataScopeType.ParentOnly.name().equalsIgnoreCase(propertyColumn.getDerivationDataScope()); - String tempColumnName = propertyColumn.getSelectIdentifier().getId(); SQLFragment tempColumnSql = propertyColumn.getSelectIdentifier().getSql(); + String alias; if ("genid".equalsIgnoreCase(propertyColumn.getColumnName()) || propertyColumn.isUniqueIdField()) { - result.columns.add(new JoinColumn(tempColumnName, tempColumnSql, propertyColumn.getValueSql("m_aliquot"), JoinAlias.m_aliquot)); - result.hasAliquotColumns = true; + alias = "m_aliquot"; + hasAliquotColumns = true; } else if (rootField) { - result.columns.add(new JoinColumn(tempColumnName, tempColumnSql, propertyColumn.getValueSql("m_sample"), JoinAlias.m_sample)); - result.hasSampleColumns = true; + alias = "m_sample"; + hasSampleColumns = true; } else { - result.columns.add(new JoinColumn(tempColumnName, tempColumnSql, propertyColumn.getValueSql("m_aliquot"), JoinAlias.m_aliquot)); - result.hasAliquotColumns = true; + alias = "m_aliquot"; + hasAliquotColumns = true; } + sql.append(comma).append(propertyColumn.getValueSql(alias)).append(" AS ").append(tempColumnSql); + comma = ", "; + // provisioned columns are never immutable join keys, so always re-derivable on update + if (null != outUpdateColumns) + outUpdateColumns.add(tempColumnSql); } } } - return result; - } - - /* SELECT and JOIN, does not include WHERE */ - private SQLFragment getJoinSQL(Set selectedColumns) - { - JoinColumns joinColumns = getJoinColumns(selectedColumns); - - SQLFragment sql = new SQLFragment(); - sql.appendComment("", getSqlDialect()); - sql.append("SELECT "); - String comma = ""; - for (JoinColumn joinColumn : joinColumns.columns) - { - sql.append(comma).append(joinColumn.sourceSql()); - // material columns are selected by name (no alias); provisioned columns are aliased to their temp-table column name - if (JoinAlias.m != joinColumn.alias()) - sql.append(" AS ").append(joinColumn.tempColumnSql()); - comma = ", "; - } - sql.append("\nFROM "); sql.append(_rootTable, "m"); - if (joinColumns.hasSampleColumns) - sql.append(" INNER JOIN ").append(joinColumns.provisioned, "m_sample").append(" ON m.RootMaterialRowId = m_sample.RowId"); - if (joinColumns.hasAliquotColumns) - sql.append(" INNER JOIN ").append(joinColumns.provisioned, "m_aliquot").append(" ON m.RowId = m_aliquot.RowId"); + if (hasSampleColumns) + sql.append(" INNER JOIN ").append(provisioned, "m_sample").append(" ON m.RootMaterialRowId = m_sample.RowId"); + if (hasAliquotColumns) + sql.append(" INNER JOIN ").append(provisioned, "m_aliquot").append(" ON m.RowId = m_aliquot.RowId"); sql.appendComment("", getSqlDialect()); return sql; From decc2d03cd9880df123e7d6dc7c9ca361cce26b1 Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Thu, 4 Jun 2026 12:35:32 -0700 Subject: [PATCH 5/9] UNION ALL --- .../experiment/api/ExpMaterialTableImpl.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index b3a831af80c..aa4fa3a469d 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -1564,8 +1564,13 @@ void executeIncrementalUpdate() SQLFragment buildIncrementalUpdateSql(@NotNull Timestamp changedSince) { - var d = CoreSchema.getInstance().getSchema().getSqlDialect(); - SQLFragment src = appendChangedSincePredicate(getViewSourceSql(), changedSince); + SqlDialect d = CoreSchema.getInstance().getSchema().getSqlDialect(); + SQLFragment src = new SQLFragment() + .append(getViewSourceSql().append(" AND m.modified >= ?").add(changedSince)) + .append("\nUNION ALL\n") + .append(getViewSourceSql() + .append(" AND EXISTS (SELECT 1 FROM exp.material r WHERE r.rowid = m.rootmaterialrowid AND r.cpastype = ").appendValue(_lsid, d) + .append(" AND r.modified >= ?").add(changedSince).append(")")); SQLFragment sql = new SQLFragment(); if (d.isPostgreSQL()) @@ -1583,7 +1588,6 @@ SQLFragment buildIncrementalUpdateSql(@NotNull Timestamp changedSince) return sql; } - /** {@code col = src.col} for every re-derivable column. Both PostgreSQL and SQL Server want a bare column name on the SET left-hand side. */ private void appendSetFromSrc(SQLFragment sql) { String comma = ""; @@ -1593,14 +1597,6 @@ private void appendSetFromSrc(SQLFragment sql) comma = ", "; } } - - private SQLFragment appendChangedSincePredicate(SQLFragment viewSource, @NotNull Timestamp changedSince) - { - viewSource.append(" AND (m.modified >= ?").add(changedSince); - viewSource.append(" OR m.rootmaterialrowid IN (SELECT rowid FROM exp.material WHERE cpastype = ").appendValue(_lsid) - .append(" AND modified >= ?").add(changedSince).append("))"); - return viewSource; - } } static class _Materialized extends MaterializedQueryHelper.Materialized From cbaa018a1cb64ba21472117cf1efe36c96c19bbe Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Thu, 4 Jun 2026 12:36:17 -0700 Subject: [PATCH 6/9] Upgrade: add index for incremental update --- .../schemas/dbscripts/postgresql/exp-26.006-26.007.sql | 8 ++++++++ .../schemas/dbscripts/sqlserver/exp-26.006-26.007.sql | 8 ++++++++ .../src/org/labkey/experiment/ExperimentModule.java | 2 +- 3 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 experiment/resources/schemas/dbscripts/postgresql/exp-26.006-26.007.sql create mode 100644 experiment/resources/schemas/dbscripts/sqlserver/exp-26.006-26.007.sql diff --git a/experiment/resources/schemas/dbscripts/postgresql/exp-26.006-26.007.sql b/experiment/resources/schemas/dbscripts/postgresql/exp-26.006-26.007.sql new file mode 100644 index 00000000000..13d567036cc --- /dev/null +++ b/experiment/resources/schemas/dbscripts/postgresql/exp-26.006-26.007.sql @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026 LabKey Corporation + * + * Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 + */ +-- For samples, incremental materialized-view updates filter exp.material by (CpasType, Modified) to find rows changed +-- since modification began. This index allows for the query to avoid a full table scan. +CREATE INDEX IX_Material_CpasType_Modified ON exp.material (CpasType, Modified); diff --git a/experiment/resources/schemas/dbscripts/sqlserver/exp-26.006-26.007.sql b/experiment/resources/schemas/dbscripts/sqlserver/exp-26.006-26.007.sql new file mode 100644 index 00000000000..796a05c2ea7 --- /dev/null +++ b/experiment/resources/schemas/dbscripts/sqlserver/exp-26.006-26.007.sql @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026 LabKey Corporation + * + * Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 + */ +-- For samples, incremental materialized-view updates filter exp.material by (CpasType, Modified) to find rows changed +-- since modification began. This index allows for the query to avoid a full table scan. +CREATE INDEX IX_Material_CpasType_Modified ON exp.Material (CpasType, Modified); diff --git a/experiment/src/org/labkey/experiment/ExperimentModule.java b/experiment/src/org/labkey/experiment/ExperimentModule.java index 91e50f06f67..679035c9d02 100644 --- a/experiment/src/org/labkey/experiment/ExperimentModule.java +++ b/experiment/src/org/labkey/experiment/ExperimentModule.java @@ -208,7 +208,7 @@ public String getName() @Override public Double getSchemaVersion() { - return 26.006; + return 26.007; } @Nullable From 219f73f236962355e8fef5ab73496da58c55f382 Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Fri, 5 Jun 2026 07:22:43 -0700 Subject: [PATCH 7/9] Precision drift, remove duplicate parent rows --- .../experiment/api/ExpMaterialTableImpl.java | 49 ++++++++----------- .../experiment/api/SampleTypeServiceImpl.java | 2 +- .../api/SampleTypeUpdateServiceDI.java | 15 ++---- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index aa4fa3a469d..b6985d4a3ec 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -21,7 +21,6 @@ import org.apache.commons.math3.util.Precision; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.jspecify.annotations.NonNull; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -1232,14 +1231,6 @@ public boolean supportTableRules() // intentional override static class InvalidationCounters { public final AtomicLong update, insert, delete, rollup; - - /** - * The earliest {@code exp.material.Modified} timestamp from which existing rows must be re-derived by the next - * targeted incremental update. Populated by {@link RefreshMaterializedViewRunnable#run()} before the - * {@code update} counter is incremented, so any reader that observes the new counter-value is guaranteed to see - * this watermark when it drains the pending state. {@code null} means no targeted update is pending. Min-merged so - * the watermark always covers the oldest not-yet-applied change. - */ public final AtomicReference pendingUpdateSince = new AtomicReference<>(); InvalidationCounters() @@ -1251,17 +1242,11 @@ static class InvalidationCounters rollup = new AtomicLong(l); } - /** Record the watermark for one update (before its counter-increment), min-merged so it covers the oldest pending change. */ void recordPendingUpdate(@NotNull Timestamp changedSince) { pendingUpdateSince.accumulateAndGet(changedSince, (cur, next) -> cur == null || next.before(cur) ? next : cur); } - /** - * Atomically take and clear the pending watermark. Should be called under the {@code _Materialized} loading lock. - * Read with {@code getAndSet(null)} so a timestamp recorded concurrently by a later POSTCOMMIT runnable survives - * for the next drain. Returns null when no targeted update is pending. - */ @Nullable Timestamp drainPendingUpdate() { return pendingUpdateSince.getAndSet(null); @@ -1342,7 +1327,7 @@ public void run() } @Override - public @NonNull String toString() + public @NotNull String toString() { return "RefreshMaterializedViewRunnable{lsid=" + lsid + ", reason=" + reason + ", changedSince=" + changedSince + "}"; } @@ -1368,11 +1353,8 @@ private SQLFragment getMaterializedSQL() * Previously it has been used on non-provisioned tables. It might be helpful to have a pattern, * even if just to help with race-conditions. * - * Maybe have a callback to generate the SQL dynamically, and verify that the sql is unchanged. + * Maybe have a callback to generate the SQL dynamically and verify that the SQL is unchanged. */ - // Collect the temp-table column identifiers (minus immutable join keys) from the same SELECT that builds the - // view, so the incremental UPDATE re-assigns exactly the columns this SELECT produces. Safe to cache: a schema - // change drops this MQH from the cache (reason == schema), so a changed column set always rebuilds fresh. List updateColumns = new ArrayList<>(); SQLFragment viewSql = getJoinSQL(null, updateColumns).append(" WHERE CpasType = ").appendValue(_ss.getLSID()); return (_MaterializedQueryHelper) new _MaterializedQueryHelper.Builder(_ss.getLSID(), "", getExpSchema().getDbSchema().getScope(), viewSql) @@ -1381,9 +1363,6 @@ private SQLFragment getMaterializedSQL() .addIndex("CREATE UNIQUE INDEX uq_${NAME}_lsid ON temp.${NAME} (lsid)") .addIndex("CREATE INDEX idx_${NAME}_container ON temp.${NAME} (container)") .addIndex("CREATE INDEX idx_${NAME}_root ON temp.${NAME} (rootmaterialrowid)") - // NOTE: no addInvalidCheck for the update counter. Updates are now applied incrementally by - // executeIncrementalUpdate() (like insert/delete/rollup) rather than triggering a full drop-and-rebuild. - // The MQH is still dropped on schema changes and on incremental-update errors (see incrementalUpdateBeforeSelect). .build(); }); return new SQLFragment("SELECT * FROM ").append(mqh.getFromSql("_cached_view_")); @@ -1397,7 +1376,6 @@ private SQLFragment getMaterializedSQL() static class _MaterializedQueryHelper extends MaterializedQueryHelper { final String _lsid; - /** The temp-table column identifiers (minus immutable join keys) that an incremental UPDATE re-derives. */ final List _updateColumns; static class Builder extends MaterializedQueryHelper.Builder @@ -1424,8 +1402,18 @@ public _MaterializedQueryHelper build() } } - _MaterializedQueryHelper(String lsid, List updateColumns, String prefix, DbScope scope, SQLFragment select, @Nullable SQLFragment uptodate, Supplier supplier, @Nullable Collection indexes, long maxTimeToCache, - boolean isSelectIntoSql) + _MaterializedQueryHelper( + String lsid, + List updateColumns, + String prefix, + DbScope scope, + SQLFragment select, + @Nullable SQLFragment uptodate, + Supplier supplier, + @Nullable Collection indexes, + long maxTimeToCache, + boolean isSelectIntoSql + ) { super(prefix, scope, select, uptodate, supplier, indexes, maxTimeToCache, isSelectIntoSql); this._lsid = lsid; @@ -1565,12 +1553,16 @@ void executeIncrementalUpdate() SQLFragment buildIncrementalUpdateSql(@NotNull Timestamp changedSince) { SqlDialect d = CoreSchema.getInstance().getSchema().getSqlDialect(); + // SQL Server's datetime type lacks microsecond precision + Timestamp comparison = d.isSqlServer() ? new Timestamp(changedSince.getTime() - 500) : changedSince; + SQLFragment src = new SQLFragment() - .append(getViewSourceSql().append(" AND m.modified >= ?").add(changedSince)) + .append(getViewSourceSql().append(" AND m.modified >= ?").add(comparison)) .append("\nUNION ALL\n") .append(getViewSourceSql() + .append(" AND m.rootmaterialrowid <> m.rowid") .append(" AND EXISTS (SELECT 1 FROM exp.material r WHERE r.rowid = m.rootmaterialrowid AND r.cpastype = ").appendValue(_lsid, d) - .append(" AND r.modified >= ?").add(changedSince).append(")")); + .append(" AND r.modified >= ?").add(comparison).append(")")); SQLFragment sql = new SQLFragment(); if (d.isPostgreSQL()) @@ -1585,6 +1577,7 @@ SQLFragment buildIncrementalUpdateSql(@NotNull Timestamp changedSince) appendSetFromSrc(sql); sql.append("\nFROM temp.${NAME} st INNER JOIN (").append(src).append("\n) src ON st.rowid = src.rowid"); } + return sql; } diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index cf256e61b4b..28c8fd4eb8c 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -2400,7 +2400,7 @@ public long getCurrentCount(NameGenerator.EntityCounter counterType, Container c return getProjectSampleCount(container, counterType == NameGenerator.EntityCounter.rootSampleCount); } - public enum SampleChangeType { insert, update, merge /* insert + update */, delete, rollup /* aliquot count */, schema } + public enum SampleChangeType { insert, update, merge, delete, rollup /* aliquot count */, schema } public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleChangeType reason) { diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java index a47813c1e60..d05f0cc6020 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java @@ -469,19 +469,16 @@ public int loadRows(User user, Container container, DataIteratorBuilder rows, Da context.putConfigParameter(ExperimentService.QueryOptions.GetSampleRecomputeCol, true); ArrayList> outputRows = new ArrayList<>(); - InsertOption io = context.getInsertOption(); - // Capture the watermark BEFORE the writes for any operation that updates existing rows (update or merge), so it is - // a lower bound on every exp.material.Modified the operation sets (ignored for a pure insert). - Timestamp changedSince = io.allowUpdate ? captureChangedSince() : null; + InsertOption insertOption = context.getInsertOption(); + Timestamp changedSince = insertOption.allowUpdate ? captureChangedSince() : null; int ret = super.loadRows(user, container, rows, outputRows, context, extraScriptContext); if (ret > 0 && !context.getErrors().hasErrors() && _sampleType != null) { - boolean isMediaUpdate = _sampleType.isMedia() && io.updateOnly; - // updateOnly -> update; insert+update (MERGE/UPSERT/REPLACE) -> merge; pure insert -> insert. - SampleTypeServiceImpl.SampleChangeType reason = io.updateOnly ? update : io.allowUpdate ? merge : insert; + boolean isMediaUpdate = _sampleType.isMedia() && insertOption.updateOnly; + SampleTypeServiceImpl.SampleChangeType reason = insertOption.updateOnly ? update : insertOption.allowUpdate ? merge : insert; onSamplesChanged(!isMediaUpdate ? outputRows : null, context.getConfigParameters(), container, reason, changedSince); - audit(io.auditAction); + audit(insertOption.auditAction); } return ret; } @@ -490,8 +487,6 @@ public int loadRows(User user, Container container, DataIteratorBuilder rows, Da public int mergeRows(User user, Container container, DataIteratorBuilder rows, BatchValidationException errors, @Nullable Map configParameters, Map extraScriptContext) { assert _sampleType != null : "SampleType required for insert/update, but not required for read/delete"; - // Capture the watermark BEFORE the writes so the merge's update portion can be targeted (the insert portion is - // handled by the insert path). A null watermark would route the merge to a full rebuild. Timestamp changedSince = captureChangedSince(); int ret = _importRowsUsingDIB(user, container, rows, null, getDataIteratorContext(errors, InsertOption.MERGE, configParameters), extraScriptContext); if (ret > 0 && !errors.hasErrors()) From 68433f7d900abe3f55cd5d929a29cde46160c391 Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Fri, 5 Jun 2026 10:37:30 -0700 Subject: [PATCH 8/9] Capture changedSince for moving samples --- .../org/labkey/experiment/api/SampleTypeServiceImpl.java | 9 +++++---- .../labkey/experiment/api/SampleTypeUpdateServiceDI.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index 28c8fd4eb8c..9ba38e5d89b 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -1206,7 +1206,7 @@ public ValidationException updateSampleType(GWTDomain SampleTypeServiceImpl.get().indexSampleType(st, SearchService.get().defaultTask().getQueue(container, SearchService.PRIORITY.modified)), POSTCOMMIT); + transaction.addCommitTask(() -> indexSampleType(st, SearchService.get().defaultTask().getQueue(container, SearchService.PRIORITY.modified)), POSTCOMMIT); transaction.commit(); refreshSampleTypeMaterializedView(st, SampleChangeType.schema); } @@ -1972,6 +1972,7 @@ public Map moveSamples(Collection sample updateCounts.put("sampleAuditEvents", 0); Map> fileMovesBySampleId = new LongHashMap<>(); ExperimentService expService = ExperimentService.get(); + Timestamp changedSince = SampleTypeUpdateServiceDI.captureChangedSince(); try (DbScope.Transaction transaction = ensureTransaction()) { @@ -1982,7 +1983,7 @@ public Map moveSamples(Collection sample AbstractQueryUpdateService.addTransactionAuditEvent(transaction, user, auditEvent); } - for (Map.Entry> entry: sampleTypesMap.entrySet()) + for (Map.Entry> entry : sampleTypesMap.entrySet()) { ExpSampleType sampleType = entry.getKey(); SamplesSchema schema = new SamplesSchema(user, sampleType.getContainer()); @@ -2056,10 +2057,10 @@ public Map moveSamples(Collection sample for (ExpSampleType sampleType : sampleTypesMap.keySet()) { // force refresh of materialized view - SampleTypeServiceImpl.get().refreshSampleTypeMaterializedView(sampleType, SampleChangeType.update); + refreshSampleTypeMaterializedView(sampleType, SampleChangeType.update, changedSince); // update search index for moved samples via indexSampleType() helper, it filters for samples to index // based on the modified date - SampleTypeServiceImpl.get().indexSampleType(sampleType, SearchService.get().defaultTask().getQueue(sampleType.getContainer(), SearchService.PRIORITY.modified)); + indexSampleType(sampleType, SearchService.get().defaultTask().getQueue(sampleType.getContainer(), SearchService.PRIORITY.modified)); } }, DbScope.CommitTaskOption.IMMEDIATE, POSTCOMMIT, POSTROLLBACK); diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java index d05f0cc6020..39b5dabde21 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java @@ -1225,7 +1225,7 @@ private void fireSamplesChanged(SampleTypeServiceImpl.SampleChangeType reason, @ _sampleType.onSamplesChanged(getUser(), null, reason, changedSince); } - private static @Nullable Timestamp captureChangedSince() + static @Nullable Timestamp captureChangedSince() { return new SqlSelector(DbScope.getLabKeyScope(), "SELECT CURRENT_TIMESTAMP").getObject(Timestamp.class); } From aaf4b5b3425d38cff7bde9fb56b39330e95d2818 Mon Sep 17 00:00:00 2001 From: labkey-nicka Date: Fri, 5 Jun 2026 12:38:45 -0700 Subject: [PATCH 9/9] ColumnInfo --- .../experiment/api/ExpMaterialTableImpl.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index b6985d4a3ec..e0a7c7c3bfe 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -41,6 +41,7 @@ import org.labkey.api.data.CoreSchema; import org.labkey.api.data.DataColumn; import org.labkey.api.data.DataRegion; +import org.labkey.api.data.DatabaseIdentifier; import org.labkey.api.data.DbSchema; import org.labkey.api.data.DbScope; import org.labkey.api.data.ForeignKey; @@ -1355,7 +1356,7 @@ private SQLFragment getMaterializedSQL() * * Maybe have a callback to generate the SQL dynamically and verify that the SQL is unchanged. */ - List updateColumns = new ArrayList<>(); + List updateColumns = new ArrayList<>(); SQLFragment viewSql = getJoinSQL(null, updateColumns).append(" WHERE CpasType = ").appendValue(_ss.getLSID()); return (_MaterializedQueryHelper) new _MaterializedQueryHelper.Builder(_ss.getLSID(), "", getExpSchema().getDbSchema().getScope(), viewSql) .updateColumns(updateColumns) @@ -1376,12 +1377,12 @@ private SQLFragment getMaterializedSQL() static class _MaterializedQueryHelper extends MaterializedQueryHelper { final String _lsid; - final List _updateColumns; + final List _updateColumns; static class Builder extends MaterializedQueryHelper.Builder { String _lsid; - List _updateColumns = List.of(); + List _updateColumns = List.of(); public Builder(String lsid, String prefix, DbScope scope, SQLFragment select) { @@ -1389,7 +1390,7 @@ public Builder(String lsid, String prefix, DbScope scope, SQLFragment select) this._lsid = lsid; } - public Builder updateColumns(List updateColumns) + public Builder updateColumns(List updateColumns) { this._updateColumns = updateColumns; return this; @@ -1404,7 +1405,7 @@ public _MaterializedQueryHelper build() _MaterializedQueryHelper( String lsid, - List updateColumns, + List updateColumns, String prefix, DbScope scope, SQLFragment select, @@ -1584,9 +1585,10 @@ SQLFragment buildIncrementalUpdateSql(@NotNull Timestamp changedSince) private void appendSetFromSrc(SQLFragment sql) { String comma = ""; - for (SQLFragment col : _updateColumns) + for (ColumnInfo col : _updateColumns) { - sql.append(comma).append(col).append(" = src.").append(col); + DatabaseIdentifier identifier = col.getSelectIdentifier(); + sql.append(comma).appendIdentifier(identifier).append(" = src.").appendIdentifier(identifier); comma = ", "; } } @@ -1627,7 +1629,7 @@ Lock getLock() } /** Immutable join-key columns that an incremental UPDATE never re-derives (excluded from the re-assign list). */ - static final Set IMMUTABLE_UPDATE_COLUMNS = new CaseInsensitiveHashSet(RowId.name(), LSID.name(), CpasType.name(), RootMaterialRowId.name()); + static final Set IMMUTABLE_UPDATE_COLUMNS = Set.of(RowId.fieldKey(), LSID.fieldKey(), CpasType.fieldKey(), RootMaterialRowId.fieldKey()); /* SELECT and JOIN, does not include WHERE */ private SQLFragment getJoinSQL(Set selectedColumns) @@ -1635,7 +1637,7 @@ private SQLFragment getJoinSQL(Set selectedColumns) return getJoinSQL(selectedColumns, null); } - private SQLFragment getJoinSQL(Set selectedColumns, @Nullable List outUpdateColumns) + private SQLFragment getJoinSQL(Set selectedColumns, @Nullable List outUpdateColumns) { TableInfo provisioned = null == _ss ? null : _ss.getTinfo(); Set provisionedCols = new CaseInsensitiveHashSet(provisioned != null ? provisioned.getColumnNameSet() : Collections.emptySet()); @@ -1645,22 +1647,22 @@ private SQLFragment getJoinSQL(Set selectedColumns, @Nullable List materialCols = new CaseInsensitiveHashSet(_rootTable.getColumnNameSet()); + List materialCols = _rootTable.getColumns(); selectedColumns = computeInnerSelectedColumns(selectedColumns); SQLFragment sql = new SQLFragment(); sql.appendComment("", getSqlDialect()); sql.append("SELECT "); String comma = ""; - for (String materialCol : materialCols) + for (ColumnInfo materialCol : materialCols) { // don't need to generate SQL for columns that aren't selected - if (ALL_COLUMNS == selectedColumns || selectedColumns.contains(new FieldKey(null, materialCol))) + if (ALL_COLUMNS == selectedColumns || selectedColumns.contains(materialCol.getFieldKey())) { - sql.append(comma).append("m.").appendIdentifier(materialCol); + sql.append(comma).append("m.").appendIdentifier(materialCol.getSelectIdentifier()); comma = ", "; - if (null != outUpdateColumns && !IMMUTABLE_UPDATE_COLUMNS.contains(materialCol)) - outUpdateColumns.add(new SQLFragment().appendIdentifier(materialCol)); + if (null != outUpdateColumns && !IMMUTABLE_UPDATE_COLUMNS.contains(materialCol.getFieldKey())) + outUpdateColumns.add(materialCol); } } @@ -1682,7 +1684,6 @@ private SQLFragment getJoinSQL(Set selectedColumns, @Nullable List