Clear out last active IP on each sessions after 30 days (#5448)
This commit is contained in:
30
crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json
generated
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\n WITH to_update AS (\n SELECT user_session_id, last_active_at\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING user_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "count!",
|
||||||
|
"type_info": "Int8"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ordinal": 1,
|
||||||
|
"name": "last_active_at",
|
||||||
|
"type_info": "Timestamptz"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Timestamptz",
|
||||||
|
"Timestamptz",
|
||||||
|
"Int8"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": "535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06"
|
||||||
|
}
|
||||||
30
crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json
generated
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\n WITH to_update AS (\n SELECT oauth2_session_id, last_active_at\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING oauth2_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "count!",
|
||||||
|
"type_info": "Int8"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ordinal": 1,
|
||||||
|
"name": "last_active_at",
|
||||||
|
"type_info": "Timestamptz"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Timestamptz",
|
||||||
|
"Timestamptz",
|
||||||
|
"Int8"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": "7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30"
|
||||||
|
}
|
||||||
30
crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json
generated
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\n WITH to_update AS (\n SELECT compat_session_id, last_active_at\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING compat_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"ordinal": 0,
|
||||||
|
"name": "count!",
|
||||||
|
"type_info": "Int8"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ordinal": 1,
|
||||||
|
"name": "last_active_at",
|
||||||
|
"type_info": "Timestamptz"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Timestamptz",
|
||||||
|
"Timestamptz",
|
||||||
|
"Int8"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": [
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"hash": "926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60"
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
-- no-transaction
|
||||||
|
-- Copyright 2026 Element Creations Ltd.
|
||||||
|
--
|
||||||
|
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||||
|
-- Please see LICENSE files in the repository root for full details.
|
||||||
|
|
||||||
|
-- Partial index for cleaning up IP addresses from inactive OAuth2 sessions
|
||||||
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS "oauth2_sessions_inactive_ips_idx"
|
||||||
|
ON "oauth2_sessions" ("last_active_at")
|
||||||
|
WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL;
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
-- no-transaction
|
||||||
|
-- Copyright 2026 Element Creations Ltd.
|
||||||
|
--
|
||||||
|
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||||
|
-- Please see LICENSE files in the repository root for full details.
|
||||||
|
|
||||||
|
-- Partial index for cleaning up IP addresses from inactive compat sessions
|
||||||
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS "compat_sessions_inactive_ips_idx"
|
||||||
|
ON "compat_sessions" ("last_active_at")
|
||||||
|
WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL;
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
-- no-transaction
|
||||||
|
-- Copyright 2026 Element Creations Ltd.
|
||||||
|
--
|
||||||
|
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||||
|
-- Please see LICENSE files in the repository root for full details.
|
||||||
|
|
||||||
|
-- Partial index for cleaning up IP addresses from inactive user sessions
|
||||||
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS "user_sessions_inactive_ips_idx"
|
||||||
|
ON "user_sessions" ("last_active_at")
|
||||||
|
WHERE "last_active_ip" IS NOT NULL AND "last_active_at" IS NOT NULL;
|
||||||
@@ -758,4 +758,57 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
|
|||||||
res.last_finished_at,
|
res.last_finished_at,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "db.compat_session.cleanup_inactive_ips",
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
db.query.text,
|
||||||
|
since = since.map(tracing::field::display),
|
||||||
|
threshold = %threshold,
|
||||||
|
limit = limit,
|
||||||
|
),
|
||||||
|
err,
|
||||||
|
)]
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||||
|
let res = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
WITH to_update AS (
|
||||||
|
SELECT compat_session_id, last_active_at
|
||||||
|
FROM compat_sessions
|
||||||
|
WHERE last_active_ip IS NOT NULL
|
||||||
|
AND last_active_at IS NOT NULL
|
||||||
|
AND ($1::timestamptz IS NULL OR last_active_at >= $1)
|
||||||
|
AND last_active_at < $2
|
||||||
|
ORDER BY last_active_at ASC
|
||||||
|
LIMIT $3
|
||||||
|
FOR UPDATE
|
||||||
|
),
|
||||||
|
updated AS (
|
||||||
|
UPDATE compat_sessions
|
||||||
|
SET last_active_ip = NULL
|
||||||
|
FROM to_update
|
||||||
|
WHERE compat_sessions.compat_session_id = to_update.compat_session_id
|
||||||
|
RETURNING compat_sessions.last_active_at
|
||||||
|
)
|
||||||
|
SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
|
||||||
|
"#,
|
||||||
|
since,
|
||||||
|
threshold,
|
||||||
|
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||||
|
)
|
||||||
|
.traced()
|
||||||
|
.fetch_one(&mut *self.conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
res.count.try_into().unwrap_or(usize::MAX),
|
||||||
|
res.last_active_at,
|
||||||
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -652,4 +652,57 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> {
|
|||||||
res.last_finished_at,
|
res.last_finished_at,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "db.oauth2_session.cleanup_inactive_ips",
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
db.query.text,
|
||||||
|
since = since.map(tracing::field::display),
|
||||||
|
threshold = %threshold,
|
||||||
|
limit = limit,
|
||||||
|
),
|
||||||
|
err,
|
||||||
|
)]
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||||
|
let res = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
WITH to_update AS (
|
||||||
|
SELECT oauth2_session_id, last_active_at
|
||||||
|
FROM oauth2_sessions
|
||||||
|
WHERE last_active_ip IS NOT NULL
|
||||||
|
AND last_active_at IS NOT NULL
|
||||||
|
AND ($1::timestamptz IS NULL OR last_active_at >= $1)
|
||||||
|
AND last_active_at < $2
|
||||||
|
ORDER BY last_active_at ASC
|
||||||
|
LIMIT $3
|
||||||
|
FOR UPDATE
|
||||||
|
),
|
||||||
|
updated AS (
|
||||||
|
UPDATE oauth2_sessions
|
||||||
|
SET last_active_ip = NULL
|
||||||
|
FROM to_update
|
||||||
|
WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id
|
||||||
|
RETURNING oauth2_sessions.last_active_at
|
||||||
|
)
|
||||||
|
SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
|
||||||
|
"#,
|
||||||
|
since,
|
||||||
|
threshold,
|
||||||
|
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||||
|
)
|
||||||
|
.traced()
|
||||||
|
.fetch_one(&mut *self.conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
res.count.try_into().unwrap_or(usize::MAX),
|
||||||
|
res.last_active_at,
|
||||||
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -695,4 +695,57 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> {
|
|||||||
res.last_finished_at,
|
res.last_finished_at,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "db.browser_session.cleanup_inactive_ips",
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
db.query.text,
|
||||||
|
since = since.map(tracing::field::display),
|
||||||
|
threshold = %threshold,
|
||||||
|
limit = limit,
|
||||||
|
),
|
||||||
|
err,
|
||||||
|
)]
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||||
|
let res = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
WITH to_update AS (
|
||||||
|
SELECT user_session_id, last_active_at
|
||||||
|
FROM user_sessions
|
||||||
|
WHERE last_active_ip IS NOT NULL
|
||||||
|
AND last_active_at IS NOT NULL
|
||||||
|
AND ($1::timestamptz IS NULL OR last_active_at >= $1)
|
||||||
|
AND last_active_at < $2
|
||||||
|
ORDER BY last_active_at ASC
|
||||||
|
LIMIT $3
|
||||||
|
FOR UPDATE
|
||||||
|
),
|
||||||
|
updated AS (
|
||||||
|
UPDATE user_sessions
|
||||||
|
SET last_active_ip = NULL
|
||||||
|
FROM to_update
|
||||||
|
WHERE user_sessions.user_session_id = to_update.user_session_id
|
||||||
|
RETURNING user_sessions.last_active_at
|
||||||
|
)
|
||||||
|
SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
|
||||||
|
"#,
|
||||||
|
since,
|
||||||
|
threshold,
|
||||||
|
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||||
|
)
|
||||||
|
.traced()
|
||||||
|
.fetch_one(&mut *self.conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
res.count.try_into().unwrap_or(usize::MAX),
|
||||||
|
res.last_active_at,
|
||||||
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -386,6 +386,30 @@ pub trait CompatSessionRepository: Send + Sync {
|
|||||||
until: DateTime<Utc>,
|
until: DateTime<Utc>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
|
|
||||||
|
/// Clear IP addresses from sessions inactive since the threshold
|
||||||
|
///
|
||||||
|
/// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is
|
||||||
|
/// before the threshold. Returns the number of sessions affected and the
|
||||||
|
/// last `last_active_at` timestamp processed for pagination.
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
///
|
||||||
|
/// * `since`: Only process sessions with `last_active_at` at or after this
|
||||||
|
/// timestamp (exclusive). If `None`, starts from the beginning.
|
||||||
|
/// * `threshold`: Clear IPs for sessions with `last_active_at` before this
|
||||||
|
/// time
|
||||||
|
/// * `limit`: Maximum number of sessions to update in this batch
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns [`Self::Error`] if the underlying repository fails
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
repository_impl!(CompatSessionRepository:
|
repository_impl!(CompatSessionRepository:
|
||||||
@@ -445,4 +469,11 @@ repository_impl!(CompatSessionRepository:
|
|||||||
until: DateTime<Utc>,
|
until: DateTime<Utc>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
|
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -485,6 +485,30 @@ pub trait OAuth2SessionRepository: Send + Sync {
|
|||||||
until: DateTime<Utc>,
|
until: DateTime<Utc>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
|
|
||||||
|
/// Clear IP addresses from sessions inactive since the threshold
|
||||||
|
///
|
||||||
|
/// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is
|
||||||
|
/// before the threshold. Returns the number of sessions affected and the
|
||||||
|
/// last `last_active_at` timestamp processed for pagination.
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
///
|
||||||
|
/// * `since`: Only process sessions with `last_active_at` at or after this
|
||||||
|
/// timestamp (exclusive). If `None`, starts from the beginning.
|
||||||
|
/// * `threshold`: Clear IPs for sessions with `last_active_at` before this
|
||||||
|
/// time
|
||||||
|
/// * `limit`: Maximum number of sessions to update in this batch
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns [`Self::Error`] if the underlying repository fails
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
repository_impl!(OAuth2SessionRepository:
|
repository_impl!(OAuth2SessionRepository:
|
||||||
@@ -557,4 +581,11 @@ repository_impl!(OAuth2SessionRepository:
|
|||||||
until: DateTime<Utc>,
|
until: DateTime<Utc>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
|
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -627,3 +627,27 @@ pub struct PruneStalePolicyDataJob;
|
|||||||
impl InsertableJob for PruneStalePolicyDataJob {
|
impl InsertableJob for PruneStalePolicyDataJob {
|
||||||
const QUEUE_NAME: &'static str = "prune-stale-policy-data";
|
const QUEUE_NAME: &'static str = "prune-stale-policy-data";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Cleanup IP addresses from inactive OAuth 2.0 sessions
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||||
|
pub struct CleanupInactiveOAuth2SessionIpsJob;
|
||||||
|
|
||||||
|
impl InsertableJob for CleanupInactiveOAuth2SessionIpsJob {
|
||||||
|
const QUEUE_NAME: &'static str = "cleanup-inactive-oauth2-session-ips";
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cleanup IP addresses from inactive compat sessions
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||||
|
pub struct CleanupInactiveCompatSessionIpsJob;
|
||||||
|
|
||||||
|
impl InsertableJob for CleanupInactiveCompatSessionIpsJob {
|
||||||
|
const QUEUE_NAME: &'static str = "cleanup-inactive-compat-session-ips";
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cleanup IP addresses from inactive user/browser sessions
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||||
|
pub struct CleanupInactiveUserSessionIpsJob;
|
||||||
|
|
||||||
|
impl InsertableJob for CleanupInactiveUserSessionIpsJob {
|
||||||
|
const QUEUE_NAME: &'static str = "cleanup-inactive-user-session-ips";
|
||||||
|
}
|
||||||
|
|||||||
@@ -331,6 +331,30 @@ pub trait BrowserSessionRepository: Send + Sync {
|
|||||||
until: DateTime<Utc>,
|
until: DateTime<Utc>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
|
|
||||||
|
/// Clear IP addresses from sessions inactive since the threshold
|
||||||
|
///
|
||||||
|
/// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is
|
||||||
|
/// before the threshold. Returns the number of sessions affected and the
|
||||||
|
/// last `last_active_at` timestamp processed for pagination.
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
///
|
||||||
|
/// * `since`: Only process sessions with `last_active_at` at or after this
|
||||||
|
/// timestamp (exclusive). If `None`, starts from the beginning.
|
||||||
|
/// * `threshold`: Clear IPs for sessions with `last_active_at` before this
|
||||||
|
/// time
|
||||||
|
/// * `limit`: Maximum number of sessions to update in this batch
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns [`Self::Error`] if the underlying repository fails
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
repository_impl!(BrowserSessionRepository:
|
repository_impl!(BrowserSessionRepository:
|
||||||
@@ -394,4 +418,11 @@ repository_impl!(BrowserSessionRepository:
|
|||||||
until: DateTime<Utc>,
|
until: DateTime<Utc>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
|
|
||||||
|
async fn cleanup_inactive_ips(
|
||||||
|
&mut self,
|
||||||
|
since: Option<DateTime<Utc>>,
|
||||||
|
threshold: DateTime<Utc>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -13,11 +13,13 @@ use async_trait::async_trait;
|
|||||||
use mas_storage::queue::{
|
use mas_storage::queue::{
|
||||||
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
|
CleanupConsumedOAuthRefreshTokensJob, CleanupExpiredOAuthAccessTokensJob,
|
||||||
CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob,
|
CleanupFinishedCompatSessionsJob, CleanupFinishedOAuth2SessionsJob,
|
||||||
CleanupFinishedUserSessionsJob, CleanupOAuthAuthorizationGrantsJob,
|
CleanupFinishedUserSessionsJob, CleanupInactiveCompatSessionIpsJob,
|
||||||
CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob, CleanupRevokedOAuthAccessTokensJob,
|
CleanupInactiveOAuth2SessionIpsJob, CleanupInactiveUserSessionIpsJob,
|
||||||
CleanupRevokedOAuthRefreshTokensJob, CleanupUpstreamOAuthLinksJob,
|
CleanupOAuthAuthorizationGrantsJob, CleanupOAuthDeviceCodeGrantsJob, CleanupQueueJobsJob,
|
||||||
CleanupUpstreamOAuthSessionsJob, CleanupUserEmailAuthenticationsJob,
|
CleanupRevokedOAuthAccessTokensJob, CleanupRevokedOAuthRefreshTokensJob,
|
||||||
CleanupUserRecoverySessionsJob, CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
CleanupUpstreamOAuthLinksJob, CleanupUpstreamOAuthSessionsJob,
|
||||||
|
CleanupUserEmailAuthenticationsJob, CleanupUserRecoverySessionsJob,
|
||||||
|
CleanupUserRegistrationsJob, PruneStalePolicyDataJob,
|
||||||
};
|
};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
use ulid::Ulid;
|
use ulid::Ulid;
|
||||||
@@ -789,3 +791,126 @@ impl RunnableJob for PruneStalePolicyDataJob {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob {
|
||||||
|
#[tracing::instrument(name = "job.cleanup_inactive_oauth2_session_ips", skip_all)]
|
||||||
|
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||||
|
// Clear IPs from sessions inactive for 30+ days
|
||||||
|
let threshold = state.clock.now() - chrono::Duration::days(30);
|
||||||
|
let mut total = 0;
|
||||||
|
|
||||||
|
let mut since = None;
|
||||||
|
while !context.cancellation_token.is_cancelled() {
|
||||||
|
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||||
|
|
||||||
|
let (count, last_active_at) = repo
|
||||||
|
.oauth2_session()
|
||||||
|
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
|
||||||
|
.await
|
||||||
|
.map_err(JobError::retry)?;
|
||||||
|
repo.save().await.map_err(JobError::retry)?;
|
||||||
|
|
||||||
|
since = last_active_at;
|
||||||
|
total += count;
|
||||||
|
|
||||||
|
if count != BATCH_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if total == 0 {
|
||||||
|
debug!("no OAuth2 session IPs to clean up");
|
||||||
|
} else {
|
||||||
|
info!(count = total, "cleaned up inactive OAuth2 session IPs");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn timeout(&self) -> Option<Duration> {
|
||||||
|
Some(Duration::from_secs(10 * 60))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RunnableJob for CleanupInactiveCompatSessionIpsJob {
|
||||||
|
#[tracing::instrument(name = "job.cleanup_inactive_compat_session_ips", skip_all)]
|
||||||
|
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||||
|
// Clear IPs from sessions inactive for 30+ days
|
||||||
|
let threshold = state.clock.now() - chrono::Duration::days(30);
|
||||||
|
let mut total = 0;
|
||||||
|
|
||||||
|
let mut since = None;
|
||||||
|
while !context.cancellation_token.is_cancelled() {
|
||||||
|
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||||
|
|
||||||
|
let (count, last_active_at) = repo
|
||||||
|
.compat_session()
|
||||||
|
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
|
||||||
|
.await
|
||||||
|
.map_err(JobError::retry)?;
|
||||||
|
repo.save().await.map_err(JobError::retry)?;
|
||||||
|
|
||||||
|
since = last_active_at;
|
||||||
|
total += count;
|
||||||
|
|
||||||
|
if count != BATCH_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if total == 0 {
|
||||||
|
debug!("no compat session IPs to clean up");
|
||||||
|
} else {
|
||||||
|
info!(count = total, "cleaned up inactive compat session IPs");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn timeout(&self) -> Option<Duration> {
|
||||||
|
Some(Duration::from_secs(10 * 60))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RunnableJob for CleanupInactiveUserSessionIpsJob {
|
||||||
|
#[tracing::instrument(name = "job.cleanup_inactive_user_session_ips", skip_all)]
|
||||||
|
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
|
||||||
|
// Clear IPs from sessions inactive for 30+ days
|
||||||
|
let threshold = state.clock.now() - chrono::Duration::days(30);
|
||||||
|
let mut total = 0;
|
||||||
|
|
||||||
|
let mut since = None;
|
||||||
|
while !context.cancellation_token.is_cancelled() {
|
||||||
|
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||||
|
|
||||||
|
let (count, last_active_at) = repo
|
||||||
|
.browser_session()
|
||||||
|
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
|
||||||
|
.await
|
||||||
|
.map_err(JobError::retry)?;
|
||||||
|
repo.save().await.map_err(JobError::retry)?;
|
||||||
|
|
||||||
|
since = last_active_at;
|
||||||
|
total += count;
|
||||||
|
|
||||||
|
if count != BATCH_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if total == 0 {
|
||||||
|
debug!("no user session IPs to clean up");
|
||||||
|
} else {
|
||||||
|
info!(count = total, "cleaned up inactive user session IPs");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn timeout(&self) -> Option<Duration> {
|
||||||
|
Some(Duration::from_secs(10 * 60))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -158,6 +158,9 @@ pub async fn init(
|
|||||||
.register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
|
.register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
|
||||||
.register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
|
.register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
|
||||||
.register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
|
.register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
|
||||||
|
.register_handler::<mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob>()
|
||||||
|
.register_handler::<mas_storage::queue::CleanupInactiveCompatSessionIpsJob>()
|
||||||
|
.register_handler::<mas_storage::queue::CleanupInactiveUserSessionIpsJob>()
|
||||||
.register_deprecated_queue("cleanup-expired-tokens")
|
.register_deprecated_queue("cleanup-expired-tokens")
|
||||||
.add_schedule(
|
.add_schedule(
|
||||||
"cleanup-revoked-oauth-access-tokens",
|
"cleanup-revoked-oauth-access-tokens",
|
||||||
@@ -260,6 +263,24 @@ pub async fn init(
|
|||||||
// Run once a day
|
// Run once a day
|
||||||
"0 0 2 * * *".parse()?,
|
"0 0 2 * * *".parse()?,
|
||||||
mas_storage::queue::PruneStalePolicyDataJob,
|
mas_storage::queue::PruneStalePolicyDataJob,
|
||||||
|
)
|
||||||
|
.add_schedule(
|
||||||
|
"cleanup-inactive-oauth2-session-ips",
|
||||||
|
// Run this job every hour
|
||||||
|
"0 46 * * * *".parse()?,
|
||||||
|
mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob,
|
||||||
|
)
|
||||||
|
.add_schedule(
|
||||||
|
"cleanup-inactive-compat-session-ips",
|
||||||
|
// Run this job every hour
|
||||||
|
"0 47 * * * *".parse()?,
|
||||||
|
mas_storage::queue::CleanupInactiveCompatSessionIpsJob,
|
||||||
|
)
|
||||||
|
.add_schedule(
|
||||||
|
"cleanup-inactive-user-session-ips",
|
||||||
|
// Run this job every hour
|
||||||
|
"0 48 * * * *".parse()?,
|
||||||
|
mas_storage::queue::CleanupInactiveUserSessionIpsJob,
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(worker)
|
Ok(worker)
|
||||||
|
|||||||
Reference in New Issue
Block a user