From 2a86a446b2eb195fbe56b37a41b612a8c498bd86 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 6 Oct 2025 13:31:43 +0100 Subject: [PATCH] Add filters for personal sessions --- crates/storage-pg/src/iden.rs | 15 ++ crates/storage-pg/src/personal/session.rs | 170 +++++++++++++++++++- crates/storage/src/personal/mod.rs | 5 +- crates/storage/src/personal/session.rs | 181 +++++++++++++++++++++- 4 files changed, 366 insertions(+), 5 deletions(-) diff --git a/crates/storage-pg/src/iden.rs b/crates/storage-pg/src/iden.rs index a861f59c7..c2198434e 100644 --- a/crates/storage-pg/src/iden.rs +++ b/crates/storage-pg/src/iden.rs @@ -108,6 +108,21 @@ pub enum OAuth2Clients { IsStatic, } +#[derive(sea_query::Iden)] +#[iden = "personal_sessions"] +pub enum PersonalSessions { + Table, + PersonalSessionId, + OwnerUserId, + ActorUserId, + HumanName, + ScopeList, + CreatedAt, + RevokedAt, + LastActiveAt, + LastActiveIp, +} + #[derive(sea_query::Iden)] #[iden = "upstream_oauth_providers"] pub enum UpstreamOAuthProviders { diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs index 514293ba9..40ed4f312 100644 --- a/crates/storage-pg/src/personal/session.rs +++ b/crates/storage-pg/src/personal/session.rs @@ -11,14 +11,29 @@ use mas_data_model::{ Clock, User, personal::session::{PersonalSession, SessionState}, }; -use mas_storage::personal::PersonalSessionRepository; +use mas_storage::{ + Page, Pagination, + personal::{PersonalSessionFilter, PersonalSessionRepository, PersonalSessionState}, +}; use oauth2_types::scope::Scope; use rand::RngCore; +use sea_query::{ + Condition, Expr, PgFunc, PostgresQueryBuilder, Query, SimpleExpr, enum_def, + extension::postgres::PgExpr as _, +}; +use sea_query_binder::SqlxBinder as _; use sqlx::PgConnection; use ulid::Ulid; use uuid::Uuid; -use crate::{DatabaseError, errors::DatabaseInconsistencyError, tracing::ExecuteExt as _}; +use crate::{ + DatabaseError, + errors::DatabaseInconsistencyError, + filter::{Filter, StatementExt as _}, + iden::PersonalSessions, + pagination::QueryBuilderExt as _, + tracing::ExecuteExt as _, +}; /// An implementation of [`PersonalSessionRepository`] for a PostgreSQL /// connection @@ -27,13 +42,15 @@ pub struct PgPersonalSessionRepository<'c> { } impl<'c> PgPersonalSessionRepository<'c> { - /// Create a new [`PgOAuth2SessionRepository`] from an active PostgreSQL + /// Create a new [`PgPersonalSessionRepository`] from an active PostgreSQL /// connection pub fn new(conn: &'c mut PgConnection) -> Self { Self { conn } } } +#[derive(sqlx::FromRow)] +#[enum_def] struct PersonalSessionLookup { personal_session_id: Uuid, owner_user_id: Uuid, @@ -215,4 +232,151 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .finish(finished_at) .map_err(DatabaseError::to_invalid_operation) } + + #[tracing::instrument( + name = "db.personal_session.list", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn list( + &mut self, + filter: PersonalSessionFilter<'_>, + pagination: Pagination, + ) -> Result, Self::Error> { + let (sql, arguments) = Query::select() + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)), + PersonalSessionLookupIden::PersonalSessionId, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::OwnerUserId)), + PersonalSessionLookupIden::OwnerUserId, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::ActorUserId)), + PersonalSessionLookupIden::ActorUserId, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::HumanName)), + PersonalSessionLookupIden::HumanName, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::ScopeList)), + PersonalSessionLookupIden::ScopeList, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::CreatedAt)), + PersonalSessionLookupIden::CreatedAt, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::RevokedAt)), + PersonalSessionLookupIden::RevokedAt, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveAt)), + PersonalSessionLookupIden::LastActiveAt, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveIp)), + PersonalSessionLookupIden::LastActiveIp, + ) + .from(PersonalSessions::Table) + .apply_filter(filter) + .generate_pagination( + (PersonalSessions::Table, PersonalSessions::PersonalSessionId), + pagination, + ) + .build_sqlx(PostgresQueryBuilder); + + let edges: Vec = sqlx::query_as_with(&sql, arguments) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let page = pagination + .process(edges) + .try_map(PersonalSession::try_from)?; + + Ok(page) + } + + #[tracing::instrument( + name = "db.personal_session.count", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result { + let (sql, arguments) = Query::select() + .expr(Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)).count()) + .from(PersonalSessions::Table) + .apply_filter(filter) + .build_sqlx(PostgresQueryBuilder); + + let count: i64 = sqlx::query_scalar_with(&sql, arguments) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + count + .try_into() + .map_err(DatabaseError::to_invalid_operation) + } +} + +impl Filter for PersonalSessionFilter<'_> { + fn generate_condition(&self, _has_joins: bool) -> impl sea_query::IntoCondition { + sea_query::Condition::all() + .add_option(self.owner_user().map(|user| { + Expr::col((PersonalSessions::Table, PersonalSessions::OwnerUserId)) + .eq(Uuid::from(user.id)) + })) + .add_option(self.actor_user().map(|user| { + Expr::col((PersonalSessions::Table, PersonalSessions::ActorUserId)) + .eq(Uuid::from(user.id)) + })) + .add_option(self.device().map(|device| -> SimpleExpr { + if let Ok([stable_scope_token, unstable_scope_token]) = device.to_scope_token() { + Condition::any() + .add( + Expr::val(stable_scope_token.to_string()).eq(PgFunc::any(Expr::col(( + PersonalSessions::Table, + PersonalSessions::ScopeList, + )))), + ) + .add(Expr::val(unstable_scope_token.to_string()).eq(PgFunc::any( + Expr::col((PersonalSessions::Table, PersonalSessions::ScopeList)), + ))) + .into() + } else { + // If the device ID can't be encoded as a scope token, match no rows + Expr::val(false).into() + } + })) + .add_option(self.state().map(|state| match state { + PersonalSessionState::Active => { + Expr::col((PersonalSessions::Table, PersonalSessions::RevokedAt)).is_null() + } + PersonalSessionState::Revoked => { + Expr::col((PersonalSessions::Table, PersonalSessions::RevokedAt)).is_not_null() + } + })) + .add_option(self.scope().map(|scope| { + let scope: Vec = scope.iter().map(|s| s.as_str().to_owned()).collect(); + Expr::col((PersonalSessions::Table, PersonalSessions::ScopeList)).contains(scope) + })) + .add_option(self.last_active_before().map(|last_active_before| { + Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveAt)) + .lt(last_active_before) + })) + .add_option(self.last_active_after().map(|last_active_after| { + Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveAt)) + .gt(last_active_after) + })) + } } diff --git a/crates/storage/src/personal/mod.rs b/crates/storage/src/personal/mod.rs index 28a33e1a0..3a9dfcd65 100644 --- a/crates/storage/src/personal/mod.rs +++ b/crates/storage/src/personal/mod.rs @@ -10,4 +10,7 @@ mod access_token; mod session; -pub use self::{access_token::PersonalAccessTokenRepository, session::PersonalSessionRepository}; +pub use self::{ + access_token::PersonalAccessTokenRepository, + session::{PersonalSessionFilter, PersonalSessionRepository, PersonalSessionState}, +}; diff --git a/crates/storage/src/personal/session.rs b/crates/storage/src/personal/session.rs index 7d0a76a37..aedb939e0 100644 --- a/crates/storage/src/personal/session.rs +++ b/crates/storage/src/personal/session.rs @@ -10,7 +10,7 @@ use oauth2_types::scope::Scope; use rand_core::RngCore; use ulid::Ulid; -use crate::repository_impl; +use crate::{Page, Pagination, repository_impl}; /// A [`PersonalSessionRepository`] helps interacting with /// [`PersonalSession`] saved in the storage backend @@ -78,6 +78,34 @@ pub trait PersonalSessionRepository: Send + Sync { clock: &dyn Clock, personal_session: PersonalSession, ) -> Result; + + /// List [`PersonalSession`]s matching the given filter and pagination + /// parameters + /// + /// # Parameters + /// + /// * `filter`: The filter parameters + /// * `pagination`: The pagination parameters + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn list( + &mut self, + filter: PersonalSessionFilter<'_>, + pagination: Pagination, + ) -> Result, Self::Error>; + + /// Count [`PersonalSession`]s matching the given filter + /// + /// # Parameters + /// + /// * `filter`: The filter parameters + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result; } repository_impl!(PersonalSessionRepository: @@ -98,4 +126,155 @@ repository_impl!(PersonalSessionRepository: clock: &dyn Clock, personal_session: PersonalSession, ) -> Result; + + async fn list( + &mut self, + filter: PersonalSessionFilter<'_>, + pagination: Pagination, + ) -> Result, Self::Error>; + + async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result; ); + +/// Filter parameters for listing personal sessions +#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +pub struct PersonalSessionFilter<'a> { + owner_user: Option<&'a User>, + actor_user: Option<&'a User>, + device: Option<&'a Device>, + state: Option, + scope: Option<&'a Scope>, + last_active_before: Option>, + last_active_after: Option>, +} + +/// Filter for what state a personal session is in. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PersonalSessionState { + /// The personal session is active, which means it either + /// has active access tokens or can have new access tokens generated. + Active, + /// The personal session is revoked, which means no more access tokens + /// can be generated and none are active. + Revoked, +} + +impl<'a> PersonalSessionFilter<'a> { + /// Create a new [`PersonalSessionFilter`] with default values + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// List sessions owned by a specific user + #[must_use] + pub fn for_owner_user(mut self, user: &'a User) -> Self { + self.owner_user = Some(user); + self + } + + /// Get the owner user filter + /// + /// Returns [`None`] if no user filter was set + #[must_use] + pub fn owner_user(&self) -> Option<&'a User> { + self.owner_user + } + + /// List sessions acting as a specific user + #[must_use] + pub fn for_actor_user(mut self, user: &'a User) -> Self { + self.actor_user = Some(user); + self + } + + /// Get the actor user filter + /// + /// Returns [`None`] if no user filter was set + #[must_use] + pub fn actor_user(&self) -> Option<&'a User> { + self.actor_user + } + + /// Only return sessions with a last active time before the given time + #[must_use] + pub fn with_last_active_before(mut self, last_active_before: DateTime) -> Self { + self.last_active_before = Some(last_active_before); + self + } + + /// Only return sessions with a last active time after the given time + #[must_use] + pub fn with_last_active_after(mut self, last_active_after: DateTime) -> Self { + self.last_active_after = Some(last_active_after); + self + } + + /// Get the last active before filter + /// + /// Returns [`None`] if no client filter was set + #[must_use] + pub fn last_active_before(&self) -> Option> { + self.last_active_before + } + + /// Get the last active after filter + /// + /// Returns [`None`] if no client filter was set + #[must_use] + pub fn last_active_after(&self) -> Option> { + self.last_active_after + } + + /// Only return active sessions + #[must_use] + pub fn active_only(mut self) -> Self { + self.state = Some(PersonalSessionState::Active); + self + } + + /// Only return finished sessions + #[must_use] + pub fn finished_only(mut self) -> Self { + self.state = Some(PersonalSessionState::Revoked); + self + } + + /// Get the state filter + /// + /// Returns [`None`] if no state filter was set + #[must_use] + pub fn state(&self) -> Option { + self.state + } + + /// Only return sessions with the given scope + #[must_use] + pub fn with_scope(mut self, scope: &'a Scope) -> Self { + self.scope = Some(scope); + self + } + + /// Get the scope filter + /// + /// Returns [`None`] if no scope filter was set + #[must_use] + pub fn scope(&self) -> Option<&'a Scope> { + self.scope + } + + /// Only return sessions that have the given device in their scope + #[must_use] + pub fn for_device(mut self, device: &'a Device) -> Self { + self.device = Some(device); + self + } + + /// Get the device filter + /// + /// Returns [`None`] if no device filter was set + #[must_use] + pub fn device(&self) -> Option<&'a Device> { + self.device + } +}