Introduce a way to clear jobs from a deprecated queue

This commit is contained in:
Quentin Gliech
2026-01-09 12:08:20 +01:00
parent bf2ad55b5c
commit ad1910c22e

View File

@@ -120,7 +120,7 @@ where
}
#[async_trait]
pub trait RunnableJob: FromJob + Send + 'static {
pub trait RunnableJob: Send + 'static {
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError>;
/// Allows the job to set a timeout for its execution. Jobs should then look
@@ -190,6 +190,22 @@ fn retry_delay(attempt: usize) -> Duration {
type JobResult = (std::time::Duration, Result<(), JobError>);
type JobFactory = Arc<dyn Fn(JobPayload) -> Box<dyn RunnableJob> + Send + Sync>;
/// This is a fake job we use to consume jobs from deprecated queues
struct DeprecatedJob;
#[async_trait]
impl RunnableJob for DeprecatedJob {
async fn run(&self, _state: &State, context: JobContext) -> Result<(), JobError> {
tracing::warn!(
job.id = %context.id,
job.queue.name = context.queue_name,
"Consumed a job from a deprecated queue, which can happen after version upgrades. This did nothing other than removing the job from the queue."
);
Ok(())
}
}
struct ScheduleDefinition {
schedule_name: &'static str,
expression: Schedule,
@@ -293,7 +309,9 @@ impl QueueWorker {
})
}
pub(crate) fn register_handler<T: RunnableJob + InsertableJob>(&mut self) -> &mut Self {
pub(crate) fn register_handler<T: RunnableJob + InsertableJob + FromJob>(
&mut self,
) -> &mut Self {
// There is a potential panic here, which is fine as it's going to be caught
// within the job task
let factory = |payload: JobPayload| {
@@ -306,6 +324,13 @@ impl QueueWorker {
self
}
/// Register a queue name as deprecated, which will consume leftover jobs
pub(crate) fn register_deprecated_queue(&mut self, queue_name: &'static str) -> &mut Self {
let factory = |_payload: JobPayload| box_runnable_job(DeprecatedJob);
self.tracker.factories.insert(queue_name, Arc::new(factory));
self
}
pub(crate) fn add_schedule<T: InsertableJob>(
&mut self,
schedule_name: &'static str,