Proactively provision devices & delete them when using the compat Matrix API

This commit is contained in:
Quentin Gliech
2023-04-04 18:15:02 +02:00
parent d943848d7d
commit d9f0fececd
5 changed files with 192 additions and 24 deletions

View File

@@ -21,8 +21,9 @@ use mas_storage::{
CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionRepository,
CompatSsoLoginRepository,
},
job::{JobRepositoryExt, ProvisionDeviceJob},
user::{UserPasswordRepository, UserRepository},
BoxClock, BoxRepository, BoxRng, Clock,
BoxClock, BoxRepository, BoxRng, Clock, RepositoryAccess,
};
use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize};
@@ -379,6 +380,10 @@ async fn user_password_login(
// Now that the user credentials have been verified, start a new compat session
let device = Device::generate(&mut rng);
repo.job()
.schedule_job(ProvisionDeviceJob::new(&user, &device))
.await?;
let session = repo
.compat_session()
.add(&mut rng, clock, &user, device)

View File

@@ -31,7 +31,8 @@ use mas_keystore::Encrypter;
use mas_router::{CompatLoginSsoAction, PostAuthAction, Route};
use mas_storage::{
compat::{CompatSessionRepository, CompatSsoLoginRepository},
BoxClock, BoxRepository, BoxRng, Clock,
job::{JobRepositoryExt, ProvisionDeviceJob},
BoxClock, BoxRepository, BoxRng, Clock, RepositoryAccess,
};
use mas_templates::{CompatSsoContext, ErrorContext, TemplateContext, Templates};
use serde::{Deserialize, Serialize};
@@ -193,6 +194,10 @@ pub async fn post(
};
let device = Device::generate(&mut rng);
repo.job()
.schedule_job(ProvisionDeviceJob::new(&session.user, &device))
.await?;
let compat_session = repo
.compat_session()
.add(&mut rng, &clock, &session.user, device)

View File

@@ -18,7 +18,8 @@ use hyper::StatusCode;
use mas_data_model::TokenType;
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionRepository},
BoxClock, BoxRepository, Clock,
job::{DeleteDeviceJob, JobRepositoryExt},
BoxClock, BoxRepository, Clock, RepositoryAccess,
};
use thiserror::Error;
@@ -95,6 +96,17 @@ pub(crate) async fn post(
.filter(|s| s.is_valid())
.ok_or(RouteError::InvalidAuthorization)?;
let user = repo
.user()
.lookup(session.user_id)
.await?
// XXX: this is probably not the right error
.ok_or(RouteError::InvalidAuthorization)?;
repo.job()
.schedule_job(DeleteDeviceJob::new(&user, &session.device))
.await?;
repo.compat_session().finish(&clock, session).await?;
repo.save().await?;

View File

@@ -216,7 +216,7 @@ where
mod jobs {
// XXX: Move this somewhere else?
use apalis_core::job::Job;
use mas_data_model::{User, UserEmail};
use mas_data_model::{Device, User, UserEmail};
use serde::{Deserialize, Serialize};
use ulid::Ulid;
@@ -280,10 +280,10 @@ mod jobs {
impl ProvisionDeviceJob {
/// Create a new job to provision a device for a user on the homeserver.
#[must_use]
pub fn new(user: &User, device_id: &str) -> Self {
pub fn new(user: &User, device: &Device) -> Self {
Self {
user_id: user.id,
device_id: device_id.to_owned(),
device_id: device.as_str().to_owned(),
}
}
@@ -314,10 +314,10 @@ mod jobs {
impl DeleteDeviceJob {
/// Create a new job to delete a device for a user on the homeserver.
#[must_use]
pub fn new(user: &User, device_id: &str) -> Self {
pub fn new(user: &User, device: &Device) -> Self {
Self {
user_id: user.id,
device_id: device_id.to_owned(),
device_id: device.as_str().to_owned(),
}
}

View File

@@ -26,15 +26,15 @@ use mas_axum_utils::axum::{
headers::{Authorization, HeaderMapExt},
http::{Request, StatusCode},
};
use mas_http::HttpServiceExt;
use mas_http::{EmptyBody, HttpServiceExt};
use mas_storage::{
job::{JobWithSpanContext, ProvisionUserJob},
job::{DeleteDeviceJob, JobWithSpanContext, ProvisionDeviceJob, ProvisionUserJob},
user::{UserEmailRepository, UserRepository},
RepositoryAccess,
};
use serde::{Deserialize, Serialize};
use tower::{Service, ServiceExt};
use tracing::{info, info_span, Instrument};
use tracing::info;
use url::Url;
use crate::{layers::TracingLayer, JobContextExt, State};
@@ -85,6 +85,9 @@ struct UserRequest {
pub external_ids: Vec<ExternalID>,
}
/// Job to provision a user on the Matrix homeserver.
/// This works by doing a PUT request to the /_synapse/admin/v2/users/{user_id}
/// endpoint.
#[tracing::instrument(
name = "job.provision_user"
fields(user.id = %job.user_id()),
@@ -98,7 +101,7 @@ async fn provision_user(
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state
.http_client("provision-matrix-user")
.http_client("matrx.provision_user")
.await?
.request_bytes_to_body()
.json_request();
@@ -110,7 +113,12 @@ async fn provision_user(
.await?
.context("User not found")?;
let mxid = format!("@{}:{}", user.username, matrix.homeserver);
// XXX: there is a lot that could go wrong in terms of encoding here
let mxid = format!(
"@{localpart}:{homeserver}",
localpart = user.username,
homeserver = matrix.homeserver
);
let three_pids = repo
.user_email()
@@ -142,21 +150,15 @@ async fn provision_user(
repo.cancel().await?;
let mut req = Request::put(
matrix
.endpoint
.join("_synapse/admin/v2/users/")?
.join(&mxid)?
.as_str(),
);
let path = format!("_synapse/admin/v2/users/{user_id}", user_id = mxid,);
let mut req = Request::put(matrix.endpoint.join(&path)?.as_str());
req.headers_mut()
.context("Failed to get headers")?
.typed_insert(Authorization::bearer(&matrix.access_token)?);
let req = req.body(body).context("Failed to build request")?;
let span = info_span!("matrix.provision_user", %mxid);
let response = client.ready().await?.call(req).instrument(span).await?;
let response = client.ready().await?.call(req).await?;
match response.status() {
StatusCode::CREATED => info!(%user.id, %mxid, "User created"),
@@ -168,6 +170,130 @@ async fn provision_user(
Ok(())
}
#[derive(Serialize, Deserialize)]
struct DeviceRequest {
device_id: String,
}
/// Job to provision a device on the Matrix homeserver.
/// This works by doing a POST request to the
/// /_synapse/admin/v2/users/{user_id}/devices endpoint.
#[tracing::instrument(
name = "job.provision_device"
fields(
user.id = %job.user_id(),
device.id = %job.device_id(),
),
skip_all,
err(Debug),
)]
async fn provision_device(
job: JobWithSpanContext<ProvisionDeviceJob>,
ctx: JobContext,
) -> Result<(), anyhow::Error> {
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state
.http_client("matrix.provision_device")
.await?
.request_bytes_to_body()
.json_request();
let mut repo = state.repository().await?;
let user = repo
.user()
.lookup(job.user_id())
.await?
.context("User not found")?;
// XXX: there is a lot that could go wrong in terms of encoding here
let mxid = format!(
"@{localpart}:{homeserver}",
localpart = user.username,
homeserver = matrix.homeserver
);
let path = format!("_synapse/admin/v2/users/{user_id}/devices", user_id = mxid);
let mut req = Request::post(matrix.endpoint.join(&path)?.as_str());
req.headers_mut()
.context("Failed to get headers")?
.typed_insert(Authorization::bearer(&matrix.access_token)?);
let req = req
.body(DeviceRequest {
device_id: job.device_id().to_owned(),
})
.context("Failed to build request")?;
let response = client.ready().await?.call(req).await?;
match response.status() {
StatusCode::CREATED => {
info!(%user.id, %mxid, device.id = job.device_id(), "Device created")
}
code => anyhow::bail!("Failed to provision device. Status code: {code}"),
}
Ok(())
}
/// Job to delete a device from a user's account.
/// This works by doing a DELETE request to the
/// /_synapse/admin/v2/users/{user_id}/devices/{device_id} endpoint.
#[tracing::instrument(
name = "job.delete_device"
fields(
user.id = %job.user_id(),
device.id = %job.device_id(),
),
skip_all,
err(Debug),
)]
async fn delete_device(
job: JobWithSpanContext<DeleteDeviceJob>,
ctx: JobContext,
) -> Result<(), anyhow::Error> {
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state.http_client("matrix.delete_device").await?;
let mut repo = state.repository().await?;
let user = repo
.user()
.lookup(job.user_id())
.await?
.context("User not found")?;
// XXX: there is a lot that could go wrong in terms of encoding here
let mxid = format!(
"@{localpart}:{homeserver}",
localpart = user.username,
homeserver = matrix.homeserver
);
let path = format!(
"_synapse/admin/v2/users/{mxid}/devices/{device_id}",
device_id = job.device_id()
);
let mut req = Request::delete(matrix.endpoint.join(&path)?.as_str());
req.headers_mut()
.context("Failed to get headers")?
.typed_insert(Authorization::bearer(&matrix.access_token)?);
let req = req
.body(EmptyBody::new())
.context("Failed to build request")?;
let response = client.ready().await?.call(req).await?;
match response.status() {
StatusCode::OK => info!(%user.id, %mxid, "Device deleted"),
code => anyhow::bail!("Failed to delete device. Status code: {code}"),
};
Ok(())
}
pub(crate) fn register(
suffix: &str,
monitor: Monitor<TokioExecutor>,
@@ -175,10 +301,30 @@ pub(crate) fn register(
) -> Monitor<TokioExecutor> {
let storage = state.store();
let worker_name = format!("{job}-{suffix}", job = ProvisionUserJob::NAME);
let worker = WorkerBuilder::new(worker_name)
let provision_user_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(provision_user));
monitor.register(worker)
let storage = state.store();
let worker_name = format!("{job}-{suffix}", job = ProvisionDeviceJob::NAME);
let provision_device_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(provision_device));
let storage = state.store();
let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME);
let delete_device_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(delete_device));
monitor
.register(provision_user_worker)
.register(provision_device_worker)
.register(delete_device_worker)
}