Refactor inactive IP cleanup to use pagination
This should avoid dead many dead tuples when processing batches of sessions to cleanup
This commit is contained in:
@@ -1,23 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH to_update AS (\n SELECT oauth2_session_id\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "4b1f70e97a155bf2a352d7f9353c535ea0cc9d3eaf2b35933477c18d8c8bcb2e"
|
||||
}
|
||||
30
crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06.json
generated
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH to_update AS (\n SELECT user_session_id, last_active_at\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING user_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "last_active_at",
|
||||
"type_info": "Timestamptz"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "535225206622b9190ccf42f7d66268818dc84c37b168ab45e582e0a727796a06"
|
||||
}
|
||||
30
crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30.json
generated
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH to_update AS (\n SELECT oauth2_session_id, last_active_at\n FROM oauth2_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE oauth2_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id\n RETURNING oauth2_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "last_active_at",
|
||||
"type_info": "Timestamptz"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "7b06e6f21c69056b526538f06f06268efd13d7af3cecb452168d514a379fec30"
|
||||
}
|
||||
30
crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json
generated
Normal file
30
crates/storage-pg/.sqlx/query-926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60.json
generated
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH to_update AS (\n SELECT compat_session_id, last_active_at\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND ($1::timestamptz IS NULL OR last_active_at >= $1)\n AND last_active_at < $2\n ORDER BY last_active_at ASC\n LIMIT $3\n FOR UPDATE\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING compat_sessions.last_active_at\n )\n SELECT COUNT(*) AS \"count!\", MAX(last_active_at) AS last_active_at FROM updated\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "last_active_at",
|
||||
"type_info": "Timestamptz"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "926cb81dc7931890a02c5a372aef79832e5d0748dad18ab44c6671f3196d6f60"
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH to_update AS (\n SELECT user_session_id\n FROM user_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE user_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE user_sessions.user_session_id = to_update.user_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "bf3f6a2ac0f3371e32603489744b94dc8713a2fb2c48f8e437ed8ee2dc64f70c"
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n WITH to_update AS (\n SELECT compat_session_id\n FROM compat_sessions\n WHERE last_active_ip IS NOT NULL\n AND last_active_at IS NOT NULL\n AND last_active_at < $1\n LIMIT $2\n ),\n updated AS (\n UPDATE compat_sessions\n SET last_active_ip = NULL\n FROM to_update\n WHERE compat_sessions.compat_session_id = to_update.compat_session_id\n RETURNING 1\n )\n SELECT COUNT(*) AS \"count!\" FROM updated\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count!",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "d01cb56a7b231393a0725637920afe41b64d198729237dd2cef060992ebd0233"
|
||||
}
|
||||
@@ -764,6 +764,7 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
since = since.map(tracing::field::display),
|
||||
threshold = %threshold,
|
||||
limit = limit,
|
||||
),
|
||||
@@ -771,28 +772,33 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
|
||||
)]
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error> {
|
||||
let res = sqlx::query_scalar!(
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
WITH to_update AS (
|
||||
SELECT compat_session_id
|
||||
SELECT compat_session_id, last_active_at
|
||||
FROM compat_sessions
|
||||
WHERE last_active_ip IS NOT NULL
|
||||
AND last_active_at IS NOT NULL
|
||||
AND last_active_at < $1
|
||||
LIMIT $2
|
||||
AND ($1::timestamptz IS NULL OR last_active_at >= $1)
|
||||
AND last_active_at < $2
|
||||
ORDER BY last_active_at ASC
|
||||
LIMIT $3
|
||||
FOR UPDATE
|
||||
),
|
||||
updated AS (
|
||||
UPDATE compat_sessions
|
||||
SET last_active_ip = NULL
|
||||
FROM to_update
|
||||
WHERE compat_sessions.compat_session_id = to_update.compat_session_id
|
||||
RETURNING 1
|
||||
RETURNING compat_sessions.last_active_at
|
||||
)
|
||||
SELECT COUNT(*) AS "count!" FROM updated
|
||||
SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
|
||||
"#,
|
||||
since,
|
||||
threshold,
|
||||
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||
)
|
||||
@@ -800,6 +806,9 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
|
||||
.fetch_one(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
Ok(res.try_into().unwrap_or(usize::MAX))
|
||||
Ok((
|
||||
res.count.try_into().unwrap_or(usize::MAX),
|
||||
res.last_active_at,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -658,6 +658,7 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> {
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
since = since.map(tracing::field::display),
|
||||
threshold = %threshold,
|
||||
limit = limit,
|
||||
),
|
||||
@@ -665,28 +666,33 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> {
|
||||
)]
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error> {
|
||||
let res = sqlx::query_scalar!(
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
WITH to_update AS (
|
||||
SELECT oauth2_session_id
|
||||
SELECT oauth2_session_id, last_active_at
|
||||
FROM oauth2_sessions
|
||||
WHERE last_active_ip IS NOT NULL
|
||||
AND last_active_at IS NOT NULL
|
||||
AND last_active_at < $1
|
||||
LIMIT $2
|
||||
AND ($1::timestamptz IS NULL OR last_active_at >= $1)
|
||||
AND last_active_at < $2
|
||||
ORDER BY last_active_at ASC
|
||||
LIMIT $3
|
||||
FOR UPDATE
|
||||
),
|
||||
updated AS (
|
||||
UPDATE oauth2_sessions
|
||||
SET last_active_ip = NULL
|
||||
FROM to_update
|
||||
WHERE oauth2_sessions.oauth2_session_id = to_update.oauth2_session_id
|
||||
RETURNING 1
|
||||
RETURNING oauth2_sessions.last_active_at
|
||||
)
|
||||
SELECT COUNT(*) AS "count!" FROM updated
|
||||
SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
|
||||
"#,
|
||||
since,
|
||||
threshold,
|
||||
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||
)
|
||||
@@ -694,6 +700,9 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> {
|
||||
.fetch_one(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
Ok(res.try_into().unwrap_or(usize::MAX))
|
||||
Ok((
|
||||
res.count.try_into().unwrap_or(usize::MAX),
|
||||
res.last_active_at,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -713,6 +713,7 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> {
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
since = since.map(tracing::field::display),
|
||||
threshold = %threshold,
|
||||
limit = limit,
|
||||
),
|
||||
@@ -720,28 +721,33 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> {
|
||||
)]
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error> {
|
||||
let res = sqlx::query_scalar!(
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
WITH to_update AS (
|
||||
SELECT user_session_id
|
||||
SELECT user_session_id, last_active_at
|
||||
FROM user_sessions
|
||||
WHERE last_active_ip IS NOT NULL
|
||||
AND last_active_at IS NOT NULL
|
||||
AND last_active_at < $1
|
||||
LIMIT $2
|
||||
AND ($1::timestamptz IS NULL OR last_active_at >= $1)
|
||||
AND last_active_at < $2
|
||||
ORDER BY last_active_at ASC
|
||||
LIMIT $3
|
||||
FOR UPDATE
|
||||
),
|
||||
updated AS (
|
||||
UPDATE user_sessions
|
||||
SET last_active_ip = NULL
|
||||
FROM to_update
|
||||
WHERE user_sessions.user_session_id = to_update.user_session_id
|
||||
RETURNING 1
|
||||
RETURNING user_sessions.last_active_at
|
||||
)
|
||||
SELECT COUNT(*) AS "count!" FROM updated
|
||||
SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
|
||||
"#,
|
||||
since,
|
||||
threshold,
|
||||
i64::try_from(limit).unwrap_or(i64::MAX),
|
||||
)
|
||||
@@ -749,6 +755,9 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> {
|
||||
.fetch_one(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
Ok(res.try_into().unwrap_or(usize::MAX))
|
||||
Ok((
|
||||
res.count.try_into().unwrap_or(usize::MAX),
|
||||
res.last_active_at,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -390,10 +390,13 @@ pub trait CompatSessionRepository: Send + Sync {
|
||||
/// Clear IP addresses from sessions inactive since the threshold
|
||||
///
|
||||
/// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is
|
||||
/// before the threshold. Returns the number of sessions affected.
|
||||
/// before the threshold. Returns the number of sessions affected and the
|
||||
/// last `last_active_at` timestamp processed for pagination.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `since`: Only process sessions with `last_active_at` at or after this
|
||||
/// timestamp (exclusive). If `None`, starts from the beginning.
|
||||
/// * `threshold`: Clear IPs for sessions with `last_active_at` before this
|
||||
/// time
|
||||
/// * `limit`: Maximum number of sessions to update in this batch
|
||||
@@ -403,9 +406,10 @@ pub trait CompatSessionRepository: Send + Sync {
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error>;
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
}
|
||||
|
||||
repository_impl!(CompatSessionRepository:
|
||||
@@ -468,7 +472,8 @@ repository_impl!(CompatSessionRepository:
|
||||
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error>;
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
);
|
||||
|
||||
@@ -489,10 +489,13 @@ pub trait OAuth2SessionRepository: Send + Sync {
|
||||
/// Clear IP addresses from sessions inactive since the threshold
|
||||
///
|
||||
/// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is
|
||||
/// before the threshold. Returns the number of sessions affected.
|
||||
/// before the threshold. Returns the number of sessions affected and the
|
||||
/// last `last_active_at` timestamp processed for pagination.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `since`: Only process sessions with `last_active_at` at or after this
|
||||
/// timestamp (exclusive). If `None`, starts from the beginning.
|
||||
/// * `threshold`: Clear IPs for sessions with `last_active_at` before this
|
||||
/// time
|
||||
/// * `limit`: Maximum number of sessions to update in this batch
|
||||
@@ -502,9 +505,10 @@ pub trait OAuth2SessionRepository: Send + Sync {
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error>;
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
}
|
||||
|
||||
repository_impl!(OAuth2SessionRepository:
|
||||
@@ -580,7 +584,8 @@ repository_impl!(OAuth2SessionRepository:
|
||||
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error>;
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
);
|
||||
|
||||
@@ -336,10 +336,13 @@ pub trait BrowserSessionRepository: Send + Sync {
|
||||
/// Clear IP addresses from sessions inactive since the threshold
|
||||
///
|
||||
/// Sets `last_active_ip` to `NULL` for sessions where `last_active_at` is
|
||||
/// before the threshold. Returns the number of sessions affected.
|
||||
/// before the threshold. Returns the number of sessions affected and the
|
||||
/// last `last_active_at` timestamp processed for pagination.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `since`: Only process sessions with `last_active_at` at or after this
|
||||
/// timestamp (exclusive). If `None`, starts from the beginning.
|
||||
/// * `threshold`: Clear IPs for sessions with `last_active_at` before this
|
||||
/// time
|
||||
/// * `limit`: Maximum number of sessions to update in this batch
|
||||
@@ -349,9 +352,10 @@ pub trait BrowserSessionRepository: Send + Sync {
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error>;
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
}
|
||||
|
||||
repository_impl!(BrowserSessionRepository:
|
||||
@@ -418,7 +422,8 @@ repository_impl!(BrowserSessionRepository:
|
||||
|
||||
async fn cleanup_inactive_ips(
|
||||
&mut self,
|
||||
since: Option<DateTime<Utc>>,
|
||||
threshold: DateTime<Utc>,
|
||||
limit: usize,
|
||||
) -> Result<usize, Self::Error>;
|
||||
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
|
||||
);
|
||||
|
||||
@@ -800,16 +800,18 @@ impl RunnableJob for CleanupInactiveOAuth2SessionIpsJob {
|
||||
let threshold = state.clock.now() - chrono::Duration::days(30);
|
||||
let mut total = 0;
|
||||
|
||||
let mut since = None;
|
||||
while !context.cancellation_token.is_cancelled() {
|
||||
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||
|
||||
let count = repo
|
||||
let (count, last_active_at) = repo
|
||||
.oauth2_session()
|
||||
.cleanup_inactive_ips(threshold, BATCH_SIZE)
|
||||
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
|
||||
since = last_active_at;
|
||||
total += count;
|
||||
|
||||
if count != BATCH_SIZE {
|
||||
@@ -839,16 +841,18 @@ impl RunnableJob for CleanupInactiveCompatSessionIpsJob {
|
||||
let threshold = state.clock.now() - chrono::Duration::days(30);
|
||||
let mut total = 0;
|
||||
|
||||
let mut since = None;
|
||||
while !context.cancellation_token.is_cancelled() {
|
||||
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||
|
||||
let count = repo
|
||||
let (count, last_active_at) = repo
|
||||
.compat_session()
|
||||
.cleanup_inactive_ips(threshold, BATCH_SIZE)
|
||||
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
|
||||
since = last_active_at;
|
||||
total += count;
|
||||
|
||||
if count != BATCH_SIZE {
|
||||
@@ -878,16 +882,18 @@ impl RunnableJob for CleanupInactiveUserSessionIpsJob {
|
||||
let threshold = state.clock.now() - chrono::Duration::days(30);
|
||||
let mut total = 0;
|
||||
|
||||
let mut since = None;
|
||||
while !context.cancellation_token.is_cancelled() {
|
||||
let mut repo = state.repository().await.map_err(JobError::retry)?;
|
||||
|
||||
let count = repo
|
||||
let (count, last_active_at) = repo
|
||||
.browser_session()
|
||||
.cleanup_inactive_ips(threshold, BATCH_SIZE)
|
||||
.cleanup_inactive_ips(since, threshold, BATCH_SIZE)
|
||||
.await
|
||||
.map_err(JobError::retry)?;
|
||||
repo.save().await.map_err(JobError::retry)?;
|
||||
|
||||
since = last_active_at;
|
||||
total += count;
|
||||
|
||||
if count != BATCH_SIZE {
|
||||
|
||||
Reference in New Issue
Block a user