matrix_sdk/
sync.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The SDK's representation of the result of a `/sync` request.
16
17use std::{
18    collections::{btree_map, BTreeMap},
19    fmt,
20    time::Duration,
21};
22
23pub use matrix_sdk_base::sync::*;
24use matrix_sdk_base::{
25    debug::{
26        DebugInvitedRoom, DebugKnockedRoom, DebugListOfProcessedToDeviceEvents,
27        DebugListOfRawEventsNoId,
28    },
29    sleep::sleep,
30    sync::SyncResponse as BaseSyncResponse,
31};
32use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
33use ruma::{
34    api::client::sync::sync_events::{
35        self,
36        v3::{InvitedRoom, KnockedRoom},
37    },
38    events::{presence::PresenceEvent, AnyGlobalAccountDataEvent},
39    serde::Raw,
40    time::Instant,
41    OwnedRoomId, RoomId,
42};
43use tracing::{debug, error, warn};
44
45use crate::{event_handler::HandlerKind, Client, Result, Room};
46
47/// The processed response of a `/sync` request.
48#[derive(Clone, Default)]
49pub struct SyncResponse {
50    /// The batch token to supply in the `since` param of the next `/sync`
51    /// request.
52    pub next_batch: String,
53    /// Updates to rooms.
54    pub rooms: RoomUpdates,
55    /// Updates to the presence status of other users.
56    pub presence: Vec<Raw<PresenceEvent>>,
57    /// The global private data created by this user.
58    pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
59    /// Messages sent directly between devices.
60    pub to_device: Vec<ProcessedToDeviceEvent>,
61    /// New notifications per room.
62    pub notifications: BTreeMap<OwnedRoomId, Vec<Notification>>,
63}
64
65impl SyncResponse {
66    pub(crate) fn new(next_batch: String, base_response: BaseSyncResponse) -> Self {
67        let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } =
68            base_response;
69
70        Self { next_batch, rooms, presence, account_data, to_device, notifications }
71    }
72}
73
74#[cfg(not(tarpaulin_include))]
75impl fmt::Debug for SyncResponse {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        f.debug_struct("SyncResponse")
78            .field("next_batch", &self.next_batch)
79            .field("rooms", &self.rooms)
80            .field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
81            .field("to_device", &DebugListOfProcessedToDeviceEvents(&self.to_device))
82            .field("notifications", &self.notifications)
83            .finish_non_exhaustive()
84    }
85}
86
87/// A batch of updates to a room.
88#[derive(Clone)]
89pub enum RoomUpdate {
90    /// Updates to a room the user is no longer in.
91    Left {
92        /// Room object with general information on the room.
93        room: Room,
94        /// Updates to the room.
95        updates: LeftRoomUpdate,
96    },
97    /// Updates to a room the user is currently in.
98    Joined {
99        /// Room object with general information on the room.
100        room: Room,
101        /// Updates to the room.
102        updates: JoinedRoomUpdate,
103    },
104    /// Updates to a room the user is invited to.
105    Invited {
106        /// Room object with general information on the room.
107        room: Room,
108        /// Updates to the room.
109        updates: InvitedRoom,
110    },
111    /// Updates to a room the user knocked on.
112    Knocked {
113        /// Room object with general information on the room.
114        room: Room,
115        /// Updates to the room.
116        updates: KnockedRoom,
117    },
118}
119
120#[cfg(not(tarpaulin_include))]
121impl fmt::Debug for RoomUpdate {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        match self {
124            Self::Left { room, updates } => {
125                f.debug_struct("Left").field("room", room).field("updates", updates).finish()
126            }
127            Self::Joined { room, updates } => {
128                f.debug_struct("Joined").field("room", room).field("updates", updates).finish()
129            }
130            Self::Invited { room, updates } => f
131                .debug_struct("Invited")
132                .field("room", room)
133                .field("updates", &DebugInvitedRoom(updates))
134                .finish(),
135            Self::Knocked { room, updates } => f
136                .debug_struct("Knocked")
137                .field("room", room)
138                .field("updates", &DebugKnockedRoom(updates))
139                .finish(),
140        }
141    }
142}
143
144/// Internal functionality related to getting events from the server
145/// (`sync_events` endpoint)
146impl Client {
147    /// Receive a sync response, compute extra information out of it and store
148    /// the interesting bits in the database, then call all the handlers.
149    pub(crate) async fn process_sync(
150        &self,
151        response: sync_events::v3::Response,
152    ) -> Result<BaseSyncResponse> {
153        let response = Box::pin(self.base_client().receive_sync_response(response)).await?;
154
155        // Some new keys might have been received, so trigger a backup if needed.
156        #[cfg(feature = "e2e-encryption")]
157        self.encryption().backups().maybe_trigger_backup();
158
159        self.call_sync_response_handlers(&response).await?;
160
161        Ok(response)
162    }
163
164    /// Calls event handlers and notification handlers after a sync response has
165    /// been processed.
166    ///
167    /// At this point, the sync response's data has been taken into account and
168    /// persisted in the store, if needs be. This function is only calling
169    /// the event, room update and notification handlers.
170    #[tracing::instrument(skip(self, response))]
171    pub(crate) async fn call_sync_response_handlers(
172        &self,
173        response: &BaseSyncResponse,
174    ) -> Result<()> {
175        let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } = response;
176
177        let now = Instant::now();
178        self.handle_sync_events(HandlerKind::GlobalAccountData, None, account_data).await?;
179        self.handle_sync_events(HandlerKind::Presence, None, presence).await?;
180        self.handle_sync_to_device_events(to_device).await?;
181
182        // Ignore errors when there are no receivers.
183        let _ = self.inner.room_updates_sender.send(rooms.clone());
184
185        for (room_id, room_info) in &rooms.joined {
186            let Some(room) = self.get_room(room_id) else {
187                error!(?room_id, "Can't call event handler, room not found");
188                continue;
189            };
190
191            self.send_room_update(room_id, || RoomUpdate::Joined {
192                room: room.clone(),
193                updates: room_info.clone(),
194            });
195
196            let JoinedRoomUpdate {
197                unread_notifications: _,
198                timeline,
199                state,
200                account_data,
201                ephemeral,
202                ambiguity_changes: _,
203            } = room_info;
204
205            let room = Some(&room);
206            self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
207            self.handle_sync_state_events(room, state).await?;
208            self.handle_sync_timeline_events(room, &timeline.events).await?;
209            // Handle ephemeral events after timeline, read receipts in here
210            // could refer to timeline events from the same response.
211            self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?;
212        }
213
214        for (room_id, room_info) in &rooms.left {
215            let Some(room) = self.get_room(room_id) else {
216                error!(?room_id, "Can't call event handler, room not found");
217                continue;
218            };
219
220            self.send_room_update(room_id, || RoomUpdate::Left {
221                room: room.clone(),
222                updates: room_info.clone(),
223            });
224
225            let LeftRoomUpdate { timeline, state, account_data, ambiguity_changes: _ } = room_info;
226
227            let room = Some(&room);
228            self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
229            self.handle_sync_state_events(room, state).await?;
230            self.handle_sync_timeline_events(room, &timeline.events).await?;
231        }
232
233        for (room_id, room_info) in &rooms.invited {
234            let Some(room) = self.get_room(room_id) else {
235                error!(?room_id, "Can't call event handler, room not found");
236                continue;
237            };
238
239            self.send_room_update(room_id, || RoomUpdate::Invited {
240                room: room.clone(),
241                updates: room_info.clone(),
242            });
243
244            let invite_state = &room_info.invite_state.events;
245            self.handle_sync_events(HandlerKind::StrippedState, Some(&room), invite_state).await?;
246        }
247
248        for (room_id, room_info) in &rooms.knocked {
249            let Some(room) = self.get_room(room_id) else {
250                error!(?room_id, "Can't call event handler, room not found");
251                continue;
252            };
253
254            self.send_room_update(room_id, || RoomUpdate::Knocked {
255                room: room.clone(),
256                updates: room_info.clone(),
257            });
258
259            let knock_state = &room_info.knock_state.events;
260            self.handle_sync_events(HandlerKind::StrippedState, Some(&room), knock_state).await?;
261        }
262
263        debug!("Ran event handlers in {:?}", now.elapsed());
264
265        let now = Instant::now();
266
267        // Construct notification event handler futures
268        let mut futures = Vec::new();
269        for handler in &*self.notification_handlers().await {
270            for (room_id, room_notifications) in notifications {
271                let Some(room) = self.get_room(room_id) else {
272                    warn!(?room_id, "Can't call notification handler, room not found");
273                    continue;
274                };
275
276                futures.extend(room_notifications.iter().map(|notification| {
277                    (handler)(notification.clone(), room.clone(), self.clone())
278                }));
279            }
280        }
281
282        // Run the notification handler futures with the
283        // `self.notification_handlers` lock no longer being held, in order.
284        for fut in futures {
285            fut.await;
286        }
287
288        debug!("Ran notification handlers in {:?}", now.elapsed());
289
290        Ok(())
291    }
292
293    fn send_room_update(&self, room_id: &RoomId, make_msg: impl FnOnce() -> RoomUpdate) {
294        if let btree_map::Entry::Occupied(entry) =
295            self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned())
296        {
297            let tx = entry.get();
298            if tx.receiver_count() == 0 {
299                entry.remove();
300            } else {
301                _ = tx.send(make_msg());
302            }
303        }
304    }
305
306    async fn sleep() {
307        sleep(Duration::from_secs(1)).await;
308    }
309
310    pub(crate) async fn sync_loop_helper(
311        &self,
312        sync_settings: &mut crate::config::SyncSettings,
313    ) -> Result<SyncResponse> {
314        let response = self.sync_once(sync_settings.clone()).await;
315
316        match response {
317            Ok(r) => {
318                sync_settings.token = Some(r.next_batch.clone());
319                Ok(r)
320            }
321            Err(e) => {
322                error!("Received an invalid response: {e}");
323                Err(e)
324            }
325        }
326    }
327
328    pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
329        let now = Instant::now();
330
331        // If the last sync happened less than a second ago, sleep for a
332        // while to not hammer out requests if the server doesn't respect
333        // the sync timeout.
334        if let Some(t) = last_sync_time {
335            if now - *t <= Duration::from_secs(1) {
336                Self::sleep().await;
337            }
338        }
339
340        *last_sync_time = Some(now);
341    }
342}