From 70936ba0f749b8860eefd0b5c80a811134ca60c4 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 12 Feb 2025 13:00:22 +0100 Subject: [PATCH 1/8] Allow filtering OAuth sessions with any/no user --- crates/storage-pg/src/oauth2/mod.rs | 2 +- crates/storage-pg/src/oauth2/session.rs | 7 +++++++ crates/storage/src/oauth2/session.rs | 23 +++++++++++++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/crates/storage-pg/src/oauth2/mod.rs b/crates/storage-pg/src/oauth2/mod.rs index 54ce32fb7..556d4314d 100644 --- a/crates/storage-pg/src/oauth2/mod.rs +++ b/crates/storage-pg/src/oauth2/mod.rs @@ -525,7 +525,7 @@ mod tests { let pagination = Pagination::first(10); // First, list all the sessions - let filter = OAuth2SessionFilter::new(); + let filter = OAuth2SessionFilter::new().for_any_user(); let list = repo .oauth2_session() .list(filter, pagination) diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index 961094821..869e0c546 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -125,6 +125,13 @@ impl Filter for OAuth2SessionFilter<'_> { let scope: Vec = scope.iter().map(|s| s.as_str().to_owned()).collect(); Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)).contains(scope) })) + .add_option(self.any_user().map(|any_user| { + if any_user { + Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)).is_not_null() + } else { + Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)).is_null() + } + })) .add_option(self.last_active_after().map(|last_active_after| { Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)) .gt(last_active_after) diff --git a/crates/storage/src/oauth2/session.rs b/crates/storage/src/oauth2/session.rs index 231ca3138..6c1214517 100644 --- a/crates/storage/src/oauth2/session.rs +++ b/crates/storage/src/oauth2/session.rs @@ -35,6 +35,7 @@ impl OAuth2SessionState { #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub struct OAuth2SessionFilter<'a> { user: Option<&'a User>, + any_user: Option, browser_session: Option<&'a BrowserSession>, device: Option<&'a Device>, client: Option<&'a Client>, @@ -66,6 +67,28 @@ impl<'a> OAuth2SessionFilter<'a> { self.user } + /// List sessions which belong to any user + #[must_use] + pub fn for_any_user(mut self) -> Self { + self.any_user = Some(true); + self + } + + /// List sessions which belong to no user + #[must_use] + pub fn for_no_user(mut self) -> Self { + self.any_user = Some(false); + self + } + + /// Get the 'any user' filter + /// + /// Returns [`None`] if no 'any user' filter was set + #[must_use] + pub fn any_user(&self) -> Option { + self.any_user + } + /// List sessions started by a specific browser session #[must_use] pub fn for_browser_session(mut self, browser_session: &'a BrowserSession) -> Self { From 0a624eb92c9046e5e7cd16edd9ff8fd3dcc8b131 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 12 Feb 2025 13:47:48 +0100 Subject: [PATCH 2/8] Setup a job to expire OAuth 2.0 sessions --- crates/storage/src/queue/tasks.rs | 70 +++++++++++++++++++++++++- crates/tasks/src/lib.rs | 2 + crates/tasks/src/sessions.rs | 82 +++++++++++++++++++++++++++++++ 3 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 crates/tasks/src/sessions.rs diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 3e3eec5e6..07e40c85f 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -3,11 +3,13 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -use mas_data_model::{Device, User, UserEmailAuthentication, UserRecoverySession}; +use chrono::{DateTime, Utc}; +use mas_data_model::{Device, Session, User, UserEmailAuthentication, UserRecoverySession}; use serde::{Deserialize, Serialize}; use ulid::Ulid; use super::InsertableJob; +use crate::{Page, Pagination}; /// This is the previous iteration of the email verification job. It has been /// replaced by [`SendEmailAuthenticationCodeJob`]. This struct is kept to be @@ -193,6 +195,15 @@ impl SyncDevicesJob { Self { user_id: user.id } } + /// Create a new job to sync the list of devices of a user with the + /// homeserver for the given user ID + /// + /// This is useful to use in cases where the [`User`] object isn't loaded + #[must_use] + pub fn new_for_id(user_id: Ulid) -> Self { + Self { user_id } + } + /// The ID of the user to sync the devices for #[must_use] pub fn user_id(&self) -> Ulid { @@ -310,3 +321,60 @@ pub struct CleanupExpiredTokensJob; impl InsertableJob for CleanupExpiredTokensJob { const QUEUE_NAME: &'static str = "cleanup-expired-tokens"; } + +/// Expire inactive OAuth 2.0 sessions +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExpireInactiveOAuthSessionsJob { + threshold: DateTime, + after: Option, +} + +impl ExpireInactiveOAuthSessionsJob { + /// Create a new job to expire inactive OAuth 2.0 sessions + /// + /// # Parameters + /// + /// * `threshold` - The threshold to expire sessions at + #[must_use] + pub fn new(threshold: DateTime) -> Self { + Self { + threshold, + after: None, + } + } + + /// Get the threshold to expire sessions at + #[must_use] + pub fn threshold(&self) -> DateTime { + self.threshold + } + + /// Get the pagination cursor + #[must_use] + pub fn pagination(&self, batch_size: usize) -> Pagination { + let pagination = Pagination::first(batch_size); + if let Some(after) = self.after { + pagination.after(after) + } else { + pagination + } + } + + /// Get the next job given the page returned by the database + #[must_use] + pub fn next(&self, page: &Page) -> Option { + if !page.has_next_page { + return None; + } + + let last_edge = page.edges.last()?; + Some(Self { + threshold: self.threshold, + after: Some(last_edge.id), + }) + } +} + +impl InsertableJob for ExpireInactiveOAuthSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-oauth-sessions"; +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 4ee635266..7edcbba55 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -22,6 +22,7 @@ mod email; mod matrix; mod new_queue; mod recovery; +mod sessions; mod user; static METER: LazyLock = LazyLock::new(|| { @@ -128,6 +129,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .add_schedule( "cleanup-expired-tokens", "0 0 * * * *".parse()?, diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs new file mode 100644 index 000000000..5061f2d93 --- /dev/null +++ b/crates/tasks/src/sessions.rs @@ -0,0 +1,82 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use std::collections::HashSet; + +use async_trait::async_trait; +use chrono::Duration; +use mas_storage::{ + oauth2::OAuth2SessionFilter, + queue::{ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt, SyncDevicesJob}, +}; + +use crate::{ + new_queue::{JobContext, JobError, RunnableJob}, + State, +}; + +#[async_trait] +impl RunnableJob for ExpireInactiveOAuthSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let clock = state.clock(); + let mut rng = state.rng(); + let mut users_synced = HashSet::new(); + + // This delay is used to space out the device sync jobs + // We add 10 seconds between each device sync, meaning that it will spread out + // the syncs over ~16 minutes max if we get a full batch of 100 users + let mut delay = Duration::minutes(1); + + let filter = OAuth2SessionFilter::new() + .with_last_active_before(self.threshold()) + .for_any_user() + .active_only(); + + let pagination = self.pagination(100); + + let page = repo + .oauth2_session() + .list(filter, pagination) + .await + .map_err(JobError::retry)?; + + if let Some(job) = self.next(&page) { + tracing::info!("Scheduling job to expire the next batch of inactive sessions"); + repo.queue_job() + .schedule_job(&mut rng, &clock, job) + .await + .map_err(JobError::retry)?; + } + + for edge in page.edges { + if let Some(user_id) = edge.user_id { + let inserted = users_synced.insert(user_id); + if inserted { + tracing::info!(user.id = %user_id, "Scheduling devices sync for user"); + repo.queue_job() + .schedule_job_later( + &mut rng, + &clock, + SyncDevicesJob::new_for_id(user_id), + clock.now() + delay, + ) + .await + .map_err(JobError::retry)?; + delay += Duration::seconds(10); + } + } + + repo.oauth2_session() + .finish(&clock, edge) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +} From dcc23421c97c0e7ae809f1b1caaf5ec58df98e90 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 12 Feb 2025 13:57:36 +0100 Subject: [PATCH 3/8] Setup a job to expire compatibility sessions --- crates/storage/src/queue/tasks.rs | 61 ++++++++++++++++++++++++++- crates/tasks/src/lib.rs | 1 + crates/tasks/src/sessions.rs | 68 ++++++++++++++++++++++++++++++- 3 files changed, 128 insertions(+), 2 deletions(-) diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 07e40c85f..faa12afc8 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -4,7 +4,9 @@ // Please see LICENSE in the repository root for full details. use chrono::{DateTime, Utc}; -use mas_data_model::{Device, Session, User, UserEmailAuthentication, UserRecoverySession}; +use mas_data_model::{ + CompatSession, Device, Session, User, UserEmailAuthentication, UserRecoverySession, +}; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -378,3 +380,60 @@ impl ExpireInactiveOAuthSessionsJob { impl InsertableJob for ExpireInactiveOAuthSessionsJob { const QUEUE_NAME: &'static str = "expire-inactive-oauth-sessions"; } + +/// Expire inactive compatibility sessions +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExpireInactiveCompatSessionsJob { + threshold: DateTime, + after: Option, +} + +impl ExpireInactiveCompatSessionsJob { + /// Create a new job to expire inactive compatibility sessions + /// + /// # Parameters + /// + /// * `threshold` - The threshold to expire sessions at + #[must_use] + pub fn new(threshold: DateTime) -> Self { + Self { + threshold, + after: None, + } + } + + /// Get the threshold to expire sessions at + #[must_use] + pub fn threshold(&self) -> DateTime { + self.threshold + } + + /// Get the pagination cursor + #[must_use] + pub fn pagination(&self, batch_size: usize) -> Pagination { + let pagination = Pagination::first(batch_size); + if let Some(after) = self.after { + pagination.after(after) + } else { + pagination + } + } + + /// Get the next job given the page returned by the database + #[must_use] + pub fn next(&self, page: &Page) -> Option { + if !page.has_next_page { + return None; + } + + let last_edge = page.edges.last()?; + Some(Self { + threshold: self.threshold, + after: Some(last_edge.id), + }) + } +} + +impl InsertableJob for ExpireInactiveCompatSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions"; +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 7edcbba55..527ec1624 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -129,6 +129,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .add_schedule( "cleanup-expired-tokens", diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs index 5061f2d93..d3dc2e865 100644 --- a/crates/tasks/src/sessions.rs +++ b/crates/tasks/src/sessions.rs @@ -8,8 +8,12 @@ use std::collections::HashSet; use async_trait::async_trait; use chrono::Duration; use mas_storage::{ + compat::CompatSessionFilter, oauth2::OAuth2SessionFilter, - queue::{ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt, SyncDevicesJob}, + queue::{ + ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt, + SyncDevicesJob, + }, }; use crate::{ @@ -80,3 +84,65 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob { Ok(()) } } + +#[async_trait] +impl RunnableJob for ExpireInactiveCompatSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let clock = state.clock(); + let mut rng = state.rng(); + let mut users_synced = HashSet::new(); + + // This delay is used to space out the device sync jobs + // We add 10 seconds between each device sync, meaning that it will spread out + // the syncs over ~16 minutes max if we get a full batch of 100 users + let mut delay = Duration::minutes(1); + + let filter = CompatSessionFilter::new() + .with_last_active_before(self.threshold()) + .active_only(); + + let pagination = self.pagination(100); + + let page = repo + .compat_session() + .list(filter, pagination) + .await + .map_err(JobError::retry)? + .map(|(c, _)| c); + + if let Some(job) = self.next(&page) { + tracing::info!("Scheduling job to expire the next batch of inactive sessions"); + repo.queue_job() + .schedule_job(&mut rng, &clock, job) + .await + .map_err(JobError::retry)?; + } + + for edge in page.edges { + let inserted = users_synced.insert(edge.user_id); + if inserted { + tracing::info!(user.id = %edge.user_id, "Scheduling devices sync for user"); + repo.queue_job() + .schedule_job_later( + &mut rng, + &clock, + SyncDevicesJob::new_for_id(edge.user_id), + clock.now() + delay, + ) + .await + .map_err(JobError::retry)?; + delay += Duration::seconds(10); + } + + repo.compat_session() + .finish(&clock, edge) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +} From d8b9bb1d9a76fe17070edd4118508e2ec36bc651 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 12 Feb 2025 14:03:11 +0100 Subject: [PATCH 4/8] Setup job to expire inactive browser sessions --- crates/storage/src/queue/tasks.rs | 60 ++++++++++++++++++++++++++++++- crates/tasks/src/lib.rs | 1 + crates/tasks/src/sessions.rs | 45 +++++++++++++++++++++-- 3 files changed, 103 insertions(+), 3 deletions(-) diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index faa12afc8..5fe53f7f2 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -5,7 +5,8 @@ use chrono::{DateTime, Utc}; use mas_data_model::{ - CompatSession, Device, Session, User, UserEmailAuthentication, UserRecoverySession, + BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication, + UserRecoverySession, }; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -437,3 +438,60 @@ impl ExpireInactiveCompatSessionsJob { impl InsertableJob for ExpireInactiveCompatSessionsJob { const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions"; } + +/// Expire inactive user sessions +#[derive(Debug, Serialize, Deserialize)] +pub struct ExpireInactiveUserSessionsJob { + threshold: DateTime, + after: Option, +} + +impl ExpireInactiveUserSessionsJob { + /// Create a new job to expire inactive user/browser sessions + /// + /// # Parameters + /// + /// * `threshold` - The threshold to expire sessions at + #[must_use] + pub fn new(threshold: DateTime) -> Self { + Self { + threshold, + after: None, + } + } + + /// Get the threshold to expire sessions at + #[must_use] + pub fn threshold(&self) -> DateTime { + self.threshold + } + + /// Get the pagination cursor + #[must_use] + pub fn pagination(&self, batch_size: usize) -> Pagination { + let pagination = Pagination::first(batch_size); + if let Some(after) = self.after { + pagination.after(after) + } else { + pagination + } + } + + /// Get the next job given the page returned by the database + #[must_use] + pub fn next(&self, page: &Page) -> Option { + if !page.has_next_page { + return None; + } + + let last_edge = page.edges.last()?; + Some(Self { + threshold: self.threshold, + after: Some(last_edge.id), + }) + } +} + +impl InsertableJob for ExpireInactiveUserSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-user-sessions"; +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 527ec1624..ba21086f4 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -131,6 +131,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .add_schedule( "cleanup-expired-tokens", "0 0 * * * *".parse()?, diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs index d3dc2e865..a9e28705c 100644 --- a/crates/tasks/src/sessions.rs +++ b/crates/tasks/src/sessions.rs @@ -11,9 +11,10 @@ use mas_storage::{ compat::CompatSessionFilter, oauth2::OAuth2SessionFilter, queue::{ - ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt, - SyncDevicesJob, + ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, + ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob, }, + user::BrowserSessionFilter, }; use crate::{ @@ -146,3 +147,43 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob { Ok(()) } } + +#[async_trait] +impl RunnableJob for ExpireInactiveUserSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let clock = state.clock(); + let mut rng = state.rng(); + + let filter = BrowserSessionFilter::new() + .with_last_active_before(self.threshold()) + .active_only(); + + let pagination = self.pagination(100); + + let page = repo + .browser_session() + .list(filter, pagination) + .await + .map_err(JobError::retry)?; + + if let Some(job) = self.next(&page) { + tracing::info!("Scheduling job to expire the next batch of inactive sessions"); + repo.queue_job() + .schedule_job(&mut rng, &clock, job) + .await + .map_err(JobError::retry)?; + } + + for edge in page.edges { + repo.browser_session() + .finish(&clock, edge) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +} From b40fcdd7120ef31720294ec4692a16ecffd9c848 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 12 Feb 2025 16:26:50 +0100 Subject: [PATCH 5/8] Experimental feature to timeout inactive sessions --- crates/cli/src/commands/server.rs | 1 + crates/cli/src/commands/worker.rs | 1 + crates/cli/src/util.rs | 12 ++++- crates/config/src/sections/experimental.rs | 38 ++++++++++++++- crates/data-model/src/lib.rs | 2 +- crates/data-model/src/site_config.rs | 10 ++++ crates/handlers/src/test_utils.rs | 1 + crates/storage/src/queue/tasks.rs | 11 +++++ crates/tasks/src/lib.rs | 17 +++++++ crates/tasks/src/sessions.rs | 54 +++++++++++++++++++++- docs/config.schema.json | 39 ++++++++++++++++ 11 files changed, 181 insertions(+), 5 deletions(-) diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index e56fd087e..c0f4f5108 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -166,6 +166,7 @@ impl Options { &mailer, homeserver_connection.clone(), url_builder.clone(), + &site_config, shutdown.soft_shutdown_token(), shutdown.task_tracker(), ) diff --git a/crates/cli/src/commands/worker.rs b/crates/cli/src/commands/worker.rs index 8d2b4cd33..187d6b7cb 100644 --- a/crates/cli/src/commands/worker.rs +++ b/crates/cli/src/commands/worker.rs @@ -73,6 +73,7 @@ impl Options { &mailer, conn, url_builder, + &site_config, shutdown.soft_shutdown_token(), shutdown.task_tracker(), ) diff --git a/crates/cli/src/util.rs b/crates/cli/src/util.rs index 02a03b0dc..b5030eb2f 100644 --- a/crates/cli/src/util.rs +++ b/crates/cli/src/util.rs @@ -12,7 +12,7 @@ use mas_config::{ EmailTransportKind, ExperimentalConfig, MatrixConfig, PasswordsConfig, PolicyConfig, TemplatesConfig, }; -use mas_data_model::SiteConfig; +use mas_data_model::{SessionExpirationConfig, SiteConfig}; use mas_email::{MailTransport, Mailer}; use mas_handlers::passwords::PasswordManager; use mas_policy::PolicyFactory; @@ -180,6 +180,15 @@ pub fn site_config_from_config( captcha_config: &CaptchaConfig, ) -> Result { let captcha = captcha_config_from_config(captcha_config)?; + let session_expiration = experimental_config + .inactive_session_expiration + .as_ref() + .map(|c| SessionExpirationConfig { + oauth_session_inactivity_ttl: c.expire_oauth_sessions.then_some(c.ttl), + compat_session_inactivity_ttl: c.expire_compat_sessions.then_some(c.ttl), + user_session_inactivity_ttl: c.expire_user_sessions.then_some(c.ttl), + }); + Ok(SiteConfig { access_token_ttl: experimental_config.access_token_ttl, compat_token_ttl: experimental_config.compat_token_ttl, @@ -198,6 +207,7 @@ pub fn site_config_from_config( && account_config.password_recovery_enabled, captcha, minimum_password_complexity: password_config.minimum_complexity(), + session_expiration, }) } diff --git a/crates/config/src/sections/experimental.rs b/crates/config/src/sections/experimental.rs index 2c86772a3..17ffa6c4d 100644 --- a/crates/config/src/sections/experimental.rs +++ b/crates/config/src/sections/experimental.rs @@ -11,6 +11,10 @@ use serde_with::serde_as; use crate::ConfigurationSection; +fn default_true() -> bool { + true +} + fn default_token_ttl() -> Duration { Duration::microseconds(5 * 60 * 1000 * 1000) } @@ -19,11 +23,32 @@ fn is_default_token_ttl(value: &Duration) -> bool { *value == default_token_ttl() } +/// Configuration options for the inactive session expiration feature +#[serde_as] +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct InactiveSessionExpirationConfig { + /// Time after which an inactive session is automatically finished + #[schemars(with = "u64", range(min = 600, max = 7_776_000))] + #[serde_as(as = "serde_with::DurationSeconds")] + pub ttl: Duration, + + /// Should compatibility sessions expire after inactivity + #[serde(default = "default_true")] + pub expire_compat_sessions: bool, + + /// Should OAuth 2.0 sessions expire after inactivity + #[serde(default = "default_true")] + pub expire_oauth_sessions: bool, + + /// Should user sessions expire after inactivity + #[serde(default = "default_true")] + pub expire_user_sessions: bool, +} + /// Configuration sections for experimental options /// /// Do not change these options unless you know what you are doing. #[serde_as] -#[allow(clippy::struct_excessive_bools)] #[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] pub struct ExperimentalConfig { /// Time-to-live of access tokens in seconds. Defaults to 5 minutes. @@ -44,6 +69,12 @@ pub struct ExperimentalConfig { )] #[serde_as(as = "serde_with::DurationSeconds")] pub compat_token_ttl: Duration, + + /// Experimetal feature to automatically expire inactive sessions + /// + /// Disabled by default + #[serde(skip_serializing_if = "Option::is_none")] + pub inactive_session_expiration: Option, } impl Default for ExperimentalConfig { @@ -51,13 +82,16 @@ impl Default for ExperimentalConfig { Self { access_token_ttl: default_token_ttl(), compat_token_ttl: default_token_ttl(), + inactive_session_expiration: None, } } } impl ExperimentalConfig { pub(crate) fn is_default(&self) -> bool { - is_default_token_ttl(&self.access_token_ttl) && is_default_token_ttl(&self.compat_token_ttl) + is_default_token_ttl(&self.access_token_ttl) + && is_default_token_ttl(&self.compat_token_ttl) + && self.inactive_session_expiration.is_none() } } diff --git a/crates/data-model/src/lib.rs b/crates/data-model/src/lib.rs index 19a81f098..b26f74f1b 100644 --- a/crates/data-model/src/lib.rs +++ b/crates/data-model/src/lib.rs @@ -32,7 +32,7 @@ pub use self::{ AuthorizationCode, AuthorizationGrant, AuthorizationGrantStage, Client, DeviceCodeGrant, DeviceCodeGrantState, InvalidRedirectUriError, JwksOrJwksUri, Pkce, Session, SessionState, }, - site_config::{CaptchaConfig, CaptchaService, SiteConfig}, + site_config::{CaptchaConfig, CaptchaService, SessionExpirationConfig, SiteConfig}, tokens::{ AccessToken, AccessTokenState, RefreshToken, RefreshTokenState, TokenFormatError, TokenType, }, diff --git a/crates/data-model/src/site_config.rs b/crates/data-model/src/site_config.rs index 5aba98a9c..0e09f8a31 100644 --- a/crates/data-model/src/site_config.rs +++ b/crates/data-model/src/site_config.rs @@ -28,6 +28,14 @@ pub struct CaptchaConfig { pub secret_key: String, } +/// Automatic session expiration configuration +#[derive(Debug, Clone)] +pub struct SessionExpirationConfig { + pub user_session_inactivity_ttl: Option, + pub oauth_session_inactivity_ttl: Option, + pub compat_session_inactivity_ttl: Option, +} + /// Random site configuration we want accessible in various places. #[allow(clippy::struct_excessive_bools)] #[derive(Debug, Clone)] @@ -74,4 +82,6 @@ pub struct SiteConfig { /// Minimum password complexity, between 0 and 4. /// This is a score from zxcvbn. pub minimum_password_complexity: u8, + + pub session_expiration: Option, } diff --git a/crates/handlers/src/test_utils.rs b/crates/handlers/src/test_utils.rs index 4e69ab5df..b295fcaa4 100644 --- a/crates/handlers/src/test_utils.rs +++ b/crates/handlers/src/test_utils.rs @@ -139,6 +139,7 @@ pub fn test_site_config() -> SiteConfig { account_recovery_allowed: true, captcha: None, minimum_password_complexity: 1, + session_expiration: None, } } diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 5fe53f7f2..2172edfdc 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -325,6 +325,17 @@ impl InsertableJob for CleanupExpiredTokensJob { const QUEUE_NAME: &'static str = "cleanup-expired-tokens"; } +/// Scheduled job to expire inactive sessions +/// +/// This job will trigger jobs to expire inactive compat, oauth and user +/// sessions. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExpireInactiveSessionsJob; + +impl InsertableJob for ExpireInactiveSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-sessions"; +} + /// Expire inactive OAuth 2.0 sessions #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ExpireInactiveOAuthSessionsJob { diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index ba21086f4..d95941b8a 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, LazyLock}; +use mas_data_model::SiteConfig; use mas_email::Mailer; use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; @@ -41,6 +42,7 @@ struct State { clock: SystemClock, homeserver: Arc>, url_builder: UrlBuilder, + site_config: SiteConfig, } impl State { @@ -50,6 +52,7 @@ impl State { mailer: Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, + site_config: SiteConfig, ) -> Self { Self { pool, @@ -57,6 +60,7 @@ impl State { clock, homeserver: Arc::new(homeserver), url_builder, + site_config, } } @@ -94,6 +98,10 @@ impl State { pub fn url_builder(&self) -> &UrlBuilder { &self.url_builder } + + pub fn site_config(&self) -> &SiteConfig { + &self.site_config + } } /// Initialise the workers. @@ -106,6 +114,7 @@ pub async fn init( mailer: &Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, + site_config: &SiteConfig, cancellation_token: CancellationToken, task_tracker: &TaskTracker, ) -> Result<(), QueueRunnerError> { @@ -115,6 +124,7 @@ pub async fn init( mailer.clone(), homeserver, url_builder, + site_config.clone(), ); let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?; @@ -129,6 +139,7 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() .register_handler::() .register_handler::() .register_handler::() @@ -136,6 +147,12 @@ pub async fn init( "cleanup-expired-tokens", "0 0 * * * *".parse()?, mas_storage::queue::CleanupExpiredTokensJob, + ) + .add_schedule( + "expire-inactive-sessions", + // Run this job every 15 minutes + "30 */15 * * * *".parse()?, + mas_storage::queue::ExpireInactiveSessionsJob, ); task_tracker.spawn(worker.run()); diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs index a9e28705c..fee7ca863 100644 --- a/crates/tasks/src/sessions.rs +++ b/crates/tasks/src/sessions.rs @@ -11,7 +11,7 @@ use mas_storage::{ compat::CompatSessionFilter, oauth2::OAuth2SessionFilter, queue::{ - ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, + ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, ExpireInactiveSessionsJob, ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob, }, user::BrowserSessionFilter, @@ -22,6 +22,58 @@ use crate::{ State, }; +#[async_trait] +impl RunnableJob for ExpireInactiveSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let Some(config) = state.site_config().session_expiration.as_ref() else { + // Automatic session expiration is disabled + return Ok(()); + }; + + let clock = state.clock(); + let mut rng = state.rng(); + let now = clock.now(); + let mut repo = state.repository().await.map_err(JobError::retry)?; + + if let Some(ttl) = config.oauth_session_inactivity_ttl { + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + ExpireInactiveOAuthSessionsJob::new(now - ttl), + ) + .await + .map_err(JobError::retry)?; + } + + if let Some(ttl) = config.compat_session_inactivity_ttl { + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + ExpireInactiveCompatSessionsJob::new(now - ttl), + ) + .await + .map_err(JobError::retry)?; + } + + if let Some(ttl) = config.user_session_inactivity_ttl { + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + ExpireInactiveUserSessionsJob::new(now - ttl), + ) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +} + #[async_trait] impl RunnableJob for ExpireInactiveOAuthSessionsJob { async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { diff --git a/docs/config.schema.json b/docs/config.schema.json index 269ae46c1..04d8a1bba 100644 --- a/docs/config.schema.json +++ b/docs/config.schema.json @@ -2457,6 +2457,45 @@ "format": "uint64", "maximum": 86400.0, "minimum": 60.0 + }, + "inactive_session_expiration": { + "description": "Experimetal feature to automatically expire inactive sessions\n\nDisabled by default", + "allOf": [ + { + "$ref": "#/definitions/InactiveSessionExpirationConfig" + } + ] + } + } + }, + "InactiveSessionExpirationConfig": { + "description": "Configuration options for the inactive session expiration feature", + "type": "object", + "required": [ + "ttl" + ], + "properties": { + "ttl": { + "description": "Time after which an inactive session is automatically finished", + "type": "integer", + "format": "uint64", + "maximum": 7776000.0, + "minimum": 600.0 + }, + "expire_compat_sessions": { + "description": "Should compatibility sessions expire after inactivity", + "default": true, + "type": "boolean" + }, + "expire_oauth_sessions": { + "description": "Should OAuth 2.0 sessions expire after inactivity", + "default": true, + "type": "boolean" + }, + "expire_user_sessions": { + "description": "Should user sessions expire after inactivity", + "default": true, + "type": "boolean" } } } From 9fea06693bafb4dbc67ce2a8ac1c9ef2f30b90fc Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 12 Feb 2025 17:17:26 +0100 Subject: [PATCH 6/8] Allow filtering sessions by client kind (dynamic or static) --- .../src/admin/v1/oauth2_sessions/list.rs | 31 ++++++++++++++++ crates/storage-pg/src/iden.rs | 9 +++++ crates/storage-pg/src/oauth2/session.rs | 22 +++++++++++- crates/storage/src/oauth2/session.rs | 35 +++++++++++++++++++ docs/api/spec.json | 23 ++++++++++++ 5 files changed, 119 insertions(+), 1 deletion(-) diff --git a/crates/handlers/src/admin/v1/oauth2_sessions/list.rs b/crates/handlers/src/admin/v1/oauth2_sessions/list.rs index 596361e0d..999584f0d 100644 --- a/crates/handlers/src/admin/v1/oauth2_sessions/list.rs +++ b/crates/handlers/src/admin/v1/oauth2_sessions/list.rs @@ -46,6 +46,22 @@ impl std::fmt::Display for OAuth2SessionStatus { } } +#[derive(Deserialize, JsonSchema, Clone, Copy)] +#[serde(rename_all = "snake_case")] +enum OAuth2ClientKind { + Dynamic, + Static, +} + +impl std::fmt::Display for OAuth2ClientKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Dynamic => write!(f, "dynamic"), + Self::Static => write!(f, "static"), + } + } +} + #[derive(FromRequestParts, Deserialize, JsonSchema, OperationIo)] #[serde(rename = "OAuth2SessionFilter")] #[aide(input_with = "Query")] @@ -61,6 +77,10 @@ pub struct FilterParams { #[schemars(with = "Option")] client: Option, + /// Retrieve the items only for a specific client kind + #[serde(rename = "filter[client-kind]")] + client_kind: Option, + /// Retrieve the items started from the given browser session #[serde(rename = "filter[user-session]")] #[schemars(with = "Option")] @@ -95,6 +115,11 @@ impl std::fmt::Display for FilterParams { sep = '&'; } + if let Some(client_kind) = self.client_kind { + write!(f, "{sep}filter[client-kind]={client_kind}")?; + sep = '&'; + } + if let Some(user_session) = self.user_session { write!(f, "{sep}filter[user-session]={user_session}")?; sep = '&'; @@ -232,6 +257,12 @@ pub async fn handler( None => filter, }; + let filter = match params.client_kind { + Some(OAuth2ClientKind::Dynamic) => filter.only_dynamic_clients(), + Some(OAuth2ClientKind::Static) => filter.only_static_clients(), + None => filter, + }; + let user_session = if let Some(user_session_id) = params.user_session { let user_session = repo .browser_session() diff --git a/crates/storage-pg/src/iden.rs b/crates/storage-pg/src/iden.rs index 951764806..841a4648e 100644 --- a/crates/storage-pg/src/iden.rs +++ b/crates/storage-pg/src/iden.rs @@ -83,6 +83,15 @@ pub enum OAuth2Sessions { LastActiveIp, } +#[derive(sea_query::Iden)] +#[iden = "oauth2_clients"] +pub enum OAuth2Clients { + Table, + #[iden = "oauth2_client_id"] + OAuth2ClientId, + IsStatic, +} + #[derive(sea_query::Iden)] #[iden = "upstream_oauth_providers"] pub enum UpstreamOAuthProviders { diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index 869e0c546..a81771e90 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -23,7 +23,7 @@ use uuid::Uuid; use crate::{ filter::{Filter, StatementExt}, - iden::OAuth2Sessions, + iden::{OAuth2Clients, OAuth2Sessions}, pagination::QueryBuilderExt, tracing::ExecuteExt, DatabaseError, DatabaseInconsistencyError, @@ -104,6 +104,26 @@ impl Filter for OAuth2SessionFilter<'_> { Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)) .eq(Uuid::from(client.id)) })) + .add_option(self.client_kind().map(|client_kind| { + // This builds either a: + // `WHERE oauth2_client_id = ANY(...)` + // or a `WHERE oauth2_client_id <> ALL(...)` + let static_clients = Query::select() + .expr(Expr::col(( + OAuth2Clients::Table, + OAuth2Clients::OAuth2ClientId, + ))) + .and_where(Expr::col((OAuth2Clients::Table, OAuth2Clients::IsStatic)).into()) + .from(OAuth2Clients::Table) + .take(); + if client_kind.is_static() { + Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)) + .eq(Expr::any(static_clients)) + } else { + Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)) + .ne(Expr::all(static_clients)) + } + })) .add_option(self.device().map(|device| { Expr::val(device.to_scope_token().to_string()).eq(PgFunc::any(Expr::col(( OAuth2Sessions::Table, diff --git a/crates/storage/src/oauth2/session.rs b/crates/storage/src/oauth2/session.rs index 6c1214517..71cca8ef3 100644 --- a/crates/storage/src/oauth2/session.rs +++ b/crates/storage/src/oauth2/session.rs @@ -31,6 +31,18 @@ impl OAuth2SessionState { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum ClientKind { + Static, + Dynamic, +} + +impl ClientKind { + pub fn is_static(self) -> bool { + matches!(self, Self::Static) + } +} + /// Filter parameters for listing OAuth 2.0 sessions #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub struct OAuth2SessionFilter<'a> { @@ -39,6 +51,7 @@ pub struct OAuth2SessionFilter<'a> { browser_session: Option<&'a BrowserSession>, device: Option<&'a Device>, client: Option<&'a Client>, + client_kind: Option, state: Option, scope: Option<&'a Scope>, last_active_before: Option>, @@ -119,6 +132,28 @@ impl<'a> OAuth2SessionFilter<'a> { self.client } + /// List only static clients + #[must_use] + pub fn only_static_clients(mut self) -> Self { + self.client_kind = Some(ClientKind::Static); + self + } + + /// List only dynamic clients + #[must_use] + pub fn only_dynamic_clients(mut self) -> Self { + self.client_kind = Some(ClientKind::Dynamic); + self + } + + /// Get the client kind filter + /// + /// Returns [`None`] if no client kind filter was set + #[must_use] + pub fn client_kind(&self) -> Option { + self.client_kind + } + /// Only return sessions with a last active time before the given time #[must_use] pub fn with_last_active_before(mut self, last_active_before: DateTime) -> Self { diff --git a/docs/api/spec.json b/docs/api/spec.json index 258068bb0..100bff25a 100644 --- a/docs/api/spec.json +++ b/docs/api/spec.json @@ -357,6 +357,17 @@ }, "style": "form" }, + { + "in": "query", + "name": "filter[client-kind]", + "description": "Retrieve the items only for a specific client kind", + "schema": { + "description": "Retrieve the items only for a specific client kind", + "$ref": "#/components/schemas/OAuth2ClientKind", + "nullable": true + }, + "style": "form" + }, { "in": "query", "name": "filter[user-session]", @@ -2347,6 +2358,11 @@ "$ref": "#/components/schemas/ULID", "nullable": true }, + "filter[client-kind]": { + "description": "Retrieve the items only for a specific client kind", + "$ref": "#/components/schemas/OAuth2ClientKind", + "nullable": true + }, "filter[user-session]": { "description": "Retrieve the items started from the given browser session", "$ref": "#/components/schemas/ULID", @@ -2367,6 +2383,13 @@ } } }, + "OAuth2ClientKind": { + "type": "string", + "enum": [ + "dynamic", + "static" + ] + }, "OAuth2SessionStatus": { "type": "string", "enum": [ From c7d2ad7c5cc7a2554a9bd562c01be7f2415230be Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 12 Feb 2025 17:17:48 +0100 Subject: [PATCH 7/8] Only expire sessions from dynamic clients --- crates/tasks/src/sessions.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs index fee7ca863..677e71b3c 100644 --- a/crates/tasks/src/sessions.rs +++ b/crates/tasks/src/sessions.rs @@ -90,6 +90,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob { let filter = OAuth2SessionFilter::new() .with_last_active_before(self.threshold()) .for_any_user() + .only_dynamic_clients() .active_only(); let pagination = self.pagination(100); From 9ce746f97587831cb1bfa1c4d0a8b414d00d661c Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 12 Feb 2025 17:31:00 +0100 Subject: [PATCH 8/8] Add documentation for session timeout configuration --- docs/reference/configuration.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 40e7e06f5..2671fc807 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -711,4 +711,19 @@ experimental: # Time-to-live of compatibility access tokens in seconds, when refresh tokens are supported. Defaults to 300, 5 minutes. #compat_token_ttl: 300 + + # Experimental feature to automatically expire inactive sessions + # Disabled by default + #inactive_session_expiration: + # Time after which an inactive session is automatically finished in seconds + #ttl: 32400 + + # Should compatibility sessions expire after inactivity. Defaults to true. + #expire_compat_sessions: true + + # Should OAuth 2.0 sessions expire after inactivity. Defaults to true. + #expire_oauth_sessions: true + + # Should user sessions expire after inactivity. Defaults to true. + #expire_user_sessions: true ```