Do not cleanup upstream OAuth sessions that may still be useful (#5437)

This commit is contained in:
Quentin Gliech
2026-01-21 13:20:38 +01:00
committed by GitHub
12 changed files with 107 additions and 49 deletions

View File

@@ -278,7 +278,7 @@ pub(crate) async fn get(
// user. Mark the session as consumed and renew the authentication.
let upstream_session = repo
.upstream_oauth_session()
.consume(&clock, upstream_session)
.consume(&clock, upstream_session, &session)
.await?;
repo.browser_session()
@@ -358,7 +358,7 @@ pub(crate) async fn get(
let upstream_session = repo
.upstream_oauth_session()
.consume(&clock, upstream_session)
.consume(&clock, upstream_session, &session)
.await?;
repo.browser_session()
@@ -697,7 +697,7 @@ pub(crate) async fn get(
let upstream_session = repo
.upstream_oauth_session()
.consume(&clock, upstream_session)
.consume(&clock, upstream_session, &session)
.await?;
repo.browser_session()
@@ -905,7 +905,7 @@ pub(crate) async fn post(
let upstream_session = repo
.upstream_oauth_session()
.consume(&clock, upstream_session)
.consume(&clock, upstream_session, &session)
.await?;
repo.browser_session()
@@ -1246,10 +1246,6 @@ async fn prepare_user_registration(
.set_upstream_oauth_authorization_session(registration, &upstream_session)
.await?;
repo.upstream_oauth_session()
.consume(clock, upstream_session)
.await?;
Ok(registration)
}

View File

@@ -221,6 +221,18 @@ pub(crate) async fn get(
.context("Authorization session has no upstream link associated with it")
.map_err(InternalError::from_anyhow)?;
if upstream_oauth_authorization_session.is_consumed() {
// This means an authorization session was used to create multiple
// user registrations. This can happen if the user goes back in
// their navigation history and basically registers twice. We also
// used to consume the session earlier in the flow, so it's also
// possible that it happens during the rollout of that version. This
// is not going to happen often enough to have a dedicated page
return Err(InternalError::from_anyhow(anyhow::anyhow!(
"The upstream authorization session was already used. Try registering again"
)));
}
let upstream_oauth_link = repo
.upstream_oauth_link()
.lookup(link_id)
@@ -307,6 +319,11 @@ pub(crate) async fn get(
}
if let Some((upstream_session, upstream_link)) = upstream_oauth {
let upstream_session = repo
.upstream_oauth_session()
.consume(&clock, upstream_session, &user_session)
.await?;
repo.upstream_oauth_link()
.associate_to_user(&upstream_link, &user)
.await?;

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE upstream_oauth_authorization_sessions\n SET consumed_at = $1,\n user_session_id = $2\n WHERE upstream_oauth_authorization_session_id = $3\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Uuid",
"Uuid"
]
},
"nullable": []
},
"hash": "5a6b91660e4c12b4a1fe2cad08e727a305cbe4029cd4cebd5ecc274e3e32f533"
}

View File

@@ -1,15 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE upstream_oauth_authorization_sessions\n SET consumed_at = $1\n WHERE upstream_oauth_authorization_session_id = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Uuid"
]
},
"nullable": []
},
"hash": "689ffbfc5137ec788e89062ad679bbe6b23a8861c09a7246dc1659c28f12bf8d"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH to_delete AS (\n SELECT upstream_oauth_authorization_session_id\n FROM upstream_oauth_authorization_sessions\n WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1)\n AND upstream_oauth_authorization_session_id <= $2\n ORDER BY upstream_oauth_authorization_session_id\n LIMIT $3\n )\n DELETE FROM upstream_oauth_authorization_sessions\n USING to_delete\n WHERE upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id = to_delete.upstream_oauth_authorization_session_id\n RETURNING upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id\n ",
"query": "\n WITH to_delete AS (\n SELECT upstream_oauth_authorization_session_id\n FROM upstream_oauth_authorization_sessions\n WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1)\n AND upstream_oauth_authorization_session_id <= $2\n AND user_session_id IS NULL\n ORDER BY upstream_oauth_authorization_session_id\n LIMIT $3\n )\n DELETE FROM upstream_oauth_authorization_sessions\n USING to_delete\n WHERE upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id = to_delete.upstream_oauth_authorization_session_id\n RETURNING upstream_oauth_authorization_sessions.upstream_oauth_authorization_session_id\n ",
"describe": {
"columns": [
{
@@ -20,5 +20,5 @@
false
]
},
"hash": "da6baa340eedfce8e965c9f3baa90f21f2331c3881c082f0157752d241403b35"
"hash": "6db23fc9c39c2c7d9224d4e1233205f636568c990ccb05cf9208750ad1330b9b"
}

