diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index a9ba39dfb..376a2db81 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -184,7 +184,7 @@ fn retry_delay(attempt: usize) -> Duration { Duration::milliseconds(2_i64.saturating_pow(attempt) * 5_000) } -type JobResult = Result<(), JobError>; +type JobResult = (std::time::Duration, Result<(), JobError>); type JobFactory = Arc Box + Send + Sync>; struct ScheduleDefinition { @@ -774,7 +774,7 @@ impl JobTracker { fn spawn_job(&mut self, state: State, context: JobContext, payload: JobPayload) { let factory = self.factories.get(context.queue_name.as_str()).cloned(); let task = { - let log_context = LogContext::new(format!("worker-job-{}", context.queue_name)); + let log_context = LogContext::new(format!("job-{}", context.queue_name)); let context = context.clone(); let span = context.span(); log_context @@ -788,7 +788,70 @@ impl JobTracker { job.attempt = %context.attempt, "Running job" ); - job.run(&state, context).await + let result = job.run(&state, context.clone()).await; + + let Some(log_context) = LogContext::current() else { + // This should never happen, but if it does it's fine: we're recovering fine + // from panics in those tasks + panic!("Missing log context, this should never happen"); + }; + + let context_stats = log_context.stats(); + + // We log the result here so that it's attached to the right span & log context + match &result { + Ok(()) => { + tracing::info!( + job.id = %context.id, + job.queue.name = %context.queue_name, + job.attempt = %context.attempt, + "Job completed [{context_stats}]" + ); + } + + Err(JobError { + decision: JobErrorDecision::Fail, + error, + }) => { + tracing::error!( + error = &**error as &dyn std::error::Error, + job.id = %context.id, + job.queue.name = %context.queue_name, + job.attempt = %context.attempt, + "Job failed, not retrying [{context_stats}]" + ); + } + + Err(JobError { + decision: JobErrorDecision::Retry, + error, + }) if context.attempt < MAX_ATTEMPTS => { + let delay = retry_delay(context.attempt); + tracing::warn!( + error = &**error as &dyn std::error::Error, + job.id = %context.id, + job.queue.name = %context.queue_name, + job.attempt = %context.attempt, + "Job failed, will retry in {}s [{context_stats}]", + delay.num_seconds() + ); + } + + Err(JobError { + decision: JobErrorDecision::Retry, + error, + }) => { + tracing::error!( + error = &**error as &dyn std::error::Error, + job.id = %context.id, + job.queue.name = %context.queue_name, + job.attempt = %context.attempt, + "Job failed too many times, abandonning [{context_stats}]" + ); + } + } + + (context_stats.elapsed, result) }) .instrument(span) }; @@ -847,15 +910,10 @@ impl JobTracker { } } - // XXX: the time measurement isn't accurate, as it would include the - // time spent between the task finishing, and us processing the result. - // It's fine for now, as it at least gives us an idea of how many tasks - // we run, and what their status is - while let Some(result) = self.last_join_result.take() { match result { - // The job succeeded - Ok((id, Ok(()))) => { + // The job succeeded. The logging and time measurement is already done in the task + Ok((id, (elapsed, Ok(())))) => { let context = self .job_contexts .remove(&id) @@ -866,22 +924,9 @@ impl JobTracker { &[KeyValue::new("job.queue.name", context.queue_name.clone())], ); - let elapsed = context - .start - .elapsed() - .as_millis() - .try_into() - .unwrap_or(u64::MAX); - tracing::info!( - job.id = %context.id, - job.queue.name = %context.queue_name, - job.attempt = %context.attempt, - job.elapsed = format!("{elapsed}ms"), - "Job completed" - ); - + let elapsed_ms = elapsed.as_millis().try_into().unwrap_or(u64::MAX); self.job_processing_time.record( - elapsed, + elapsed_ms, &[ KeyValue::new("job.queue.name", context.queue_name), KeyValue::new("job.result", "success"), @@ -893,8 +938,8 @@ impl JobTracker { .await?; } - // The job failed - Ok((id, Err(e))) => { + // The job failed. The logging and time measurement is already done in the task + Ok((id, (elapsed, Err(e)))) => { let context = self .job_contexts .remove(&id) @@ -910,26 +955,11 @@ impl JobTracker { .mark_as_failed(clock, context.id, &reason) .await?; - let elapsed = context - .start - .elapsed() - .as_millis() - .try_into() - .unwrap_or(u64::MAX); - + let elapsed_ms = elapsed.as_millis().try_into().unwrap_or(u64::MAX); match e.decision { JobErrorDecision::Fail => { - tracing::error!( - error = &e as &dyn std::error::Error, - job.id = %context.id, - job.queue.name = %context.queue_name, - job.attempt = %context.attempt, - job.elapsed = format!("{elapsed}ms"), - "Job failed, not retrying" - ); - self.job_processing_time.record( - elapsed, + elapsed_ms, &[ KeyValue::new("job.queue.name", context.queue_name), KeyValue::new("job.result", "failed"), @@ -938,50 +968,31 @@ impl JobTracker { ); } + JobErrorDecision::Retry if context.attempt < MAX_ATTEMPTS => { + self.job_processing_time.record( + elapsed_ms, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "failed"), + KeyValue::new("job.decision", "retry"), + ], + ); + + let delay = retry_delay(context.attempt); + repo.queue_job() + .retry(&mut *rng, clock, context.id, delay) + .await?; + } + JobErrorDecision::Retry => { - if context.attempt < MAX_ATTEMPTS { - let delay = retry_delay(context.attempt); - tracing::warn!( - error = &e as &dyn std::error::Error, - job.id = %context.id, - job.queue.name = %context.queue_name, - job.attempt = %context.attempt, - job.elapsed = format!("{elapsed}ms"), - "Job failed, will retry in {}s", - delay.num_seconds() - ); - - self.job_processing_time.record( - elapsed, - &[ - KeyValue::new("job.queue.name", context.queue_name), - KeyValue::new("job.result", "failed"), - KeyValue::new("job.decision", "retry"), - ], - ); - - repo.queue_job() - .retry(&mut *rng, clock, context.id, delay) - .await?; - } else { - tracing::error!( - error = &e as &dyn std::error::Error, - job.id = %context.id, - job.queue.name = %context.queue_name, - job.attempt = %context.attempt, - job.elapsed = format!("{elapsed}ms"), - "Job failed too many times, abandonning" - ); - - self.job_processing_time.record( - elapsed, - &[ - KeyValue::new("job.queue.name", context.queue_name), - KeyValue::new("job.result", "failed"), - KeyValue::new("job.decision", "abandon"), - ], - ); - } + self.job_processing_time.record( + elapsed_ms, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "failed"), + KeyValue::new("job.decision", "abandon"), + ], + ); } } } @@ -999,6 +1010,8 @@ impl JobTracker { &[KeyValue::new("job.queue.name", context.queue_name.clone())], ); + // This measurement is not accurate as it includes the time processing the jobs, + // but it's fine, it's only for panicked tasks let elapsed = context .start .elapsed() @@ -1013,7 +1026,7 @@ impl JobTracker { if context.attempt < MAX_ATTEMPTS { let delay = retry_delay(context.attempt); - tracing::warn!( + tracing::error!( error = &e as &dyn std::error::Error, job.id = %context.id, job.queue.name = %context.queue_name,