diff --git a/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json b/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json new file mode 100644 index 000000000..d7f208530 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT user_session_id, last_active_at\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING user_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06" +} diff --git a/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json b/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json new file mode 100644 index 000000000..a2c99a758 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT oauth2_session_id, last_active_at\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING oauth2_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30" +} diff --git a/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json b/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json new file mode 100644 index 000000000..4aaa3523f --- /dev/null +++ b/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT compat_session_id, last_active_at\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING compat_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60" +} diff --git a/crates/storage-pg/migrations/20260123090000_idx_oauth2_sessions_inactive_ips.sql b/crates/storage-pg/migrations/20260123090000_idx_oauth2_sessions_inactive_ips.sql new file mode 100644 index 000000000..6b26e231b --- /dev/null +++ b/crates/storage-pg/migrations/20260123090000_idx_oauth2_sessions_inactive_ips.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Partial index for cleaning up IP addresses from inactive OAuth2 sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS "oauth2_sessions_inactive_ips_idx" + ON "oauth2_sessions" ("last_active_at") + WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL; diff --git a/crates/storage-pg/migrations/20260123090001_idx_compat_sessions_inactive_ips.sql b/crates/storage-pg/migrations/20260123090001_idx_compat_sessions_inactive_ips.sql new file mode 100644 index 000000000..01e79bbab --- /dev/null +++ b/crates/storage-pg/migrations/20260123090001_idx_compat_sessions_inactive_ips.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Partial index for cleaning up IP addresses from inactive compat sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS "compat_sessions_inactive_ips_idx" + ON "compat_sessions" ("last_active_at") + WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL; diff --git a/crates/storage-pg/migrations/20260123090002_idx_user_sessions_inactive_ips.sql b/crates/storage-pg/migrations/20260123090002_idx_user_sessions_inactive_ips.sql new file mode 100644 index 000000000..c131fb896 --- /dev/null +++ b/crates/storage-pg/migrations/20260123090002_idx_user_sessions_inactive_ips.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Partial index for cleaning up IP addresses from inactive user sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS "user_sessions_inactive_ips_idx" + ON "user_sessions" ("last_active_at") + WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL; diff --git a/crates/storage-pg/src/compat/session.rs b/crates/storage-pg/src/compat/session.rs index df071c752..e7612fdc1 100644 --- a/crates/storage-pg/src/compat/session.rs +++ b/crates/storage-pg/src/compat/session.rs @@ -758,4 +758,57 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { res.last_finished_at, )) } + + #[tracing::instrument( + name = "db.compat_session.cleanup_inactive_ips", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + threshold = %threshold, + limit = limit, + ), + err, + )] + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( + r#" + WITH to_update AS ( + SELECT compat_session_id, last_active_at + FROM compat_sessions + WHERE last_active_ip IS NOT NULL + AND last_active_at IS NOT NULL + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE + ), + updated AS ( + UPDATE compat_sessions + SET last_active_ip = NULL + FROM to_update + WHERE compat_sessions.compat_session_id = to_update.compat_session_id + RETURNING compat_sessions.last_active_at + ) + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated + "#, + since, + threshold, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) + } } diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index a6f8dab56..e1379f74a 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -652,4 +652,57 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { res.last_finished_at, )) } + + #[tracing::instrument( + name = "db.oauth2_session.cleanup_inactive_ips", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + threshold = %threshold, + limit = limit, + ), + err, + )] + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( + r#" + WITH to_update AS ( + SELECT oauth2_session_id, last_active_at + FROM oauth2_sessions + WHERE last_active_ip IS NOT NULL + AND last_active_at IS NOT NULL + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE + ), + updated AS ( + UPDATE oauth2_sessions + SET last_active_ip = NULL + FROM to_update + WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id + RETURNING oauth2_sessions.last_active_at + ) + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated + "#, + since, + threshold, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) + } } diff --git a/crates/storage-pg/src/user/session.rs b/crates/storage-pg/src/user/session.rs index 0af857f57..dba42726b 100644 --- a/crates/storage-pg/src/user/session.rs +++ b/crates/storage-pg/src/user/session.rs @@ -695,4 +695,57 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { res.last_finished_at, )) } + + #[tracing::instrument( + name = "db.browser_session.cleanup_inactive_ips", + skip_all, + fields( + db.query.text, + since = since.map(tracing::field::display), + threshold = %threshold, + limit = limit, + ), + err, + )] + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( + r#" + WITH to_update AS ( + SELECT user_session_id, last_active_at + FROM user_sessions + WHERE last_active_ip IS NOT NULL + AND last_active_at IS NOT NULL + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE + ), + updated AS ( + UPDATE user_sessions + SET last_active_ip = NULL + FROM to_update + WHERE user_sessions.user_session_id = to_update.user_session_id + RETURNING user_sessions.last_active_at + ) + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated + "#, + since, + threshold, + i64::try_from(limit).unwrap_or(i64::MAX), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) + } } diff --git a/crates/storage/src/compat/session.rs b/crates/storage/src/compat/session.rs index 0ed335ed9..8ea08d8f6 100644 --- a/crates/storage/src/compat/session.rs +++ b/crates/storage/src/compat/session.rs @@ -386,6 +386,30 @@ pub trait CompatSessionRepository: Send + Sync { until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + /// Clear IP addresses from sessions inactive since the threshold + /// + /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. + /// + /// # Parameters + /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. + /// * `threshold`: Clear IPs for sessions with `last_active_at` before this + /// time + /// * `limit`: Maximum number of sessions to update in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(CompatSessionRepository: @@ -445,4 +469,11 @@ repository_impl!(CompatSessionRepository: until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/storage/src/oauth2/session.rs b/crates/storage/src/oauth2/session.rs index 068512b14..75035653f 100644 --- a/crates/storage/src/oauth2/session.rs +++ b/crates/storage/src/oauth2/session.rs @@ -485,6 +485,30 @@ pub trait OAuth2SessionRepository: Send + Sync { until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + /// Clear IP addresses from sessions inactive since the threshold + /// + /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. + /// + /// # Parameters + /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. + /// * `threshold`: Clear IPs for sessions with `last_active_at` before this + /// time + /// * `limit`: Maximum number of sessions to update in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(OAuth2SessionRepository: @@ -557,4 +581,11 @@ repository_impl!(OAuth2SessionRepository: until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index edf3a3c5e..51845a43b 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -627,3 +627,27 @@ pub struct PruneStalePolicyDataJob; impl InsertableJob for PruneStalePolicyDataJob { const QUEUE_NAME: &'static str = "prune-stale-policy-data"; } + +/// Cleanup IP addresses from inactive OAuth 2.0 sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupInactiveOAuth2SessionIpsJob; + +impl InsertableJob for CleanupInactiveOAuth2SessionIpsJob { + const QUEUE_NAME: &'static str = "cleanup-inactive-oauth2-session-ips"; +} + +/// Cleanup IP addresses from inactive compat sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupInactiveCompatSessionIpsJob; + +impl InsertableJob for CleanupInactiveCompatSessionIpsJob { + const QUEUE_NAME: &'static str = "cleanup-inactive-compat-session-ips"; +} + +/// Cleanup IP addresses from inactive user/browser sessions +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct CleanupInactiveUserSessionIpsJob; + +impl InsertableJob for CleanupInactiveUserSessionIpsJob { + const QUEUE_NAME: &'static str = "cleanup-inactive-user-session-ips"; +} diff --git a/crates/storage/src/user/session.rs b/crates/storage/src/user/session.rs index fb7517b02..5e4a108ea 100644 --- a/crates/storage/src/user/session.rs +++ b/crates/storage/src/user/session.rs @@ -331,6 +331,30 @@ pub trait BrowserSessionRepository: Send + Sync { until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + /// Clear IP addresses from sessions inactive since the threshold + /// + /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. + /// + /// # Parameters + /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. + /// * `threshold`: Clear IPs for sessions with `last_active_at` before this + /// time + /// * `limit`: Maximum number of sessions to update in this batch + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(BrowserSessionRepository: @@ -394,4 +418,11 @@ repository_impl!(BrowserSessionRepository: until: DateTime, limit: usize, ) -> Result<(usize, Option>), Self::Error>; + + async fn cleanup_inactive_ips( + &mut self, + since: Option>, + threshold: DateTime, + limit: usize, + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 0df35dbfd..c3fb46f78 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -13,11 +13,13 @@ use async_trait::async_trait; use mas_storage::queue::{ CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob, CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob, - CleanupFinishedUserSessionsJob, CleanupOAuthAuthorizationGrantsJob, - CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob, - CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob, - CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob, - CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob, + CleanupFinishedUserSessionsJob, CleanupInactiveCompatSessionIpsJob, + CleanupInactiveOAuth2SessionIpsJob, CleanupInactiveUserSessionIpsJob, + CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, + CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob, + CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob, + CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob, + CleanupUserRegistrationsJob, PruneStalePolicyDataJob, }; use tracing::{debug, info}; use ulid::Ulid; @@ -789,3 +791,126 @@ impl RunnableJob for PruneStalePolicyDataJob { Ok(()) } } + +#[async_trait] +impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_oauth2_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let (count, last_active_at) = repo + .oauth2_session() + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_active_at; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no OAuth2 session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive OAuth2 session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupInactiveCompatSessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_compat_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let (count, last_active_at) = repo + .compat_session() + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_active_at; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no compat session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive compat session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} + +#[async_trait] +impl RunnableJob for CleanupInactiveUserSessionIpsJob { + #[tracing::instrument(name = "job.cleanup_inactive_user_session_ips", skip_all)] + async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> { + // Clear IPs from sessions inactive for 30+ days + let threshold = state.clock.now() - chrono::Duration::days(30); + let mut total = 0; + + let mut since = None; + while !context.cancellation_token.is_cancelled() { + let mut repo = state.repository().await.map_err(JobError::retry)?; + + let (count, last_active_at) = repo + .browser_session() + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) + .await + .map_err(JobError::retry)?; + repo.save().await.map_err(JobError::retry)?; + + since = last_active_at; + total += count; + + if count != BATCH_SIZE { + break; + } + } + + if total == 0 { + debug!("no user session IPs to clean up"); + } else { + info!(count = total, "cleaned up inactive user session IPs"); + } + + Ok(()) + } + + fn timeout(&self) -> Option { + Some(Duration::from_secs(10 * 60)) + } +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index d4f58b23b..ca339f96f 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -158,6 +158,9 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() + .register_handler::() + .register_handler::() .register_deprecated_queue("cleanup-expired-tokens") .add_schedule( "cleanup-revoked-oauth-access-tokens", @@ -260,6 +263,24 @@ pub async fn init( // Run once a day "0 0 2 * * *".parse()?, mas_storage::queue::PruneStalePolicyDataJob, + ) + .add_schedule( + "cleanup-inactive-oauth2-session-ips", + // Run this job every hour + "0 46 * * * *".parse()?, + mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob, + ) + .add_schedule( + "cleanup-inactive-compat-session-ips", + // Run this job every hour + "0 47 * * * *".parse()?, + mas_storage::queue::CleanupInactiveCompatSessionIpsJob, + ) + .add_schedule( + "cleanup-inactive-user-session-ips", + // Run this job every hour + "0 48 * * * *".parse()?, + mas_storage::queue::CleanupInactiveUserSessionIpsJob, ); Ok(worker)