diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index faa12afc8..5fe53f7f2 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -5,7 +5,8 @@ use chrono::{DateTime, Utc}; use mas_data_model::{ - CompatSession, Device, Session, User, UserEmailAuthentication, UserRecoverySession, + BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication, + UserRecoverySession, }; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -437,3 +438,60 @@ impl ExpireInactiveCompatSessionsJob { impl InsertableJob for ExpireInactiveCompatSessionsJob { const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions"; } + +/// Expire inactive user sessions +#[derive(Debug, Serialize, Deserialize)] +pub struct ExpireInactiveUserSessionsJob { + threshold: DateTime, + after: Option, +} + +impl ExpireInactiveUserSessionsJob { + /// Create a new job to expire inactive user/browser sessions + /// + /// # Parameters + /// + /// * `threshold` - The threshold to expire sessions at + #[must_use] + pub fn new(threshold: DateTime) -> Self { + Self { + threshold, + after: None, + } + } + + /// Get the threshold to expire sessions at + #[must_use] + pub fn threshold(&self) -> DateTime { + self.threshold + } + + /// Get the pagination cursor + #[must_use] + pub fn pagination(&self, batch_size: usize) -> Pagination { + let pagination = Pagination::first(batch_size); + if let Some(after) = self.after { + pagination.after(after) + } else { + pagination + } + } + + /// Get the next job given the page returned by the database + #[must_use] + pub fn next(&self, page: &Page) -> Option { + if !page.has_next_page { + return None; + } + + let last_edge = page.edges.last()?; + Some(Self { + threshold: self.threshold, + after: Some(last_edge.id), + }) + } +} + +impl InsertableJob for ExpireInactiveUserSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-user-sessions"; +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 527ec1624..ba21086f4 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -131,6 +131,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .add_schedule( "cleanup-expired-tokens", "0 0 * * * *".parse()?, diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs index d3dc2e865..a9e28705c 100644 --- a/crates/tasks/src/sessions.rs +++ b/crates/tasks/src/sessions.rs @@ -11,9 +11,10 @@ use mas_storage::{ compat::CompatSessionFilter, oauth2::OAuth2SessionFilter, queue::{ - ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt, - SyncDevicesJob, + ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, + ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob, }, + user::BrowserSessionFilter, }; use crate::{ @@ -146,3 +147,43 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob { Ok(()) } } + +#[async_trait] +impl RunnableJob for ExpireInactiveUserSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let clock = state.clock(); + let mut rng = state.rng(); + + let filter = BrowserSessionFilter::new() + .with_last_active_before(self.threshold()) + .active_only(); + + let pagination = self.pagination(100); + + let page = repo + .browser_session() + .list(filter, pagination) + .await + .map_err(JobError::retry)?; + + if let Some(job) = self.next(&page) { + tracing::info!("Scheduling job to expire the next batch of inactive sessions"); + repo.queue_job() + .schedule_job(&mut rng, &clock, job) + .await + .map_err(JobError::retry)?; + } + + for edge in page.edges { + repo.browser_session() + .finish(&clock, edge) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +}