Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* [BUGFIX] Ring: Fix ring token conflict resolution only applied to updated instance and make constantly token conflict check during instance observe period.
* [BUGFIX] Distributor: Fix a panic (`slice bounds out of range`) in the stream push path when the context deadline expires while the worker goroutine is still marshalling a `WriteRequest`. #7541
* [BUGFIX] Query Frontend: Fix native histogram responses not being handled correctly in `minTime()` sort ordering for split_by_interval merge. #7555
* [BUGFIX] Compactor: Ensure visit marker heartbeat goroutine completes before blocks cleaner returns. #7386

## 1.21.0 2026-04-24

Expand Down
14 changes: 12 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,14 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
doneChan := make(chan struct{})
go func() {
visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
close(doneChan)
}()
defer func() {
errChan <- nil
<-doneChan
}()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This exact pattern is duplicated verbatim in cleanDeletedUsers below. Consider extracting a small helper to reduce the duplication, e.g.:

func runWithHeartBeat(ctx context.Context, mgr *VisitMarkerManager, interval time.Duration, fn func() error) error {
    errChan := make(chan error, 1)
    doneChan := make(chan struct{})
    go func() {
        mgr.HeartBeat(ctx, errChan, interval, true)
        close(doneChan)
    }()
    defer func() {
        errChan <- nil
        <-doneChan
    }()
    return fn()
}

This is optional — the duplication is small and the current code is clear.

return errors.Wrapf(c.cleanUser(ctx, userLogger, userBucket, userID, firstRun), "failed to delete blocks for user: %s", userID)
})
Expand Down Expand Up @@ -392,9 +397,14 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
doneChan := make(chan struct{})
go func() {
visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
close(doneChan)
}()
defer func() {
errChan <- nil
<-doneChan
}()
return errors.Wrapf(c.deleteUserMarkedForDeletion(ctx, userLogger, userBucket, userID), "failed to delete user marked for deletion: %s", userID)
})
Expand Down
Loading