From d9f0fececd98f6131e327ccd2f8b4e939ca0233a Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 4 Apr 2023 18:15:02 +0200 Subject: [PATCH] Proactively provision devices & delete them when using the compat Matrix API --- crates/handlers/src/compat/login.rs | 7 +- .../handlers/src/compat/login_sso_complete.rs | 7 +- crates/handlers/src/compat/logout.rs | 14 +- crates/storage/src/job.rs | 10 +- crates/tasks/src/matrix.rs | 178 ++++++++++++++++-- 5 files changed, 192 insertions(+), 24 deletions(-) diff --git a/crates/handlers/src/compat/login.rs b/crates/handlers/src/compat/login.rs index c3d303b06..8dd19307d 100644 --- a/crates/handlers/src/compat/login.rs +++ b/crates/handlers/src/compat/login.rs @@ -21,8 +21,9 @@ use mas_storage::{ CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionRepository, CompatSsoLoginRepository, }, + job::{JobRepositoryExt, ProvisionDeviceJob}, user::{UserPasswordRepository, UserRepository}, - BoxClock, BoxRepository, BoxRng, Clock, + BoxClock, BoxRepository, BoxRng, Clock, RepositoryAccess, }; use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; @@ -379,6 +380,10 @@ async fn user_password_login( // Now that the user credentials have been verified, start a new compat session let device = Device::generate(&mut rng); + repo.job() + .schedule_job(ProvisionDeviceJob::new(&user, &device)) + .await?; + let session = repo .compat_session() .add(&mut rng, clock, &user, device) diff --git a/crates/handlers/src/compat/login_sso_complete.rs b/crates/handlers/src/compat/login_sso_complete.rs index da846eda4..b571d5f40 100644 --- a/crates/handlers/src/compat/login_sso_complete.rs +++ b/crates/handlers/src/compat/login_sso_complete.rs @@ -31,7 +31,8 @@ use mas_keystore::Encrypter; use mas_router::{CompatLoginSsoAction, PostAuthAction, Route}; use mas_storage::{ compat::{CompatSessionRepository, CompatSsoLoginRepository}, - BoxClock, BoxRepository, BoxRng, Clock, + job::{JobRepositoryExt, ProvisionDeviceJob}, + BoxClock, BoxRepository, BoxRng, Clock, RepositoryAccess, }; use mas_templates::{CompatSsoContext, ErrorContext, TemplateContext, Templates}; use serde::{Deserialize, Serialize}; @@ -193,6 +194,10 @@ pub async fn post( }; let device = Device::generate(&mut rng); + repo.job() + .schedule_job(ProvisionDeviceJob::new(&session.user, &device)) + .await?; + let compat_session = repo .compat_session() .add(&mut rng, &clock, &session.user, device) diff --git a/crates/handlers/src/compat/logout.rs b/crates/handlers/src/compat/logout.rs index 7875e48b3..4f91f3631 100644 --- a/crates/handlers/src/compat/logout.rs +++ b/crates/handlers/src/compat/logout.rs @@ -18,7 +18,8 @@ use hyper::StatusCode; use mas_data_model::TokenType; use mas_storage::{ compat::{CompatAccessTokenRepository, CompatSessionRepository}, - BoxClock, BoxRepository, Clock, + job::{DeleteDeviceJob, JobRepositoryExt}, + BoxClock, BoxRepository, Clock, RepositoryAccess, }; use thiserror::Error; @@ -95,6 +96,17 @@ pub(crate) async fn post( .filter(|s| s.is_valid()) .ok_or(RouteError::InvalidAuthorization)?; + let user = repo + .user() + .lookup(session.user_id) + .await? + // XXX: this is probably not the right error + .ok_or(RouteError::InvalidAuthorization)?; + + repo.job() + .schedule_job(DeleteDeviceJob::new(&user, &session.device)) + .await?; + repo.compat_session().finish(&clock, session).await?; repo.save().await?; diff --git a/crates/storage/src/job.rs b/crates/storage/src/job.rs index 3eb246a4a..3fdcc4eaf 100644 --- a/crates/storage/src/job.rs +++ b/crates/storage/src/job.rs @@ -216,7 +216,7 @@ where mod jobs { // XXX: Move this somewhere else? use apalis_core::job::Job; - use mas_data_model::{User, UserEmail}; + use mas_data_model::{Device, User, UserEmail}; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -280,10 +280,10 @@ mod jobs { impl ProvisionDeviceJob { /// Create a new job to provision a device for a user on the homeserver. #[must_use] - pub fn new(user: &User, device_id: &str) -> Self { + pub fn new(user: &User, device: &Device) -> Self { Self { user_id: user.id, - device_id: device_id.to_owned(), + device_id: device.as_str().to_owned(), } } @@ -314,10 +314,10 @@ mod jobs { impl DeleteDeviceJob { /// Create a new job to delete a device for a user on the homeserver. #[must_use] - pub fn new(user: &User, device_id: &str) -> Self { + pub fn new(user: &User, device: &Device) -> Self { Self { user_id: user.id, - device_id: device_id.to_owned(), + device_id: device.as_str().to_owned(), } } diff --git a/crates/tasks/src/matrix.rs b/crates/tasks/src/matrix.rs index d3c91665f..5590b02c1 100644 --- a/crates/tasks/src/matrix.rs +++ b/crates/tasks/src/matrix.rs @@ -26,15 +26,15 @@ use mas_axum_utils::axum::{ headers::{Authorization, HeaderMapExt}, http::{Request, StatusCode}, }; -use mas_http::HttpServiceExt; +use mas_http::{EmptyBody, HttpServiceExt}; use mas_storage::{ - job::{JobWithSpanContext, ProvisionUserJob}, + job::{DeleteDeviceJob, JobWithSpanContext, ProvisionDeviceJob, ProvisionUserJob}, user::{UserEmailRepository, UserRepository}, RepositoryAccess, }; use serde::{Deserialize, Serialize}; use tower::{Service, ServiceExt}; -use tracing::{info, info_span, Instrument}; +use tracing::info; use url::Url; use crate::{layers::TracingLayer, JobContextExt, State}; @@ -85,6 +85,9 @@ struct UserRequest { pub external_ids: Vec, } +/// Job to provision a user on the Matrix homeserver. +/// This works by doing a PUT request to the /_synapse/admin/v2/users/{user_id} +/// endpoint. #[tracing::instrument( name = "job.provision_user" fields(user.id = %job.user_id()), @@ -98,7 +101,7 @@ async fn provision_user( let state = ctx.state(); let matrix = state.matrix_connection(); let mut client = state - .http_client("provision-matrix-user") + .http_client("matrx.provision_user") .await? .request_bytes_to_body() .json_request(); @@ -110,7 +113,12 @@ async fn provision_user( .await? .context("User not found")?; - let mxid = format!("@{}:{}", user.username, matrix.homeserver); + // XXX: there is a lot that could go wrong in terms of encoding here + let mxid = format!( + "@{localpart}:{homeserver}", + localpart = user.username, + homeserver = matrix.homeserver + ); let three_pids = repo .user_email() @@ -142,21 +150,15 @@ async fn provision_user( repo.cancel().await?; - let mut req = Request::put( - matrix - .endpoint - .join("_synapse/admin/v2/users/")? - .join(&mxid)? - .as_str(), - ); + let path = format!("_synapse/admin/v2/users/{user_id}", user_id = mxid,); + let mut req = Request::put(matrix.endpoint.join(&path)?.as_str()); req.headers_mut() .context("Failed to get headers")? .typed_insert(Authorization::bearer(&matrix.access_token)?); let req = req.body(body).context("Failed to build request")?; - let span = info_span!("matrix.provision_user", %mxid); - let response = client.ready().await?.call(req).instrument(span).await?; + let response = client.ready().await?.call(req).await?; match response.status() { StatusCode::CREATED => info!(%user.id, %mxid, "User created"), @@ -168,6 +170,130 @@ async fn provision_user( Ok(()) } +#[derive(Serialize, Deserialize)] +struct DeviceRequest { + device_id: String, +} + +/// Job to provision a device on the Matrix homeserver. +/// This works by doing a POST request to the +/// /_synapse/admin/v2/users/{user_id}/devices endpoint. +#[tracing::instrument( + name = "job.provision_device" + fields( + user.id = %job.user_id(), + device.id = %job.device_id(), + ), + skip_all, + err(Debug), +)] +async fn provision_device( + job: JobWithSpanContext, + ctx: JobContext, +) -> Result<(), anyhow::Error> { + let state = ctx.state(); + let matrix = state.matrix_connection(); + let mut client = state + .http_client("matrix.provision_device") + .await? + .request_bytes_to_body() + .json_request(); + let mut repo = state.repository().await?; + + let user = repo + .user() + .lookup(job.user_id()) + .await? + .context("User not found")?; + + // XXX: there is a lot that could go wrong in terms of encoding here + let mxid = format!( + "@{localpart}:{homeserver}", + localpart = user.username, + homeserver = matrix.homeserver + ); + + let path = format!("_synapse/admin/v2/users/{user_id}/devices", user_id = mxid); + let mut req = Request::post(matrix.endpoint.join(&path)?.as_str()); + req.headers_mut() + .context("Failed to get headers")? + .typed_insert(Authorization::bearer(&matrix.access_token)?); + + let req = req + .body(DeviceRequest { + device_id: job.device_id().to_owned(), + }) + .context("Failed to build request")?; + + let response = client.ready().await?.call(req).await?; + + match response.status() { + StatusCode::CREATED => { + info!(%user.id, %mxid, device.id = job.device_id(), "Device created") + } + code => anyhow::bail!("Failed to provision device. Status code: {code}"), + } + + Ok(()) +} + +/// Job to delete a device from a user's account. +/// This works by doing a DELETE request to the +/// /_synapse/admin/v2/users/{user_id}/devices/{device_id} endpoint. +#[tracing::instrument( + name = "job.delete_device" + fields( + user.id = %job.user_id(), + device.id = %job.device_id(), + ), + skip_all, + err(Debug), +)] +async fn delete_device( + job: JobWithSpanContext, + ctx: JobContext, +) -> Result<(), anyhow::Error> { + let state = ctx.state(); + let matrix = state.matrix_connection(); + let mut client = state.http_client("matrix.delete_device").await?; + let mut repo = state.repository().await?; + + let user = repo + .user() + .lookup(job.user_id()) + .await? + .context("User not found")?; + + // XXX: there is a lot that could go wrong in terms of encoding here + let mxid = format!( + "@{localpart}:{homeserver}", + localpart = user.username, + homeserver = matrix.homeserver + ); + + let path = format!( + "_synapse/admin/v2/users/{mxid}/devices/{device_id}", + device_id = job.device_id() + ); + + let mut req = Request::delete(matrix.endpoint.join(&path)?.as_str()); + req.headers_mut() + .context("Failed to get headers")? + .typed_insert(Authorization::bearer(&matrix.access_token)?); + let req = req + .body(EmptyBody::new()) + .context("Failed to build request")?; + + let response = client.ready().await?.call(req).await?; + + match response.status() { + StatusCode::OK => info!(%user.id, %mxid, "Device deleted"), + code => anyhow::bail!("Failed to delete device. Status code: {code}"), + }; + + Ok(()) +} + pub(crate) fn register( suffix: &str, monitor: Monitor, @@ -175,10 +301,30 @@ pub(crate) fn register( ) -> Monitor { let storage = state.store(); let worker_name = format!("{job}-{suffix}", job = ProvisionUserJob::NAME); - let worker = WorkerBuilder::new(worker_name) + let provision_user_worker = WorkerBuilder::new(worker_name) .layer(state.inject()) .layer(TracingLayer::new()) .with_storage(storage) .build(job_fn(provision_user)); - monitor.register(worker) + + let storage = state.store(); + let worker_name = format!("{job}-{suffix}", job = ProvisionDeviceJob::NAME); + let provision_device_worker = WorkerBuilder::new(worker_name) + .layer(state.inject()) + .layer(TracingLayer::new()) + .with_storage(storage) + .build(job_fn(provision_device)); + + let storage = state.store(); + let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME); + let delete_device_worker = WorkerBuilder::new(worker_name) + .layer(state.inject()) + .layer(TracingLayer::new()) + .with_storage(storage) + .build(job_fn(delete_device)); + + monitor + .register(provision_user_worker) + .register(provision_device_worker) + .register(delete_device_worker) }