View File

@@ -0,0 +1,11 @@
-- Copyright 2026 Element Creations Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
-- Please see LICENSE files in the repository root for full details.
-- Start tracking the associated `user_session` directly on the authorization session
-- This will be backfilled in a separate migration rolling in the next version
ALTER TABLE upstream_oauth_authorization_sessions
ADD COLUMN user_session_id UUID
REFERENCES user_sessions (user_session_id)
ON DELETE SET NULL;

View File

@@ -0,0 +1,10 @@
-- no-transaction
-- Copyright 2026 Element Creations Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
-- Please see LICENSE files in the repository root for full details.
-- Add partial index for cleanup of orphaned upstream OAuth sessions
CREATE INDEX CONCURRENTLY IF NOT EXISTS upstream_oauth_authorization_sessions_orphaned
ON upstream_oauth_authorization_sessions (upstream_oauth_authorization_session_id)
WHERE user_session_id IS NULL;

View File

@@ -167,11 +167,24 @@ mod tests {
assert!(!session.is_consumed());
assert_eq!(session.link_id(), Some(link.id));
let session = repo
.upstream_oauth_session()
.consume(&clock, session)
// We need to create a user and start a browser session to consume the session
let user = repo
.user()
.add(&mut rng, &clock, "john".to_owned())
.await
.unwrap();
let browser_session = repo
.browser_session()
.add(&mut rng, &clock, &user, None)
.await
.unwrap();
let session = repo
.upstream_oauth_session()
.consume(&clock, session, &browser_session)
.await
.unwrap();
// Reload the session
let session = repo
.upstream_oauth_session()
@@ -181,11 +194,6 @@ mod tests {
.expect("session to be found in the database");
assert!(session.is_consumed());
let user = repo
.user()
.add(&mut rng, &clock, "john".to_owned())
.await
.unwrap();
repo.upstream_oauth_link()
.associate_to_user(&link, &user)
.await

View File

@@ -8,8 +8,8 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use mas_data_model::{
Clock, UpstreamOAuthAuthorizationSession, UpstreamOAuthAuthorizationSessionState,
UpstreamOAuthLink, UpstreamOAuthProvider,
BrowserSession, Clock, UpstreamOAuthAuthorizationSession,
UpstreamOAuthAuthorizationSessionState, UpstreamOAuthLink, UpstreamOAuthProvider,
};
use mas_storage::{
Page, Pagination,
@@ -375,15 +375,18 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> {
&mut self,
clock: &dyn Clock,
upstream_oauth_authorization_session: UpstreamOAuthAuthorizationSession,
browser_session: &BrowserSession,
) -> Result<UpstreamOAuthAuthorizationSession, Self::Error> {
let consumed_at = clock.now();
sqlx::query!(
r#"
UPDATE upstream_oauth_authorization_sessions
SET consumed_at = $1
WHERE upstream_oauth_authorization_session_id = $2
SET consumed_at = $1,
user_session_id = $2
WHERE upstream_oauth_authorization_session_id = $3
"#,
consumed_at,
Uuid::from(browser_session.id),
Uuid::from(upstream_oauth_authorization_session.id),
)
.traced()
@@ -577,7 +580,7 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> {
),
err,
)]
async fn cleanup(
async fn cleanup_orphaned(
&mut self,
since: Option<Ulid>,
until: Ulid,
@@ -593,6 +596,7 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> {
FROM upstream_oauth_authorization_sessions
WHERE ($1::uuid IS NULL OR upstream_oauth_authorization_session_id > $1)
AND upstream_oauth_authorization_session_id <= $2
AND user_session_id IS NULL
ORDER BY upstream_oauth_authorization_session_id
LIMIT $3
)

View File

@@ -7,7 +7,8 @@
use async_trait::async_trait;
use mas_data_model::{
Clock, UpstreamOAuthAuthorizationSession, UpstreamOAuthLink, UpstreamOAuthProvider,
BrowserSession, Clock, UpstreamOAuthAuthorizationSession, UpstreamOAuthLink,
UpstreamOAuthProvider,
};
use rand_core::RngCore;
use ulid::Ulid;
@@ -167,6 +168,8 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync {
///
/// * `clock`: the clock source
/// * `upstream_oauth_authorization_session`: the session to consume
/// * `browser_session`: the browser session that was authenticated with
/// this authorization session
///
/// # Errors
///
@@ -175,6 +178,7 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync {
&mut self,
clock: &dyn Clock,
upstream_oauth_authorization_session: UpstreamOAuthAuthorizationSession,
browser_session: &BrowserSession,
) -> Result<UpstreamOAuthAuthorizationSession, Self::Error>;
/// List [`UpstreamOAuthAuthorizationSession`] with the given filter and
@@ -207,9 +211,11 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync {
async fn count(&mut self, filter: UpstreamOAuthSessionFilter<'_>)
-> Result<usize, Self::Error>;
/// Cleanup old authorization sessions
/// Cleanup old authorization sessions that are not linked to a user session
///
/// This will delete sessions with IDs up to and including `until`.
/// Authorization sessions with a user session linked must be kept around to
/// avoid breaking features like OIDC Backchannel Logout.
///
/// Returns the number of sessions deleted and the cursor for the next batch
///
@@ -223,7 +229,7 @@ pub trait UpstreamOAuthSessionRepository: Send + Sync {
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn cleanup(
async fn cleanup_orphaned(
&mut self,
since: Option<Ulid>,
until: Ulid,
@@ -262,6 +268,7 @@ repository_impl!(UpstreamOAuthSessionRepository:
&mut self,
clock: &dyn Clock,
upstream_oauth_authorization_session: UpstreamOAuthAuthorizationSession,
browser_session: &BrowserSession,
) -> Result<UpstreamOAuthAuthorizationSession, Self::Error>;
async fn list(
@@ -272,7 +279,7 @@ repository_impl!(UpstreamOAuthSessionRepository:
async fn count(&mut self, filter: UpstreamOAuthSessionFilter<'_>) -> Result<usize, Self::Error>;
async fn cleanup(
async fn cleanup_orphaned(
&mut self,
since: Option<Ulid>,
until: Ulid,

View File

@@ -345,7 +345,7 @@ impl RunnableJob for CleanupUpstreamOAuthSessionsJob {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, cursor) = repo
.upstream_oauth_session()
.cleanup(since, until, BATCH_SIZE)
.cleanup_orphaned(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;

View File

@@ -211,12 +211,16 @@ pub async fn init(
"0 57 * * * *".parse()?,
mas_storage::queue::CleanupUserEmailAuthenticationsJob,
)
.add_schedule(
"cleanup-upstream-oauth-sessions",
// Run this job every hour
"0 58 * * * *".parse()?,
mas_storage::queue::CleanupUpstreamOAuthSessionsJob,
)
// This job is currently disabled, as it needs a database backfill to
// happen, which will happen in the next release. Some context in
// https://github.com/element-hq/matrix-authentication-service/issues/5435
//
//.add_schedule(
// "cleanup-upstream-oauth-sessions",
// // Run this job every hour
// "0 58 * * * *".parse()?,
// mas_storage::queue::CleanupUpstreamOAuthSessionsJob,
//)
.add_schedule(
"cleanup-upstream-oauth-links",
// Run this job every hour