Setup job to expire inactive browser sessions

This commit is contained in:
Quentin Gliech
2025-02-12 14:03:11 +01:00
parent dcc23421c9
commit d8b9bb1d9a
3 changed files with 103 additions and 3 deletions

View File

@@ -5,7 +5,8 @@
use chrono::{DateTime, Utc};
use mas_data_model::{
CompatSession, Device, Session, User, UserEmailAuthentication, UserRecoverySession,
BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication,
UserRecoverySession,
};
use serde::{Deserialize, Serialize};
use ulid::Ulid;
@@ -437,3 +438,60 @@ impl ExpireInactiveCompatSessionsJob {
impl InsertableJob for ExpireInactiveCompatSessionsJob {
const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions";
}
/// Expire inactive user sessions
#[derive(Debug, Serialize, Deserialize)]
pub struct ExpireInactiveUserSessionsJob {
threshold: DateTime<Utc>,
after: Option<Ulid>,
}
impl ExpireInactiveUserSessionsJob {
/// Create a new job to expire inactive user/browser sessions
///
/// # Parameters
///
/// * `threshold` - The threshold to expire sessions at
#[must_use]
pub fn new(threshold: DateTime<Utc>) -> Self {
Self {
threshold,
after: None,
}
}
/// Get the threshold to expire sessions at
#[must_use]
pub fn threshold(&self) -> DateTime<Utc> {
self.threshold
}
/// Get the pagination cursor
#[must_use]
pub fn pagination(&self, batch_size: usize) -> Pagination {
let pagination = Pagination::first(batch_size);
if let Some(after) = self.after {
pagination.after(after)
} else {
pagination
}
}
/// Get the next job given the page returned by the database
#[must_use]
pub fn next(&self, page: &Page<BrowserSession>) -> Option<Self> {
if !page.has_next_page {
return None;
}
let last_edge = page.edges.last()?;
Some(Self {
threshold: self.threshold,
after: Some(last_edge.id),
})
}
}
impl InsertableJob for ExpireInactiveUserSessionsJob {
const QUEUE_NAME: &'static str = "expire-inactive-user-sessions";
}

View File

@@ -131,6 +131,7 @@ pub async fn init(
.register_handler::<mas_storage::queue::VerifyEmailJob>()
.register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
.register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
.register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
.add_schedule(
"cleanup-expired-tokens",
"0 0 * * * *".parse()?,

View File

@@ -11,9 +11,10 @@ use mas_storage::{
compat::CompatSessionFilter,
oauth2::OAuth2SessionFilter,
queue::{
ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt,
SyncDevicesJob,
ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob,
ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob,
},
user::BrowserSessionFilter,
};
use crate::{
@@ -146,3 +147,43 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob {
Ok(())
}
}
#[async_trait]
impl RunnableJob for ExpireInactiveUserSessionsJob {
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let clock = state.clock();
let mut rng = state.rng();
let filter = BrowserSessionFilter::new()
.with_last_active_before(self.threshold())
.active_only();
let pagination = self.pagination(100);
let page = repo
.browser_session()
.list(filter, pagination)
.await
.map_err(JobError::retry)?;
if let Some(job) = self.next(&page) {
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
repo.queue_job()
.schedule_job(&mut rng, &clock, job)
.await
.map_err(JobError::retry)?;
}
for edge in page.edges {
repo.browser_session()
.finish(&clock, edge)
.await
.map_err(JobError::retry)?;
}
repo.save().await.map_err(JobError::retry)?;
Ok(())
}
}