diff --git a/crates/handlers/src/upstream_oauth2/link.rs b/crates/handlers/src/upstream_oauth2/link.rs index ba24ed311..357a7a72f 100644 --- a/crates/handlers/src/upstream_oauth2/link.rs +++ b/crates/handlers/src/upstream_oauth2/link.rs @@ -278,7 +278,7 @@ pub(crate) async fn get( // user. Mark the session as consumed and renew the authentication. let upstream_session = repo .upstream_oauth_session() - .consume(&clock, upstream_session) + .consume(&clock, upstream_session, &session) .await?; repo.browser_session() @@ -358,7 +358,7 @@ pub(crate) async fn get( let upstream_session = repo .upstream_oauth_session() - .consume(&clock, upstream_session) + .consume(&clock, upstream_session, &session) .await?; repo.browser_session() @@ -697,7 +697,7 @@ pub(crate) async fn get( let upstream_session = repo .upstream_oauth_session() - .consume(&clock, upstream_session) + .consume(&clock, upstream_session, &session) .await?; repo.browser_session() @@ -905,7 +905,7 @@ pub(crate) async fn post( let upstream_session = repo .upstream_oauth_session() - .consume(&clock, upstream_session) + .consume(&clock, upstream_session, &session) .await?; repo.browser_session() @@ -1246,10 +1246,6 @@ async fn prepare_user_registration( .set_upstream_oauth_authorization_session(registration, &upstream_session) .await?; - repo.upstream_oauth_session() - .consume(clock, upstream_session) - .await?; - Ok(registration) } diff --git a/crates/handlers/src/views/register/steps/finish.rs b/crates/handlers/src/views/register/steps/finish.rs index 6b7b5bfc5..af0b8ef9f 100644 --- a/crates/handlers/src/views/register/steps/finish.rs +++ b/crates/handlers/src/views/register/steps/finish.rs @@ -221,6 +221,18 @@ pub(crate) async fn get( .context("Authorization session has no upstream link associated with it") .map_err(InternalError::from_anyhow)?; + if upstream_oauth_authorization_session.is_consumed() { + // This means an authorization session was used to create multiple + // user registrations. This can happen if the user goes back in + // their navigation history and basically registers twice. We also + // used to consume the session earlier in the flow, so it's also + // possible that it happens during the rollout of that version. This + // is not going to happen often enough to have a dedicated page + return Err(InternalError::from_anyhow(anyhow::anyhow!( + "The upstream authorization session was already used. Try registering again" + ))); + } + let upstream_oauth_link = repo .upstream_oauth_link() .lookup(link_id) @@ -307,6 +319,11 @@ pub(crate) async fn get( } if let Some((upstream_session, upstream_link)) = upstream_oauth { + let upstream_session = repo + .upstream_oauth_session() + .consume(&clock, upstream_session, &user_session) + .await?; + repo.upstream_oauth_link() .associate_to_user(&upstream_link, &user) .await?; diff --git a/crates/storage-pg/.sqlx/query-5a6b91660e4c12b4a1fe2cad08e727a305cbe4029cd4cebd5ecc274e3e32f533.json b/crates/storage-pg/.sqlx/query-5a6b91660e4c12b4a1fe2cad08e727a305cbe4029cd4cebd5ecc274e3e32f533.json new file mode 100644 index 000000000..e1aa9740e --- /dev/null +++ b/crates/storage-pg/.sqlx/query-5a6b91660e4c12b4a1fe2cad08e727a305cbe4029cd4cebd5ecc274e3e32f533.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE upstream_oauth_authorization_sessions\n SET consumed_at = $1,\n user_session_id = $2\n WHERE upstream_oauth_authorization_session_id = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Uuid", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "5a6b91660e4c12b4a1fe2cad08e727a305cbe4029cd4cebd5ecc274e3e32f533" +} diff --git a/crates/storage-pg/.sqlx/query-689ffbfc5137ec788e89062ad679bbe6b23a8861c09a7246dc1659c28f12bf8d.json b/crates/storage-pg/.sqlx/query-689ffbfc5137ec788e89062ad679bbe6b23a8861c09a7246dc1659c28f12bf8d.json deleted file mode 100644 index b122c5a40..000000000 --- a/crates/storage-pg/.sqlx/query-689ffbfc5137ec788e89062ad679bbe6b23a8861c09a7246dc1659c28f12bf8d.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE upstream_oauth_authorization_sessions\n SET consumed_at = $1\n WHERE upstream_oauth_authorization_session_id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Timestamptz", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "689ffbfc5137ec788e89062ad679bbe6b23a8861c09a7246dc1659c28f12bf8d" -} diff --git a/crates/storage-pg/.sqlx/query-da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35.json b/crates/storage-pg/.sqlx/query-6db23fc9c39c2c7d9224d4e1233205f636568c990ccb05cf9208750ad1330b9b.json similarity index 50% rename from crates/storage-pg/.sqlx/query-da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35.json rename to crates/storage-pg/.sqlx/query-6db23fc9c39c2c7d9224d4e1233205f636568c990ccb05cf9208750ad1330b9b.json index 2ced2b555..7de368daa 100644 --- a/crates/storage-pg/.sqlx/query-da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35.json +++ b/crates/storage-pg/.sqlx/query-6db23fc9c39c2c7d9224d4e1233205f636568c990ccb05cf9208750ad1330b9b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH to_delete AS (\n SELECT upstream_oauth_authorization_session_id\n FROM upstream_oauth_authorization_sessions\n WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1)\n AND upstream_oauth_authorization_session_id <= $2\n ORDER BY upstream_oauth_authorization_session_id\n LIMIT $3\n )\n DELETE FROM upstream_oauth_authorization_sessions\n USING to_delete\n WHERE upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id = to_delete.upstream_oauth_authorization_session_id\n RETURNING upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id\n ", + "query": "\n WITH to_delete AS (\n SELECT upstream_oauth_authorization_session_id\n FROM upstream_oauth_authorization_sessions\n WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1)\n AND upstream_oauth_authorization_session_id <= $2\n AND user_session_id IS NULL\n ORDER BY upstream_oauth_authorization_session_id\n LIMIT $3\n )\n DELETE FROM upstream_oauth_authorization_sessions\n USING to_delete\n WHERE upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id = to_delete.upstream_oauth_authorization_session_id\n RETURNING upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id\n ", "describe": { "columns": [ { @@ -20,5 +20,5 @@ false ] }, - "hash": "da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35" + "hash": "6db23fc9c39c2c7d9224d4e1233205f636568c990ccb05cf9208750ad1330b9b" } diff --git a/crates/storage-pg/migrations/20260121103025_upstream_oauth_track_user_session.sql b/crates/storage-pg/migrations/20260121103025_upstream_oauth_track_user_session.sql new file mode 100644 index 000000000..f71d3280e --- /dev/null +++ b/crates/storage-pg/migrations/20260121103025_upstream_oauth_track_user_session.sql @@ -0,0 +1,11 @@ +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Start tracking the associated `user_session` directly on the authorization session +-- This will be backfilled in a separate migration rolling in the next version +ALTER TABLE upstream_oauth_authorization_sessions + ADD COLUMN user_session_id UUID + REFERENCES user_sessions (user_session_id) + ON DELETE SET NULL; diff --git a/crates/storage-pg/migrations/20260121112201_upstream_oauth_sessions_orphan_index.sql b/crates/storage-pg/migrations/20260121112201_upstream_oauth_sessions_orphan_index.sql new file mode 100644 index 000000000..9efad01d5 --- /dev/null +++ b/crates/storage-pg/migrations/20260121112201_upstream_oauth_sessions_orphan_index.sql @@ -0,0 +1,10 @@ +-- no-transaction +-- Copyright 2026 Element Creations Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE files in the repository root for full details. + +-- Add partial index for cleanup of orphaned upstream OAuth sessions +CREATE INDEX CONCURRENTLY IF NOT EXISTS upstream_oauth_authorization_sessions_orphaned + ON upstream_oauth_authorization_sessions (upstream_oauth_authorization_session_id) + WHERE user_session_id IS NULL; diff --git a/crates/storage-pg/src/upstream_oauth2/mod.rs b/crates/storage-pg/src/upstream_oauth2/mod.rs index d98e840b6..12df9d5f0 100644 --- a/crates/storage-pg/src/upstream_oauth2/mod.rs +++ b/crates/storage-pg/src/upstream_oauth2/mod.rs @@ -167,11 +167,24 @@ mod tests { assert!(!session.is_consumed()); assert_eq!(session.link_id(), Some(link.id)); - let session = repo - .upstream_oauth_session() - .consume(&clock, session) + // We need to create a user and start a browser session to consume the session + let user = repo + .user() + .add(&mut rng, &clock, "john".to_owned()) .await .unwrap(); + let browser_session = repo + .browser_session() + .add(&mut rng, &clock, &user, None) + .await + .unwrap(); + + let session = repo + .upstream_oauth_session() + .consume(&clock, session, &browser_session) + .await + .unwrap(); + // Reload the session let session = repo .upstream_oauth_session() @@ -181,11 +194,6 @@ mod tests { .expect("session to be found in the database"); assert!(session.is_consumed()); - let user = repo - .user() - .add(&mut rng, &clock, "john".to_owned()) - .await - .unwrap(); repo.upstream_oauth_link() .associate_to_user(&link, &user) .await diff --git a/crates/storage-pg/src/upstream_oauth2/session.rs b/crates/storage-pg/src/upstream_oauth2/session.rs index ebd45cb45..8d7e18550 100644 --- a/crates/storage-pg/src/upstream_oauth2/session.rs +++ b/crates/storage-pg/src/upstream_oauth2/session.rs @@ -8,8 +8,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use mas_data_model::{ - Clock, UpstreamOAuthAuthorizationSession, UpstreamOAuthAuthorizationSessionState, - UpstreamOAuthLink, UpstreamOAuthProvider, + BrowserSession, Clock, UpstreamOAuthAuthorizationSession, + UpstreamOAuthAuthorizationSessionState, UpstreamOAuthLink, UpstreamOAuthProvider, }; use mas_storage::{ Page, Pagination, @@ -375,15 +375,18 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> { &mut self, clock: &dyn Clock, upstream_oauth_authorization_session: UpstreamOAuthAuthorizationSession, + browser_session: &BrowserSession, ) -> Result { let consumed_at = clock.now(); sqlx::query!( r#" UPDATE upstream_oauth_authorization_sessions - SET consumed_at = $1 - WHERE upstream_oauth_authorization_session_id = $2 + SET consumed_at = $1, + user_session_id = $2 + WHERE upstream_oauth_authorization_session_id = $3 "#, consumed_at, + Uuid::from(browser_session.id), Uuid::from(upstream_oauth_authorization_session.id), ) .traced() @@ -577,7 +580,7 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> { ), err, )] - async fn cleanup( + async fn cleanup_orphaned( &mut self, since: Option, until: Ulid, @@ -593,6 +596,7 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> { FROM upstream_oauth_authorization_sessions WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1) AND upstream_oauth_authorization_session_id <= $2 + AND user_session_id IS NULL ORDER BY upstream_oauth_authorization_session_id LIMIT $3 ) diff --git a/crates/storage/src/upstream_oauth2/session.rs b/crates/storage/src/upstream_oauth2/session.rs index 04c5a90f1..6e017ca45 100644 --- a/crates/storage/src/upstream_oauth2/session.rs +++ b/crates/storage/src/upstream_oauth2/session.rs @@ -7,7 +7,8 @@ use async_trait::async_trait; use mas_data_model::{ - Clock, UpstreamOAuthAuthorizationSession, UpstreamOAuthLink, UpstreamOAuthProvider, + BrowserSession, Clock, UpstreamOAuthAuthorizationSession, UpstreamOAuthLink, + UpstreamOAuthProvider, }; use rand_core::RngCore; use ulid::Ulid; @@ -167,6 +168,8 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync { /// /// * `clock`: the clock source /// * `upstream_oauth_authorization_session`: the session to consume + /// * `browser_session`: the browser session that was authenticated with + /// this authorization session /// /// # Errors /// @@ -175,6 +178,7 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync { &mut self, clock: &dyn Clock, upstream_oauth_authorization_session: UpstreamOAuthAuthorizationSession, + browser_session: &BrowserSession, ) -> Result; /// List [`UpstreamOAuthAuthorizationSession`] with the given filter and @@ -207,9 +211,11 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync { async fn count(&mut self, filter: UpstreamOAuthSessionFilter<'_>) -> Result; - /// Cleanup old authorization sessions + /// Cleanup old authorization sessions that are not linked to a user session /// /// This will delete sessions with IDs up to and including `until`. + /// Authorization sessions with a user session linked must be kept around to + /// avoid breaking features like OIDC Backchannel Logout. /// /// Returns the number of sessions deleted and the cursor for the next batch /// @@ -223,7 +229,7 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync { /// # Errors /// /// Returns [`Self::Error`] if the underlying repository fails - async fn cleanup( + async fn cleanup_orphaned( &mut self, since: Option, until: Ulid, @@ -262,6 +268,7 @@ repository_impl!(UpstreamOAuthSessionRepository: &mut self, clock: &dyn Clock, upstream_oauth_authorization_session: UpstreamOAuthAuthorizationSession, + browser_session: &BrowserSession, ) -> Result; async fn list( @@ -272,7 +279,7 @@ repository_impl!(UpstreamOAuthSessionRepository: async fn count(&mut self, filter: UpstreamOAuthSessionFilter<'_>) -> Result; - async fn cleanup( + async fn cleanup_orphaned( &mut self, since: Option, until: Ulid, diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index c3876631f..1d1f4e314 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -345,7 +345,7 @@ impl RunnableJob for CleanupUpstreamOAuthSessionsJob { let mut repo = state.repository().await.map_err(JobError::retry)?; let (count, cursor) = repo .upstream_oauth_session() - .cleanup(since, until, BATCH_SIZE) + .cleanup_orphaned(since, until, BATCH_SIZE) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 54dc8d713..f238773c8 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -211,12 +211,16 @@ pub async fn init( "0 57 * * * *".parse()?, mas_storage::queue::CleanupUserEmailAuthenticationsJob, ) - .add_schedule( - "cleanup-upstream-oauth-sessions", - // Run this job every hour - "0 58 * * * *".parse()?, - mas_storage::queue::CleanupUpstreamOAuthSessionsJob, - ) + // This job is currently disabled, as it needs a database backfill to + // happen, which will happen in the next release. Some context in + // https://github.com/element-hq/matrix-authentication-service/issues/5435 + // + //.add_schedule( + // "cleanup-upstream-oauth-sessions", + // // Run this job every hour + // "0 58 * * * *".parse()?, + // mas_storage::queue::CleanupUpstreamOAuthSessionsJob, + //) .add_schedule( "cleanup-upstream-oauth-links", // Run this job every hour