Add metrics to the job queue

This adds:

 - a histogram of the time it takes to process a job for each queue,
   with the status of the job (success, failure, etc.)
 - a histogram which records the time it takes to do a "tick", fetch jobs
 - a counter of the number of jobs currently in-flight for each queue
 - a counter which tracks the reasons why the worker got worken up
This commit is contained in:
Quentin Gliech
2024-12-13 15:47:59 +01:00
parent 1abd57e9c9
commit 2dbfbfb03f
4 changed files with 198 additions and 19 deletions

1
Cargo.lock generated
View File

@@ -3696,6 +3696,7 @@ dependencies = [
"mas-templates",
"mas-tower",
"opentelemetry",
"opentelemetry-semantic-conventions",
"rand",
"rand_chacha",
"serde",

View File

@@ -29,6 +29,7 @@ tower.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true
opentelemetry.workspace = true
opentelemetry-semantic-conventions.workspace = true
ulid.workspace = true
url.workspace = true
serde.workspace = true

View File

@@ -4,9 +4,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
#![allow(dead_code)]
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use mas_email::Mailer;
use mas_matrix::HomeserverConnection;
@@ -14,6 +12,7 @@ use mas_router::UrlBuilder;
use mas_storage::{BoxClock, BoxRepository, RepositoryError, SystemClock};
use mas_storage_pg::PgRepository;
use new_queue::QueueRunnerError;
use opentelemetry::metrics::Meter;
use rand::SeedableRng;
use sqlx::{Pool, Postgres};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -25,6 +24,15 @@ mod new_queue;
mod recovery;
mod user;
static METER: LazyLock<Meter> = LazyLock::new(|| {
let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
.with_version(env!("CARGO_PKG_VERSION"))
.with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
.build();
opentelemetry::global::meter_with_scope(scope)
});
#[derive(Clone)]
struct State {
pool: Pool<Postgres>,

View File

@@ -13,6 +13,10 @@ use mas_storage::{
Clock, RepositoryAccess, RepositoryError,
};
use mas_storage_pg::{DatabaseError, PgRepository};
use opentelemetry::{
metrics::{Counter, Histogram, UpDownCounter},
KeyValue,
};
use rand::{distributions::Uniform, Rng, RngCore};
use rand_chacha::ChaChaRng;
use serde::de::DeserializeOwned;
@@ -21,13 +25,13 @@ use sqlx::{
Acquire, Either,
};
use thiserror::Error;
use tokio::task::JoinSet;
use tokio::{task::JoinSet, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument as _, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt as _;
use ulid::Ulid;
use crate::State;
use crate::{State, METER};
type JobPayload = serde_json::Value;
@@ -37,6 +41,12 @@ pub struct JobContext {
pub metadata: JobMetadata,
pub queue_name: String,
pub attempt: usize,
pub start: Instant,
#[expect(
dead_code,
reason = "we're not yet using this, but will be in the future"
)]
pub cancellation_token: CancellationToken,
}
@@ -46,7 +56,7 @@ impl JobContext {
parent: Span::none(),
"job.run",
job.id = %self.id,
job.queue_name = self.queue_name,
job.queue.name = self.queue_name,
job.attempt = self.attempt,
);
@@ -193,6 +203,8 @@ pub struct QueueWorker {
state: State,
schedules: Vec<ScheduleDefinition>,
tracker: JobTracker,
wakeup_reason: Counter<u64>,
tick_time: Histogram<u64>,
}
impl QueueWorker {
@@ -240,6 +252,23 @@ impl QueueWorker {
tracing::info!("Registered worker");
let now = clock.now();
let wakeup_reason = METER
.u64_counter("job.worker.wakeups")
.with_description("Counts how many time the worker has been woken up, for which reason")
.build();
// Pre-create the reasons on the counter
wakeup_reason.add(0, &[KeyValue::new("reason", "sleep")]);
wakeup_reason.add(0, &[KeyValue::new("reason", "task")]);
wakeup_reason.add(0, &[KeyValue::new("reason", "notification")]);
let tick_time = METER
.u64_histogram("job.worker.tick_duration")
.with_description(
"How much time the worker took to tick, including performing leader duties",
)
.build();
Ok(Self {
rng,
clock,
@@ -250,7 +279,9 @@ impl QueueWorker {
cancellation_token,
state,
schedules: Vec::new(),
tracker: JobTracker::default(),
tracker: JobTracker::new(),
wakeup_reason,
tick_time,
})
}
@@ -329,12 +360,16 @@ impl QueueWorker {
return Ok(());
}
let start = Instant::now();
self.tick().await?;
if self.am_i_leader {
self.perform_leader_duties().await?;
}
let elapsed = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
self.tick_time.record(elapsed, &[]);
Ok(())
}
@@ -393,8 +428,6 @@ impl QueueWorker {
.sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION));
let wakeup_sleep = tokio::time::sleep(sleep_duration);
// TODO: add metrics to track the wake up reasons
tokio::select! {
() = self.cancellation_token.cancelled() => {
tracing::debug!("Woke up from cancellation");
@@ -402,13 +435,16 @@ impl QueueWorker {
() = wakeup_sleep => {
tracing::debug!("Woke up from sleep");
self.wakeup_reason.add(1, &[KeyValue::new("reason", "sleep")]);
},
() = self.tracker.collect_next_job(), if self.tracker.has_jobs() => {
tracing::debug!("Joined job task");
self.wakeup_reason.add(1, &[KeyValue::new("reason", "task")]);
},
notification = self.listener.recv() => {
self.wakeup_reason.add(1, &[KeyValue::new("reason", "notification")]);
match notification {
Ok(notification) => {
tracing::debug!(
@@ -495,11 +531,13 @@ impl QueueWorker {
} in jobs
{
let cancellation_token = self.cancellation_token.child_token();
let start = Instant::now();
let context = JobContext {
id,
metadata,
queue_name,
attempt,
start,
cancellation_token,
};
@@ -639,8 +677,8 @@ impl QueueWorker {
.await?;
match scheduled {
0 => {}
1 => tracing::warn!("One scheduled job marked as available"),
n => tracing::warn!("{n} scheduled jobs marked as available"),
1 => tracing::info!("One scheduled job marked as available"),
n => tracing::info!("{n} scheduled jobs marked as available"),
}
// Release the leader lock
@@ -662,7 +700,6 @@ impl QueueWorker {
///
/// This is a separate structure to be able to borrow it mutably at the same
/// time as the connection to the database is borrowed
#[derive(Default)]
struct JobTracker {
/// Stores a mapping from the job queue name to the job factory
factories: HashMap<&'static str, JobFactory>,
@@ -676,9 +713,38 @@ struct JobTracker {
/// Stores the last `join_next_with_id` result for processing, in case we
/// got woken up in `collect_next_job`
last_join_result: Option<Result<(tokio::task::Id, JobResult), tokio::task::JoinError>>,
/// An histogram which records the time it takes to process a job
job_processing_time: Histogram<u64>,
/// A counter which records the number of jobs currently in flight
in_flight_jobs: UpDownCounter<i64>,
}
impl JobTracker {
fn new() -> Self {
let job_processing_time = METER
.u64_histogram("job.process.duration")
.with_description("The time it takes to process a job in milliseconds")
.with_unit("ms")
.build();
let in_flight_jobs = METER
.i64_up_down_counter("job.active_tasks")
.with_description("The number of jobs currently in flight")
.with_unit("{job}")
.build();
Self {
factories: HashMap::new(),
running_jobs: JoinSet::new(),
job_contexts: HashMap::new(),
last_join_result: None,
job_processing_time,
in_flight_jobs,
}
}
/// Returns the queue names that are currently being tracked
fn queues(&self) -> Vec<&'static str> {
self.factories.keys().copied().collect()
@@ -700,6 +766,11 @@ impl JobTracker {
.instrument(span)
};
self.in_flight_jobs.add(
1,
&[KeyValue::new("job.queue.name", context.queue_name.clone())],
);
let handle = self.running_jobs.spawn(task);
self.job_contexts.insert(handle.id(), context);
}
@@ -749,8 +820,12 @@ impl JobTracker {
}
}
// XXX: the time measurement isn't accurate, as it would include the
// time spent between the task finishing, and us processing the result.
// It's fine for now, as it at least gives us an idea of how many tasks
// we run, and what their status is
while let Some(result) = self.last_join_result.take() {
// TODO: add metrics to track the job status and the time it took
match result {
// The job succeeded
Ok((id, Ok(()))) => {
@@ -759,13 +834,33 @@ impl JobTracker {
.remove(&id)
.expect("Job context not found");
self.in_flight_jobs.add(
-1,
&[KeyValue::new("job.queue.name", context.queue_name.clone())],
);
let elapsed = context
.start
.elapsed()
.as_millis()
.try_into()
.unwrap_or(u64::MAX);
tracing::info!(
job.id = %context.id,
job.queue_name = %context.queue_name,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job completed"
);
self.job_processing_time.record(
elapsed,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "success"),
],
);
repo.queue_job()
.mark_as_completed(clock, context.id)
.await?;
@@ -778,20 +873,42 @@ impl JobTracker {
.remove(&id)
.expect("Job context not found");
self.in_flight_jobs.add(
-1,
&[KeyValue::new("job.queue.name", context.queue_name.clone())],
);
let reason = format!("{:?}", e.error);
repo.queue_job()
.mark_as_failed(clock, context.id, &reason)
.await?;
let elapsed = context
.start
.elapsed()
.as_millis()
.try_into()
.unwrap_or(u64::MAX);
match e.decision {
JobErrorDecision::Fail => {
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job failed, not retrying"
);
self.job_processing_time.record(
elapsed,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "failed"),
KeyValue::new("job.decision", "fail"),
],
);
}
JobErrorDecision::Retry => {
@@ -800,12 +917,22 @@ impl JobTracker {
tracing::warn!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job failed, will retry in {}s",
delay.num_seconds()
);
self.job_processing_time.record(
elapsed,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "failed"),
KeyValue::new("job.decision", "retry"),
],
);
repo.queue_job()
.retry(&mut *rng, clock, context.id, delay)
.await?;
@@ -813,10 +940,20 @@ impl JobTracker {
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job failed too many times, abandonning"
);
self.job_processing_time.record(
elapsed,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "failed"),
KeyValue::new("job.decision", "abandon"),
],
);
}
}
}
@@ -830,6 +967,18 @@ impl JobTracker {
.remove(&id)
.expect("Job context not found");
self.in_flight_jobs.add(
-1,
&[KeyValue::new("job.queue.name", context.queue_name.clone())],
);
let elapsed = context
.start
.elapsed()
.as_millis()
.try_into()
.unwrap_or(u64::MAX);
let reason = e.to_string();
repo.queue_job()
.mark_as_failed(clock, context.id, &reason)
@@ -840,12 +989,22 @@ impl JobTracker {
tracing::warn!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job crashed, will retry in {}s",
delay.num_seconds()
);
self.job_processing_time.record(
elapsed,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "crashed"),
KeyValue::new("job.decision", "retry"),
],
);
repo.queue_job()
.retry(&mut *rng, clock, context.id, delay)
.await?;
@@ -853,10 +1012,20 @@ impl JobTracker {
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue_name = %context.queue_name,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job crashed too many times, abandonning"
);
self.job_processing_time.record(
elapsed,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "crashed"),
KeyValue::new("job.decision", "abandon"),
],
);
}
}
};