From b4025acc80ba5ca13662a0e79dbfc9d3573f0cbd Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 23 Jan 2026 11:11:07 +0100 Subject: [PATCH 1/2] Add cleanup jobs for inactive session IP addresses This adds three new scheduled cleanup jobs that clear the last_active_ip field from sessions that have been inactive for more than 30 days: - CleanupInactiveOAuth2SessionIpsJob - CleanupInactiveCompatSessionIpsJob - CleanupInactiveUserSessionIpsJob This helps with data minimization by not retaining IP addresses longer --- ...c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json | 23 ++++ ...b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json | 23 ++++ ...afe41b64d198729237dd2cef060992ebd0233.json | 23 ++++ ...90000_idx_oauth2_sessions_inactive_ips.sql | 10 ++ ...90001_idx_compat_sessions_inactive_ips.sql | 10 ++ ...3090002_idx_user_sessions_inactive_ips.sql | 10 ++ crates/storage-pg/src/compat/session.rs | 44 ++++++ crates/storage-pg/src/oauth2/session.rs | 44 ++++++ crates/storage-pg/src/user/session.rs | 44 ++++++ crates/storage/src/compat/session.rs | 26 ++++ crates/storage/src/oauth2/session.rs | 26 ++++ crates/storage/src/queue/tasks.rs | 24 ++++ crates/storage/src/user/session.rs | 26 ++++ crates/tasks/src/database.rs | 129 +++++++++++++++++- crates/tasks/src/lib.rs | 21 +++ 15 files changed, 478 insertions(+), 5 deletions(-) create mode 100644 crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json create mode 100644 crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json create mode 100644 crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json create mode 100644 crates/storage-pg/migrations/20260123090000_idx_oauth2_sessions_inactive_ips.sql create mode 100644 crates/storage-pg/migrations/20260123090001_idx_compat_sessions_inactive_ips.sql create mode 100644 crates/storage-pg/migrations/20260123090002_idx_user_sessions_inactive_ips.sql diff --git a/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json b/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json new file mode 100644 index 000000000..5bbedf86c --- /dev/null +++ b/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT oauth2_session_id\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e" +} diff --git a/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json b/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json new file mode 100644 index 000000000..90b4d1e80 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT user_session_id\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c" +} diff --git a/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json b/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json new file mode 100644 index 000000000..b9234e33a --- /dev/null +++ b/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT compat_session_id\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233" +} diff --git a/crates/storage-pg/migrations/20260123090000_idx_oauth2_sessions_inactive_ips.sql b/crates/storage-pg/migrations/20260123090000_idx_oauth2_sessions_inactive_ips.sql new file mode 100644 index 000000000..6b26e231b --- /dev/null +++ b/crates/storage-pg/migrations/20260123090000_idx_oauth2_sessions_inactive_ips.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Partial index for cleaning up IP addresses from inactive OAuth2 sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS "oauth2_sessions_inactive_ips_idx" + ON "oauth2_sessions" ("last_active_at") + WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL; diff --git a/crates/storage-pg/migrations/20260123090001_idx_compat_sessions_inactive_ips.sql b/crates/storage-pg/migrations/20260123090001_idx_compat_sessions_inactive_ips.sql new file mode 100644 index 000000000..01e79bbab --- /dev/null +++ b/crates/storage-pg/migrations/20260123090001_idx_compat_sessions_inactive_ips.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Partial index for cleaning up IP addresses from inactive compat sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS "compat_sessions_inactive_ips_idx" + ON "compat_sessions" ("last_active_at") + WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL; diff --git a/crates/storage-pg/migrations/20260123090002_idx_user_sessions_inactive_ips.sql b/crates/storage-pg/migrations/20260123090002_idx_user_sessions_inactive_ips.sql new file mode 100644 index 000000000..c131fb896 --- /dev/null +++ b/crates/storage-pg/migrations/20260123090002_idx_user_sessions_inactive_ips.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Partial index for cleaning up IP addresses from inactive user sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS "user_sessions_inactive_ips_idx" + ON "user_sessions" ("last_active_at") + WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL; diff --git a/crates/storage-pg/src/compat/session.rs b/crates/storage-pg/src/compat/session.rs index df071c752..438cfaa4a 100644 --- a/crates/storage-pg/src/compat/session.rs +++ b/crates/storage-pg/src/compat/session.rs @@ -758,4 +758,48 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { res.last_finished_at, )) } + + #[tracing::instrument( + name = "db.compat_session.cleanup_inactive_ips", + skip_all, + fields( + db.query.text, + threshold = %threshold, + limit = limit, + ), + err, + )] + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result { + let res = sqlx::query_scalar!( + r#" + WITH to_update AS ( + SELECT compat_session_id + FROM compat_sessions + WHERE last_active_ip IS NOT NULL + AND last_active_at IS NOT NULL + AND last_active_at < $1 + LIMIT $2 + ), + updated AS ( + UPDATE compat_sessions + SET last_active_ip = NULL + FROM to_update + WHERE compat_sessions.compat_session_id = to_update.compat_session_id + RETURNING 1 + ) + SELECT COUNT(*) AS "count!" FROM updated + "#, + threshold, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(res.try_into().unwrap_or(usize::MAX)) + } } diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index a6f8dab56..3af51ba70 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -652,4 +652,48 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { res.last_finished_at, )) } + + #[tracing::instrument( + name = "db.oauth2_session.cleanup_inactive_ips", + skip_all, + fields( + db.query.text, + threshold = %threshold, + limit = limit, + ), + err, + )] + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result { + let res = sqlx::query_scalar!( + r#" + WITH to_update AS ( + SELECT oauth2_session_id + FROM oauth2_sessions + WHERE last_active_ip IS NOT NULL + AND last_active_at IS NOT NULL + AND last_active_at < $1 + LIMIT $2 + ), + updated AS ( + UPDATE oauth2_sessions + SET last_active_ip = NULL + FROM to_update + WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id + RETURNING 1 + ) + SELECT COUNT(*) AS "count!" FROM updated + "#, + threshold, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(res.try_into().unwrap_or(usize::MAX)) + } } diff --git a/crates/storage-pg/src/user/session.rs b/crates/storage-pg/src/user/session.rs index 888edeebe..23cddc088 100644 --- a/crates/storage-pg/src/user/session.rs +++ b/crates/storage-pg/src/user/session.rs @@ -707,4 +707,48 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { res.last_finished_at, )) } + + #[tracing::instrument( + name = "db.browser_session.cleanup_inactive_ips", + skip_all, + fields( + db.query.text, + threshold = %threshold, + limit = limit, + ), + err, + )] + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result { + let res = sqlx::query_scalar!( + r#" + WITH to_update AS ( + SELECT user_session_id + FROM user_sessions + WHERE last_active_ip IS NOT NULL + AND last_active_at IS NOT NULL + AND last_active_at < $1 + LIMIT $2 + ), + updated AS ( + UPDATE user_sessions + SET last_active_ip = NULL + FROM to_update + WHERE user_sessions.user_session_id = to_update.user_session_id + RETURNING 1 + ) + SELECT COUNT(*) AS "count!" FROM updated + "#, + threshold, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(res.try_into().unwrap_or(usize::MAX)) + } } diff --git a/crates/storage/src/compat/session.rs b/crates/storage/src/compat/session.rs index 0ed335ed9..e48fa39f1 100644 --- a/crates/storage/src/compat/session.rs +++ b/crates/storage/src/compat/session.rs @@ -386,6 +386,26 @@ pub trait CompatSessionRepository: Send + Sync { until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + /// Clear IP addresses from sessions inactive since the threshold + /// + /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is + /// before the threshold. Returns the number of sessions affected. + /// + /// # Parameters + /// + /// * `threshold`: Clear IPs for sessions with `last_active_at` before this + /// time + /// * `limit`: Maximum number of sessions to update in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result; } repository_impl!(CompatSessionRepository: @@ -445,4 +465,10 @@ repository_impl!(CompatSessionRepository: until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result; ); diff --git a/crates/storage/src/oauth2/session.rs b/crates/storage/src/oauth2/session.rs index 068512b14..96b3f2cd7 100644 --- a/crates/storage/src/oauth2/session.rs +++ b/crates/storage/src/oauth2/session.rs @@ -485,6 +485,26 @@ pub trait OAuth2SessionRepository: Send + Sync { until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + /// Clear IP addresses from sessions inactive since the threshold + /// + /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is + /// before the threshold. Returns the number of sessions affected. + /// + /// # Parameters + /// + /// * `threshold`: Clear IPs for sessions with `last_active_at` before this + /// time + /// * `limit`: Maximum number of sessions to update in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result; } repository_impl!(OAuth2SessionRepository: @@ -557,4 +577,10 @@ repository_impl!(OAuth2SessionRepository: until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result; ); diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index edf3a3c5e..51845a43b 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -627,3 +627,27 @@ pub struct PruneStalePolicyDataJob; impl InsertableJob for PruneStalePolicyDataJob { const QUEUE_NAME: &'static str = "prune-stale-policy-data"; } + +/// Cleanup IP addresses from inactive OAuth 2.0 sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupInactiveOAuth2SessionIpsJob; + +impl InsertableJob for CleanupInactiveOAuth2SessionIpsJob { + const QUEUE_NAME: &'static str = "cleanup-inactive-oauth2-session-ips"; +} + +/// Cleanup IP addresses from inactive compat sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupInactiveCompatSessionIpsJob; + +impl InsertableJob for CleanupInactiveCompatSessionIpsJob { + const QUEUE_NAME: &'static str = "cleanup-inactive-compat-session-ips"; +} + +/// Cleanup IP addresses from inactive user/browser sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupInactiveUserSessionIpsJob; + +impl InsertableJob for CleanupInactiveUserSessionIpsJob { + const QUEUE_NAME: &'static str = "cleanup-inactive-user-session-ips"; +} diff --git a/crates/storage/src/user/session.rs b/crates/storage/src/user/session.rs index 6d04aefc2..49e44079e 100644 --- a/crates/storage/src/user/session.rs +++ b/crates/storage/src/user/session.rs @@ -332,6 +332,26 @@ pub trait BrowserSessionRepository: Send + Sync { until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + /// Clear IP addresses from sessions inactive since the threshold + /// + /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is + /// before the threshold. Returns the number of sessions affected. + /// + /// # Parameters + /// + /// * `threshold`: Clear IPs for sessions with `last_active_at` before this + /// time + /// * `limit`: Maximum number of sessions to update in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result; } repository_impl!(BrowserSessionRepository: @@ -395,4 +415,10 @@ repository_impl!(BrowserSessionRepository: until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + async fn cleanup_inactive_ips( + &mut self, + threshold: DateTime, + limit: usize, + ) -> Result; ); diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 0df35dbfd..898b689bf 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -13,11 +13,13 @@ use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob, - CleanupFinishedUserSessionsJob, CleanupOAuthAuthorizationGrantsJob, - CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob, - CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob, - CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob, - CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, + CleanupFinishedUserSessionsJob, CleanupInactiveCompatSessionIpsJob, + CleanupInactiveOAuth2SessionIpsJob, CleanupInactiveUserSessionIpsJob, + CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, + CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, + CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob, + CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, + CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; use ulid::Ulid; @@ -789,3 +791,120 @@ impl RunnableJob for PruneStalePolicyDataJob { Ok(()) } } + +#[async_trait] +impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_oauth2_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let count = repo + .oauth2_session() + .cleanup_inactive_ips(threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no OAuth2 session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive OAuth2 session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupInactiveCompatSessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_compat_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let count = repo + .compat_session() + .cleanup_inactive_ips(threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no compat session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive compat session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupInactiveUserSessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_user_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let count = repo + .browser_session() + .cleanup_inactive_ips(threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no user session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive user session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 6be237206..6369b5751 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -158,6 +158,9 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() + .register_handler::() + .register_handler::() .register_deprecated_queue("cleanup-expired-tokens") .add_schedule( "cleanup-revoked-oauth-access-tokens", @@ -264,6 +267,24 @@ pub async fn init( // Run once a day "0 0 2 * * *".parse()?, mas_storage::queue::PruneStalePolicyDataJob, + ) + .add_schedule( + "cleanup-inactive-oauth2-session-ips", + // Run this job every hour + "0 46 * * * *".parse()?, + mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob, + ) + .add_schedule( + "cleanup-inactive-compat-session-ips", + // Run this job every hour + "0 47 * * * *".parse()?, + mas_storage::queue::CleanupInactiveCompatSessionIpsJob, + ) + .add_schedule( + "cleanup-inactive-user-session-ips", + // Run this job every hour + "0 48 * * * *".parse()?, + mas_storage::queue::CleanupInactiveUserSessionIpsJob, ); Ok(worker) From 270236cb4a0eae9cc61ec311d3b09bcb296ecdf3 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 23 Jan 2026 18:52:33 +0100 Subject: [PATCH 2/2] Refactor inactive IP cleanup to use pagination This should avoid dead many dead tuples when processing batches of sessions to cleanup --- ...c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json | 23 -------------- ...268818dc84c37b168ab45e582e0a727796a06.json | 30 +++++++++++++++++++ ...6268efd13d7af3cecb452168d514a379fec30.json | 30 +++++++++++++++++++ ...f79832e5d0748dad18ab44c6671f3196d6f60.json | 30 +++++++++++++++++++ ...b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json | 23 -------------- ...afe41b64d198729237dd2cef060992ebd0233.json | 23 -------------- crates/storage-pg/src/compat/session.rs | 25 +++++++++++----- crates/storage-pg/src/oauth2/session.rs | 25 +++++++++++----- crates/storage-pg/src/user/session.rs | 25 +++++++++++----- crates/storage/src/compat/session.rs | 11 +++++-- crates/storage/src/oauth2/session.rs | 11 +++++-- crates/storage/src/user/session.rs | 11 +++++-- crates/tasks/src/database.rs | 18 +++++++---- 13 files changed, 177 insertions(+), 108 deletions(-) delete mode 100644 crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json create mode 100644 crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json create mode 100644 crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json create mode 100644 crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json delete mode 100644 crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json delete mode 100644 crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json diff --git a/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json b/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json deleted file mode 100644 index 5bbedf86c..000000000 --- a/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH to_update AS (\n SELECT oauth2_session_id\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Timestamptz", - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e" -} diff --git a/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json b/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json new file mode 100644 index 000000000..d7f208530 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT user_session_id, last_active_at\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING user_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06" +} diff --git a/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json b/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json new file mode 100644 index 000000000..a2c99a758 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT oauth2_session_id, last_active_at\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING oauth2_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30" +} diff --git a/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json b/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json new file mode 100644 index 000000000..4aaa3523f --- /dev/null +++ b/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT compat_session_id, last_active_at\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING compat_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60" +} diff --git a/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json b/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json deleted file mode 100644 index 90b4d1e80..000000000 --- a/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH to_update AS (\n SELECT user_session_id\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Timestamptz", - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c" -} diff --git a/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json b/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json deleted file mode 100644 index b9234e33a..000000000 --- a/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH to_update AS (\n SELECT compat_session_id\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Timestamptz", - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233" -} diff --git a/crates/storage-pg/src/compat/session.rs b/crates/storage-pg/src/compat/session.rs index 438cfaa4a..e7612fdc1 100644 --- a/crates/storage-pg/src/compat/session.rs +++ b/crates/storage-pg/src/compat/session.rs @@ -764,6 +764,7 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { skip_all, fields( db.query.text, + since = since.map(tracing::field::display), threshold = %threshold, limit = limit, ), @@ -771,28 +772,33 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { )] async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result { - let res = sqlx::query_scalar!( + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( r#" WITH to_update AS ( - SELECT compat_session_id + SELECT compat_session_id, last_active_at FROM compat_sessions WHERE last_active_ip IS NOT NULL AND last_active_at IS NOT NULL - AND last_active_at < $1 - LIMIT $2 + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE ), updated AS ( UPDATE compat_sessions SET last_active_ip = NULL FROM to_update WHERE compat_sessions.compat_session_id = to_update.compat_session_id - RETURNING 1 + RETURNING compat_sessions.last_active_at ) - SELECT COUNT(*) AS "count!" FROM updated + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated "#, + since, threshold, i64::try_from(limit).unwrap_or(i64::MAX), ) @@ -800,6 +806,9 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { .fetch_one(&mut *self.conn) .await?; - Ok(res.try_into().unwrap_or(usize::MAX)) + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) } } diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index 3af51ba70..e1379f74a 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -658,6 +658,7 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { skip_all, fields( db.query.text, + since = since.map(tracing::field::display), threshold = %threshold, limit = limit, ), @@ -665,28 +666,33 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { )] async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result { - let res = sqlx::query_scalar!( + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( r#" WITH to_update AS ( - SELECT oauth2_session_id + SELECT oauth2_session_id, last_active_at FROM oauth2_sessions WHERE last_active_ip IS NOT NULL AND last_active_at IS NOT NULL - AND last_active_at < $1 - LIMIT $2 + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE ), updated AS ( UPDATE oauth2_sessions SET last_active_ip = NULL FROM to_update WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id - RETURNING 1 + RETURNING oauth2_sessions.last_active_at ) - SELECT COUNT(*) AS "count!" FROM updated + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated "#, + since, threshold, i64::try_from(limit).unwrap_or(i64::MAX), ) @@ -694,6 +700,9 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { .fetch_one(&mut *self.conn) .await?; - Ok(res.try_into().unwrap_or(usize::MAX)) + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) } } diff --git a/crates/storage-pg/src/user/session.rs b/crates/storage-pg/src/user/session.rs index 23cddc088..853023984 100644 --- a/crates/storage-pg/src/user/session.rs +++ b/crates/storage-pg/src/user/session.rs @@ -713,6 +713,7 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { skip_all, fields( db.query.text, + since = since.map(tracing::field::display), threshold = %threshold, limit = limit, ), @@ -720,28 +721,33 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { )] async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result { - let res = sqlx::query_scalar!( + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( r#" WITH to_update AS ( - SELECT user_session_id + SELECT user_session_id, last_active_at FROM user_sessions WHERE last_active_ip IS NOT NULL AND last_active_at IS NOT NULL - AND last_active_at < $1 - LIMIT $2 + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE ), updated AS ( UPDATE user_sessions SET last_active_ip = NULL FROM to_update WHERE user_sessions.user_session_id = to_update.user_session_id - RETURNING 1 + RETURNING user_sessions.last_active_at ) - SELECT COUNT(*) AS "count!" FROM updated + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated "#, + since, threshold, i64::try_from(limit).unwrap_or(i64::MAX), ) @@ -749,6 +755,9 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { .fetch_one(&mut *self.conn) .await?; - Ok(res.try_into().unwrap_or(usize::MAX)) + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) } } diff --git a/crates/storage/src/compat/session.rs b/crates/storage/src/compat/session.rs index e48fa39f1..8ea08d8f6 100644 --- a/crates/storage/src/compat/session.rs +++ b/crates/storage/src/compat/session.rs @@ -390,10 +390,13 @@ pub trait CompatSessionRepository: Send + Sync { /// Clear IP addresses from sessions inactive since the threshold /// /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is - /// before the threshold. Returns the number of sessions affected. + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. /// /// # Parameters /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. /// * `threshold`: Clear IPs for sessions with `last_active_at` before this /// time /// * `limit`: Maximum number of sessions to update in this batch @@ -403,9 +406,10 @@ pub trait CompatSessionRepository: Send + Sync { /// Returns [`Self::Error`] if the underlying repository fails async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(CompatSessionRepository: @@ -468,7 +472,8 @@ repository_impl!(CompatSessionRepository: async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/storage/src/oauth2/session.rs b/crates/storage/src/oauth2/session.rs index 96b3f2cd7..75035653f 100644 --- a/crates/storage/src/oauth2/session.rs +++ b/crates/storage/src/oauth2/session.rs @@ -489,10 +489,13 @@ pub trait OAuth2SessionRepository: Send + Sync { /// Clear IP addresses from sessions inactive since the threshold /// /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is - /// before the threshold. Returns the number of sessions affected. + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. /// /// # Parameters /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. /// * `threshold`: Clear IPs for sessions with `last_active_at` before this /// time /// * `limit`: Maximum number of sessions to update in this batch @@ -502,9 +505,10 @@ pub trait OAuth2SessionRepository: Send + Sync { /// Returns [`Self::Error`] if the underlying repository fails async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(OAuth2SessionRepository: @@ -580,7 +584,8 @@ repository_impl!(OAuth2SessionRepository: async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/storage/src/user/session.rs b/crates/storage/src/user/session.rs index 49e44079e..8cf78aa7f 100644 --- a/crates/storage/src/user/session.rs +++ b/crates/storage/src/user/session.rs @@ -336,10 +336,13 @@ pub trait BrowserSessionRepository: Send + Sync { /// Clear IP addresses from sessions inactive since the threshold /// /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is - /// before the threshold. Returns the number of sessions affected. + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. /// /// # Parameters /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. /// * `threshold`: Clear IPs for sessions with `last_active_at` before this /// time /// * `limit`: Maximum number of sessions to update in this batch @@ -349,9 +352,10 @@ pub trait BrowserSessionRepository: Send + Sync { /// Returns [`Self::Error`] if the underlying repository fails async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(BrowserSessionRepository: @@ -418,7 +422,8 @@ repository_impl!(BrowserSessionRepository: async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 898b689bf..c3fb46f78 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -800,16 +800,18 @@ impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob { let threshold = state.clock.now() - chrono::Duration::days(30); let mut total = 0; + let mut since = None; while !context.cancellation_token.is_cancelled() { let mut repo = state.repository().await.map_err(JobError::retry)?; - let count = repo + let (count, last_active_at) = repo .oauth2_session() - .cleanup_inactive_ips(threshold, BATCH_SIZE) + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; + since = last_active_at; total += count; if count != BATCH_SIZE { @@ -839,16 +841,18 @@ impl RunnableJob for CleanupInactiveCompatSessionIpsJob { let threshold = state.clock.now() - chrono::Duration::days(30); let mut total = 0; + let mut since = None; while !context.cancellation_token.is_cancelled() { let mut repo = state.repository().await.map_err(JobError::retry)?; - let count = repo + let (count, last_active_at) = repo .compat_session() - .cleanup_inactive_ips(threshold, BATCH_SIZE) + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; + since = last_active_at; total += count; if count != BATCH_SIZE { @@ -878,16 +882,18 @@ impl RunnableJob for CleanupInactiveUserSessionIpsJob { let threshold = state.clock.now() - chrono::Duration::days(30); let mut total = 0; + let mut since = None; while !context.cancellation_token.is_cancelled() { let mut repo = state.repository().await.map_err(JobError::retry)?; - let count = repo + let (count, last_active_at) = repo .browser_session() - .cleanup_inactive_ips(threshold, BATCH_SIZE) + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; + since = last_active_at; total += count; if count != BATCH_SIZE {