Record the job result from within the job LogContext

This means we can log stats about the job when it finishes, and its
status will have the right log context attached to it.
This commit is contained in:
Quentin Gliech
2025-04-17 17:34:17 +02:00
parent 3a8d4a1e8a
commit 005c427c2f

View File

@@ -184,7 +184,7 @@ fn retry_delay(attempt: usize) -> Duration {
Duration::milliseconds(2_i64.saturating_pow(attempt) * 5_000)
}
type JobResult = Result<(), JobError>;
type JobResult = (std::time::Duration, Result<(), JobError>);
type JobFactory = Arc<dyn Fn(JobPayload) -> Box<dyn RunnableJob> + Send + Sync>;
struct ScheduleDefinition {
@@ -774,7 +774,7 @@ impl JobTracker {
fn spawn_job(&mut self, state: State, context: JobContext, payload: JobPayload) {
let factory = self.factories.get(context.queue_name.as_str()).cloned();
let task = {
let log_context = LogContext::new(format!("worker-job-{}", context.queue_name));
let log_context = LogContext::new(format!("job-{}", context.queue_name));
let context = context.clone();
let span = context.span();
log_context
@@ -788,7 +788,70 @@ impl JobTracker {
job.attempt = %context.attempt,
"Running job"
);
job.run(&state, context).await
let result = job.run(&state, context.clone()).await;
let Some(log_context) = LogContext::current() else {
// This should never happen, but if it does it's fine: we're recovering fine
// from panics in those tasks
panic!("Missing log context, this should never happen");
};
let context_stats = log_context.stats();
// We log the result here so that it's attached to the right span & log context
match &result {
Ok(()) => {
tracing::info!(
job.id = %context.id,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
"Job completed [{context_stats}]"
);
}
Err(JobError {
decision: JobErrorDecision::Fail,
error,
}) => {
tracing::error!(
error = &**error as &dyn std::error::Error,
job.id = %context.id,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
"Job failed, not retrying [{context_stats}]"
);
}
Err(JobError {
decision: JobErrorDecision::Retry,
error,
}) if context.attempt < MAX_ATTEMPTS => {
let delay = retry_delay(context.attempt);
tracing::warn!(
error = &**error as &dyn std::error::Error,
job.id = %context.id,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
"Job failed, will retry in {}s [{context_stats}]",
delay.num_seconds()
);
}
Err(JobError {
decision: JobErrorDecision::Retry,
error,
}) => {
tracing::error!(
error = &**error as &dyn std::error::Error,
job.id = %context.id,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
"Job failed too many times, abandonning [{context_stats}]"
);
}
}
(context_stats.elapsed, result)
})
.instrument(span)
};
@@ -847,15 +910,10 @@ impl JobTracker {
}
}
// XXX: the time measurement isn't accurate, as it would include the
// time spent between the task finishing, and us processing the result.
// It's fine for now, as it at least gives us an idea of how many tasks
// we run, and what their status is
while let Some(result) = self.last_join_result.take() {
match result {
// The job succeeded
Ok((id, Ok(()))) => {
// The job succeeded. The logging and time measurement is already done in the task
Ok((id, (elapsed, Ok(())))) => {
let context = self
.job_contexts
.remove(&id)
@@ -866,22 +924,9 @@ impl JobTracker {
&[KeyValue::new("job.queue.name", context.queue_name.clone())],
);
let elapsed = context
.start
.elapsed()
.as_millis()
.try_into()
.unwrap_or(u64::MAX);
tracing::info!(
job.id = %context.id,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job completed"
);
let elapsed_ms = elapsed.as_millis().try_into().unwrap_or(u64::MAX);
self.job_processing_time.record(
elapsed,
elapsed_ms,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "success"),
@@ -893,8 +938,8 @@ impl JobTracker {
.await?;
}
// The job failed
Ok((id, Err(e))) => {
// The job failed. The logging and time measurement is already done in the task
Ok((id, (elapsed, Err(e)))) => {
let context = self
.job_contexts
.remove(&id)
@@ -910,26 +955,11 @@ impl JobTracker {
.mark_as_failed(clock, context.id, &reason)
.await?;
let elapsed = context
.start
.elapsed()
.as_millis()
.try_into()
.unwrap_or(u64::MAX);
let elapsed_ms = elapsed.as_millis().try_into().unwrap_or(u64::MAX);
match e.decision {
JobErrorDecision::Fail => {
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job failed, not retrying"
);
self.job_processing_time.record(
elapsed,
elapsed_ms,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "failed"),
@@ -938,50 +968,31 @@ impl JobTracker {
);
}
JobErrorDecision::Retry if context.attempt < MAX_ATTEMPTS => {
self.job_processing_time.record(
elapsed_ms,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "failed"),
KeyValue::new("job.decision", "retry"),
],
);
let delay = retry_delay(context.attempt);
repo.queue_job()
.retry(&mut *rng, clock, context.id, delay)
.await?;
}
JobErrorDecision::Retry => {
if context.attempt < MAX_ATTEMPTS {
let delay = retry_delay(context.attempt);
tracing::warn!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job failed, will retry in {}s",
delay.num_seconds()
);
self.job_processing_time.record(
elapsed,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "failed"),
KeyValue::new("job.decision", "retry"),
],
);
repo.queue_job()
.retry(&mut *rng, clock, context.id, delay)
.await?;
} else {
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue.name = %context.queue_name,
job.attempt = %context.attempt,
job.elapsed = format!("{elapsed}ms"),
"Job failed too many times, abandonning"
);
self.job_processing_time.record(
elapsed,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "failed"),
KeyValue::new("job.decision", "abandon"),
],
);
}
self.job_processing_time.record(
elapsed_ms,
&[
KeyValue::new("job.queue.name", context.queue_name),
KeyValue::new("job.result", "failed"),
KeyValue::new("job.decision", "abandon"),
],
);
}
}
}
@@ -999,6 +1010,8 @@ impl JobTracker {
&[KeyValue::new("job.queue.name", context.queue_name.clone())],
);
// This measurement is not accurate as it includes the time processing the jobs,
// but it's fine, it's only for panicked tasks
let elapsed = context
.start
.elapsed()
@@ -1013,7 +1026,7 @@ impl JobTracker {
if context.attempt < MAX_ATTEMPTS {
let delay = retry_delay(context.attempt);
tracing::warn!(
tracing::error!(
error = &e as &dyn std::error::Error,
job.id = %context.id,
job.queue.name = %context.queue_name,