From 256d11c5a165d3e9f004d86452ee4461493ad8e3 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 22 Jan 2026 15:21:19 +0100 Subject: [PATCH 1/3] Include pagination params in the tracing fields of cleanup methods --- crates/storage-pg/src/oauth2/access_token.rs | 6 ++++++ crates/storage-pg/src/upstream_oauth2/link.rs | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/storage-pg/src/oauth2/access_token.rs b/crates/storage-pg/src/oauth2/access_token.rs index 2b950aa67..a8d49ac36 100644 --- a/crates/storage-pg/src/oauth2/access_token.rs +++ b/crates/storage-pg/src/oauth2/access_token.rs @@ -255,6 +255,9 @@ impl OAuth2AccessTokenRepository for PgOAuth2AccessTokenRepository<'_> { skip_all, fields( db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, ), err, )] @@ -309,6 +312,9 @@ impl OAuth2AccessTokenRepository for PgOAuth2AccessTokenRepository<'_> { skip_all, fields( db.query.text, + since = since.map(tracing::field::display), + until = %until, + limit = limit, ), err, )] diff --git a/crates/storage-pg/src/upstream_oauth2/link.rs b/crates/storage-pg/src/upstream_oauth2/link.rs index ca01dc97d..8814bc3e6 100644 --- a/crates/storage-pg/src/upstream_oauth2/link.rs +++ b/crates/storage-pg/src/upstream_oauth2/link.rs @@ -449,9 +449,9 @@ impl UpstreamOAuthLinkRepository for PgUpstreamOAuthLinkRepository<'_> { skip_all, fields( db.query.text, - since, - until, - limit, + since = since.map(tracing::field::display), + until = %until, + limit = limit, ), err, )] From f7db25bce2303342158c384c5c2043d91426dd19 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 22 Jan 2026 11:13:27 +0100 Subject: [PATCH 2/3] Add cleanup jobs developer documentation --- docs/SUMMARY.md | 1 + docs/development/cleanup-jobs.md | 266 +++++++++++++++++++++++++++++++ 2 files changed, 267 insertions(+) create mode 100644 docs/development/cleanup-jobs.md diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 211047872..27182d5f0 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -44,6 +44,7 @@ - [Releasing](./development/releasing.md) - [Architecture](./development/architecture.md) - [Database](./development/database.md) +- [Cleanup jobs](./development/cleanup-jobs.md) - [Internal GraphQL API](./development/graphql.md) --- diff --git a/docs/development/cleanup-jobs.md b/docs/development/cleanup-jobs.md new file mode 100644 index 000000000..045946435 --- /dev/null +++ b/docs/development/cleanup-jobs.md @@ -0,0 +1,266 @@ +# Cleanup Jobs + +In MAS, most of the data only soft-deleted, through setting a `deleted_at`, `finished_at`, `consumed_at` timestamp on the row, instead of actually deleting the row. +They are kept around for a short period of time, for audit purposes or to help with the user experience in some case. +This document describes the cleanup jobs in MAS which delete those stale rows after some time, including how to add new cleanup jobs and understand the existing ones. + +## Cleanup Job Architecture + +Cleanup jobs are scheduled tasks that hard-delete old data from the database. They follow a consistent pattern: + +1. **Job struct** in `crates/storage/src/queue/tasks.rs` - Defines the job and queue name +2. **Storage trait** in `crates/storage/src/{domain}/` - Declares the cleanup method interface +3. **PostgreSQL implementation** in `crates/storage-pg/src/{domain}/` - Implements the actual cleanup logic +4. **Job runner** in `crates/tasks/src/database.rs` - Implements the `RunnableJob` trait with batching logic +5. **Registration** in `crates/tasks/src/lib.rs` - Registers the handler and schedules execution + +## All Cleanup Jobs + +| Job | Entity | Retention | Notes | +|-----|--------|-----------|-------| +| `CleanupRevokedOAuthAccessTokensJob` | `oauth2_access_tokens` | 1 hour after `revoked_at` | | +| `CleanupExpiredOAuthAccessTokensJob` | `oauth2_access_tokens` | 30 days after `expires_at` | For idempotency | +| `CleanupRevokedOAuthRefreshTokensJob` | `oauth2_refresh_tokens` | 1 hour after `revoked_at` | | +| `CleanupConsumedOAuthRefreshTokensJob` | `oauth2_refresh_tokens` | 1 hour after `consumed_at` | | +| `CleanupUserRegistrationsJob` | `user_registrations` | 30 days | For abuse investigation | +| `CleanupFinishedCompatSessionsJob` | `compat_sessions` | 30 days after `finished_at` | Cascades to tokens | +| `CleanupFinishedOAuth2SessionsJob` | `oauth2_sessions` | 30 days after `finished_at` | Cascades to tokens | +| `CleanupFinishedUserSessionsJob` | `user_sessions` | 30 days after `finished_at` | Only if no child sessions | +| `CleanupOAuthAuthorizationGrantsJob` | `oauth2_authorization_grants` | 7 days | | +| `CleanupOAuthDeviceCodeGrantsJob` | `oauth2_device_code_grant` | 7 days | | +| `CleanupUserRecoverySessionsJob` | `user_recovery_sessions` | 7 days | Codes expire in 10 min | +| `CleanupUserEmailAuthenticationsJob` | `user_email_authentications` | 7 days | Codes expire in 10 min | +| `CleanupUpstreamOAuthSessionsJob` | `upstream_oauth_authorization_sessions` | 7 days (orphaned) | Where `user_session_id IS NULL` | +| `CleanupUpstreamOAuthLinksJob` | `upstream_oauth_links` | 7 days (orphaned) | Where `user_id IS NULL` | +| `CleanupInactiveOAuth2SessionIpsJob` | `oauth2_sessions.last_active_ip` | 30 days | Clears out IPs after inactivity | +| `CleanupInactiveCompatSessionIpsJob` | `compat_sessions.last_active_ip` | 30 days | Clears out IPs after inactivity | +| `CleanupInactiveUserSessionIpsJob` | `user_sessions.last_active_ip` | 30 days | Clears out IPs after inactivity | +| `CleanupQueueJobsJob` | `queue_jobs` | 30 days | Completed/failed jobs | + +## Session Cleanup and Backchannel Logout + +The session cleanup jobs must preserve the dependency chain required for backchannel logout to work correctly. + +### Backchannel Logout Flow + +When an upstream IdP sends a backchannel logout notification, MAS must trace through the session hierarchy to find and finish all related sessions: + +``` + Upstream IdP logout notification + │ + ▼ + ┌───────────────────────────────────────┐ + │ upstream_oauth_authorization_sessions │ + │ (matched by sub/sid claims) │ + └──────────────┬────────────────────────┘ + │ user_session_id + ▼ + ┌─────────────────────────────────────┐ + │ user_sessions │ + │ (browser sessions) │ + └──────────────┬──────────────────────┘ + │ user_session_id FK + ┌────┴──────────────┐ + │ │ + ▼ ▼ + ┌─────────────────┐ ┌─────────────────┐ + │ compat_sessions │ │ oauth2_sessions │ + └─────────────────┘ └─────────────────┘ +``` + +### Cleanup Order + +The cleanup jobs run in an order that respects this hierarchy: + +1. **Compat sessions** (`CleanupFinishedCompatSessionsJob`) + - Also deletes `compat_access_tokens`, `compat_refresh_tokens` +1. **OAuth2 sessions** (`CleanupFinishedOAuth2SessionsJob`) + - Also deletes `oauth2_access_tokens`, `oauth2_refresh_tokens` +1. **User sessions** (`CleanupFinishedUserSessionsJob`) + - Only deletes if NO `compat_sessions` or `oauth2_sessions` reference it. + This can make this job inefficient if there are lots of finished `user_sessions` that are still referenced by active `compat_sessions` or `oauth2_sessions`. + - Also deletes `user_session_authentications` + - Cascades to `SET NULL` the `user_session_id` on `upstream_oauth_authorization_sessions` +1. **Upstream OAuth authorization sessions** (`CleanupUpstreamOAuthSessionsJob`) + - Only deletes if `user_session_id` is `NULL`, so if the authentication session was never finished *or* the user session was cleaned up. + +### Why User Sessions Require Special Handling + +The `user_session_id` foreign key links must be preserved for backchannel logout to work: + +1. **Backchannel logout** traces: `upstream_oauth_authorization_sessions` → `user_sessions` → `compat_sessions`/`oauth2_sessions` +2. If `user_sessions` is deleted while child sessions exist, the link is broken and logout propagation fails +3. The `NOT EXISTS` checks in the cleanup query ensure we only delete `user_sessions` after all children are cleaned up +4. FK constraints (`ON DELETE NO ACTION`) provide a safety net - attempting to delete a referenced `user_session` will fail + +## Adding a New Cleanup Job + +### 1. Add Job Struct + +In `crates/storage/src/queue/tasks.rs`: + +```rust +/// Cleanup old foo records +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupFooJob; + +impl InsertableJob for CleanupFooJob { + const QUEUE_NAME: &'static str = "cleanup-foo"; +} +``` + +### 2. Add Storage Trait Method + +In `crates/storage/src/{domain}/foo.rs`, add to the repository trait and `repository_impl!` macro: + +```rust +async fn cleanup( + &mut self, + since: Option>, + until: DateTime, + limit: usize, +) -> Result<(usize, Option>), Self::Error>; +``` + +### 3. Implement in PostgreSQL + +In `crates/storage-pg/src/{domain}/foo.rs`, use the CTE pattern: + +```rust +async fn cleanup( + &mut self, + since: Option>, + until: DateTime, + limit: usize, +) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( + r#" + WITH + to_delete AS ( + SELECT id, timestamp_col + FROM table + WHERE timestamp_col IS NOT NULL + AND ($1::timestamptz IS NULL OR timestamp_col >= $1) + AND timestamp_col < $2 + ORDER BY timestamp_col ASC + LIMIT $3 + FOR UPDATE + ), + deleted AS ( + DELETE FROM table USING to_delete + WHERE table.id = to_delete.id + RETURNING timestamp_col + ) + SELECT COUNT(*) as "count!", MAX(timestamp_col) as last_timestamp FROM deleted + "#, + since, + until, + limit as i64, + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_timestamp, + )) +} +``` + +### 4. Add Index Migration + +Make sure to add an index on that timestamp column used by this cleanup job: + +```sql +-- no-transaction +CREATE INDEX CONCURRENTLY IF NOT EXISTS "table_timestamp_idx" + ON "table" ("timestamp_col") + WHERE "timestamp_col" IS NOT NULL; +``` + +The partial index (`WHERE timestamp_col IS NOT NULL`) makes queries more efficient by only indexing rows that will actually be cleaned up. + +### 5. Implement RunnableJob + +In `crates/tasks/src/database.rs`: + +```rust +#[async_trait] +impl RunnableJob for CleanupFooJob { + #[tracing::instrument(name = "job.cleanup_foo", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Cleanup records older than X days + let until = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let (count, last_timestamp) = repo + .foo() + .cleanup(since, until, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_timestamp; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no foo records to clean up"); + } else { + info!(count = total, "cleaned up foo records"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} +``` + +### 6. Register and Schedule + +In `crates/tasks/src/lib.rs`: + +```rust +// Add to register_handler chain +.register_handler::() + +// Add schedule +.add_schedule( + "cleanup-foo", + // Run this job every hour + "0 XX * * * *".parse()?, // Choose a minute offset + mas_storage::queue::CleanupFooJob, +) +``` + +## Implementation Notes + +### Batching Pattern + +All cleanup jobs use a batching pattern to avoid long-running transactions: + +- Process records in batches (typically 1000 at a time) +- Use pagination cursor (`since`) to track progress +- Create a new transaction for each batch +- Check for cancellation between batches +- Log total count at the end + +### Retention Policies + +Retention periods vary by use case: + +- **1 hour**: Revoked/consumed tokens (no longer useful) +- **7 days**: Short-lived grants/codes (abuse investigation) +- **30 days**: Sessions and registrations (longer audit trail) From 7abedeba83acfedc0bfcee08c9a58a27ca9845b9 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 23 Jan 2026 17:43:13 +0100 Subject: [PATCH 3/3] Minor reword in the cleanup jobs documentation Co-authored-by: Olivier 'reivilibre' --- docs/development/cleanup-jobs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/cleanup-jobs.md b/docs/development/cleanup-jobs.md index 045946435..639d0e7a3 100644 --- a/docs/development/cleanup-jobs.md +++ b/docs/development/cleanup-jobs.md @@ -1,6 +1,6 @@ # Cleanup Jobs -In MAS, most of the data only soft-deleted, through setting a `deleted_at`, `finished_at`, `consumed_at` timestamp on the row, instead of actually deleting the row. +In MAS, most of the data are initially only soft-deleted, by setting a `deleted_at`, `finished_at`, `consumed_at` timestamp on the row, instead of actually deleting the row. They are kept around for a short period of time, for audit purposes or to help with the user experience in some case. This document describes the cleanup jobs in MAS which delete those stale rows after some time, including how to add new cleanup jobs and understand the existing ones.