From 005c427c2f2d7e98fa819a57e9be8abea4c09607 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 17 Apr 2025 17:34:17 +0200 Subject: [PATCH] Record the job result from within the job LogContext This means we can log stats about the job when it finishes, and its status will have the right log context attached to it. --- crates/tasks/src/new_queue.rs | 189 ++++++++++++++++++---------------- 1 file changed, 101 insertions(+), 88 deletions(-) diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index a9ba39dfb..376a2db81 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -184,7 +184,7 @@ fn retry_delay(attempt: usize) -> Duration { Duration::milliseconds(2_i64.saturating_pow(attempt) * 5_000) } -type JobResult = Result<(), JobError>; +type JobResult = (std::time::Duration, Result<(), JobError>); type JobFactory = Arc Box + Send + Sync>; struct ScheduleDefinition { @@ -774,7 +774,7 @@ impl JobTracker { fn spawn_job(&mut self, state: State, context: JobContext, payload: JobPayload) { let factory = self.factories.get(context.queue_name.as_str()).cloned(); let task = { - let log_context = LogContext::new(format!("worker-job-{}", context.queue_name)); + let log_context = LogContext::new(format!("job-{}", context.queue_name)); let context = context.clone(); let span = context.span(); log_context @@ -788,7 +788,70 @@ impl JobTracker { job.attempt = %context.attempt, "Running job" ); - job.run(&state, context).await + let result = job.run(&state, context.clone()).await; + + let Some(log_context) = LogContext::current() else { + // This should never happen, but if it does it's fine: we're recovering fine + // from panics in those tasks + panic!("Missing log context, this should never happen"); + }; + + let context_stats = log_context.stats(); + + // We log the result here so that it's attached to the right span & log context + match &result { + Ok(()) => { + tracing::info!( + job.id = %context.id, + job.queue.name = %context.queue_name, + job.attempt = %context.attempt, + "Job completed [{context_stats}]" + ); + } + + Err(JobError { + decision: JobErrorDecision::Fail, + error, + }) => { + tracing::error!( + error = &**error as &dyn std::error::Error, + job.id = %context.id, + job.queue.name = %context.queue_name, + job.attempt = %context.attempt, + "Job failed, not retrying [{context_stats}]" + ); + } + + Err(JobError { + decision: JobErrorDecision::Retry, + error, + }) if context.attempt < MAX_ATTEMPTS => { + let delay = retry_delay(context.attempt); + tracing::warn!( + error = &**error as &dyn std::error::Error, + job.id = %context.id, + job.queue.name = %context.queue_name, + job.attempt = %context.attempt, + "Job failed, will retry in {}s [{context_stats}]", + delay.num_seconds() + ); + } + + Err(JobError { + decision: JobErrorDecision::Retry, + error, + }) => { + tracing::error!( + error = &**error as &dyn std::error::Error, + job.id = %context.id, + job.queue.name = %context.queue_name, + job.attempt = %context.attempt, + "Job failed too many times, abandonning [{context_stats}]" + ); + } + } + + (context_stats.elapsed, result) }) .instrument(span) }; @@ -847,15 +910,10 @@ impl JobTracker { } } - // XXX: the time measurement isn't accurate, as it would include the - // time spent between the task finishing, and us processing the result. - // It's fine for now, as it at least gives us an idea of how many tasks - // we run, and what their status is - while let Some(result) = self.last_join_result.take() { match result { - // The job succeeded - Ok((id, Ok(()))) => { + // The job succeeded. The logging and time measurement is already done in the task + Ok((id, (elapsed, Ok(())))) => { let context = self .job_contexts .remove(&id) @@ -866,22 +924,9 @@ impl JobTracker { &[KeyValue::new("job.queue.name", context.queue_name.clone())], ); - let elapsed = context - .start - .elapsed() - .as_millis() - .try_into() - .unwrap_or(u64::MAX); - tracing::info!( - job.id = %context.id, - job.queue.name = %context.queue_name, - job.attempt = %context.attempt, - job.elapsed = format!("{elapsed}ms"), - "Job completed" - ); - + let elapsed_ms = elapsed.as_millis().try_into().unwrap_or(u64::MAX); self.job_processing_time.record( - elapsed, + elapsed_ms, &[ KeyValue::new("job.queue.name", context.queue_name), KeyValue::new("job.result", "success"), @@ -893,8 +938,8 @@ impl JobTracker { .await?; } - // The job failed - Ok((id, Err(e))) => { + // The job failed. The logging and time measurement is already done in the task + Ok((id, (elapsed, Err(e)))) => { let context = self .job_contexts .remove(&id) @@ -910,26 +955,11 @@ impl JobTracker { .mark_as_failed(clock, context.id, &reason) .await?; - let elapsed = context - .start - .elapsed() - .as_millis() - .try_into() - .unwrap_or(u64::MAX); - + let elapsed_ms = elapsed.as_millis().try_into().unwrap_or(u64::MAX); match e.decision { JobErrorDecision::Fail => { - tracing::error!( - error = &e as &dyn std::error::Error, - job.id = %context.id, - job.queue.name = %context.queue_name, - job.attempt = %context.attempt, - job.elapsed = format!("{elapsed}ms"), - "Job failed, not retrying" - ); - self.job_processing_time.record( - elapsed, + elapsed_ms, &[ KeyValue::new("job.queue.name", context.queue_name), KeyValue::new("job.result", "failed"), @@ -938,50 +968,31 @@ impl JobTracker { ); } + JobErrorDecision::Retry if context.attempt < MAX_ATTEMPTS => { + self.job_processing_time.record( + elapsed_ms, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "failed"), + KeyValue::new("job.decision", "retry"), + ], + ); + + let delay = retry_delay(context.attempt); + repo.queue_job() + .retry(&mut *rng, clock, context.id, delay) + .await?; + } + JobErrorDecision::Retry => { - if context.attempt < MAX_ATTEMPTS { - let delay = retry_delay(context.attempt); - tracing::warn!( - error = &e as &dyn std::error::Error, - job.id = %context.id, - job.queue.name = %context.queue_name, - job.attempt = %context.attempt, - job.elapsed = format!("{elapsed}ms"), - "Job failed, will retry in {}s", - delay.num_seconds() - ); - - self.job_processing_time.record( - elapsed, - &[ - KeyValue::new("job.queue.name", context.queue_name), - KeyValue::new("job.result", "failed"), - KeyValue::new("job.decision", "retry"), - ], - ); - - repo.queue_job() - .retry(&mut *rng, clock, context.id, delay) - .await?; - } else { - tracing::error!( - error = &e as &dyn std::error::Error, - job.id = %context.id, - job.queue.name = %context.queue_name, - job.attempt = %context.attempt, - job.elapsed = format!("{elapsed}ms"), - "Job failed too many times, abandonning" - ); - - self.job_processing_time.record( - elapsed, - &[ - KeyValue::new("job.queue.name", context.queue_name), - KeyValue::new("job.result", "failed"), - KeyValue::new("job.decision", "abandon"), - ], - ); - } + self.job_processing_time.record( + elapsed_ms, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "failed"), + KeyValue::new("job.decision", "abandon"), + ], + ); } } } @@ -999,6 +1010,8 @@ impl JobTracker { &[KeyValue::new("job.queue.name", context.queue_name.clone())], ); + // This measurement is not accurate as it includes the time processing the jobs, + // but it's fine, it's only for panicked tasks let elapsed = context .start .elapsed() @@ -1013,7 +1026,7 @@ impl JobTracker { if context.attempt < MAX_ATTEMPTS { let delay = retry_delay(context.attempt); - tracing::warn!( + tracing::error!( error = &e as &dyn std::error::Error, job.id = %context.id, job.queue.name = %context.queue_name,