diff --git a/crates/storage-pg/src/queue/worker.rs b/crates/storage-pg/src/queue/worker.rs index 2aaacc64b..5fa784cb1 100644 --- a/crates/storage-pg/src/queue/worker.rs +++ b/crates/storage-pg/src/queue/worker.rs @@ -79,11 +79,7 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { ), err, )] - async fn heartbeat( - &mut self, - clock: &dyn Clock, - worker: Worker, - ) -> Result { + async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error> { let now = clock.now(); let res = sqlx::query!( r#" @@ -101,7 +97,7 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { // If no row was updated, the worker was shutdown so we return an error DatabaseError::ensure_affected_rows(&res, 1)?; - Ok(worker) + Ok(()) } #[tracing::instrument( diff --git a/crates/storage/src/queue/worker.rs b/crates/storage/src/queue/worker.rs index dfb9699e6..19ceead88 100644 --- a/crates/storage/src/queue/worker.rs +++ b/crates/storage/src/queue/worker.rs @@ -40,14 +40,11 @@ pub trait QueueWorkerRepository: Send + Sync { /// Send a heartbeat for the given worker. /// - /// Returns the updated worker. - /// /// # Errors /// /// Returns an error if the underlying repository fails or if the worker was /// shutdown. - async fn heartbeat(&mut self, clock: &dyn Clock, worker: Worker) - -> Result; + async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>; /// Mark the given worker as shutdown. /// @@ -102,8 +99,8 @@ repository_impl!(QueueWorkerRepository: async fn heartbeat( &mut self, clock: &dyn Clock, - worker: Worker, - ) -> Result; + worker: &Worker, + ) -> Result<(), Self::Error>; async fn shutdown( &mut self, diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index eabf17aa6..4a8058b59 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -15,7 +15,7 @@ pub async fn run(state: State) -> Result<(), RepositoryError> { let mut rng = state.rng(); let clock = state.clock(); - let mut worker = repo.queue_worker().register(&mut rng, &clock).await?; + let worker = repo.queue_worker().register(&mut rng, &clock).await?; span.record("worker.id", tracing::field::display(worker.id)); repo.save().await?; @@ -44,7 +44,7 @@ pub async fn run(state: State) -> Result<(), RepositoryError> { // on a logged table if now - last_heartbeat >= chrono::Duration::minutes(1) { tracing::info!("Sending heartbeat"); - worker = repo.queue_worker().heartbeat(&clock, worker).await?; + repo.queue_worker().heartbeat(&clock, &worker).await?; last_heartbeat = now; }