diff --git a/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json b/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json deleted file mode 100644 index 5bbedf86c..000000000 --- a/crates/storage-pg/.sqlx/query-4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH to_update AS (\n SELECT oauth2_session_id\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Timestamptz", - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e" -} diff --git a/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json b/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json new file mode 100644 index 000000000..d7f208530 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT user_session_id, last_active_at\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING user_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06" +} diff --git a/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json b/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json new file mode 100644 index 000000000..a2c99a758 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT oauth2_session_id, last_active_at\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING oauth2_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30" +} diff --git a/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json b/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json new file mode 100644 index 000000000..4aaa3523f --- /dev/null +++ b/crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH to_update AS (\n SELECT compat_session_id, last_active_at\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING compat_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "last_active_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60" +} diff --git a/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json b/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json deleted file mode 100644 index 90b4d1e80..000000000 --- a/crates/storage-pg/.sqlx/query-bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH to_update AS (\n SELECT user_session_id\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Timestamptz", - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c" -} diff --git a/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json b/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json deleted file mode 100644 index b9234e33a..000000000 --- a/crates/storage-pg/.sqlx/query-d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH to_update AS (\n SELECT compat_session_id\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Timestamptz", - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233" -} diff --git a/crates/storage-pg/src/compat/session.rs b/crates/storage-pg/src/compat/session.rs index 438cfaa4a..e7612fdc1 100644 --- a/crates/storage-pg/src/compat/session.rs +++ b/crates/storage-pg/src/compat/session.rs @@ -764,6 +764,7 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { skip_all, fields( db.query.text, + since = since.map(tracing::field::display), threshold = %threshold, limit = limit, ), @@ -771,28 +772,33 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { )] async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result { - let res = sqlx::query_scalar!( + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( r#" WITH to_update AS ( - SELECT compat_session_id + SELECT compat_session_id, last_active_at FROM compat_sessions WHERE last_active_ip IS NOT NULL AND last_active_at IS NOT NULL - AND last_active_at < $1 - LIMIT $2 + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE ), updated AS ( UPDATE compat_sessions SET last_active_ip = NULL FROM to_update WHERE compat_sessions.compat_session_id = to_update.compat_session_id - RETURNING 1 + RETURNING compat_sessions.last_active_at ) - SELECT COUNT(*) AS "count!" FROM updated + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated "#, + since, threshold, i64::try_from(limit).unwrap_or(i64::MAX), ) @@ -800,6 +806,9 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { .fetch_one(&mut *self.conn) .await?; - Ok(res.try_into().unwrap_or(usize::MAX)) + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) } } diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index 3af51ba70..e1379f74a 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -658,6 +658,7 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { skip_all, fields( db.query.text, + since = since.map(tracing::field::display), threshold = %threshold, limit = limit, ), @@ -665,28 +666,33 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { )] async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result { - let res = sqlx::query_scalar!( + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( r#" WITH to_update AS ( - SELECT oauth2_session_id + SELECT oauth2_session_id, last_active_at FROM oauth2_sessions WHERE last_active_ip IS NOT NULL AND last_active_at IS NOT NULL - AND last_active_at < $1 - LIMIT $2 + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE ), updated AS ( UPDATE oauth2_sessions SET last_active_ip = NULL FROM to_update WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id - RETURNING 1 + RETURNING oauth2_sessions.last_active_at ) - SELECT COUNT(*) AS "count!" FROM updated + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated "#, + since, threshold, i64::try_from(limit).unwrap_or(i64::MAX), ) @@ -694,6 +700,9 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { .fetch_one(&mut *self.conn) .await?; - Ok(res.try_into().unwrap_or(usize::MAX)) + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) } } diff --git a/crates/storage-pg/src/user/session.rs b/crates/storage-pg/src/user/session.rs index 23cddc088..853023984 100644 --- a/crates/storage-pg/src/user/session.rs +++ b/crates/storage-pg/src/user/session.rs @@ -713,6 +713,7 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { skip_all, fields( db.query.text, + since = since.map(tracing::field::display), threshold = %threshold, limit = limit, ), @@ -720,28 +721,33 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { )] async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result { - let res = sqlx::query_scalar!( + ) -> Result<(usize, Option>), Self::Error> { + let res = sqlx::query!( r#" WITH to_update AS ( - SELECT user_session_id + SELECT user_session_id, last_active_at FROM user_sessions WHERE last_active_ip IS NOT NULL AND last_active_at IS NOT NULL - AND last_active_at < $1 - LIMIT $2 + AND ($1::timestamptz IS NULL OR last_active_at >= $1) + AND last_active_at < $2 + ORDER BY last_active_at ASC + LIMIT $3 + FOR UPDATE ), updated AS ( UPDATE user_sessions SET last_active_ip = NULL FROM to_update WHERE user_sessions.user_session_id = to_update.user_session_id - RETURNING 1 + RETURNING user_sessions.last_active_at ) - SELECT COUNT(*) AS "count!" FROM updated + SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated "#, + since, threshold, i64::try_from(limit).unwrap_or(i64::MAX), ) @@ -749,6 +755,9 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { .fetch_one(&mut *self.conn) .await?; - Ok(res.try_into().unwrap_or(usize::MAX)) + Ok(( + res.count.try_into().unwrap_or(usize::MAX), + res.last_active_at, + )) } } diff --git a/crates/storage/src/compat/session.rs b/crates/storage/src/compat/session.rs index e48fa39f1..8ea08d8f6 100644 --- a/crates/storage/src/compat/session.rs +++ b/crates/storage/src/compat/session.rs @@ -390,10 +390,13 @@ pub trait CompatSessionRepository: Send + Sync { /// Clear IP addresses from sessions inactive since the threshold /// /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is - /// before the threshold. Returns the number of sessions affected. + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. /// /// # Parameters /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. /// * `threshold`: Clear IPs for sessions with `last_active_at` before this /// time /// * `limit`: Maximum number of sessions to update in this batch @@ -403,9 +406,10 @@ pub trait CompatSessionRepository: Send + Sync { /// Returns [`Self::Error`] if the underlying repository fails async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(CompatSessionRepository: @@ -468,7 +472,8 @@ repository_impl!(CompatSessionRepository: async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/storage/src/oauth2/session.rs b/crates/storage/src/oauth2/session.rs index 96b3f2cd7..75035653f 100644 --- a/crates/storage/src/oauth2/session.rs +++ b/crates/storage/src/oauth2/session.rs @@ -489,10 +489,13 @@ pub trait OAuth2SessionRepository: Send + Sync { /// Clear IP addresses from sessions inactive since the threshold /// /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is - /// before the threshold. Returns the number of sessions affected. + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. /// /// # Parameters /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. /// * `threshold`: Clear IPs for sessions with `last_active_at` before this /// time /// * `limit`: Maximum number of sessions to update in this batch @@ -502,9 +505,10 @@ pub trait OAuth2SessionRepository: Send + Sync { /// Returns [`Self::Error`] if the underlying repository fails async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(OAuth2SessionRepository: @@ -580,7 +584,8 @@ repository_impl!(OAuth2SessionRepository: async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/storage/src/user/session.rs b/crates/storage/src/user/session.rs index 49e44079e..8cf78aa7f 100644 --- a/crates/storage/src/user/session.rs +++ b/crates/storage/src/user/session.rs @@ -336,10 +336,13 @@ pub trait BrowserSessionRepository: Send + Sync { /// Clear IP addresses from sessions inactive since the threshold /// /// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is - /// before the threshold. Returns the number of sessions affected. + /// before the threshold. Returns the number of sessions affected and the + /// last `last_active_at` timestamp processed for pagination. /// /// # Parameters /// + /// * `since`: Only process sessions with `last_active_at` at or after this + /// timestamp (exclusive). If `None`, starts from the beginning. /// * `threshold`: Clear IPs for sessions with `last_active_at` before this /// time /// * `limit`: Maximum number of sessions to update in this batch @@ -349,9 +352,10 @@ pub trait BrowserSessionRepository: Send + Sync { /// Returns [`Self::Error`] if the underlying repository fails async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; } repository_impl!(BrowserSessionRepository: @@ -418,7 +422,8 @@ repository_impl!(BrowserSessionRepository: async fn cleanup_inactive_ips( &mut self, + since: Option>, threshold: DateTime, limit: usize, - ) -> Result; + ) -> Result<(usize, Option>), Self::Error>; ); diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 898b689bf..c3fb46f78 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -800,16 +800,18 @@ impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob { let threshold = state.clock.now() - chrono::Duration::days(30); let mut total = 0; + let mut since = None; while !context.cancellation_token.is_cancelled() { let mut repo = state.repository().await.map_err(JobError::retry)?; - let count = repo + let (count, last_active_at) = repo .oauth2_session() - .cleanup_inactive_ips(threshold, BATCH_SIZE) + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; + since = last_active_at; total += count; if count != BATCH_SIZE { @@ -839,16 +841,18 @@ impl RunnableJob for CleanupInactiveCompatSessionIpsJob { let threshold = state.clock.now() - chrono::Duration::days(30); let mut total = 0; + let mut since = None; while !context.cancellation_token.is_cancelled() { let mut repo = state.repository().await.map_err(JobError::retry)?; - let count = repo + let (count, last_active_at) = repo .compat_session() - .cleanup_inactive_ips(threshold, BATCH_SIZE) + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; + since = last_active_at; total += count; if count != BATCH_SIZE { @@ -878,16 +882,18 @@ impl RunnableJob for CleanupInactiveUserSessionIpsJob { let threshold = state.clock.now() - chrono::Duration::days(30); let mut total = 0; + let mut since = None; while !context.cancellation_token.is_cancelled() { let mut repo = state.repository().await.map_err(JobError::retry)?; - let count = repo + let (count, last_active_at) = repo .browser_session() - .cleanup_inactive_ips(threshold, BATCH_SIZE) + .cleanup_inactive_ips(since, threshold, BATCH_SIZE) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; + since = last_active_at; total += count; if count != BATCH_SIZE {