1use std::{
18 collections::{btree_map, BTreeMap},
19 fmt,
20 time::Duration,
21};
22
23pub use matrix_sdk_base::sync::*;
24use matrix_sdk_base::{
25 debug::{
26 DebugInvitedRoom, DebugKnockedRoom, DebugListOfProcessedToDeviceEvents,
27 DebugListOfRawEventsNoId,
28 },
29 sleep::sleep,
30 sync::SyncResponse as BaseSyncResponse,
31};
32use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
33use ruma::{
34 api::client::sync::sync_events::{
35 self,
36 v3::{InvitedRoom, KnockedRoom},
37 },
38 events::{presence::PresenceEvent, AnyGlobalAccountDataEvent},
39 serde::Raw,
40 time::Instant,
41 OwnedRoomId, RoomId,
42};
43use tracing::{debug, error, warn};
44
45use crate::{event_handler::HandlerKind, Client, Result, Room};
46
47#[derive(Clone, Default)]
49pub struct SyncResponse {
50 pub next_batch: String,
53 pub rooms: RoomUpdates,
55 pub presence: Vec<Raw<PresenceEvent>>,
57 pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
59 pub to_device: Vec<ProcessedToDeviceEvent>,
61 pub notifications: BTreeMap<OwnedRoomId, Vec<Notification>>,
63}
64
65impl SyncResponse {
66 pub(crate) fn new(next_batch: String, base_response: BaseSyncResponse) -> Self {
67 let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } =
68 base_response;
69
70 Self { next_batch, rooms, presence, account_data, to_device, notifications }
71 }
72}
73
74#[cfg(not(tarpaulin_include))]
75impl fmt::Debug for SyncResponse {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 f.debug_struct("SyncResponse")
78 .field("next_batch", &self.next_batch)
79 .field("rooms", &self.rooms)
80 .field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
81 .field("to_device", &DebugListOfProcessedToDeviceEvents(&self.to_device))
82 .field("notifications", &self.notifications)
83 .finish_non_exhaustive()
84 }
85}
86
87#[derive(Clone)]
89pub enum RoomUpdate {
90 Left {
92 room: Room,
94 updates: LeftRoomUpdate,
96 },
97 Joined {
99 room: Room,
101 updates: JoinedRoomUpdate,
103 },
104 Invited {
106 room: Room,
108 updates: InvitedRoom,
110 },
111 Knocked {
113 room: Room,
115 updates: KnockedRoom,
117 },
118}
119
120#[cfg(not(tarpaulin_include))]
121impl fmt::Debug for RoomUpdate {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 match self {
124 Self::Left { room, updates } => {
125 f.debug_struct("Left").field("room", room).field("updates", updates).finish()
126 }
127 Self::Joined { room, updates } => {
128 f.debug_struct("Joined").field("room", room).field("updates", updates).finish()
129 }
130 Self::Invited { room, updates } => f
131 .debug_struct("Invited")
132 .field("room", room)
133 .field("updates", &DebugInvitedRoom(updates))
134 .finish(),
135 Self::Knocked { room, updates } => f
136 .debug_struct("Knocked")
137 .field("room", room)
138 .field("updates", &DebugKnockedRoom(updates))
139 .finish(),
140 }
141 }
142}
143
144impl Client {
147 pub(crate) async fn process_sync(
150 &self,
151 response: sync_events::v3::Response,
152 ) -> Result<BaseSyncResponse> {
153 let response = Box::pin(self.base_client().receive_sync_response(response)).await?;
154
155 #[cfg(feature = "e2e-encryption")]
157 self.encryption().backups().maybe_trigger_backup();
158
159 self.call_sync_response_handlers(&response).await?;
160
161 Ok(response)
162 }
163
164 #[tracing::instrument(skip(self, response))]
171 pub(crate) async fn call_sync_response_handlers(
172 &self,
173 response: &BaseSyncResponse,
174 ) -> Result<()> {
175 let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } = response;
176
177 let now = Instant::now();
178 self.handle_sync_events(HandlerKind::GlobalAccountData, None, account_data).await?;
179 self.handle_sync_events(HandlerKind::Presence, None, presence).await?;
180 self.handle_sync_to_device_events(to_device).await?;
181
182 let _ = self.inner.room_updates_sender.send(rooms.clone());
184
185 for (room_id, room_info) in &rooms.joined {
186 let Some(room) = self.get_room(room_id) else {
187 error!(?room_id, "Can't call event handler, room not found");
188 continue;
189 };
190
191 self.send_room_update(room_id, || RoomUpdate::Joined {
192 room: room.clone(),
193 updates: room_info.clone(),
194 });
195
196 let JoinedRoomUpdate {
197 unread_notifications: _,
198 timeline,
199 state,
200 account_data,
201 ephemeral,
202 ambiguity_changes: _,
203 } = room_info;
204
205 let room = Some(&room);
206 self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
207 self.handle_sync_state_events(room, state).await?;
208 self.handle_sync_timeline_events(room, &timeline.events).await?;
209 self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?;
212 }
213
214 for (room_id, room_info) in &rooms.left {
215 let Some(room) = self.get_room(room_id) else {
216 error!(?room_id, "Can't call event handler, room not found");
217 continue;
218 };
219
220 self.send_room_update(room_id, || RoomUpdate::Left {
221 room: room.clone(),
222 updates: room_info.clone(),
223 });
224
225 let LeftRoomUpdate { timeline, state, account_data, ambiguity_changes: _ } = room_info;
226
227 let room = Some(&room);
228 self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
229 self.handle_sync_state_events(room, state).await?;
230 self.handle_sync_timeline_events(room, &timeline.events).await?;
231 }
232
233 for (room_id, room_info) in &rooms.invited {
234 let Some(room) = self.get_room(room_id) else {
235 error!(?room_id, "Can't call event handler, room not found");
236 continue;
237 };
238
239 self.send_room_update(room_id, || RoomUpdate::Invited {
240 room: room.clone(),
241 updates: room_info.clone(),
242 });
243
244 let invite_state = &room_info.invite_state.events;
245 self.handle_sync_events(HandlerKind::StrippedState, Some(&room), invite_state).await?;
246 }
247
248 for (room_id, room_info) in &rooms.knocked {
249 let Some(room) = self.get_room(room_id) else {
250 error!(?room_id, "Can't call event handler, room not found");
251 continue;
252 };
253
254 self.send_room_update(room_id, || RoomUpdate::Knocked {
255 room: room.clone(),
256 updates: room_info.clone(),
257 });
258
259 let knock_state = &room_info.knock_state.events;
260 self.handle_sync_events(HandlerKind::StrippedState, Some(&room), knock_state).await?;
261 }
262
263 debug!("Ran event handlers in {:?}", now.elapsed());
264
265 let now = Instant::now();
266
267 let mut futures = Vec::new();
269 for handler in &*self.notification_handlers().await {
270 for (room_id, room_notifications) in notifications {
271 let Some(room) = self.get_room(room_id) else {
272 warn!(?room_id, "Can't call notification handler, room not found");
273 continue;
274 };
275
276 futures.extend(room_notifications.iter().map(|notification| {
277 (handler)(notification.clone(), room.clone(), self.clone())
278 }));
279 }
280 }
281
282 for fut in futures {
285 fut.await;
286 }
287
288 debug!("Ran notification handlers in {:?}", now.elapsed());
289
290 Ok(())
291 }
292
293 fn send_room_update(&self, room_id: &RoomId, make_msg: impl FnOnce() -> RoomUpdate) {
294 if let btree_map::Entry::Occupied(entry) =
295 self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned())
296 {
297 let tx = entry.get();
298 if tx.receiver_count() == 0 {
299 entry.remove();
300 } else {
301 _ = tx.send(make_msg());
302 }
303 }
304 }
305
306 async fn sleep() {
307 sleep(Duration::from_secs(1)).await;
308 }
309
310 pub(crate) async fn sync_loop_helper(
311 &self,
312 sync_settings: &mut crate::config::SyncSettings,
313 ) -> Result<SyncResponse> {
314 let response = self.sync_once(sync_settings.clone()).await;
315
316 match response {
317 Ok(r) => {
318 sync_settings.token = Some(r.next_batch.clone());
319 Ok(r)
320 }
321 Err(e) => {
322 error!("Received an invalid response: {e}");
323 Err(e)
324 }
325 }
326 }
327
328 pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
329 let now = Instant::now();
330
331 if let Some(t) = last_sync_time {
335 if now - *t <= Duration::from_secs(1) {
336 Self::sleep().await;
337 }
338 }
339
340 *last_sync_time = Some(now);
341 }
342}