From 120c8f7d23a53bbd949ca0177b0c62a94affc27b Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 22 Oct 2025 11:26:20 +0100 Subject: [PATCH 1/6] Add `revoke_bulk` for personal sessions storage --- crates/storage-pg/src/personal/mod.rs | 98 +++++++++++++++++++++++ crates/storage-pg/src/personal/session.rs | 62 ++++++++++++++ crates/storage/src/personal/session.rs | 26 ++++++ 3 files changed, 186 insertions(+) diff --git a/crates/storage-pg/src/personal/mod.rs b/crates/storage-pg/src/personal/mod.rs index e6e20b698..aa4597fec 100644 --- a/crates/storage-pg/src/personal/mod.rs +++ b/crates/storage-pg/src/personal/mod.rs @@ -181,6 +181,104 @@ mod tests { assert!(session_lookup.is_revoked()); } + #[sqlx::test(migrator = "crate::MIGRATOR")] + async fn test_session_revoke_bulk(pool: PgPool) { + let mut rng = ChaChaRng::seed_from_u64(42); + let clock = MockClock::default(); + let mut repo = PgRepository::from_pool(&pool).await.unwrap(); + + let alice_user = repo + .user() + .add(&mut rng, &clock, "alice".to_owned()) + .await + .unwrap(); + let bob_user = repo + .user() + .add(&mut rng, &clock, "bob".to_owned()) + .await + .unwrap(); + + let session1 = repo + .personal_session() + .add( + &mut rng, + &clock, + (&alice_user).into(), + &bob_user, + "Test Personal Session".to_owned(), + "openid".parse().unwrap(), + ) + .await + .unwrap(); + repo.personal_access_token() + .add( + &mut rng, + &clock, + &session1, + "mpt_hiss", + Some(Duration::days(42)), + ) + .await + .unwrap(); + + let session2 = repo + .personal_session() + .add( + &mut rng, + &clock, + (&bob_user).into(), + &bob_user, + "Test Personal Session".to_owned(), + "openid".parse().unwrap(), + ) + .await + .unwrap(); + repo.personal_access_token() + .add( + &mut rng, &clock, &session2, "mpt_meow", // No expiry + None, + ) + .await + .unwrap(); + + // Just one session without a token expiry time + assert_eq!( + repo.personal_session() + .revoke_bulk( + &clock, + PersonalSessionFilter::new() + .active_only() + .with_expires(false) + ) + .await + .unwrap(), + 1 + ); + + // Just one session with a token expiry time + assert_eq!( + repo.personal_session() + .revoke_bulk( + &clock, + PersonalSessionFilter::new() + .active_only() + .with_expires(true) + ) + .await + .unwrap(), + 1 + ); + + // No active sessions left + assert_eq!( + repo.personal_session() + .revoke_bulk(&clock, PersonalSessionFilter::new().active_only()) + .await + .unwrap(), + 0 + ); + } + #[sqlx::test(migrator = "crate::MIGRATOR")] async fn test_access_token_repository(pool: PgPool) { const FIRST_TOKEN: &str = "first_access_token"; diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs index 8b1723767..db8e46ff3 100644 --- a/crates/storage-pg/src/personal/session.rs +++ b/crates/storage-pg/src/personal/session.rs @@ -357,6 +357,68 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .map_err(DatabaseError::to_invalid_operation) } + #[tracing::instrument( + name = "db.personal_session.revoke_bulk", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result { + let revoked_at = clock.now(); + + let (sql, arguments) = Query::update() + .table(PersonalSessions::Table) + .value(PersonalSessions::RevokedAt, revoked_at) + .and_where( + Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)) + .in_subquery( + Query::select() + .expr(Expr::col(( + PersonalSessions::Table, + PersonalSessions::PersonalSessionId, + ))) + .from(PersonalSessions::Table) + .left_join( + PersonalAccessTokens::Table, + Cond::all() + .add( + Expr::col(( + PersonalSessions::Table, + PersonalSessions::PersonalSessionId, + )) + .eq(Expr::col(( + PersonalAccessTokens::Table, + PersonalAccessTokens::PersonalSessionId, + ))), + ) + .add( + Expr::col(( + PersonalAccessTokens::Table, + PersonalAccessTokens::RevokedAt, + )) + .is_null(), + ), + ) + .apply_filter(filter) + .take(), + ), + ) + .build_sqlx(PostgresQueryBuilder); + + let res = sqlx::query_with(&sql, arguments) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(res.rows_affected().try_into().unwrap_or(usize::MAX)) + } + #[tracing::instrument( name = "db.personal_session.list", skip_all, diff --git a/crates/storage/src/personal/session.rs b/crates/storage/src/personal/session.rs index 3515b1161..8e208a610 100644 --- a/crates/storage/src/personal/session.rs +++ b/crates/storage/src/personal/session.rs @@ -87,6 +87,26 @@ pub trait PersonalSessionRepository: Send + Sync { personal_session: PersonalSession, ) -> Result; + /// Revoke all the [`PersonalSession`]s matching the given filter. + /// + /// This will also revoke the relevant personal access tokens. + /// + /// Returns the number of sessions affected + /// + /// # Parameters + /// + /// * `clock`: The clock used to generate timestamps + /// * `filter`: The filter to apply + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result; + /// List [`PersonalSession`]s matching the given filter and pagination /// parameters /// @@ -150,6 +170,12 @@ repository_impl!(PersonalSessionRepository: personal_session: PersonalSession, ) -> Result; + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result; + async fn list( &mut self, filter: PersonalSessionFilter<'_>, From c94e4ea27b1dd6cf1fc0888528bf89d45268df1a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 22 Oct 2025 11:26:42 +0100 Subject: [PATCH 2/6] Revoke personal sessions on user deactivation --- crates/tasks/src/user.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/crates/tasks/src/user.rs b/crates/tasks/src/user.rs index ee60c6532..e605670cc 100644 --- a/crates/tasks/src/user.rs +++ b/crates/tasks/src/user.rs @@ -10,6 +10,7 @@ use mas_storage::{ RepositoryAccess, compat::CompatSessionFilter, oauth2::OAuth2SessionFilter, + personal::PersonalSessionFilter, queue::{DeactivateUserJob, ReactivateUserJob}, user::{BrowserSessionFilter, UserEmailFilter, UserRepository}, }; @@ -80,6 +81,36 @@ impl RunnableJob for DeactivateUserJob { .map_err(JobError::retry)?; info!(affected = n, "Killed all compatibility sessions for user"); + let n = repo + .personal_session() + .revoke_bulk( + clock, + PersonalSessionFilter::new() + .for_actor_user(&user) + .active_only(), + ) + .await + .map_err(JobError::retry)?; + info!( + affected = n, + "Killed all compatibility sessions acting as user" + ); + + let n = repo + .personal_session() + .revoke_bulk( + clock, + PersonalSessionFilter::new() + .for_owner_user(&user) + .active_only(), + ) + .await + .map_err(JobError::retry)?; + info!( + affected = n, + "Killed all compatibility sessions owned by user" + ); + // Delete all the email addresses for the user let n = repo .user_email() From 29c3da5d0e0c8ff29965b01736b2bade51bde74b Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 22 Oct 2025 11:31:17 +0100 Subject: [PATCH 3/6] Don't allow creating personal sessions for deactivated users --- crates/handlers/src/admin/v1/personal_sessions/add.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/handlers/src/admin/v1/personal_sessions/add.rs b/crates/handlers/src/admin/v1/personal_sessions/add.rs index 5a7bb5a0e..cc86807e9 100644 --- a/crates/handlers/src/admin/v1/personal_sessions/add.rs +++ b/crates/handlers/src/admin/v1/personal_sessions/add.rs @@ -32,6 +32,9 @@ pub enum RouteError { #[error("User not found")] UserNotFound, + #[error("User is not active")] + UserDeactivated, + #[error("Invalid scope")] InvalidScope, } @@ -46,6 +49,7 @@ impl IntoResponse for RouteError { let status = match self { Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::UserNotFound => StatusCode::NOT_FOUND, + Self::UserDeactivated => StatusCode::GONE, Self::InvalidScope => StatusCode::BAD_REQUEST, }; (status, sentry_event_id, Json(error)).into_response() @@ -114,6 +118,10 @@ pub async fn handler( .await? .ok_or(RouteError::UserNotFound)?; + if actor_user.deactivated_at.is_some() { + return Err(RouteError::UserDeactivated); + } + let scope: Scope = params.scope.parse().map_err(|_| RouteError::InvalidScope)?; // Create the personal session From 0ec91f5f4fb7a8789ffb22286617d83538c0d636 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 22 Oct 2025 14:20:55 +0100 Subject: [PATCH 4/6] Use `is_valid_actor` --- crates/handlers/src/admin/v1/personal_sessions/add.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/handlers/src/admin/v1/personal_sessions/add.rs b/crates/handlers/src/admin/v1/personal_sessions/add.rs index edf333b19..26e5ca896 100644 --- a/crates/handlers/src/admin/v1/personal_sessions/add.rs +++ b/crates/handlers/src/admin/v1/personal_sessions/add.rs @@ -113,7 +113,7 @@ pub async fn handler( .await? .ok_or(RouteError::UserNotFound)?; - if actor_user.deactivated_at.is_some() { + if !actor_user.is_valid_actor() { return Err(RouteError::UserDeactivated); } From 7e70afa6abd3b7d2f1a7a6086e2bf60b29987161 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 22 Oct 2025 14:22:10 +0100 Subject: [PATCH 5/6] Add comments for the filters --- crates/storage-pg/src/personal/session.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs index db8e46ff3..b4c330ecb 100644 --- a/crates/storage-pg/src/personal/session.rs +++ b/crates/storage-pg/src/personal/session.rs @@ -377,6 +377,9 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .value(PersonalSessions::RevokedAt, revoked_at) .and_where( Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)) + // Because filters apply to both the session and access token tables, + // Use a subquery to make it possible to use a JOIN + // onto the personal access token table. .in_subquery( Query::select() .expr(Expr::col(( @@ -387,6 +390,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .left_join( PersonalAccessTokens::Table, Cond::all() + // Match session ID .add( Expr::col(( PersonalSessions::Table, @@ -397,6 +401,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { PersonalAccessTokens::PersonalSessionId, ))), ) + // Only choose the active access token for each session .add( Expr::col(( PersonalAccessTokens::Table, @@ -495,6 +500,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .left_join( PersonalAccessTokens::Table, Cond::all() + // Match session ID .add( Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)) .eq(Expr::col(( @@ -502,6 +508,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { PersonalAccessTokens::PersonalSessionId, ))), ) + // Only choose the active access token for each session .add( Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::RevokedAt)) .is_null(), @@ -539,6 +546,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .left_join( PersonalAccessTokens::Table, Cond::all() + // Match session ID .add( Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)) .eq(Expr::col(( @@ -546,6 +554,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { PersonalAccessTokens::PersonalSessionId, ))), ) + // Only choose the active access token for each session .add( Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::RevokedAt)) .is_null(), From 676c594dc4d207ca0037a02103215264945a9fd6 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 22 Oct 2025 14:23:18 +0100 Subject: [PATCH 6/6] Remove stale comment --- crates/storage/src/personal/session.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/storage/src/personal/session.rs b/crates/storage/src/personal/session.rs index 8e208a610..921c6df39 100644 --- a/crates/storage/src/personal/session.rs +++ b/crates/storage/src/personal/session.rs @@ -89,8 +89,6 @@ pub trait PersonalSessionRepository: Send + Sync { /// Revoke all the [`PersonalSession`]s matching the given filter. /// - /// This will also revoke the relevant personal access tokens. - /// /// Returns the number of sessions affected /// /// # Parameters