Admin API for adding and removing upstream oauth links (#4255)

This commit is contained in:
Quentin Gliech
2025-04-09 13:33:16 +02:00
committed by GitHub
13 changed files with 1027 additions and 15 deletions

View File

@@ -30,6 +30,12 @@ pub enum UpstreamOAuthAuthorizationSessionState {
extra_callback_parameters: Option<serde_json::Value>,
userinfo: Option<serde_json::Value>,
},
Unlinked {
completed_at: DateTime<Utc>,
consumed_at: Option<DateTime<Utc>>,
unlinked_at: DateTime<Utc>,
id_token: Option<String>,
},
}
impl UpstreamOAuthAuthorizationSessionState {
@@ -57,7 +63,9 @@ impl UpstreamOAuthAuthorizationSessionState {
extra_callback_parameters,
userinfo,
}),
Self::Completed { .. } | Self::Consumed { .. } => Err(InvalidTransitionError),
Self::Completed { .. } | Self::Consumed { .. } | Self::Unlinked { .. } => {
Err(InvalidTransitionError)
}
}
}
@@ -85,7 +93,9 @@ impl UpstreamOAuthAuthorizationSessionState {
extra_callback_parameters,
userinfo,
}),
Self::Pending | Self::Consumed { .. } => Err(InvalidTransitionError),
Self::Pending | Self::Consumed { .. } | Self::Unlinked { .. } => {
Err(InvalidTransitionError)
}
}
}
@@ -98,7 +108,7 @@ impl UpstreamOAuthAuthorizationSessionState {
#[must_use]
pub fn link_id(&self) -> Option<Ulid> {
match self {
Self::Pending => None,
Self::Pending | Self::Unlinked { .. } => None,
Self::Completed { link_id, .. } | Self::Consumed { link_id, .. } => Some(*link_id),
}
}
@@ -114,9 +124,9 @@ impl UpstreamOAuthAuthorizationSessionState {
pub fn completed_at(&self) -> Option<DateTime<Utc>> {
match self {
Self::Pending => None,
Self::Completed { completed_at, .. } | Self::Consumed { completed_at, .. } => {
Some(*completed_at)
}
Self::Completed { completed_at, .. }
| Self::Consumed { completed_at, .. }
| Self::Unlinked { completed_at, .. } => Some(*completed_at),
}
}
@@ -130,9 +140,9 @@ impl UpstreamOAuthAuthorizationSessionState {
pub fn id_token(&self) -> Option<&str> {
match self {
Self::Pending => None,
Self::Completed { id_token, .. } | Self::Consumed { id_token, .. } => {
id_token.as_deref()
}
Self::Completed { id_token, .. }
| Self::Consumed { id_token, .. }
| Self::Unlinked { id_token, .. } => id_token.as_deref(),
}
}
@@ -145,7 +155,7 @@ impl UpstreamOAuthAuthorizationSessionState {
#[must_use]
pub fn extra_callback_parameters(&self) -> Option<&serde_json::Value> {
match self {
Self::Pending => None,
Self::Pending | Self::Unlinked { .. } => None,
Self::Completed {
extra_callback_parameters,
..
@@ -160,7 +170,7 @@ impl UpstreamOAuthAuthorizationSessionState {
#[must_use]
pub fn userinfo(&self) -> Option<&serde_json::Value> {
match self {
Self::Pending => None,
Self::Pending | Self::Unlinked { .. } => None,
Self::Completed { userinfo, .. } | Self::Consumed { userinfo, .. } => userinfo.as_ref(),
}
}
@@ -177,6 +187,22 @@ impl UpstreamOAuthAuthorizationSessionState {
match self {
Self::Pending | Self::Completed { .. } => None,
Self::Consumed { consumed_at, .. } => Some(*consumed_at),
Self::Unlinked { consumed_at, .. } => *consumed_at,
}
}
/// Get the time at which the upstream OAuth 2.0 authorization session was
/// unlinked.
///
/// Returns `None` if the upstream OAuth 2.0 authorization session state is
/// not [`Unlinked`].
///
/// [`Unlinked`]: UpstreamOAuthAuthorizationSessionState::Unlinked
#[must_use]
pub fn unlinked_at(&self) -> Option<DateTime<Utc>> {
match self {
Self::Pending | Self::Completed { .. } | Self::Consumed { .. } => None,
Self::Unlinked { unlinked_at, .. } => Some(*unlinked_at),
}
}
@@ -206,6 +232,15 @@ impl UpstreamOAuthAuthorizationSessionState {
pub fn is_consumed(&self) -> bool {
matches!(self, Self::Consumed { .. })
}
/// Returns `true` if the upstream OAuth 2.0 authorization session state is
/// [`Unlinked`].
///
/// [`Unlinked`]: UpstreamOAuthAuthorizationSessionState::Unlinked
#[must_use]
pub fn is_unlinked(&self) -> bool {
matches!(self, Self::Unlinked { .. })
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]

View File

@@ -26,6 +26,7 @@ mod user_emails;
mod user_sessions;
mod users;
#[allow(clippy::too_many_lines)]
pub fn router<S>() -> ApiRouter<S>
where
S: Clone + Send + Sync + 'static,
@@ -123,6 +124,10 @@ where
get_with(
self::upstream_oauth_links::list,
self::upstream_oauth_links::list_doc,
)
.post_with(
self::upstream_oauth_links::add,
self::upstream_oauth_links::add_doc,
),
)
.api_route(
@@ -130,6 +135,10 @@ where
get_with(
self::upstream_oauth_links::get,
self::upstream_oauth_links::get_doc,
)
.delete_with(
self::upstream_oauth_links::delete,
self::upstream_oauth_links::delete_doc,
),
)
}

View File

@@ -0,0 +1,463 @@
// Copyright 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use aide::{NoApi, OperationIo, transform::TransformOperation};
use axum::{Json, response::IntoResponse};
use hyper::StatusCode;
use mas_storage::BoxRng;
use schemars::JsonSchema;
use serde::Deserialize;
use ulid::Ulid;
use crate::{
admin::{
call_context::CallContext,
model::{Resource, UpstreamOAuthLink},
response::{ErrorResponse, SingleResponse},
},
impl_from_error_for_route,
};
#[derive(Debug, thiserror::Error, OperationIo)]
#[aide(output_with = "Json<ErrorResponse>")]
pub enum RouteError {
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("Upstream Oauth 2.0 Provider ID {0} with subject {1} is already linked to a user")]
LinkAlreadyExists(Ulid, String),
#[error("User ID {0} not found")]
UserNotFound(Ulid),
#[error("Upstream OAuth 2.0 Provider ID {0} not found")]
ProviderNotFound(Ulid),
}
impl_from_error_for_route!(mas_storage::RepositoryError);
impl IntoResponse for RouteError {
fn into_response(self) -> axum::response::Response {
let error = ErrorResponse::from_error(&self);
let status = match self {
Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::LinkAlreadyExists(_, _) => StatusCode::CONFLICT,
Self::UserNotFound(_) | Self::ProviderNotFound(_) => StatusCode::NOT_FOUND,
};
(status, Json(error)).into_response()
}
}
/// # JSON payload for the `POST /api/admin/v1/upstream-oauth-links`
#[derive(Deserialize, JsonSchema)]
#[serde(rename = "AddUpstreamOauthLinkRequest")]
pub struct Request {
/// The ID of the user to which the link should be added.
#[schemars(with = "crate::admin::schema::Ulid")]
user_id: Ulid,
/// The ID of the upstream provider to which the link is for.
#[schemars(with = "crate::admin::schema::Ulid")]
provider_id: Ulid,
/// The subject (sub) claim of the user on the provider.
subject: String,
/// A human readable account name.
human_account_name: Option<String>,
}
pub fn doc(operation: TransformOperation) -> TransformOperation {
operation
.id("addUpstreamOAuthLink")
.summary("Add an upstream OAuth 2.0 link")
.tag("upstream-oauth-link")
.response_with::<200, Json<SingleResponse<UpstreamOAuthLink>>, _>(|t| {
let [sample, ..] = UpstreamOAuthLink::samples();
let response = SingleResponse::new_canonical(sample);
t.description("An existing Upstream OAuth 2.0 link was associated to a user")
.example(response)
})
.response_with::<201, Json<SingleResponse<UpstreamOAuthLink>>, _>(|t| {
let [sample, ..] = UpstreamOAuthLink::samples();
let response = SingleResponse::new_canonical(sample);
t.description("A new Upstream OAuth 2.0 link was created")
.example(response)
})
.response_with::<409, RouteError, _>(|t| {
let [provider_sample, ..] = UpstreamOAuthLink::samples();
let response = ErrorResponse::from_error(&RouteError::LinkAlreadyExists(
provider_sample.id(),
String::from("subject1"),
));
t.description("The subject from the provider is already linked to another user")
.example(response)
})
.response_with::<404, RouteError, _>(|t| {
let response = ErrorResponse::from_error(&RouteError::UserNotFound(Ulid::nil()));
t.description("User or provider was not found")
.example(response)
})
}
#[tracing::instrument(name = "handler.admin.v1.upstream_oauth_links.post", skip_all, err)]
pub async fn handler(
CallContext {
mut repo, clock, ..
}: CallContext,
NoApi(mut rng): NoApi<BoxRng>,
Json(params): Json<Request>,
) -> Result<(StatusCode, Json<SingleResponse<UpstreamOAuthLink>>), RouteError> {
// Find the user
let user = repo
.user()
.lookup(params.user_id)
.await?
.ok_or(RouteError::UserNotFound(params.user_id))?;
// Find the provider
let provider = repo
.upstream_oauth_provider()
.lookup(params.provider_id)
.await?
.ok_or(RouteError::ProviderNotFound(params.provider_id))?;
let maybe_link = repo
.upstream_oauth_link()
.find_by_subject(&provider, &params.subject)
.await?;
if let Some(mut link) = maybe_link {
if link.user_id.is_some() {
return Err(RouteError::LinkAlreadyExists(
link.provider_id,
link.subject,
));
}
repo.upstream_oauth_link()
.associate_to_user(&link, &user)
.await?;
link.user_id = Some(user.id);
repo.save().await?;
return Ok((
StatusCode::OK,
Json(SingleResponse::new_canonical(link.into())),
));
}
let mut link = repo
.upstream_oauth_link()
.add(
&mut rng,
&clock,
&provider,
params.subject,
params.human_account_name,
)
.await?;
repo.upstream_oauth_link()
.associate_to_user(&link, &user)
.await?;
link.user_id = Some(user.id);
repo.save().await?;
Ok((
StatusCode::CREATED,
Json(SingleResponse::new_canonical(link.into())),
))
}
#[cfg(test)]
mod tests {
use hyper::{Request, StatusCode};
use insta::assert_json_snapshot;
use sqlx::PgPool;
use ulid::Ulid;
use super::super::test_utils;
use crate::test_utils::{RequestBuilderExt, ResponseExt, TestState, setup};
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_create(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let mut rng = state.rng();
let mut repo = state.repository().await.unwrap();
let alice = repo
.user()
.add(&mut rng, &state.clock, "alice".to_owned())
.await
.unwrap();
let provider = repo
.upstream_oauth_provider()
.add(
&mut rng,
&state.clock,
test_utils::oidc_provider_params("provider1"),
)
.await
.unwrap();
repo.save().await.unwrap();
let request = Request::post("/api/admin/v1/upstream-oauth-links")
.bearer(&token)
.json(serde_json::json!({
"user_id": alice.id,
"provider_id": provider.id,
"subject": "subject1"
}));
let response = state.request(request).await;
response.assert_status(StatusCode::CREATED);
let body: serde_json::Value = response.json();
assert_json_snapshot!(body, @r###"
{
"data": {
"type": "upstream-oauth-link",
"id": "01FSHN9AG07HNEZXNQM2KNBNF6",
"attributes": {
"created_at": "2022-01-16T14:40:00Z",
"provider_id": "01FSHN9AG0AJ6AC5HQ9X6H4RP4",
"subject": "subject1",
"user_id": "01FSHN9AG0MZAA6S4AF7CTV32E",
"human_account_name": null
},
"links": {
"self": "/api/admin/v1/upstream-oauth-links/01FSHN9AG07HNEZXNQM2KNBNF6"
}
},
"links": {
"self": "/api/admin/v1/upstream-oauth-links/01FSHN9AG07HNEZXNQM2KNBNF6"
}
}
"###);
}
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_association(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let mut rng = state.rng();
let mut repo = state.repository().await.unwrap();
let alice = repo
.user()
.add(&mut rng, &state.clock, "alice".to_owned())
.await
.unwrap();
let provider = repo
.upstream_oauth_provider()
.add(
&mut rng,
&state.clock,
test_utils::oidc_provider_params("provider1"),
)
.await
.unwrap();
// Existing unfinished link
repo.upstream_oauth_link()
.add(
&mut rng,
&state.clock,
&provider,
String::from("subject1"),
None,
)
.await
.unwrap();
repo.save().await.unwrap();
let request = Request::post("/api/admin/v1/upstream-oauth-links")
.bearer(&token)
.json(serde_json::json!({
"user_id": alice.id,
"provider_id": provider.id,
"subject": "subject1"
}));
let response = state.request(request).await;
response.assert_status(StatusCode::OK);
let body: serde_json::Value = response.json();
assert_json_snapshot!(body, @r###"
{
"data": {
"type": "upstream-oauth-link",
"id": "01FSHN9AG09NMZYX8MFYH578R9",
"attributes": {
"created_at": "2022-01-16T14:40:00Z",
"provider_id": "01FSHN9AG0AJ6AC5HQ9X6H4RP4",
"subject": "subject1",
"user_id": "01FSHN9AG0MZAA6S4AF7CTV32E",
"human_account_name": null
},
"links": {
"self": "/api/admin/v1/upstream-oauth-links/01FSHN9AG09NMZYX8MFYH578R9"
}
},
"links": {
"self": "/api/admin/v1/upstream-oauth-links/01FSHN9AG09NMZYX8MFYH578R9"
}
}
"###);
}
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_link_already_exists(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let mut rng = state.rng();
let mut repo = state.repository().await.unwrap();
let alice = repo
.user()
.add(&mut rng, &state.clock, "alice".to_owned())
.await
.unwrap();
let bob = repo
.user()
.add(&mut rng, &state.clock, "bob".to_owned())
.await
.unwrap();
let provider = repo
.upstream_oauth_provider()
.add(
&mut rng,
&state.clock,
test_utils::oidc_provider_params("provider1"),
)
.await
.unwrap();
let link = repo
.upstream_oauth_link()
.add(
&mut rng,
&state.clock,
&provider,
String::from("subject1"),
None,
)
.await
.unwrap();
repo.upstream_oauth_link()
.associate_to_user(&link, &alice)
.await
.unwrap();
repo.save().await.unwrap();
let request = Request::post("/api/admin/v1/upstream-oauth-links")
.bearer(&token)
.json(serde_json::json!({
"user_id": bob.id,
"provider_id": provider.id,
"subject": "subject1"
}));
let response = state.request(request).await;
response.assert_status(StatusCode::CONFLICT);
let body: serde_json::Value = response.json();
assert_json_snapshot!(body, @r###"
{
"errors": [
{
"title": "Upstream Oauth 2.0 Provider ID 01FSHN9AG09NMZYX8MFYH578R9 with subject subject1 is already linked to a user"
}
]
}
"###);
}
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_user_not_found(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let mut rng = state.rng();
let mut repo = state.repository().await.unwrap();
let provider = repo
.upstream_oauth_provider()
.add(
&mut rng,
&state.clock,
test_utils::oidc_provider_params("provider1"),
)
.await
.unwrap();
repo.save().await.unwrap();
let request = Request::post("/api/admin/v1/upstream-oauth-links")
.bearer(&token)
.json(serde_json::json!({
"user_id": Ulid::nil(),
"provider_id": provider.id,
"subject": "subject1"
}));
let response = state.request(request).await;
response.assert_status(StatusCode::NOT_FOUND);
let body: serde_json::Value = response.json();
assert_json_snapshot!(body, @r###"
{
"errors": [
{
"title": "User ID 00000000000000000000000000 not found"
}
]
}
"###);
}
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_provider_not_found(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let mut rng = state.rng();
let mut repo = state.repository().await.unwrap();
let alice = repo
.user()
.add(&mut rng, &state.clock, "alice".to_owned())
.await
.unwrap();
repo.save().await.unwrap();
let request = Request::post("/api/admin/v1/upstream-oauth-links")
.bearer(&token)
.json(serde_json::json!({
"user_id": alice.id,
"provider_id": Ulid::nil(),
"subject": "subject1"
}));
let response = state.request(request).await;
response.assert_status(StatusCode::NOT_FOUND);
let body: serde_json::Value = response.json();
assert_json_snapshot!(body, @r###"
{
"errors": [
{
"title": "Upstream OAuth 2.0 Provider ID 00000000000000000000000000 not found"
}
]
}
"###);
}
}

View File

@@ -0,0 +1,185 @@
// Copyright 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use aide::{OperationIo, transform::TransformOperation};
use axum::{Json, response::IntoResponse};
use hyper::StatusCode;
use ulid::Ulid;
use crate::{
admin::{call_context::CallContext, params::UlidPathParam, response::ErrorResponse},
impl_from_error_for_route,
};
#[derive(Debug, thiserror::Error, OperationIo)]
#[aide(output_with = "Json<ErrorResponse>")]
pub enum RouteError {
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("Upstream OAuth 2.0 Link ID {0} not found")]
NotFound(Ulid),
}
impl_from_error_for_route!(mas_storage::RepositoryError);
impl IntoResponse for RouteError {
fn into_response(self) -> axum::response::Response {
let error = ErrorResponse::from_error(&self);
let status = match self {
Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::NotFound(_) => StatusCode::NOT_FOUND,
};
(status, Json(error)).into_response()
}
}
pub fn doc(operation: TransformOperation) -> TransformOperation {
operation
.id("deleteUpstreamOAuthLink")
.summary("Delete an upstream OAuth 2.0 link")
.tag("upstream-oauth-link")
.response_with::<204, (), _>(|t| t.description("Upstream OAuth 2.0 link was deleted"))
.response_with::<404, RouteError, _>(|t| {
let response = ErrorResponse::from_error(&RouteError::NotFound(Ulid::nil()));
t.description("Upstream OAuth 2.0 link was not found")
.example(response)
})
}
#[tracing::instrument(name = "handler.admin.v1.upstream_oauth_links.delete", skip_all, err)]
pub async fn handler(
CallContext {
mut repo, clock, ..
}: CallContext,
id: UlidPathParam,
) -> Result<StatusCode, RouteError> {
let link = repo
.upstream_oauth_link()
.lookup(*id)
.await?
.ok_or(RouteError::NotFound(*id))?;
repo.upstream_oauth_link().remove(&clock, link).await?;
repo.save().await?;
Ok(StatusCode::NO_CONTENT)
}
#[cfg(test)]
mod tests {
use hyper::{Request, StatusCode};
use mas_data_model::UpstreamOAuthAuthorizationSessionState;
use sqlx::PgPool;
use ulid::Ulid;
use super::super::test_utils;
use crate::test_utils::{RequestBuilderExt, ResponseExt, TestState, setup};
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_delete(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let mut rng = state.rng();
let mut repo = state.repository().await.unwrap();
let alice = repo
.user()
.add(&mut rng, &state.clock, "alice".to_owned())
.await
.unwrap();
let provider = repo
.upstream_oauth_provider()
.add(
&mut rng,
&state.clock,
test_utils::oidc_provider_params("provider1"),
)
.await
.unwrap();
// Pretend it was linked by an authorization session
let session = repo
.upstream_oauth_session()
.add(
&mut rng,
&state.clock,
&provider,
String::new(),
None,
String::new(),
)
.await
.unwrap();
let link = repo
.upstream_oauth_link()
.add(
&mut rng,
&state.clock,
&provider,
String::from("subject1"),
None,
)
.await
.unwrap();
let session = repo
.upstream_oauth_session()
.complete_with_link(&state.clock, session, &link, None, None, None)
.await
.unwrap();
repo.upstream_oauth_link()
.associate_to_user(&link, &alice)
.await
.unwrap();
repo.save().await.unwrap();
let request = Request::delete(format!("/api/admin/v1/upstream-oauth-links/{}", link.id))
.bearer(&token)
.empty();
let response = state.request(request).await;
response.assert_status(StatusCode::NO_CONTENT);
// Verify that the link was deleted
let request = Request::get(format!("/api/admin/v1/upstream-oauth-links/{}", link.id))
.bearer(&token)
.empty();
let response = state.request(request).await;
response.assert_status(StatusCode::NOT_FOUND);
// Verify that the session was marked as unlinked
let mut repo = state.repository().await.unwrap();
let session = repo
.upstream_oauth_session()
.lookup(session.id)
.await
.unwrap()
.unwrap();
assert!(matches!(
session.state,
UpstreamOAuthAuthorizationSessionState::Unlinked { .. }
));
}
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
async fn test_not_found(pool: PgPool) {
setup();
let mut state = TestState::from_pool(pool).await.unwrap();
let token = state.token_with_scope("urn:mas:admin").await;
let link_id = Ulid::nil();
let request = Request::delete(format!("/api/admin/v1/upstream-oauth-links/{link_id}"))
.bearer(&token)
.empty();
let response = state.request(request).await;
response.assert_status(StatusCode::NOT_FOUND);
}
}

View File

@@ -3,10 +3,14 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
mod add;
mod delete;
mod get;
mod list;
pub use self::{
add::{doc as add_doc, handler as add},
delete::{doc as delete_doc, handler as delete},
get::{doc as get_doc, handler as get},
list::{doc as list_doc, handler as list},
};

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n upstream_oauth_authorization_session_id,\n upstream_oauth_provider_id,\n upstream_oauth_link_id,\n state,\n code_challenge_verifier,\n nonce,\n id_token,\n extra_callback_parameters,\n userinfo,\n created_at,\n completed_at,\n consumed_at\n FROM upstream_oauth_authorization_sessions\n WHERE upstream_oauth_authorization_session_id = $1\n ",
"query": "\n SELECT\n upstream_oauth_authorization_session_id,\n upstream_oauth_provider_id,\n upstream_oauth_link_id,\n state,\n code_challenge_verifier,\n nonce,\n id_token,\n extra_callback_parameters,\n userinfo,\n created_at,\n completed_at,\n consumed_at,\n unlinked_at\n FROM upstream_oauth_authorization_sessions\n WHERE upstream_oauth_authorization_session_id = $1\n ",
"describe": {
"columns": [
{
@@ -62,6 +62,11 @@
"ordinal": 11,
"name": "consumed_at",
"type_info": "Timestamptz"
},
{
"ordinal": 12,
"name": "unlinked_at",
"type_info": "Timestamptz"
}
],
"parameters": {
@@ -81,8 +86,9 @@
true,
false,
true,
true,
true
]
},
"hash": "ea30b3809fd7c1d4e9983909c0219f343953a89f2a43f6b8c4ab4fbea7645ccc"
"hash": "37a124678323380357fa9d1375fd125fb35476ac3008e5adbd04a761d5edcd42"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE upstream_oauth_authorization_sessions SET\n upstream_oauth_link_id = NULL,\n unlinked_at = $2\n WHERE upstream_oauth_link_id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz"
]
},
"nullable": []
},
"hash": "3ed73cfce8ef6a1108f454e18b1668f64b76975dba07e67d04ed7a52e2e8107f"
}

View File

@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM upstream_oauth_links\n WHERE upstream_oauth_link_id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "cc60ad934d347fb4546205d1fe07e9d2f127cb15b1bb650d1ea3805a4c55b196"
}

View File

@@ -0,0 +1,7 @@
-- Copyright 2025 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.
ALTER TABLE upstream_oauth_authorization_sessions
ADD COLUMN unlinked_at TIMESTAMP WITH TIME ZONE;

View File

@@ -11,10 +11,12 @@ use mas_storage::{
Clock, Page, Pagination,
upstream_oauth2::{UpstreamOAuthLinkFilter, UpstreamOAuthLinkRepository},
};
use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
use rand::RngCore;
use sea_query::{Expr, PostgresQueryBuilder, Query, enum_def};
use sea_query_binder::SqlxBinder;
use sqlx::PgConnection;
use tracing::Instrument;
use ulid::Ulid;
use uuid::Uuid;
@@ -374,4 +376,63 @@ impl UpstreamOAuthLinkRepository for PgUpstreamOAuthLinkRepository<'_> {
.try_into()
.map_err(DatabaseError::to_invalid_operation)
}
#[tracing::instrument(
name = "db.upstream_oauth_link.remove",
skip_all,
fields(
db.query.text,
upstream_oauth_link.id,
upstream_oauth_link.provider_id,
%upstream_oauth_link.subject,
),
err,
)]
async fn remove(
&mut self,
clock: &dyn Clock,
upstream_oauth_link: UpstreamOAuthLink,
) -> Result<(), Self::Error> {
// Unlink the authorization sessions first, as they have a foreign key
// constraint on the links.
let span = tracing::info_span!(
"db.upstream_oauth_link.remove.unlink",
{ DB_QUERY_TEXT } = tracing::field::Empty
);
sqlx::query!(
r#"
UPDATE upstream_oauth_authorization_sessions SET
upstream_oauth_link_id = NULL,
unlinked_at = $2
WHERE upstream_oauth_link_id = $1
"#,
Uuid::from(upstream_oauth_link.id),
clock.now()
)
.record(&span)
.execute(&mut *self.conn)
.instrument(span)
.await?;
// Then delete the link itself
let span = tracing::info_span!(
"db.upstream_oauth_link.remove.delete",
{ DB_QUERY_TEXT } = tracing::field::Empty
);
let res = sqlx::query!(
r#"
DELETE FROM upstream_oauth_links
WHERE upstream_oauth_link_id = $1
"#,
Uuid::from(upstream_oauth_link.id),
)
.record(&span)
.execute(&mut *self.conn)
.instrument(span)
.await?;
DatabaseError::ensure_affected_rows(&res, 1)?;
Ok(())
}
}

View File

@@ -45,6 +45,7 @@ struct SessionLookup {
completed_at: Option<DateTime<Utc>>,
consumed_at: Option<DateTime<Utc>>,
extra_callback_parameters: Option<serde_json::Value>,
unlinked_at: Option<DateTime<Utc>>,
}
impl TryFrom<SessionLookup> for UpstreamOAuthAuthorizationSession {
@@ -59,8 +60,11 @@ impl TryFrom<SessionLookup> for UpstreamOAuthAuthorizationSession {
value.userinfo,
value.completed_at,
value.consumed_at,
value.unlinked_at,
) {
(None, None, None, None, None, None) => UpstreamOAuthAuthorizationSessionState::Pending,
(None, None, None, None, None, None, None) => {
UpstreamOAuthAuthorizationSessionState::Pending
}
(
Some(link_id),
id_token,
@@ -68,6 +72,7 @@ impl TryFrom<SessionLookup> for UpstreamOAuthAuthorizationSession {
userinfo,
Some(completed_at),
None,
None,
) => UpstreamOAuthAuthorizationSessionState::Completed {
completed_at,
link_id: link_id.into(),
@@ -82,6 +87,7 @@ impl TryFrom<SessionLookup> for UpstreamOAuthAuthorizationSession {
userinfo,
Some(completed_at),
Some(consumed_at),
None,
) => UpstreamOAuthAuthorizationSessionState::Consumed {
completed_at,
link_id: link_id.into(),
@@ -90,6 +96,14 @@ impl TryFrom<SessionLookup> for UpstreamOAuthAuthorizationSession {
userinfo,
consumed_at,
},
(_, id_token, _, _, Some(completed_at), consumed_at, Some(unlinked_at)) => {
UpstreamOAuthAuthorizationSessionState::Unlinked {
completed_at,
id_token,
consumed_at,
unlinked_at,
}
}
_ => {
return Err(DatabaseInconsistencyError::on(
"upstream_oauth_authorization_sessions",
@@ -142,7 +156,8 @@ impl UpstreamOAuthSessionRepository for PgUpstreamOAuthSessionRepository<'_> {
userinfo,
created_at,
completed_at,
consumed_at
consumed_at,
unlinked_at
FROM upstream_oauth_authorization_sessions
WHERE upstream_oauth_authorization_session_id = $1
"#,

View File

@@ -200,6 +200,22 @@ pub trait UpstreamOAuthLinkRepository: Send + Sync {
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn count(&mut self, filter: UpstreamOAuthLinkFilter<'_>) -> Result<usize, Self::Error>;
/// Delete a [`UpstreamOAuthLink`]
///
/// # Parameters
///
/// * `clock`: The clock used to generate timestamps
/// * `upstream_oauth_link`: The [`UpstreamOAuthLink`] to delete
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn remove(
&mut self,
clock: &dyn Clock,
upstream_oauth_link: UpstreamOAuthLink,
) -> Result<(), Self::Error>;
}
repository_impl!(UpstreamOAuthLinkRepository:
@@ -233,4 +249,6 @@ repository_impl!(UpstreamOAuthLinkRepository:
) -> Result<Page<UpstreamOAuthLink>, Self::Error>;
async fn count(&mut self, filter: UpstreamOAuthLinkFilter<'_>) -> Result<usize, Self::Error>;
async fn remove(&mut self, clock: &dyn Clock, upstream_oauth_link: UpstreamOAuthLink) -> Result<(), Self::Error>;
);

View File

@@ -2289,6 +2289,117 @@
}
}
}
},
"post": {
"tags": [
"upstream-oauth-link"
],
"summary": "Add an upstream OAuth 2.0 link",
"operationId": "addUpstreamOAuthLink",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/AddUpstreamOauthLinkRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "An existing Upstream OAuth 2.0 link was associated to a user",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/SingleResponse_for_UpstreamOAuthLink"
},
"example": {
"data": {
"type": "upstream-oauth-link",
"id": "01040G2081040G2081040G2081",
"attributes": {
"created_at": "1970-01-01T00:00:00Z",
"provider_id": "02081040G2081040G2081040G2",
"subject": "john-42",
"user_id": "030C1G60R30C1G60R30C1G60R3",
"human_account_name": "john.doe@example.com"
},
"links": {
"self": "/api/admin/v1/upstream-oauth-links/01040G2081040G2081040G2081"
}
},
"links": {
"self": "/api/admin/v1/upstream-oauth-links/01040G2081040G2081040G2081"
}
}
}
}
},
"201": {
"description": "A new Upstream OAuth 2.0 link was created",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/SingleResponse_for_UpstreamOAuthLink"
},
"example": {
"data": {
"type": "upstream-oauth-link",
"id": "01040G2081040G2081040G2081",
"attributes": {
"created_at": "1970-01-01T00:00:00Z",
"provider_id": "02081040G2081040G2081040G2",
"subject": "john-42",
"user_id": "030C1G60R30C1G60R30C1G60R3",
"human_account_name": "john.doe@example.com"
},
"links": {
"self": "/api/admin/v1/upstream-oauth-links/01040G2081040G2081040G2081"
}
},
"links": {
"self": "/api/admin/v1/upstream-oauth-links/01040G2081040G2081040G2081"
}
}
}
}
},
"409": {
"description": "The subject from the provider is already linked to another user",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
},
"example": {
"errors": [
{
"title": "Upstream Oauth 2.0 Provider ID 01040G2081040G2081040G2081 with subject subject1 is already linked to a user"
}
]
}
}
}
},
"404": {
"description": "User or provider was not found",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
},
"example": {
"errors": [
{
"title": "User ID 00000000000000000000000000 not found"
}
]
}
}
}
}
}
}
},
"/api/admin/v1/upstream-oauth-links/{id}": {
@@ -2358,6 +2469,47 @@
}
}
}
},
"delete": {
"tags": [
"upstream-oauth-link"
],
"summary": "Delete an upstream OAuth 2.0 link",
"operationId": "deleteUpstreamOAuthLink",
"parameters": [
{
"in": "path",
"name": "id",
"required": true,
"schema": {
"title": "The ID of the resource",
"$ref": "#/components/schemas/ULID"
},
"style": "simple"
}
],
"responses": {
"204": {
"description": "Upstream OAuth 2.0 link was deleted"
},
"404": {
"description": "Upstream OAuth 2.0 link was not found",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
},
"example": {
"errors": [
{
"title": "Upstream OAuth 2.0 Link ID 00000000000000000000000000 not found"
}
]
}
}
}
}
}
}
}
},
@@ -3504,6 +3656,34 @@
}
}
},
"AddUpstreamOauthLinkRequest": {
"title": "JSON payload for the `POST /api/admin/v1/upstream-oauth-links`",
"type": "object",
"required": [
"provider_id",
"subject",
"user_id"
],
"properties": {
"user_id": {
"description": "The ID of the user to which the link should be added.",
"$ref": "#/components/schemas/ULID"
},
"provider_id": {
"description": "The ID of the upstream provider to which the link is for.",
"$ref": "#/components/schemas/ULID"
},
"subject": {
"description": "The subject (sub) claim of the user on the provider.",
"type": "string"
},
"human_account_name": {
"description": "A human readable account name.",
"type": "string",
"nullable": true
}
}
},
"SingleResponse_for_UpstreamOAuthLink": {
"description": "A top-level response with a single resource",
"type": "object",