Live Location Sharing - handle server echoes (#5514)
* Track active live location sessions by ID instead of timeout. # Conflicts: # ElementX/Sources/Services/Location/LiveLocationManager.swift * implemented a system to promote starting session to active sesessions to send locations at the right time, and a system to remove a local session if it's handled by an external device. * pr suggestions --------- Co-authored-by: Doug <douglase@element.io>
This commit is contained in:
@@ -352,8 +352,8 @@ final class AppSettings {
|
||||
|
||||
// MARK: - Live Location
|
||||
|
||||
@UserPreference(key: UserDefaultsKeys.liveLocationSharingTimeoutDatesByRoomID, defaultValue: [String: Date](), storageType: .userDefaults(store))
|
||||
var liveLocationSharingTimeoutDatesByRoomID
|
||||
@UserPreference(key: UserDefaultsKeys.liveLocationSharingTimeoutDatesByRoomID, defaultValue: [String: LiveLocationSession](), storageType: .userDefaults(store))
|
||||
var liveLocationSharingSessionsByRoomID
|
||||
|
||||
@UserPreference(key: UserDefaultsKeys.liveLocationMinimumDistanceUpdate, defaultValue: 10, storageType: .userDefaults(store))
|
||||
var liveLocationMinimumDistanceUpdate
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
//
|
||||
// Copyright 2026 Element Creations Ltd.
|
||||
//
|
||||
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
|
||||
// Please see LICENSE files in the repository root for full details.
|
||||
//
|
||||
|
||||
import Foundation
|
||||
|
||||
struct LiveLocationSession: Codable, Equatable {
|
||||
let eventID: String
|
||||
let expirationDate: Date
|
||||
}
|
||||
@@ -159,6 +159,8 @@ extension ClientProxyMock {
|
||||
|
||||
underlyingTimelineMediaVisibilityPublisher = CurrentValueSubject<TimelineMediaVisibility, Never>(configuration.timelineMediaVisibility).asCurrentValuePublisher()
|
||||
underlyingHideInviteAvatarsPublisher = CurrentValueSubject<Bool, Never>(configuration.hideInviteAvatars).asCurrentValuePublisher()
|
||||
|
||||
liveLocationOwnInfoUpdatesPublisher = PassthroughSubject<LiveLocationOwnInfoUpdate, Never>().eraseToAnyPublisher()
|
||||
|
||||
underlyingMaxMediaUploadSize = .success(configuration.maxMediaUploadSize)
|
||||
|
||||
|
||||
@@ -2854,6 +2854,11 @@ class ClientProxyMock: ClientProxyProtocol, @unchecked Sendable {
|
||||
}
|
||||
var underlyingMaxMediaUploadSize: Result<UInt, ClientProxyError>!
|
||||
var maxMediaUploadSizeClosure: (() async -> Result<UInt, ClientProxyError>)?
|
||||
var liveLocationOwnInfoUpdatesPublisher: AnyPublisher<LiveLocationOwnInfoUpdate, Never> {
|
||||
get { return underlyingLiveLocationOwnInfoUpdatesPublisher }
|
||||
set(value) { underlyingLiveLocationOwnInfoUpdatesPublisher = value }
|
||||
}
|
||||
var underlyingLiveLocationOwnInfoUpdatesPublisher: AnyPublisher<LiveLocationOwnInfoUpdate, Never>!
|
||||
|
||||
//MARK: - isOnlyDeviceLeft
|
||||
|
||||
@@ -10766,13 +10771,13 @@ class JoinedRoomProxyMock: JoinedRoomProxyProtocol, @unchecked Sendable {
|
||||
var startLiveLocationShareDurationReceivedDuration: Duration?
|
||||
var startLiveLocationShareDurationReceivedInvocations: [Duration] = []
|
||||
|
||||
var startLiveLocationShareDurationUnderlyingReturnValue: Result<Void, RoomProxyError>!
|
||||
var startLiveLocationShareDurationReturnValue: Result<Void, RoomProxyError>! {
|
||||
var startLiveLocationShareDurationUnderlyingReturnValue: Result<String, RoomProxyError>!
|
||||
var startLiveLocationShareDurationReturnValue: Result<String, RoomProxyError>! {
|
||||
get {
|
||||
if Thread.isMainThread {
|
||||
return startLiveLocationShareDurationUnderlyingReturnValue
|
||||
} else {
|
||||
var returnValue: Result<Void, RoomProxyError>? = nil
|
||||
var returnValue: Result<String, RoomProxyError>? = nil
|
||||
DispatchQueue.main.sync {
|
||||
returnValue = startLiveLocationShareDurationUnderlyingReturnValue
|
||||
}
|
||||
@@ -10790,9 +10795,9 @@ class JoinedRoomProxyMock: JoinedRoomProxyProtocol, @unchecked Sendable {
|
||||
}
|
||||
}
|
||||
}
|
||||
var startLiveLocationShareDurationClosure: ((Duration) async -> Result<Void, RoomProxyError>)?
|
||||
var startLiveLocationShareDurationClosure: ((Duration) async -> Result<String, RoomProxyError>)?
|
||||
|
||||
func startLiveLocationShare(duration: Duration) async -> Result<Void, RoomProxyError> {
|
||||
func startLiveLocationShare(duration: Duration) async -> Result<String, RoomProxyError> {
|
||||
startLiveLocationShareDurationCallsCount += 1
|
||||
startLiveLocationShareDurationReceivedDuration = duration
|
||||
DispatchQueue.main.async {
|
||||
|
||||
@@ -203,6 +203,12 @@ extension SDKListener: LiveLocationsListener where T == [LiveLocationShareUpdate
|
||||
}
|
||||
}
|
||||
|
||||
extension SDKListener: BeaconInfoListener where T == BeaconInfoUpdate {
|
||||
func onUpdate(update: BeaconInfoUpdate) {
|
||||
onUpdateClosure(update)
|
||||
}
|
||||
}
|
||||
|
||||
extension SDKListener: ThreadListEntriesListener where T == [ThreadListUpdate] {
|
||||
func onUpdate(diff: [ThreadListUpdate]) {
|
||||
onUpdateClosure(diff)
|
||||
|
||||
@@ -171,11 +171,11 @@ class RoomScreenViewModel: RoomScreenViewModelType, RoomScreenViewModelProtocol
|
||||
.weakAssign(to: \.state.isKnockingEnabled, on: self)
|
||||
.store(in: &cancellables)
|
||||
|
||||
appSettings.$liveLocationSharingTimeoutDatesByRoomID
|
||||
appSettings.$liveLocationSharingSessionsByRoomID
|
||||
.receive(on: DispatchQueue.main)
|
||||
.sink { [weak self] timeoutDatesByRoomID in
|
||||
.sink { [weak self] sessionsByRoomID in
|
||||
guard let self else { return }
|
||||
state.isSharingLiveLocation = timeoutDatesByRoomID.keys.contains(roomProxy.id)
|
||||
state.isSharingLiveLocation = sessionsByRoomID.keys.contains(roomProxy.id)
|
||||
}
|
||||
.store(in: &cancellables)
|
||||
|
||||
|
||||
@@ -46,6 +46,9 @@ class ClientProxy: ClientProxyProtocol {
|
||||
|
||||
// periphery:ignore - required for instance retention in the rust codebase
|
||||
private var mediaPreviewConfigListenerTaskHandle: TaskHandle?
|
||||
|
||||
// periphery:ignore - required for instance retention in the rust codebase
|
||||
private var liveLocationOwnInfoUpdatesListenerTaskHandle: TaskHandle?
|
||||
|
||||
private var delegateHandle: TaskHandle?
|
||||
|
||||
@@ -188,7 +191,12 @@ class ClientProxy: ClientProxyProtocol {
|
||||
}
|
||||
|
||||
var roomsToAwait: Set<String> = []
|
||||
|
||||
|
||||
private let liveLocationOwnInfoUpdatesSubject = PassthroughSubject<LiveLocationOwnInfoUpdate, Never>()
|
||||
var liveLocationOwnInfoUpdatesPublisher: AnyPublisher<LiveLocationOwnInfoUpdate, Never> {
|
||||
liveLocationOwnInfoUpdatesSubject.eraseToAnyPublisher()
|
||||
}
|
||||
|
||||
private let sendQueueStatusSubject = CurrentValueSubject<Bool, Never>(false)
|
||||
|
||||
init(client: ClientProtocol,
|
||||
@@ -270,6 +278,8 @@ class ClientProxy: ClientProxyProtocol {
|
||||
Task {
|
||||
mediaPreviewConfigListenerTaskHandle = await createMediaPreviewConfigObserver()
|
||||
}
|
||||
|
||||
liveLocationOwnInfoUpdatesListenerTaskHandle = createLiveLocationOwnInfoUpdatesObserver()
|
||||
}
|
||||
|
||||
var userID: String {
|
||||
@@ -1136,6 +1146,21 @@ class ClientProxy: ClientProxyProtocol {
|
||||
}
|
||||
}
|
||||
|
||||
private func createLiveLocationOwnInfoUpdatesObserver() -> TaskHandle? {
|
||||
do {
|
||||
return try client.subscribeToOwnBeaconInfoUpdates(listener: SDKListener { [weak self] update in
|
||||
guard let self else { return }
|
||||
let appUpdate = LiveLocationOwnInfoUpdate(roomID: update.roomId,
|
||||
eventID: update.eventId,
|
||||
isLive: update.live)
|
||||
liveLocationOwnInfoUpdatesSubject.send(appUpdate)
|
||||
})
|
||||
} catch {
|
||||
MXLog.error("Failed creating own beacon info updates observer: \(error)")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
private func createRoomListServiceObserver(_ roomListService: RoomListService) -> TaskHandle {
|
||||
roomListService.state(listener: SDKListener { [weak self] state in
|
||||
guard let self else { return }
|
||||
|
||||
@@ -90,6 +90,16 @@ enum TimelineMediaVisibility: Decodable {
|
||||
case never
|
||||
}
|
||||
|
||||
/// Represents a server-echoed update about the current user's own beacon info state in a room.
|
||||
struct LiveLocationOwnInfoUpdate: Equatable {
|
||||
/// The room where the beacon info event was sent.
|
||||
let roomID: String
|
||||
/// The event ID of the beacon info state event.
|
||||
let eventID: String
|
||||
/// Whether the beacon is currently active (live) or has been stopped.
|
||||
let isLive: Bool
|
||||
}
|
||||
|
||||
// sourcery: AutoMockable
|
||||
protocol ClientProxyProtocol: AnyObject {
|
||||
var actionsPublisher: AnyPublisher<ClientProxyAction, Never> { get }
|
||||
@@ -264,6 +274,11 @@ protocol ClientProxyProtocol: AnyObject {
|
||||
|
||||
func userIdentity(for userID: String, fallBackToServer: Bool) async -> Result<UserIdentityProxyProtocol?, ClientProxyError>
|
||||
|
||||
// MARK: - Live Location
|
||||
|
||||
/// Publishes updates about the current user's own live location beacon info state changes (start/stop) as echoed by the server.
|
||||
var liveLocationOwnInfoUpdatesPublisher: AnyPublisher<LiveLocationOwnInfoUpdate, Never> { get }
|
||||
|
||||
// MARK: - Moderation & Safety
|
||||
|
||||
func setTimelineMediaVisibility(_ value: TimelineMediaVisibility) async -> Result<Void, ClientProxyError>
|
||||
|
||||
@@ -21,16 +21,20 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
/// Cached joined room proxies keyed by room ID, kept in sync with the active sessions dictionary.
|
||||
private var activeRoomProxies = [String: JoinedRoomProxyProtocol]()
|
||||
|
||||
/// Sessions that have been requested but not yet confirmed by the server echo.
|
||||
/// Once the server acknowledges the beacon info, sessions are promoted to the persistent store.
|
||||
private var startingLiveLocationSharingSessionsByRoomID = [String: LiveLocationSession]()
|
||||
|
||||
/// Subject used to pipe location updates into the backpressure-aware processing loop.
|
||||
private let locationUpdateSubject = PassthroughSubject<CLLocationCoordinate2D, Never>()
|
||||
|
||||
|
||||
/// The most recent location update waiting to be sent. When a send is already in progress,
|
||||
/// new updates overwrite this value so only the latest is sent once the current send completes.
|
||||
private var latestPendingLocation: CLLocationCoordinate2D?
|
||||
|
||||
|
||||
/// Whether a location send cycle (send + minimum delay) is currently in progress.
|
||||
private var isProcessingLocationUpdate = false
|
||||
|
||||
|
||||
private var cancellables = Set<AnyCancellable>()
|
||||
|
||||
private var isUpdatingLocation = false
|
||||
@@ -64,7 +68,7 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
setupMinimumDistanceUpdatesAndAccuracy(minimumDistance: appSettings.liveLocationMinimumDistanceUpdate)
|
||||
setupSubscriptions()
|
||||
}
|
||||
|
||||
|
||||
// MARK: - LiveLocationManagerProtocol
|
||||
|
||||
var hasDisplayedLiveLocationDisclaimer: Bool {
|
||||
@@ -87,7 +91,8 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
func startLiveLocation(roomID: String, duration: Duration) async -> Result<Void, LiveLocationManagerError> {
|
||||
// Stop any existing session for this room first
|
||||
var didAlreadyStopLocalSession = false
|
||||
if appSettings.liveLocationSharingTimeoutDatesByRoomID[roomID] != nil {
|
||||
if appSettings.liveLocationSharingSessionsByRoomID[roomID] != nil
|
||||
|| startingLiveLocationSharingSessionsByRoomID[roomID] != nil {
|
||||
await stopLiveLocation(roomID: roomID)
|
||||
didAlreadyStopLocalSession = true
|
||||
}
|
||||
@@ -104,20 +109,13 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
}
|
||||
let result = await roomProxy.startLiveLocationShare(duration: duration)
|
||||
|
||||
guard case .success = result else {
|
||||
guard case .success(let eventID) = result else {
|
||||
MXLog.error("Failed to start live location share in room: \(roomID)")
|
||||
return .failure(.startFailed)
|
||||
}
|
||||
|
||||
let timeoutDate = Date().addingTimeInterval(TimeInterval(duration.seconds))
|
||||
appSettings.liveLocationSharingTimeoutDatesByRoomID[roomID] = timeoutDate
|
||||
|
||||
if isUpdatingLocation, let lastLocation {
|
||||
// To make sure the newly started session is in sync with the existing ones,
|
||||
// we re-send the last location received by the manager.
|
||||
// Otherwise we would need to wait a distance filtered update.
|
||||
locationUpdateSubject.send(lastLocation)
|
||||
}
|
||||
let expirationDate = Date().addingTimeInterval(TimeInterval(duration.seconds))
|
||||
startingLiveLocationSharingSessionsByRoomID[roomID] = LiveLocationSession(eventID: eventID, expirationDate: expirationDate)
|
||||
|
||||
return .success(())
|
||||
}
|
||||
@@ -125,7 +123,8 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
func stopLiveLocation(roomID: String) async {
|
||||
var roomProxy: JoinedRoomProxyProtocol?
|
||||
let cachedRoomProxy = activeRoomProxies[roomID]
|
||||
appSettings.liveLocationSharingTimeoutDatesByRoomID.removeValue(forKey: roomID)
|
||||
startingLiveLocationSharingSessionsByRoomID.removeValue(forKey: roomID)
|
||||
appSettings.liveLocationSharingSessionsByRoomID.removeValue(forKey: roomID)
|
||||
|
||||
if let cachedRoomProxy {
|
||||
roomProxy = cachedRoomProxy
|
||||
@@ -186,12 +185,20 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
}
|
||||
.store(in: &cancellables)
|
||||
|
||||
appSettings.$liveLocationSharingTimeoutDatesByRoomID
|
||||
clientProxy.liveLocationOwnInfoUpdatesPublisher
|
||||
.receive(on: DispatchQueue.main)
|
||||
.sink { [weak self] update in
|
||||
guard let self else { return }
|
||||
handleBeaconInfoUpdate(update)
|
||||
}
|
||||
.store(in: &cancellables)
|
||||
|
||||
appSettings.$liveLocationSharingSessionsByRoomID
|
||||
.removeDuplicates()
|
||||
.sink { [weak self] sessions in
|
||||
guard let self else { return }
|
||||
syncActiveRoomProxies(with: sessions)
|
||||
|
||||
|
||||
if sessions.isEmpty {
|
||||
self.stopUpdatingLocation()
|
||||
} else {
|
||||
@@ -209,7 +216,32 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
.store(in: &cancellables)
|
||||
}
|
||||
|
||||
private func syncActiveRoomProxies(with sessions: [String: Date]) {
|
||||
private func handleBeaconInfoUpdate(_ update: LiveLocationOwnInfoUpdate) {
|
||||
// A new beaconInfo has been received in a room with existing active session.
|
||||
// This is either a new start or a new stop from a different device, so we
|
||||
// should remove the session from the current local one.
|
||||
appSettings.liveLocationSharingSessionsByRoomID.removeValue(forKey: update.roomID)
|
||||
|
||||
// Instead if we receive a new isLiveUpdate
|
||||
guard update.isLive else { return }
|
||||
|
||||
// That belongs to a session that is starting in a room and matches the eventID
|
||||
guard let session = startingLiveLocationSharingSessionsByRoomID[update.roomID],
|
||||
session.eventID == update.eventID else {
|
||||
return
|
||||
}
|
||||
|
||||
// This means the server has echoed the start of the session and we can safely promote it
|
||||
// to a started session and start sending live locations.
|
||||
startingLiveLocationSharingSessionsByRoomID.removeValue(forKey: update.roomID)
|
||||
appSettings.liveLocationSharingSessionsByRoomID[update.roomID] = session
|
||||
|
||||
if isUpdatingLocation, let lastLocation {
|
||||
locationUpdateSubject.send(lastLocation)
|
||||
}
|
||||
}
|
||||
|
||||
private func syncActiveRoomProxies(with sessions: [String: LiveLocationSession]) {
|
||||
// Remove proxies for rooms no longer in the dictionary.
|
||||
let activeRoomIDs = Set(sessions.keys)
|
||||
for roomID in activeRoomProxies.keys where !activeRoomIDs.contains(roomID) {
|
||||
@@ -259,11 +291,11 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
/// discarding any intermediate updates while always keeping the last one.
|
||||
private func processLocationUpdateIfNeeded() {
|
||||
guard !isProcessingLocationUpdate, let location = latestPendingLocation else { return }
|
||||
guard !appSettings.liveLocationSharingTimeoutDatesByRoomID.isEmpty else { return }
|
||||
|
||||
guard !appSettings.liveLocationSharingSessionsByRoomID.isEmpty else { return }
|
||||
|
||||
latestPendingLocation = nil
|
||||
isProcessingLocationUpdate = true
|
||||
|
||||
|
||||
Task { @MainActor [weak self] in
|
||||
guard let self else { return }
|
||||
|
||||
@@ -282,13 +314,13 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
processLocationUpdateIfNeeded()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private func sendLocationToActiveRooms(_ coordinate: CLLocationCoordinate2D) async {
|
||||
let sessions = appSettings.liveLocationSharingTimeoutDatesByRoomID
|
||||
let sessions = appSettings.liveLocationSharingSessionsByRoomID
|
||||
let geoURI = GeoURI(coordinate: coordinate, uncertainty: nil)
|
||||
|
||||
for (roomID, timeoutDate) in sessions {
|
||||
if Date() >= timeoutDate {
|
||||
for (roomID, session) in sessions {
|
||||
if Date() >= session.expirationDate {
|
||||
MXLog.info("Live location session expired for room: \(roomID)")
|
||||
await stopLiveLocation(roomID: roomID)
|
||||
continue
|
||||
@@ -306,7 +338,7 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
case .failure(let error):
|
||||
switch error {
|
||||
case .liveLocationSessionIsNotActive:
|
||||
MXLog.error("Failed to send live locatio update to room \(roomID): session not active")
|
||||
MXLog.error("Failed to send live location update to room \(roomID): session not active")
|
||||
await stopLiveLocation(roomID: roomID)
|
||||
default:
|
||||
MXLog.error("Failed to send live location update to room \(roomID): \(error)")
|
||||
@@ -329,7 +361,8 @@ class LiveLocationManager: NSObject, LiveLocationManagerProtocol, CLLocationMana
|
||||
}
|
||||
|
||||
private func stopAllSessions() {
|
||||
let roomIDs = Array(appSettings.liveLocationSharingTimeoutDatesByRoomID.keys)
|
||||
let roomIDs = Array(Set(appSettings.liveLocationSharingSessionsByRoomID.keys)
|
||||
.union(startingLiveLocationSharingSessionsByRoomID.keys))
|
||||
Task { [weak self] in
|
||||
guard let self else { return }
|
||||
for roomID in roomIDs {
|
||||
|
||||
@@ -758,10 +758,10 @@ class JoinedRoomProxy: JoinedRoomProxyProtocol {
|
||||
await RoomLiveLocationService(liveLocationsObserver: room.liveLocationsObserver())
|
||||
}
|
||||
|
||||
func startLiveLocationShare(duration: Duration) async -> Result<Void, RoomProxyError> {
|
||||
func startLiveLocationShare(duration: Duration) async -> Result<String, RoomProxyError> {
|
||||
do {
|
||||
try await room.startLiveLocationShare(durationMillis: UInt64(duration.seconds * 1000))
|
||||
return .success(())
|
||||
let eventID = try await room.startLiveLocationShare(durationMillis: UInt64(duration.seconds * 1000))
|
||||
return .success(eventID)
|
||||
} catch {
|
||||
MXLog.error("Failed starting live location share with error: \(error)")
|
||||
return .failure(.sdkError(error))
|
||||
|
||||
@@ -200,7 +200,7 @@ protocol JoinedRoomProxyProtocol: RoomProxyProtocol {
|
||||
|
||||
func makeLiveLocationService() async -> RoomLiveLocationServiceProtocol
|
||||
|
||||
func startLiveLocationShare(duration: Duration) async -> Result<Void, RoomProxyError>
|
||||
func startLiveLocationShare(duration: Duration) async -> Result<String, RoomProxyError>
|
||||
func sendLiveLocation(geoURI: GeoURI) async -> Result<Void, RoomProxyError>
|
||||
func stopLiveLocationShare() async -> Result<Void, RoomProxyError>
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user