diff --git a/experiment/resources/schemas/dbscripts/postgresql/exp-26.006-26.007.sql b/experiment/resources/schemas/dbscripts/postgresql/exp-26.006-26.007.sql new file mode 100644 index 00000000000..13d567036cc --- /dev/null +++ b/experiment/resources/schemas/dbscripts/postgresql/exp-26.006-26.007.sql @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026 LabKey Corporation + * + * Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 + */ +-- For samples, incremental materialized-view updates filter exp.material by (CpasType, Modified) to find rows changed +-- since modification began. This index allows for the query to avoid a full table scan. +CREATE INDEX IX_Material_CpasType_Modified ON exp.material (CpasType, Modified); diff --git a/experiment/resources/schemas/dbscripts/sqlserver/exp-26.006-26.007.sql b/experiment/resources/schemas/dbscripts/sqlserver/exp-26.006-26.007.sql new file mode 100644 index 00000000000..796a05c2ea7 --- /dev/null +++ b/experiment/resources/schemas/dbscripts/sqlserver/exp-26.006-26.007.sql @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2026 LabKey Corporation + * + * Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 + */ +-- For samples, incremental materialized-view updates filter exp.material by (CpasType, Modified) to find rows changed +-- since modification began. This index allows for the query to avoid a full table scan. +CREATE INDEX IX_Material_CpasType_Modified ON exp.Material (CpasType, Modified); diff --git a/experiment/src/org/labkey/experiment/ExperimentModule.java b/experiment/src/org/labkey/experiment/ExperimentModule.java index 5aa431cc2b1..679035c9d02 100644 --- a/experiment/src/org/labkey/experiment/ExperimentModule.java +++ b/experiment/src/org/labkey/experiment/ExperimentModule.java @@ -123,6 +123,7 @@ import org.labkey.experiment.api.ExpDataClassType; import org.labkey.experiment.api.ExpDataImpl; import org.labkey.experiment.api.ExpDataTableImpl; +import org.labkey.experiment.api.ExpMaterialTableImpl; import org.labkey.experiment.api.ExpMaterialImpl; import org.labkey.experiment.api.ExpProtocolImpl; import org.labkey.experiment.api.ExpSampleTypeImpl; @@ -207,7 +208,7 @@ public String getName() @Override public Double getSchemaVersion() { - return 26.006; + return 26.007; } @Nullable @@ -1119,6 +1120,7 @@ public Collection getSummary(Container c) DomainImpl.TestCase.class, DomainPropertyImpl.TestCase.class, ExpDataTableImpl.TestCase.class, + ExpMaterialTableImpl.IncrementalUpdateTestCase.class, ExperimentServiceImpl.AuditDomainUriTest.class, ExperimentServiceImpl.LineageQueryTestCase.class, ExperimentServiceImpl.ParseInputOutputAliasTestCase.class, diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index e8d702b06e9..e0a7c7c3bfe 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -21,6 +21,10 @@ import org.apache.commons.math3.util.Precision; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; import org.labkey.api.assay.plate.AssayPlateMetadataService; import org.labkey.api.audit.AuditHandler; import org.labkey.api.cache.BlockingCache; @@ -37,6 +41,7 @@ import org.labkey.api.data.CoreSchema; import org.labkey.api.data.DataColumn; import org.labkey.api.data.DataRegion; +import org.labkey.api.data.DatabaseIdentifier; import org.labkey.api.data.DbSchema; import org.labkey.api.data.DbScope; import org.labkey.api.data.ForeignKey; @@ -48,11 +53,14 @@ import org.labkey.api.data.RenderContext; import org.labkey.api.data.SQLFragment; import org.labkey.api.data.Sort; +import org.labkey.api.data.SqlSelector; import org.labkey.api.data.TableInfo; import org.labkey.api.data.UnionContainerFilter; +import org.labkey.api.data.dialect.SqlDialect; import org.labkey.api.dataiterator.DataIteratorBuilder; import org.labkey.api.dataiterator.DataIteratorContext; import org.labkey.api.dataiterator.LoggingDataIterator; +import org.labkey.api.dataiterator.MapDataIterator; import org.labkey.api.dataiterator.SimpleTranslator; import org.labkey.api.exp.Lsid; import org.labkey.api.exp.MvColumn; @@ -61,6 +69,7 @@ import org.labkey.api.exp.api.ExpMaterial; import org.labkey.api.exp.api.ExpProtocol; import org.labkey.api.exp.api.ExpSampleType; +import org.labkey.api.exp.api.SampleTypeService; import org.labkey.api.exp.api.ExperimentService; import org.labkey.api.exp.api.ExperimentUrls; import org.labkey.api.exp.api.NameExpressionOptionService; @@ -77,6 +86,7 @@ import org.labkey.api.exp.query.ExpSchema; import org.labkey.api.exp.query.SamplesSchema; import org.labkey.api.gwt.client.AuditBehaviorType; +import org.labkey.api.gwt.client.model.GWTPropertyDescriptor; import org.labkey.api.gwt.client.model.PropertyValidatorType; import org.labkey.api.inventory.InventoryService; import org.labkey.api.ontology.Quantity; @@ -89,6 +99,7 @@ import org.labkey.api.query.LookupForeignKey; import org.labkey.api.query.QueryException; import org.labkey.api.query.QueryForeignKey; +import org.labkey.api.query.BatchValidationException; import org.labkey.api.query.QueryService; import org.labkey.api.query.QueryUpdateService; import org.labkey.api.query.QueryUrls; @@ -105,11 +116,14 @@ import org.labkey.api.security.permissions.Permission; import org.labkey.api.security.permissions.ReadPermission; import org.labkey.api.security.permissions.UpdatePermission; +import org.labkey.api.test.TestWhen; import org.labkey.api.util.GUID; import org.labkey.api.util.HeartBeat; +import org.labkey.api.util.JunitUtil; import org.labkey.api.util.PageFlowUtil; import org.labkey.api.util.Pair; import org.labkey.api.util.StringExpression; +import org.labkey.api.util.TestContext; import org.labkey.api.util.UnexpectedException; import org.labkey.api.view.ActionURL; import org.labkey.api.view.ViewContext; @@ -121,6 +135,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -134,6 +149,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -147,6 +163,7 @@ import static org.labkey.api.exp.api.SampleTypeDomainKind.SAMPLE_TYPE_FILE_DIRECTORY_NAME; import static org.labkey.api.exp.query.ExpMaterialTable.Column.*; import static org.labkey.api.util.StringExpressionFactory.AbstractStringExpression.NullValueBehavior.NullResult; +import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.merge; import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.schema; public class ExpMaterialTableImpl extends ExpRunItemTableImpl implements ExpMaterialTable @@ -165,7 +182,7 @@ public class ExpMaterialTableImpl extends ExpRunItemTableImpl new SampleTypeAmountPrecisionDisplayColumn(colInfo, typeUnit)); columnInfo.setDescription("The amount of this sample, in the display unit for the sample type, currently on hand."); columnInfo.setShownInUpdateView(true); @@ -1215,6 +1232,8 @@ public boolean supportTableRules() // intentional override static class InvalidationCounters { public final AtomicLong update, insert, delete, rollup; + public final AtomicReference pendingUpdateSince = new AtomicReference<>(); + InvalidationCounters() { long l = System.currentTimeMillis(); @@ -1223,70 +1242,104 @@ static class InvalidationCounters delete = new AtomicLong(l); rollup = new AtomicLong(l); } + + void recordPendingUpdate(@NotNull Timestamp changedSince) + { + pendingUpdateSince.accumulateAndGet(changedSince, (cur, next) -> cur == null || next.before(cur) ? next : cur); + } + + @Nullable Timestamp drainPendingUpdate() + { + return pendingUpdateSince.getAndSet(null); + } } static final BlockingCache _materializedQueries = CacheManager.getBlockingStringKeyCache(CacheManager.UNLIMITED, CacheManager.HOUR, "materialized sample types", null); static final Map _invalidationCounters = Collections.synchronizedMap(new HashMap<>()); static final AtomicBoolean initializedListeners = new AtomicBoolean(false); - // used by SampleTypeServiceImpl.refreshSampleTypeMaterializedView() public static void refreshMaterializedView(final String lsid, SampleTypeServiceImpl.SampleChangeType reason) + { + refreshMaterializedView(lsid, reason, null); + } + + /** + * @param changedSince a watermark (from the database clock, captured before the update's writes) at or after which + * the changed {@code exp.material} rows were modified; only meaningful for update. null means + * the caller could not capture a watermark, forcing a full re-sync on the next read. + */ + public static void refreshMaterializedView(final String lsid, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Timestamp changedSince) { var scope = ExperimentServiceImpl.getExpSchema().getScope(); - var runnable = new RefreshMaterializedViewRunnable(lsid, reason); + var runnable = new RefreshMaterializedViewRunnable(lsid, reason, changedSince); scope.addCommitTask(runnable, DbScope.CommitTaskOption.POSTCOMMIT); } - private static class RefreshMaterializedViewRunnable implements Runnable + /** + * POSTCOMMIT task that turns a committed data change into an invalidation: a schema change (or any update/merge that + * could not capture a watermark) drops the cached MQH so the next read rebuilds the whole view; otherwise it bumps the + * matching per-LSID counter(s), and for {@code update}/{@code merge} also records the change watermark for a targeted + * incremental update. + */ + private record RefreshMaterializedViewRunnable( + String lsid, + SampleTypeServiceImpl.SampleChangeType reason, + @Nullable Timestamp changedSince + ) implements Runnable { - private final String _lsid; - private final SampleTypeServiceImpl.SampleChangeType _reason; - - public RefreshMaterializedViewRunnable(String lsid, SampleTypeServiceImpl.SampleChangeType reason) - { - _lsid = lsid; - _reason = reason; - } - @Override public void run() { - if (_reason == schema) + if (reason == schema) { /* NOTE: MaterializedQueryHelper can detect data changes and refresh the materialized view using the provided SQL. * It does not handle schema changes where the SQL itself needs to be updated. In this case, we remove the * MQH from the cache to force the SQL to be regenerated. */ - _materializedQueries.remove(_lsid); + _materializedQueries.remove(lsid); return; } - var counters = getInvalidateCounters(_lsid); - switch (_reason) + + var counters = getInvalidateCounters(lsid); + switch (reason) { case insert -> counters.insert.incrementAndGet(); case rollup -> counters.rollup.incrementAndGet(); - case update -> counters.update.incrementAndGet(); case delete -> counters.delete.incrementAndGet(); - default -> throw new IllegalStateException("Unexpected value: " + _reason); + case update, merge -> { + // An update or merge did not capture a watermark, so it cannot be targeted incrementally; drop the + // cached MQH so the next read rebuilds the whole view. Clear any pending watermark since the + // rebuild supersedes it. + if (changedSince == null) + { + counters.pendingUpdateSince.set(null); + _materializedQueries.remove(lsid); + return; + } + + counters.recordPendingUpdate(changedSince); + counters.update.incrementAndGet(); + + if (reason == merge) + counters.insert.incrementAndGet(); + } + default -> throw new IllegalStateException("Unexpected value: " + reason); } } @Override - public boolean equals(Object obj) + public @NotNull String toString() { - return obj instanceof RefreshMaterializedViewRunnable other && _lsid.equals(other._lsid) && _reason.equals(other._reason); + return "RefreshMaterializedViewRunnable{lsid=" + lsid + ", reason=" + reason + ", changedSince=" + changedSince + "}"; } } private static InvalidationCounters getInvalidateCounters(String lsid) { if (!initializedListeners.getAndSet(true)) - { CacheManager.addListener(_invalidationCounters::clear); - } - return _invalidationCounters.computeIfAbsent(lsid, (unused) -> - new InvalidationCounters() - ); + + return _invalidationCounters.computeIfAbsent(lsid, (_) -> new InvalidationCounters()); } /* SELECT and JOIN, does not include WHERE, same as getJoinSQL() */ @@ -1301,21 +1354,21 @@ private SQLFragment getMaterializedSQL() * Previously it has been used on non-provisioned tables. It might be helpful to have a pattern, * even if just to help with race-conditions. * - * Maybe have a callback to generate the SQL dynamically, and verify that the sql is unchanged. + * Maybe have a callback to generate the SQL dynamically and verify that the SQL is unchanged. */ - SQLFragment viewSql = getJoinSQL(null).append(" WHERE CpasType = ").appendValue(_ss.getLSID()); + List updateColumns = new ArrayList<>(); + SQLFragment viewSql = getJoinSQL(null, updateColumns).append(" WHERE CpasType = ").appendValue(_ss.getLSID()); return (_MaterializedQueryHelper) new _MaterializedQueryHelper.Builder(_ss.getLSID(), "", getExpSchema().getDbSchema().getScope(), viewSql) + .updateColumns(updateColumns) .addIndex("CREATE UNIQUE INDEX uq_${NAME}_rowid ON temp.${NAME} (rowid)") .addIndex("CREATE UNIQUE INDEX uq_${NAME}_lsid ON temp.${NAME} (lsid)") .addIndex("CREATE INDEX idx_${NAME}_container ON temp.${NAME} (container)") .addIndex("CREATE INDEX idx_${NAME}_root ON temp.${NAME} (rootmaterialrowid)") - .addInvalidCheck(() -> String.valueOf(getInvalidateCounters(_ss.getLSID()).update.get())) .build(); }); return new SQLFragment("SELECT * FROM ").append(mqh.getFromSql("_cached_view_")); } - /** * MaterializedQueryHelper has a built-in mechanism for tracking when a temp table needs to be recomputed. * It does not help with incremental updates (except for providing the upsert() method). @@ -1324,10 +1377,12 @@ private SQLFragment getMaterializedSQL() static class _MaterializedQueryHelper extends MaterializedQueryHelper { final String _lsid; + final List _updateColumns; static class Builder extends MaterializedQueryHelper.Builder { String _lsid; + List _updateColumns = List.of(); public Builder(String lsid, String prefix, DbScope scope, SQLFragment select) { @@ -1335,18 +1390,35 @@ public Builder(String lsid, String prefix, DbScope scope, SQLFragment select) this._lsid = lsid; } + public Builder updateColumns(List updateColumns) + { + this._updateColumns = updateColumns; + return this; + } + @Override public _MaterializedQueryHelper build() { - return new _MaterializedQueryHelper(_lsid, _prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto); + return new _MaterializedQueryHelper(_lsid, _updateColumns, _prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto); } } - _MaterializedQueryHelper(String lsid, String prefix, DbScope scope, SQLFragment select, @Nullable SQLFragment uptodate, Supplier supplier, @Nullable Collection indexes, long maxTimeToCache, - boolean isSelectIntoSql) + _MaterializedQueryHelper( + String lsid, + List updateColumns, + String prefix, + DbScope scope, + SQLFragment select, + @Nullable SQLFragment uptodate, + Supplier supplier, + @Nullable Collection indexes, + long maxTimeToCache, + boolean isSelectIntoSql + ) { super(prefix, scope, select, uptodate, supplier, indexes, maxTimeToCache, isSelectIntoSql); this._lsid = lsid; + this._updateColumns = updateColumns; } @Override @@ -1373,6 +1445,8 @@ protected void incrementalUpdateBeforeSelect(Materialized m) if (!materialized.incrementalDeleteCheck.stillValid(0)) executeIncrementalDelete(); + if (!materialized.incrementalUpdateCheck.stillValid(0)) + executeIncrementalUpdate(); if (!materialized.incrementalRollupCheck.stillValid(0)) executeIncrementalRollup(); if (!materialized.incrementalInsertCheck.stillValid(0)) @@ -1399,7 +1473,7 @@ protected void incrementalUpdateBeforeSelect(Materialized m) void upsertWithRetry(SQLFragment sql) { // not actually read-only, but we don't want to start an explicit transaction - _scope.executeWithRetryReadOnly((tx) -> upsert(sql)); + _scope.executeWithRetryReadOnly((_) -> upsert(sql)); } void executeIncrementalInsert() @@ -1457,6 +1531,67 @@ void executeIncrementalRollup() } upsertWithRetry(incremental); } + + void executeIncrementalUpdate() + { + InvalidationCounters counters = getInvalidateCounters(_lsid); + Timestamp since = counters.drainPendingUpdate(); + if (since == null) + return; + + try + { + upsertWithRetry(buildIncrementalUpdateSql(since)); + } + catch (RuntimeException ex) + { + // Restore the drained watermark so nothing is lost; the caller's handler will drop the MQH and rebuild cleanly. + counters.recordPendingUpdate(since); + throw ex; + } + } + + SQLFragment buildIncrementalUpdateSql(@NotNull Timestamp changedSince) + { + SqlDialect d = CoreSchema.getInstance().getSchema().getSqlDialect(); + // SQL Server's datetime type lacks microsecond precision + Timestamp comparison = d.isSqlServer() ? new Timestamp(changedSince.getTime() - 500) : changedSince; + + SQLFragment src = new SQLFragment() + .append(getViewSourceSql().append(" AND m.modified >= ?").add(comparison)) + .append("\nUNION ALL\n") + .append(getViewSourceSql() + .append(" AND m.rootmaterialrowid <> m.rowid") + .append(" AND EXISTS (SELECT 1 FROM exp.material r WHERE r.rowid = m.rootmaterialrowid AND r.cpastype = ").appendValue(_lsid, d) + .append(" AND r.modified >= ?").add(comparison).append(")")); + + SQLFragment sql = new SQLFragment(); + if (d.isPostgreSQL()) + { + sql.append("UPDATE temp.${NAME} AS st\nSET "); + appendSetFromSrc(sql); + sql.append("\nFROM (").append(src).append("\n) src\n").append("WHERE st.rowid = src.rowid"); + } + else + { + sql.append("UPDATE st\nSET "); + appendSetFromSrc(sql); + sql.append("\nFROM temp.${NAME} st INNER JOIN (").append(src).append("\n) src ON st.rowid = src.rowid"); + } + + return sql; + } + + private void appendSetFromSrc(SQLFragment sql) + { + String comma = ""; + for (ColumnInfo col : _updateColumns) + { + DatabaseIdentifier identifier = col.getSelectIdentifier(); + sql.append(comma).appendIdentifier(identifier).append(" = src.").appendIdentifier(identifier); + comma = ", "; + } + } } static class _Materialized extends MaterializedQueryHelper.Materialized @@ -1464,6 +1599,7 @@ static class _Materialized extends MaterializedQueryHelper.Materialized final MaterializedQueryHelper.Invalidator incrementalInsertCheck; final MaterializedQueryHelper.Invalidator incrementalRollupCheck; final MaterializedQueryHelper.Invalidator incrementalDeleteCheck; + final MaterializedQueryHelper.Invalidator incrementalUpdateCheck; _Materialized(_MaterializedQueryHelper mqh, String tableName, String cacheKey, long created, String sql) { @@ -1472,6 +1608,7 @@ static class _Materialized extends MaterializedQueryHelper.Materialized incrementalInsertCheck = new MaterializedQueryHelper.SupplierInvalidator(() -> String.valueOf(counters.insert.get())); incrementalRollupCheck = new MaterializedQueryHelper.SupplierInvalidator(() -> String.valueOf(counters.rollup.get())); incrementalDeleteCheck = new MaterializedQueryHelper.SupplierInvalidator(() -> String.valueOf(counters.delete.get())); + incrementalUpdateCheck = new MaterializedQueryHelper.SupplierInvalidator(() -> String.valueOf(counters.update.get())); } @Override @@ -1482,6 +1619,7 @@ public void reset() incrementalInsertCheck.stillValid(now); incrementalRollupCheck.stillValid(now); incrementalDeleteCheck.stillValid(now); + incrementalUpdateCheck.stillValid(now); } Lock getLock() @@ -1490,35 +1628,44 @@ Lock getLock() } } + /** Immutable join-key columns that an incremental UPDATE never re-derives (excluded from the re-assign list). */ + static final Set IMMUTABLE_UPDATE_COLUMNS = Set.of(RowId.fieldKey(), LSID.fieldKey(), CpasType.fieldKey(), RootMaterialRowId.fieldKey()); /* SELECT and JOIN, does not include WHERE */ private SQLFragment getJoinSQL(Set selectedColumns) + { + return getJoinSQL(selectedColumns, null); + } + + private SQLFragment getJoinSQL(Set selectedColumns, @Nullable List outUpdateColumns) { TableInfo provisioned = null == _ss ? null : _ss.getTinfo(); Set provisionedCols = new CaseInsensitiveHashSet(provisioned != null ? provisioned.getColumnNameSet() : Collections.emptySet()); provisionedCols.remove(RowId.name()); provisionedCols.remove(Name.name()); boolean hasProvisionedColumns = containsProvisionedColumns(selectedColumns, provisionedCols); - boolean hasSampleColumns = false; boolean hasAliquotColumns = false; - Set materialCols = new CaseInsensitiveHashSet(_rootTable.getColumnNameSet()); + List materialCols = _rootTable.getColumns(); selectedColumns = computeInnerSelectedColumns(selectedColumns); SQLFragment sql = new SQLFragment(); sql.appendComment("", getSqlDialect()); sql.append("SELECT "); String comma = ""; - for (String materialCol : materialCols) + for (ColumnInfo materialCol : materialCols) { // don't need to generate SQL for columns that aren't selected - if (ALL_COLUMNS == selectedColumns || selectedColumns.contains(new FieldKey(null, materialCol))) + if (ALL_COLUMNS == selectedColumns || selectedColumns.contains(materialCol.getFieldKey())) { - sql.append(comma).append("m.").appendIdentifier(materialCol); + sql.append(comma).append("m.").appendIdentifier(materialCol.getSelectIdentifier()); comma = ", "; + if (null != outUpdateColumns && !IMMUTABLE_UPDATE_COLUMNS.contains(materialCol.getFieldKey())) + outUpdateColumns.add(materialCol); } } + if (null != provisioned && hasProvisionedColumns) { for (ColumnInfo propertyColumn : provisioned.getColumns()) @@ -1535,25 +1682,31 @@ private SQLFragment getJoinSQL(Set selectedColumns) // don't need to generate SQL for columns that aren't selected if (ALL_COLUMNS == selectedColumns || selectedColumns.contains(propertyColumn.getFieldKey()) || propertyColumn.isMvIndicatorColumn()) { - sql.append(comma); boolean rootField = StringUtils.isEmpty(propertyColumn.getDerivationDataScope()) || ExpSchema.DerivationDataScopeType.ParentOnly.name().equalsIgnoreCase(propertyColumn.getDerivationDataScope()); + String alias; if ("genid".equalsIgnoreCase(propertyColumn.getColumnName()) || propertyColumn.isUniqueIdField()) { - sql.append(propertyColumn.getValueSql("m_aliquot")).append(" AS ").appendIdentifier(propertyColumn.getSelectIdentifier()); + alias = "m_aliquot"; hasAliquotColumns = true; } else if (rootField) { - sql.append(propertyColumn.getValueSql("m_sample")).append(" AS ").appendIdentifier(propertyColumn.getSelectIdentifier()); + alias = "m_sample"; hasSampleColumns = true; } else { - sql.append(propertyColumn.getValueSql("m_aliquot")).append(" AS ").appendIdentifier(propertyColumn.getSelectIdentifier()); + alias = "m_aliquot"; hasAliquotColumns = true; } + + sql.append(comma).append(propertyColumn.getValueSql(alias)).append(" AS ").appendIdentifier(propertyColumn.getSelectIdentifier()); comma = ", "; + + // provisioned columns are never immutable join keys, so always re-derivable on update + if (null != outUpdateColumns) + outUpdateColumns.add(propertyColumn); } } } @@ -1830,7 +1983,7 @@ public List> getImportTemplates(ViewContext ctx) url.addParameter("includeColumn", aliasKey); } } - catch (IOException e) + catch (IOException ignored) {} templates.add(Pair.of("Download Template", url.toString())); return templates; @@ -1874,4 +2027,248 @@ public Object getDisplayValue(RenderContext ctx) return value; } } + + @TestWhen(TestWhen.When.BVT) + public static class IncrementalUpdateTestCase extends Assert + { + private static User _user; + private static Container _c; + + @BeforeClass + public static void setup() + { + JunitUtil.deleteTestContainer(); + + _c = JunitUtil.getTestContainer(); + _user = TestContext.get().getUser(); + } + + @AfterClass + public static void tearDown() + { + JunitUtil.deleteTestContainer(); + } + + @Test + public void testTargetedSingleRowUpdate() throws Exception + { + ExpSampleType st = createSampleType("IncrUpdTargeted"); + int root = insertRoots(st, "R1").getFirst(); + int aliquot = insertAliquots(st, "R1", 1).getFirst(); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); // baseline materialization + + // Targeted update of one aliquot's aliquot-scoped property. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), aliquot, "aliquotProp", "changed"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // And a targeted update of the root's own (non-derived-onto-aliquot) name-adjacent material column path. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "rootChanged0"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testBatchUpdate() throws Exception + { + ExpSampleType st = createSampleType("IncrUpdBatch"); + List roots = insertRoots(st, "R1", "R2", "R3", "R4", "R5"); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + List> updates = new ArrayList<>(); + for (int i = 0; i < roots.size(); i++) + updates.add(CaseInsensitiveHashMap.of(RowId.name(), roots.get(i), "rootProp", "batch" + i)); + updateRows(st, updates); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testRootFanoutUpdate() throws Exception + { + ExpSampleType st = createSampleType("IncrUpdFanout"); + int root = insertRoots(st, "R1").getFirst(); + insertAliquots(st, "R1", 25); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // Editing the root's ParentOnly property must re-derive m_sample.* for every aliquot beneath it. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "fannedOut"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testFullResync() throws Exception + { + ExpSampleType st = createSampleType("IncrUpdFull"); + List roots = insertRoots(st, "R1", "R2", "R3"); + insertAliquots(st, "R1", 3); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // Make a real change, then force the full rebuild path (as a no-watermark update/merge would: drop the MQH) + // before reading. The next read must rebuild the whole view from scratch. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), roots.getFirst(), "rootProp", "fullResynced"))); + _materializedQueries.remove(st.getLSID()); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testModifiedBoundaryNotSkipped() throws Exception + { + // The watermark predicate uses >= so a row modified at exactly the watermark is never skipped. + ExpSampleType st = createSampleType("IncrUpdBoundary"); + int root = insertRoots(st, "R1").getFirst(); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // Drive the targeted update with a watermark equal to the row's own Modified, simulating same-tick capture. + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "boundary"))); + Timestamp rowModified = new SqlSelector(ExperimentServiceImpl.getExpSchema().getScope(), + new SQLFragment("SELECT modified FROM exp.material WHERE rowid = ?").add(root)).getObject(Timestamp.class); + InvalidationCounters counters = getInvalidateCounters(st.getLSID()); + counters.pendingUpdateSince.set(rowModified); + counters.update.incrementAndGet(); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testSequentialUpdates() throws Exception + { + // Two updates in sequence: each read must apply its own watermark and leave the cache consistent. + ExpSampleType st = createSampleType("IncrUpdSequential"); + int root = insertRoots(st, "R1").getFirst(); + insertAliquots(st, "R1", 2); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "first"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + updateRows(st, List.of(CaseInsensitiveHashMap.of(RowId.name(), root, "rootProp", "second"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + @Test + public void testMergeInsertAndUpdate() throws Exception + { + // A single merge that both updates existing rows and inserts new ones must leave the cache consistent: the + // merge signals both the update path (existing rows, watermark-targeted) and the insert path (new rows). + ExpSampleType st = createSampleType("IncrUpdMerge"); + insertRoots(st, "R1", "R2"); + insertAliquots(st, "R1", 2); + + ExpMaterialTableImpl table = getSamplesTable(st); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + + // R1/R2 already exist (matched by name -> updated); R3/R4 are new (-> inserted), all in one merge. + mergeRows(st, List.of( + CaseInsensitiveHashMap.of("name", "R1", "rootProp", "mergedR1"), + CaseInsensitiveHashMap.of("name", "R2", "rootProp", "mergedR2"), + CaseInsensitiveHashMap.of("name", "R3", "rootProp", "mergedR3"), + CaseInsensitiveHashMap.of("name", "R4", "rootProp", "mergedR4"))); + assertCacheMatchesFreshDerivation(table, st.getLSID()); + } + + private ExpSampleType createSampleType(String name) throws Exception + { + List props = new ArrayList<>(); + props.add(new GWTPropertyDescriptor("name", "string")); + GWTPropertyDescriptor rootProp = new GWTPropertyDescriptor("rootProp", "string"); + rootProp.setDerivationDataScope(ExpSchema.DerivationDataScopeType.ParentOnly.name()); // -> m_sample join + props.add(rootProp); + GWTPropertyDescriptor aliquotProp = new GWTPropertyDescriptor("aliquotProp", "string"); + aliquotProp.setDerivationDataScope(ExpSchema.DerivationDataScopeType.ChildOnly.name()); // -> m_aliquot join + props.add(aliquotProp); + return SampleTypeService.get().createSampleType(_c, _user, name, null, props, Collections.emptyList(), -1, -1, -1, -1, null); + } + + private ExpMaterialTableImpl getSamplesTable(ExpSampleType st) + { + return (ExpMaterialTableImpl) QueryService.get().getUserSchema(_user, _c, SamplesSchema.SCHEMA_NAME).getTable(st.getName()); + } + + private @NotNull QueryUpdateService getUpdateService(ExpSampleType st) + { + TableInfo table = getSamplesTable(st); + QueryUpdateService service = table.getUpdateService(); + if (service == null) + throw new IllegalArgumentException("No QueryUpdateService found for table " + st.getName()); + + return service; + } + + private List insertRoots(ExpSampleType st, String... names) throws Exception + { + List> rows = new ArrayList<>(); + for (int i = 0; i < names.length; i++) + rows.add(CaseInsensitiveHashMap.of("name", names[i], "rootProp", "root" + i)); + return insert(st, rows); + } + + private List insertAliquots(ExpSampleType st, String rootName, int count) throws Exception + { + List> rows = new ArrayList<>(); + for (int i = 0; i < count; i++) + rows.add(CaseInsensitiveHashMap.of("name", rootName + "-" + i, "AliquotedFrom", rootName, "aliquotProp", "aliquot" + i)); + return insert(st, rows); + } + + private List insert(ExpSampleType st, List> rows) throws Exception + { + BatchValidationException errors = new BatchValidationException(); + List> inserted = getUpdateService(st).insertRows(_user, _c, rows, errors, null, null); + if (errors.hasErrors()) + throw errors; + + if (inserted == null || inserted.isEmpty()) + return Collections.emptyList(); + + List rowIds = new ArrayList<>(); + for (Map row : inserted) + rowIds.add(((Number) row.get(RowId.name())).intValue()); + return rowIds; + } + + private void updateRows(ExpSampleType st, List> rows) throws Exception + { + BatchValidationException errors = new BatchValidationException(); + getUpdateService(st).updateRows(_user, _c, rows, null, errors, null, null); + if (errors.hasErrors()) + throw errors; + } + + private void mergeRows(ExpSampleType st, List> rows) throws Exception + { + BatchValidationException errors = new BatchValidationException(); + getUpdateService(st).mergeRows(_user, _c, MapDataIterator.of(rows), errors, null, null); + if (errors.hasErrors()) + throw errors; + } + + /** + * Trigger materialization (and any pending incremental update) as a side effect of {@link #getMaterializedSQL()}, + * then assert the cached temp table equals a fresh derivation of the join query in both directions. + */ + private void assertCacheMatchesFreshDerivation(ExpMaterialTableImpl table, String lsid) + { + SQLFragment cached = table.getMaterializedSQL(); // builds/refreshes the temp table via the normal read path + SQLFragment fresh = table.getJoinSQL(null).append(" WHERE CpasType = ").appendValue(lsid); + DbScope scope = ExperimentServiceImpl.getExpSchema().getScope(); + assertEquals("Cached rows not found in fresh derivation", 0, countDiff(scope, cached, fresh)); + assertEquals("Fresh-derivation rows not found in cache", 0, countDiff(scope, fresh, cached)); + } + + private int countDiff(DbScope scope, SQLFragment a, SQLFragment b) + { + SQLFragment sql = new SQLFragment("SELECT COUNT(*) FROM (\n").append(a).append("\nEXCEPT\n").append(b).append("\n) diff_"); + Integer count = new SqlSelector(scope, sql).getObject(Integer.class); + return count == null ? 0 : count; + } + } } diff --git a/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java b/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java index c53185c77f5..1638e121f84 100644 --- a/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpSampleTypeImpl.java @@ -75,6 +75,7 @@ import org.labkey.experiment.controllers.exp.ExperimentController; import java.io.IOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -865,9 +866,9 @@ public ExpProtocol[] getProtocols(User user) return ret; } - public void onSamplesChanged(User user, List materials, SampleTypeServiceImpl.SampleChangeType reason) + public void onSamplesChanged(User user, List materials, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Timestamp changedSince) { - SampleTypeServiceImpl.get().refreshSampleTypeMaterializedView(this, reason); + SampleTypeServiceImpl.get().refreshSampleTypeMaterializedView(this, reason, changedSince); ExpProtocol[] protocols = getProtocols(user); if (protocols.length != 0) @@ -892,7 +893,6 @@ public void onSamplesChanged(User user, List materials, SampleTypeServ } } - @Override public void setContainer(Container container) { diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index 052fbc5b0bc..9ba38e5d89b 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -135,6 +135,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -1205,7 +1206,7 @@ public ValidationException updateSampleType(GWTDomain SampleTypeServiceImpl.get().indexSampleType(st, SearchService.get().defaultTask().getQueue(container, SearchService.PRIORITY.modified)), POSTCOMMIT); + transaction.addCommitTask(() -> indexSampleType(st, SearchService.get().defaultTask().getQueue(container, SearchService.PRIORITY.modified)), POSTCOMMIT); transaction.commit(); refreshSampleTypeMaterializedView(st, SampleChangeType.schema); } @@ -1971,6 +1972,7 @@ public Map moveSamples(Collection sample updateCounts.put("sampleAuditEvents", 0); Map> fileMovesBySampleId = new LongHashMap<>(); ExperimentService expService = ExperimentService.get(); + Timestamp changedSince = SampleTypeUpdateServiceDI.captureChangedSince(); try (DbScope.Transaction transaction = ensureTransaction()) { @@ -1981,7 +1983,7 @@ public Map moveSamples(Collection sample AbstractQueryUpdateService.addTransactionAuditEvent(transaction, user, auditEvent); } - for (Map.Entry> entry: sampleTypesMap.entrySet()) + for (Map.Entry> entry : sampleTypesMap.entrySet()) { ExpSampleType sampleType = entry.getKey(); SamplesSchema schema = new SamplesSchema(user, sampleType.getContainer()); @@ -2055,10 +2057,10 @@ public Map moveSamples(Collection sample for (ExpSampleType sampleType : sampleTypesMap.keySet()) { // force refresh of materialized view - SampleTypeServiceImpl.get().refreshSampleTypeMaterializedView(sampleType, SampleChangeType.update); + refreshSampleTypeMaterializedView(sampleType, SampleChangeType.update, changedSince); // update search index for moved samples via indexSampleType() helper, it filters for samples to index // based on the modified date - SampleTypeServiceImpl.get().indexSampleType(sampleType, SearchService.get().defaultTask().getQueue(sampleType.getContainer(), SearchService.PRIORITY.modified)); + indexSampleType(sampleType, SearchService.get().defaultTask().getQueue(sampleType.getContainer(), SearchService.PRIORITY.modified)); } }, DbScope.CommitTaskOption.IMMEDIATE, POSTCOMMIT, POSTROLLBACK); @@ -2399,13 +2401,22 @@ public long getCurrentCount(NameGenerator.EntityCounter counterType, Container c return getProjectSampleCount(container, counterType == NameGenerator.EntityCounter.rootSampleCount); } - public enum SampleChangeType { insert, update, delete, rollup /* aliquot count */, schema } + public enum SampleChangeType { insert, update, merge, delete, rollup /* aliquot count */, schema } public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleChangeType reason) { - ExpMaterialTableImpl.refreshMaterializedView(st.getLSID(), reason); + refreshSampleTypeMaterializedView(st, reason, null); } + /** + * @param changedSince a database-clock watermark captured before the update's writes, at or after which the changed + * samples were modified (only meaningful for update); null means the caller could not capture a + * watermark, forcing a full re-sync on the next read. + */ + public void refreshSampleTypeMaterializedView(@NotNull ExpSampleType st, SampleChangeType reason, @Nullable Timestamp changedSince) + { + ExpMaterialTableImpl.refreshMaterializedView(st.getLSID(), reason, changedSince); + } public static class TestCase extends Assert { diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java index 9986758935b..39b5dabde21 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeUpdateServiceDI.java @@ -46,6 +46,7 @@ import org.labkey.api.data.RemapCache; import org.labkey.api.data.RuntimeSQLException; import org.labkey.api.data.SimpleFilter; +import org.labkey.api.data.SqlSelector; import org.labkey.api.data.TableInfo; import org.labkey.api.data.TableSelector; import org.labkey.api.data.UpdateableTableInfo; @@ -116,6 +117,7 @@ import java.io.IOException; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -149,6 +151,7 @@ import static org.labkey.api.util.IntegerUtils.asLong; import static org.labkey.experiment.ExpDataIterators.incrementCounts; import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.insert; +import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.merge; import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.rollup; import static org.labkey.experiment.api.SampleTypeServiceImpl.SampleChangeType.update; @@ -466,12 +469,16 @@ public int loadRows(User user, Container container, DataIteratorBuilder rows, Da context.putConfigParameter(ExperimentService.QueryOptions.GetSampleRecomputeCol, true); ArrayList> outputRows = new ArrayList<>(); + InsertOption insertOption = context.getInsertOption(); + Timestamp changedSince = insertOption.allowUpdate ? captureChangedSince() : null; + int ret = super.loadRows(user, container, rows, outputRows, context, extraScriptContext); if (ret > 0 && !context.getErrors().hasErrors() && _sampleType != null) { - boolean isMediaUpdate = _sampleType.isMedia() && context.getInsertOption().updateOnly; - onSamplesChanged(!isMediaUpdate ? outputRows : null, context.getConfigParameters(), container, context.getInsertOption().allowUpdate ? update : insert); - audit(context.getInsertOption().auditAction); + boolean isMediaUpdate = _sampleType.isMedia() && insertOption.updateOnly; + SampleTypeServiceImpl.SampleChangeType reason = insertOption.updateOnly ? update : insertOption.allowUpdate ? merge : insert; + onSamplesChanged(!isMediaUpdate ? outputRows : null, context.getConfigParameters(), container, reason, changedSince); + audit(insertOption.auditAction); } return ret; } @@ -480,10 +487,11 @@ public int loadRows(User user, Container container, DataIteratorBuilder rows, Da public int mergeRows(User user, Container container, DataIteratorBuilder rows, BatchValidationException errors, @Nullable Map configParameters, Map extraScriptContext) { assert _sampleType != null : "SampleType required for insert/update, but not required for read/delete"; + Timestamp changedSince = captureChangedSince(); int ret = _importRowsUsingDIB(user, container, rows, null, getDataIteratorContext(errors, InsertOption.MERGE, configParameters), extraScriptContext); if (ret > 0 && !errors.hasErrors()) { - onSamplesChanged(null, configParameters, container, update); // mergeRows not really used, skip wiring recalc + onSamplesChanged(null, configParameters, container, merge, changedSince); // mergeRows not really used, skip wiring recalc audit(QueryService.AuditAction.MERGE); } return ret; @@ -510,7 +518,7 @@ public List> insertRows(User user, Container container, List if (results != null && !results.isEmpty() && !errors.hasErrors()) { - onSamplesChanged(results, configParameters, container, SampleTypeServiceImpl.SampleChangeType.insert); + onSamplesChanged(results, configParameters, container, insert); audit(QueryService.AuditAction.INSERT); } return results; @@ -553,6 +561,7 @@ public List> updateRows( List> results; Map finalConfigParameters = configParameters == null ? new HashMap<>() : configParameters; recordDataIteratorUsed(finalConfigParameters); + Timestamp changedSince = captureChangedSince(); try { @@ -567,7 +576,7 @@ public List> updateRows( if (results != null && !results.isEmpty() && !errors.hasErrors()) { - onSamplesChanged(!_sampleType.isMedia() ? results : null, configParameters, container, update); + onSamplesChanged(!_sampleType.isMedia() ? results : null, configParameters, container, update, changedSince); audit(QueryService.AuditAction.UPDATE); } @@ -1139,6 +1148,11 @@ protected Map getRow(User user, Container container, Map> results, Map params, Container container, SampleTypeServiceImpl.SampleChangeType reason) + { + onSamplesChanged(results, params, container, reason, null); + } + + private void onSamplesChanged(List> results, Map params, Container container, SampleTypeServiceImpl.SampleChangeType reason, @Nullable Timestamp changedSince) { var tx = getSchema().getDbSchema().getScope().getCurrentTransaction(); Pair, Set> parentKeys = getSampleParentsForRecalc(results); @@ -1163,7 +1177,7 @@ private void onSamplesChanged(List> results, Map { - fireSamplesChanged(reason); + fireSamplesChanged(reason, changedSince); if (finalUseBackgroundRecalc && !finalSkipRecalc) handleRecalc(parentKeys.first, parentKeys.second, true, container); }, DbScope.CommitTaskOption.POSTCOMMIT); @@ -1173,7 +1187,7 @@ private void onSamplesChanged(List> results, Map rootRowIds, Set parentNames, boolean } } - private void fireSamplesChanged(SampleTypeServiceImpl.SampleChangeType reason) + private void fireSamplesChanged(SampleTypeServiceImpl.SampleChangeType reason, @Nullable Timestamp changedSince) { if (_sampleType != null) - _sampleType.onSamplesChanged(getUser(), null, reason); + _sampleType.onSamplesChanged(getUser(), null, reason, changedSince); + } + + static @Nullable Timestamp captureChangedSince() + { + return new SqlSelector(DbScope.getLabKeyScope(), "SELECT CURRENT_TIMESTAMP").getObject(Timestamp.class); } void audit(QueryService.AuditAction auditAction)