Replace Combine throttle with backpressure-aware location send loop.
This commit is contained in:
@@ -21,9 +21,16 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
|||||||
/// Cached joined room proxies keyed by room ID, kept in sync with the active sessions dictionary.
|
/// Cached joined room proxies keyed by room ID, kept in sync with the active sessions dictionary.
|
||||||
private var activeRoomProxies = [String: JoinedRoomProxyProtocol]()
|
private var activeRoomProxies = [String: JoinedRoomProxyProtocol]()
|
||||||
|
|
||||||
/// Subject used to pipe location updates through Combine's throttle operator.
|
/// Subject used to pipe location updates into the backpressure-aware processing loop.
|
||||||
private let locationUpdateSubject = PassthroughSubject<CLLocationCoordinate2D, Never>()
|
private let locationUpdateSubject = PassthroughSubject<CLLocationCoordinate2D, Never>()
|
||||||
|
|
||||||
|
/// The most recent location update waiting to be sent. When a send is already in progress,
|
||||||
|
/// new updates overwrite this value so only the latest is sent once the current send completes.
|
||||||
|
private var latestPendingLocation: CLLocationCoordinate2D?
|
||||||
|
|
||||||
|
/// Whether a location send cycle (send + minimum delay) is currently in progress.
|
||||||
|
private var isProcessingLocationUpdate = false
|
||||||
|
|
||||||
private var cancellables = Set<AnyCancellable>()
|
private var cancellables = Set<AnyCancellable>()
|
||||||
|
|
||||||
private var isUpdatingLocation = false
|
private var isUpdatingLocation = false
|
||||||
@@ -172,13 +179,10 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
|||||||
|
|
||||||
private func setupSubscriptions() {
|
private func setupSubscriptions() {
|
||||||
locationUpdateSubject
|
locationUpdateSubject
|
||||||
.throttle(for: .seconds(3), scheduler: DispatchQueue.main, latest: true)
|
|
||||||
.sink { [weak self] update in
|
.sink { [weak self] update in
|
||||||
guard let self else { return }
|
guard let self else { return }
|
||||||
guard !appSettings.liveLocationSharingTimeoutDatesByRoomID.isEmpty else { return }
|
latestPendingLocation = update
|
||||||
Task { [weak self] in
|
processLocationUpdateIfNeeded()
|
||||||
await self?.sendLocationToActiveRooms(update)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
.store(in: &cancellables)
|
.store(in: &cancellables)
|
||||||
|
|
||||||
@@ -247,6 +251,38 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
|||||||
lastLocation = nil
|
lastLocation = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Kicks off a send cycle if one isn't already running. Each cycle:
|
||||||
|
/// 1. Takes the latest pending location and clears it.
|
||||||
|
/// 2. Sends the location to all active rooms **and** waits a minimum 3-second delay (in parallel).
|
||||||
|
/// 3. After both complete, checks for a new pending location and loops if one exists.
|
||||||
|
/// This ensures at least 3 seconds or the send duration itself between consecutive sends,
|
||||||
|
/// discarding any intermediate updates while always keeping the last one.
|
||||||
|
private func processLocationUpdateIfNeeded() {
|
||||||
|
guard !isProcessingLocationUpdate, let location = latestPendingLocation else { return }
|
||||||
|
guard !appSettings.liveLocationSharingTimeoutDatesByRoomID.isEmpty else { return }
|
||||||
|
|
||||||
|
latestPendingLocation = nil
|
||||||
|
isProcessingLocationUpdate = true
|
||||||
|
|
||||||
|
Task { @MainActor [weak self] in
|
||||||
|
guard let self else { return }
|
||||||
|
|
||||||
|
// Wait for both the send and the minimum throttle interval.
|
||||||
|
// This guarantees at least 3 seconds between sends, plus the full send duration.
|
||||||
|
await withTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask { [weak self] in
|
||||||
|
await self?.sendLocationToActiveRooms(location)
|
||||||
|
}
|
||||||
|
group.addTask {
|
||||||
|
try? await Task.sleep(for: .seconds(3))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
isProcessingLocationUpdate = false
|
||||||
|
processLocationUpdateIfNeeded()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private func sendLocationToActiveRooms(_ coordinate: CLLocationCoordinate2D) async {
|
private func sendLocationToActiveRooms(_ coordinate: CLLocationCoordinate2D) async {
|
||||||
let sessions = appSettings.liveLocationSharingTimeoutDatesByRoomID
|
let sessions = appSettings.liveLocationSharingTimeoutDatesByRoomID
|
||||||
let geoURI = GeoURI(coordinate: coordinate, uncertainty: nil)
|
let geoURI = GeoURI(coordinate: coordinate, uncertainty: nil)
|
||||||
|
|||||||
Reference in New Issue
Block a user