Experimental feature to automatically expire inactive sessions (#4022)

Fixes #1875 

This adds an experimental feature which allows expiring sessions that
are inactive for a certain amount of time.

It runs as a scheduled task every 15 minutes, checking for the 'last
activity' on each session type.
It processes sessions by batches of 100 at a time, to avoid overloading
Synapse when syncing back the database.

It expires:

 - all user (browser) sessions
 - all compatibility sessions
 - oauth sessions which are:
   - for a user
   - using a 'dynamic' client (so the sessions started from clients defined
      in the config are excluded)
This commit is contained in:
Quentin Gliech
2025-02-13 10:33:00 +01:00
committed by GitHub
18 changed files with 725 additions and 7 deletions

View File

@@ -166,6 +166,7 @@ impl Options {
&mailer,
homeserver_connection.clone(),
url_builder.clone(),
&site_config,
shutdown.soft_shutdown_token(),
shutdown.task_tracker(),
)

View File

@@ -73,6 +73,7 @@ impl Options {
&mailer,
conn,
url_builder,
&site_config,
shutdown.soft_shutdown_token(),
shutdown.task_tracker(),
)

View File

@@ -12,7 +12,7 @@ use mas_config::{
EmailTransportKind, ExperimentalConfig, MatrixConfig, PasswordsConfig, PolicyConfig,
TemplatesConfig,
};
use mas_data_model::SiteConfig;
use mas_data_model::{SessionExpirationConfig, SiteConfig};
use mas_email::{MailTransport, Mailer};
use mas_handlers::passwords::PasswordManager;
use mas_policy::PolicyFactory;
@@ -180,6 +180,15 @@ pub fn site_config_from_config(
captcha_config: &CaptchaConfig,
) -> Result<SiteConfig, anyhow::Error> {
let captcha = captcha_config_from_config(captcha_config)?;
let session_expiration = experimental_config
.inactive_session_expiration
.as_ref()
.map(|c| SessionExpirationConfig {
oauth_session_inactivity_ttl: c.expire_oauth_sessions.then_some(c.ttl),
compat_session_inactivity_ttl: c.expire_compat_sessions.then_some(c.ttl),
user_session_inactivity_ttl: c.expire_user_sessions.then_some(c.ttl),
});
Ok(SiteConfig {
access_token_ttl: experimental_config.access_token_ttl,
compat_token_ttl: experimental_config.compat_token_ttl,
@@ -198,6 +207,7 @@ pub fn site_config_from_config(
&& account_config.password_recovery_enabled,
captcha,
minimum_password_complexity: password_config.minimum_complexity(),
session_expiration,
})
}

View File

@@ -11,6 +11,10 @@ use serde_with::serde_as;
use crate::ConfigurationSection;
fn default_true() -> bool {
true
}
fn default_token_ttl() -> Duration {
Duration::microseconds(5 * 60 * 1000 * 1000)
}
@@ -19,11 +23,32 @@ fn is_default_token_ttl(value: &Duration) -> bool {
*value == default_token_ttl()
}
/// Configuration options for the inactive session expiration feature
#[serde_as]
#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)]
pub struct InactiveSessionExpirationConfig {
/// Time after which an inactive session is automatically finished
#[schemars(with = "u64", range(min = 600, max = 7_776_000))]
#[serde_as(as = "serde_with::DurationSeconds<i64>")]
pub ttl: Duration,
/// Should compatibility sessions expire after inactivity
#[serde(default = "default_true")]
pub expire_compat_sessions: bool,
/// Should OAuth 2.0 sessions expire after inactivity
#[serde(default = "default_true")]
pub expire_oauth_sessions: bool,
/// Should user sessions expire after inactivity
#[serde(default = "default_true")]
pub expire_user_sessions: bool,
}
/// Configuration sections for experimental options
///
/// Do not change these options unless you know what you are doing.
#[serde_as]
#[allow(clippy::struct_excessive_bools)]
#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)]
pub struct ExperimentalConfig {
/// Time-to-live of access tokens in seconds. Defaults to 5 minutes.
@@ -44,6 +69,12 @@ pub struct ExperimentalConfig {
)]
#[serde_as(as = "serde_with::DurationSeconds<i64>")]
pub compat_token_ttl: Duration,
/// Experimetal feature to automatically expire inactive sessions
///
/// Disabled by default
#[serde(skip_serializing_if = "Option::is_none")]
pub inactive_session_expiration: Option<InactiveSessionExpirationConfig>,
}
impl Default for ExperimentalConfig {
@@ -51,13 +82,16 @@ impl Default for ExperimentalConfig {
Self {
access_token_ttl: default_token_ttl(),
compat_token_ttl: default_token_ttl(),
inactive_session_expiration: None,
}
}
}
impl ExperimentalConfig {
pub(crate) fn is_default(&self) -> bool {
is_default_token_ttl(&self.access_token_ttl) && is_default_token_ttl(&self.compat_token_ttl)
is_default_token_ttl(&self.access_token_ttl)
&& is_default_token_ttl(&self.compat_token_ttl)
&& self.inactive_session_expiration.is_none()
}
}

View File

@@ -32,7 +32,7 @@ pub use self::{
AuthorizationCode, AuthorizationGrant, AuthorizationGrantStage, Client, DeviceCodeGrant,
DeviceCodeGrantState, InvalidRedirectUriError, JwksOrJwksUri, Pkce, Session, SessionState,
},
site_config::{CaptchaConfig, CaptchaService, SiteConfig},
site_config::{CaptchaConfig, CaptchaService, SessionExpirationConfig, SiteConfig},
tokens::{
AccessToken, AccessTokenState, RefreshToken, RefreshTokenState, TokenFormatError, TokenType,
},

View File

@@ -28,6 +28,14 @@ pub struct CaptchaConfig {
pub secret_key: String,
}
/// Automatic session expiration configuration
#[derive(Debug, Clone)]
pub struct SessionExpirationConfig {
pub user_session_inactivity_ttl: Option<Duration>,
pub oauth_session_inactivity_ttl: Option<Duration>,
pub compat_session_inactivity_ttl: Option<Duration>,
}
/// Random site configuration we want accessible in various places.
#[allow(clippy::struct_excessive_bools)]
#[derive(Debug, Clone)]
@@ -74,4 +82,6 @@ pub struct SiteConfig {
/// Minimum password complexity, between 0 and 4.
/// This is a score from zxcvbn.
pub minimum_password_complexity: u8,
pub session_expiration: Option<SessionExpirationConfig>,
}

View File

@@ -46,6 +46,22 @@ impl std::fmt::Display for OAuth2SessionStatus {
}
}
#[derive(Deserialize, JsonSchema, Clone, Copy)]
#[serde(rename_all = "snake_case")]
enum OAuth2ClientKind {
Dynamic,
Static,
}
impl std::fmt::Display for OAuth2ClientKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Dynamic => write!(f, "dynamic"),
Self::Static => write!(f, "static"),
}
}
}
#[derive(FromRequestParts, Deserialize, JsonSchema, OperationIo)]
#[serde(rename = "OAuth2SessionFilter")]
#[aide(input_with = "Query<FilterParams>")]
@@ -61,6 +77,10 @@ pub struct FilterParams {
#[schemars(with = "Option<crate::admin::schema::Ulid>")]
client: Option<Ulid>,
/// Retrieve the items only for a specific client kind
#[serde(rename = "filter[client-kind]")]
client_kind: Option<OAuth2ClientKind>,
/// Retrieve the items started from the given browser session
#[serde(rename = "filter[user-session]")]
#[schemars(with = "Option<crate::admin::schema::Ulid>")]
@@ -95,6 +115,11 @@ impl std::fmt::Display for FilterParams {
sep = '&';
}
if let Some(client_kind) = self.client_kind {
write!(f, "{sep}filter[client-kind]={client_kind}")?;
sep = '&';
}
if let Some(user_session) = self.user_session {
write!(f, "{sep}filter[user-session]={user_session}")?;
sep = '&';
@@ -232,6 +257,12 @@ pub async fn handler(
None => filter,
};
let filter = match params.client_kind {
Some(OAuth2ClientKind::Dynamic) => filter.only_dynamic_clients(),
Some(OAuth2ClientKind::Static) => filter.only_static_clients(),
None => filter,
};
let user_session = if let Some(user_session_id) = params.user_session {
let user_session = repo
.browser_session()

View File

@@ -139,6 +139,7 @@ pub fn test_site_config() -> SiteConfig {
account_recovery_allowed: true,
captcha: None,
minimum_password_complexity: 1,
session_expiration: None,
}
}

View File

@@ -83,6 +83,15 @@ pub enum OAuth2Sessions {
LastActiveIp,
}
#[derive(sea_query::Iden)]
#[iden = "oauth2_clients"]
pub enum OAuth2Clients {
Table,
#[iden = "oauth2_client_id"]
OAuth2ClientId,
IsStatic,
}
#[derive(sea_query::Iden)]
#[iden = "upstream_oauth_providers"]
pub enum UpstreamOAuthProviders {

View File

@@ -525,7 +525,7 @@ mod tests {
let pagination = Pagination::first(10);
// First, list all the sessions
let filter = OAuth2SessionFilter::new();
let filter = OAuth2SessionFilter::new().for_any_user();
let list = repo
.oauth2_session()
.list(filter, pagination)

View File

@@ -23,7 +23,7 @@ use uuid::Uuid;
use crate::{
filter::{Filter, StatementExt},
iden::OAuth2Sessions,
iden::{OAuth2Clients, OAuth2Sessions},
pagination::QueryBuilderExt,
tracing::ExecuteExt,
DatabaseError, DatabaseInconsistencyError,
@@ -104,6 +104,26 @@ impl Filter for OAuth2SessionFilter<'_> {
Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId))
.eq(Uuid::from(client.id))
}))
.add_option(self.client_kind().map(|client_kind| {
// This builds either a:
// `WHERE oauth2_client_id = ANY(...)`
// or a `WHERE oauth2_client_id <> ALL(...)`
let static_clients = Query::select()
.expr(Expr::col((
OAuth2Clients::Table,
OAuth2Clients::OAuth2ClientId,
)))
.and_where(Expr::col((OAuth2Clients::Table, OAuth2Clients::IsStatic)).into())
.from(OAuth2Clients::Table)
.take();
if client_kind.is_static() {
Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId))
.eq(Expr::any(static_clients))
} else {
Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId))
.ne(Expr::all(static_clients))
}
}))
.add_option(self.device().map(|device| {
Expr::val(device.to_scope_token().to_string()).eq(PgFunc::any(Expr::col((
OAuth2Sessions::Table,
@@ -125,6 +145,13 @@ impl Filter for OAuth2SessionFilter<'_> {
let scope: Vec<String> = scope.iter().map(|s| s.as_str().to_owned()).collect();
Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)).contains(scope)
}))
.add_option(self.any_user().map(|any_user| {
if any_user {
Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)).is_not_null()
} else {
Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)).is_null()
}
}))
.add_option(self.last_active_after().map(|last_active_after| {
Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt))
.gt(last_active_after)

View File

@@ -31,13 +31,27 @@ impl OAuth2SessionState {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum ClientKind {
Static,
Dynamic,
}
impl ClientKind {
pub fn is_static(self) -> bool {
matches!(self, Self::Static)
}
}
/// Filter parameters for listing OAuth 2.0 sessions
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub struct OAuth2SessionFilter<'a> {
user: Option<&'a User>,
any_user: Option<bool>,
browser_session: Option<&'a BrowserSession>,
device: Option<&'a Device>,
client: Option<&'a Client>,
client_kind: Option<ClientKind>,
state: Option<OAuth2SessionState>,
scope: Option<&'a Scope>,
last_active_before: Option<DateTime<Utc>>,
@@ -66,6 +80,28 @@ impl<'a> OAuth2SessionFilter<'a> {
self.user
}
/// List sessions which belong to any user
#[must_use]
pub fn for_any_user(mut self) -> Self {
self.any_user = Some(true);
self
}
/// List sessions which belong to no user
#[must_use]
pub fn for_no_user(mut self) -> Self {
self.any_user = Some(false);
self
}
/// Get the 'any user' filter
///
/// Returns [`None`] if no 'any user' filter was set
#[must_use]
pub fn any_user(&self) -> Option<bool> {
self.any_user
}
/// List sessions started by a specific browser session
#[must_use]
pub fn for_browser_session(mut self, browser_session: &'a BrowserSession) -> Self {
@@ -96,6 +132,28 @@ impl<'a> OAuth2SessionFilter<'a> {
self.client
}
/// List only static clients
#[must_use]
pub fn only_static_clients(mut self) -> Self {
self.client_kind = Some(ClientKind::Static);
self
}
/// List only dynamic clients
#[must_use]
pub fn only_dynamic_clients(mut self) -> Self {
self.client_kind = Some(ClientKind::Dynamic);
self
}
/// Get the client kind filter
///
/// Returns [`None`] if no client kind filter was set
#[must_use]
pub fn client_kind(&self) -> Option<ClientKind> {
self.client_kind
}
/// Only return sessions with a last active time before the given time
#[must_use]
pub fn with_last_active_before(mut self, last_active_before: DateTime<Utc>) -> Self {

View File

@@ -3,11 +3,16 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use mas_data_model::{Device, User, UserEmailAuthentication, UserRecoverySession};
use chrono::{DateTime, Utc};
use mas_data_model::{
BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication,
UserRecoverySession,
};
use serde::{Deserialize, Serialize};
use ulid::Ulid;
use super::InsertableJob;
use crate::{Page, Pagination};
/// This is the previous iteration of the email verification job. It has been
/// replaced by [`SendEmailAuthenticationCodeJob`]. This struct is kept to be
@@ -193,6 +198,15 @@ impl SyncDevicesJob {
Self { user_id: user.id }
}
/// Create a new job to sync the list of devices of a user with the
/// homeserver for the given user ID
///
/// This is useful to use in cases where the [`User`] object isn't loaded
#[must_use]
pub fn new_for_id(user_id: Ulid) -> Self {
Self { user_id }
}
/// The ID of the user to sync the devices for
#[must_use]
pub fn user_id(&self) -> Ulid {
@@ -310,3 +324,185 @@ pub struct CleanupExpiredTokensJob;
impl InsertableJob for CleanupExpiredTokensJob {
const QUEUE_NAME: &'static str = "cleanup-expired-tokens";
}
/// Scheduled job to expire inactive sessions
///
/// This job will trigger jobs to expire inactive compat, oauth and user
/// sessions.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ExpireInactiveSessionsJob;
impl InsertableJob for ExpireInactiveSessionsJob {
const QUEUE_NAME: &'static str = "expire-inactive-sessions";
}
/// Expire inactive OAuth 2.0 sessions
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ExpireInactiveOAuthSessionsJob {
threshold: DateTime<Utc>,
after: Option<Ulid>,
}
impl ExpireInactiveOAuthSessionsJob {
/// Create a new job to expire inactive OAuth 2.0 sessions
///
/// # Parameters
///
/// * `threshold` - The threshold to expire sessions at
#[must_use]
pub fn new(threshold: DateTime<Utc>) -> Self {
Self {
threshold,
after: None,
}
}
/// Get the threshold to expire sessions at
#[must_use]
pub fn threshold(&self) -> DateTime<Utc> {
self.threshold
}
/// Get the pagination cursor
#[must_use]
pub fn pagination(&self, batch_size: usize) -> Pagination {
let pagination = Pagination::first(batch_size);
if let Some(after) = self.after {
pagination.after(after)
} else {
pagination
}
}
/// Get the next job given the page returned by the database
#[must_use]
pub fn next(&self, page: &Page<Session>) -> Option<Self> {
if !page.has_next_page {
return None;
}
let last_edge = page.edges.last()?;
Some(Self {
threshold: self.threshold,
after: Some(last_edge.id),
})
}
}
impl InsertableJob for ExpireInactiveOAuthSessionsJob {
const QUEUE_NAME: &'static str = "expire-inactive-oauth-sessions";
}
/// Expire inactive compatibility sessions
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ExpireInactiveCompatSessionsJob {
threshold: DateTime<Utc>,
after: Option<Ulid>,
}
impl ExpireInactiveCompatSessionsJob {
/// Create a new job to expire inactive compatibility sessions
///
/// # Parameters
///
/// * `threshold` - The threshold to expire sessions at
#[must_use]
pub fn new(threshold: DateTime<Utc>) -> Self {
Self {
threshold,
after: None,
}
}
/// Get the threshold to expire sessions at
#[must_use]
pub fn threshold(&self) -> DateTime<Utc> {
self.threshold
}
/// Get the pagination cursor
#[must_use]
pub fn pagination(&self, batch_size: usize) -> Pagination {
let pagination = Pagination::first(batch_size);
if let Some(after) = self.after {
pagination.after(after)
} else {
pagination
}
}
/// Get the next job given the page returned by the database
#[must_use]
pub fn next(&self, page: &Page<CompatSession>) -> Option<Self> {
if !page.has_next_page {
return None;
}
let last_edge = page.edges.last()?;
Some(Self {
threshold: self.threshold,
after: Some(last_edge.id),
})
}
}
impl InsertableJob for ExpireInactiveCompatSessionsJob {
const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions";
}
/// Expire inactive user sessions
#[derive(Debug, Serialize, Deserialize)]
pub struct ExpireInactiveUserSessionsJob {
threshold: DateTime<Utc>,
after: Option<Ulid>,
}
impl ExpireInactiveUserSessionsJob {
/// Create a new job to expire inactive user/browser sessions
///
/// # Parameters
///
/// * `threshold` - The threshold to expire sessions at
#[must_use]
pub fn new(threshold: DateTime<Utc>) -> Self {
Self {
threshold,
after: None,
}
}
/// Get the threshold to expire sessions at
#[must_use]
pub fn threshold(&self) -> DateTime<Utc> {
self.threshold
}
/// Get the pagination cursor
#[must_use]
pub fn pagination(&self, batch_size: usize) -> Pagination {
let pagination = Pagination::first(batch_size);
if let Some(after) = self.after {
pagination.after(after)
} else {
pagination
}
}
/// Get the next job given the page returned by the database
#[must_use]
pub fn next(&self, page: &Page<BrowserSession>) -> Option<Self> {
if !page.has_next_page {
return None;
}
let last_edge = page.edges.last()?;
Some(Self {
threshold: self.threshold,
after: Some(last_edge.id),
})
}
}
impl InsertableJob for ExpireInactiveUserSessionsJob {
const QUEUE_NAME: &'static str = "expire-inactive-user-sessions";
}

View File

@@ -6,6 +6,7 @@
use std::sync::{Arc, LazyLock};
use mas_data_model::SiteConfig;
use mas_email::Mailer;
use mas_matrix::HomeserverConnection;
use mas_router::UrlBuilder;
@@ -22,6 +23,7 @@ mod email;
mod matrix;
mod new_queue;
mod recovery;
mod sessions;
mod user;
static METER: LazyLock<Meter> = LazyLock::new(|| {
@@ -40,6 +42,7 @@ struct State {
clock: SystemClock,
homeserver: Arc<dyn HomeserverConnection<Error = anyhow::Error>>,
url_builder: UrlBuilder,
site_config: SiteConfig,
}
impl State {
@@ -49,6 +52,7 @@ impl State {
mailer: Mailer,
homeserver: impl HomeserverConnection<Error = anyhow::Error> + 'static,
url_builder: UrlBuilder,
site_config: SiteConfig,
) -> Self {
Self {
pool,
@@ -56,6 +60,7 @@ impl State {
clock,
homeserver: Arc::new(homeserver),
url_builder,
site_config,
}
}
@@ -93,6 +98,10 @@ impl State {
pub fn url_builder(&self) -> &UrlBuilder {
&self.url_builder
}
pub fn site_config(&self) -> &SiteConfig {
&self.site_config
}
}
/// Initialise the workers.
@@ -105,6 +114,7 @@ pub async fn init(
mailer: &Mailer,
homeserver: impl HomeserverConnection<Error = anyhow::Error> + 'static,
url_builder: UrlBuilder,
site_config: &SiteConfig,
cancellation_token: CancellationToken,
task_tracker: &TaskTracker,
) -> Result<(), QueueRunnerError> {
@@ -114,6 +124,7 @@ pub async fn init(
mailer.clone(),
homeserver,
url_builder,
site_config.clone(),
);
let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?;
@@ -128,10 +139,20 @@ pub async fn init(
.register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
.register_handler::<mas_storage::queue::SyncDevicesJob>()
.register_handler::<mas_storage::queue::VerifyEmailJob>()
.register_handler::<mas_storage::queue::ExpireInactiveSessionsJob>()
.register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
.register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
.register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
.add_schedule(
"cleanup-expired-tokens",
"0 0 * * * *".parse()?,
mas_storage::queue::CleanupExpiredTokensJob,
)
.add_schedule(
"expire-inactive-sessions",
// Run this job every 15 minutes
"30 */15 * * * *".parse()?,
mas_storage::queue::ExpireInactiveSessionsJob,
);
task_tracker.spawn(worker.run());

View File

@@ -0,0 +1,242 @@
// Copyright 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
use std::collections::HashSet;
use async_trait::async_trait;
use chrono::Duration;
use mas_storage::{
compat::CompatSessionFilter,
oauth2::OAuth2SessionFilter,
queue::{
ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, ExpireInactiveSessionsJob,
ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob,
},
user::BrowserSessionFilter,
};
use crate::{
new_queue::{JobContext, JobError, RunnableJob},
State,
};
#[async_trait]
impl RunnableJob for ExpireInactiveSessionsJob {
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
let Some(config) = state.site_config().session_expiration.as_ref() else {
// Automatic session expiration is disabled
return Ok(());
};
let clock = state.clock();
let mut rng = state.rng();
let now = clock.now();
let mut repo = state.repository().await.map_err(JobError::retry)?;
if let Some(ttl) = config.oauth_session_inactivity_ttl {
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
ExpireInactiveOAuthSessionsJob::new(now - ttl),
)
.await
.map_err(JobError::retry)?;
}
if let Some(ttl) = config.compat_session_inactivity_ttl {
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
ExpireInactiveCompatSessionsJob::new(now - ttl),
)
.await
.map_err(JobError::retry)?;
}
if let Some(ttl) = config.user_session_inactivity_ttl {
repo.queue_job()
.schedule_job(
&mut rng,
&clock,
ExpireInactiveUserSessionsJob::new(now - ttl),
)
.await
.map_err(JobError::retry)?;
}
repo.save().await.map_err(JobError::retry)?;
Ok(())
}
}
#[async_trait]
impl RunnableJob for ExpireInactiveOAuthSessionsJob {
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let clock = state.clock();
let mut rng = state.rng();
let mut users_synced = HashSet::new();
// This delay is used to space out the device sync jobs
// We add 10 seconds between each device sync, meaning that it will spread out
// the syncs over ~16 minutes max if we get a full batch of 100 users
let mut delay = Duration::minutes(1);
let filter = OAuth2SessionFilter::new()
.with_last_active_before(self.threshold())
.for_any_user()
.only_dynamic_clients()
.active_only();
let pagination = self.pagination(100);
let page = repo
.oauth2_session()
.list(filter, pagination)
.await
.map_err(JobError::retry)?;
if let Some(job) = self.next(&page) {
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
repo.queue_job()
.schedule_job(&mut rng, &clock, job)
.await
.map_err(JobError::retry)?;
}
for edge in page.edges {
if let Some(user_id) = edge.user_id {
let inserted = users_synced.insert(user_id);
if inserted {
tracing::info!(user.id = %user_id, "Scheduling devices sync for user");
repo.queue_job()
.schedule_job_later(
&mut rng,
&clock,
SyncDevicesJob::new_for_id(user_id),
clock.now() + delay,
)
.await
.map_err(JobError::retry)?;
delay += Duration::seconds(10);
}
}
repo.oauth2_session()
.finish(&clock, edge)
.await
.map_err(JobError::retry)?;
}
repo.save().await.map_err(JobError::retry)?;
Ok(())
}
}
#[async_trait]
impl RunnableJob for ExpireInactiveCompatSessionsJob {
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let clock = state.clock();
let mut rng = state.rng();
let mut users_synced = HashSet::new();
// This delay is used to space out the device sync jobs
// We add 10 seconds between each device sync, meaning that it will spread out
// the syncs over ~16 minutes max if we get a full batch of 100 users
let mut delay = Duration::minutes(1);
let filter = CompatSessionFilter::new()
.with_last_active_before(self.threshold())
.active_only();
let pagination = self.pagination(100);
let page = repo
.compat_session()
.list(filter, pagination)
.await
.map_err(JobError::retry)?
.map(|(c, _)| c);
if let Some(job) = self.next(&page) {
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
repo.queue_job()
.schedule_job(&mut rng, &clock, job)
.await
.map_err(JobError::retry)?;
}
for edge in page.edges {
let inserted = users_synced.insert(edge.user_id);
if inserted {
tracing::info!(user.id = %edge.user_id, "Scheduling devices sync for user");
repo.queue_job()
.schedule_job_later(
&mut rng,
&clock,
SyncDevicesJob::new_for_id(edge.user_id),
clock.now() + delay,
)
.await
.map_err(JobError::retry)?;
delay += Duration::seconds(10);
}
repo.compat_session()
.finish(&clock, edge)
.await
.map_err(JobError::retry)?;
}
repo.save().await.map_err(JobError::retry)?;
Ok(())
}
}
#[async_trait]
impl RunnableJob for ExpireInactiveUserSessionsJob {
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let clock = state.clock();
let mut rng = state.rng();
let filter = BrowserSessionFilter::new()
.with_last_active_before(self.threshold())
.active_only();
let pagination = self.pagination(100);
let page = repo
.browser_session()
.list(filter, pagination)
.await
.map_err(JobError::retry)?;
if let Some(job) = self.next(&page) {
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
repo.queue_job()
.schedule_job(&mut rng, &clock, job)
.await
.map_err(JobError::retry)?;
}
for edge in page.edges {
repo.browser_session()
.finish(&clock, edge)
.await
.map_err(JobError::retry)?;
}
repo.save().await.map_err(JobError::retry)?;
Ok(())
}
}

View File

@@ -357,6 +357,17 @@
},
"style": "form"
},
{
"in": "query",
"name": "filter[client-kind]",
"description": "Retrieve the items only for a specific client kind",
"schema": {
"description": "Retrieve the items only for a specific client kind",
"$ref": "#/components/schemas/OAuth2ClientKind",
"nullable": true
},
"style": "form"
},
{
"in": "query",
"name": "filter[user-session]",
@@ -2347,6 +2358,11 @@
"$ref": "#/components/schemas/ULID",
"nullable": true
},
"filter[client-kind]": {
"description": "Retrieve the items only for a specific client kind",
"$ref": "#/components/schemas/OAuth2ClientKind",
"nullable": true
},
"filter[user-session]": {
"description": "Retrieve the items started from the given browser session",
"$ref": "#/components/schemas/ULID",
@@ -2367,6 +2383,13 @@
}
}
},
"OAuth2ClientKind": {
"type": "string",
"enum": [
"dynamic",
"static"
]
},
"OAuth2SessionStatus": {
"type": "string",
"enum": [

View File

@@ -2457,6 +2457,45 @@
"format": "uint64",
"maximum": 86400.0,
"minimum": 60.0
},
"inactive_session_expiration": {
"description": "Experimetal feature to automatically expire inactive sessions\n\nDisabled by default",
"allOf": [
{
"$ref": "#/definitions/InactiveSessionExpirationConfig"
}
]
}
}
},
"InactiveSessionExpirationConfig": {
"description": "Configuration options for the inactive session expiration feature",
"type": "object",
"required": [
"ttl"
],
"properties": {
"ttl": {
"description": "Time after which an inactive session is automatically finished",
"type": "integer",
"format": "uint64",
"maximum": 7776000.0,
"minimum": 600.0
},
"expire_compat_sessions": {
"description": "Should compatibility sessions expire after inactivity",
"default": true,
"type": "boolean"
},
"expire_oauth_sessions": {
"description": "Should OAuth 2.0 sessions expire after inactivity",
"default": true,
"type": "boolean"
},
"expire_user_sessions": {
"description": "Should user sessions expire after inactivity",
"default": true,
"type": "boolean"
}
}
}

View File

@@ -711,4 +711,19 @@ experimental:
# Time-to-live of compatibility access tokens in seconds, when refresh tokens are supported. Defaults to 300, 5 minutes.
#compat_token_ttl: 300
# Experimental feature to automatically expire inactive sessions
# Disabled by default
#inactive_session_expiration:
# Time after which an inactive session is automatically finished in seconds
#ttl: 32400
# Should compatibility sessions expire after inactivity. Defaults to true.
#expire_compat_sessions: true
# Should OAuth 2.0 sessions expire after inactivity. Defaults to true.
#expire_oauth_sessions: true
# Should user sessions expire after inactivity. Defaults to true.
#expire_user_sessions: true
```