Make the worker heartbeat take a worker reference
This commit is contained in:
@@ -79,11 +79,7 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> {
|
||||
),
|
||||
err,
|
||||
)]
|
||||
async fn heartbeat(
|
||||
&mut self,
|
||||
clock: &dyn Clock,
|
||||
worker: Worker,
|
||||
) -> Result<Worker, Self::Error> {
|
||||
async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error> {
|
||||
let now = clock.now();
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
@@ -101,7 +97,7 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> {
|
||||
// If no row was updated, the worker was shutdown so we return an error
|
||||
DatabaseError::ensure_affected_rows(&res, 1)?;
|
||||
|
||||
Ok(worker)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
|
||||
@@ -40,14 +40,11 @@ pub trait QueueWorkerRepository: Send + Sync {
|
||||
|
||||
/// Send a heartbeat for the given worker.
|
||||
///
|
||||
/// Returns the updated worker.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if the underlying repository fails or if the worker was
|
||||
/// shutdown.
|
||||
async fn heartbeat(&mut self, clock: &dyn Clock, worker: Worker)
|
||||
-> Result<Worker, Self::Error>;
|
||||
async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
|
||||
|
||||
/// Mark the given worker as shutdown.
|
||||
///
|
||||
@@ -102,8 +99,8 @@ repository_impl!(QueueWorkerRepository:
|
||||
async fn heartbeat(
|
||||
&mut self,
|
||||
clock: &dyn Clock,
|
||||
worker: Worker,
|
||||
) -> Result<Worker, Self::Error>;
|
||||
worker: &Worker,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
async fn shutdown(
|
||||
&mut self,
|
||||
|
||||
@@ -15,7 +15,7 @@ pub async fn run(state: State) -> Result<(), RepositoryError> {
|
||||
let mut rng = state.rng();
|
||||
let clock = state.clock();
|
||||
|
||||
let mut worker = repo.queue_worker().register(&mut rng, &clock).await?;
|
||||
let worker = repo.queue_worker().register(&mut rng, &clock).await?;
|
||||
span.record("worker.id", tracing::field::display(worker.id));
|
||||
repo.save().await?;
|
||||
|
||||
@@ -44,7 +44,7 @@ pub async fn run(state: State) -> Result<(), RepositoryError> {
|
||||
// on a logged table
|
||||
if now - last_heartbeat >= chrono::Duration::minutes(1) {
|
||||
tracing::info!("Sending heartbeat");
|
||||
worker = repo.queue_worker().heartbeat(&clock, worker).await?;
|
||||
repo.queue_worker().heartbeat(&clock, &worker).await?;
|
||||
last_heartbeat = now;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user