matrix_sdk_ui/
notification_client.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeMap,
17    sync::{Arc, Mutex},
18    time::Duration,
19};
20
21use futures_util::{pin_mut, StreamExt as _};
22use matrix_sdk::{
23    room::Room, sleep::sleep, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode,
24};
25use matrix_sdk_base::{deserialized_responses::TimelineEvent, RoomState, StoreError};
26use ruma::{
27    api::client::sync::sync_events::v5 as http,
28    assign,
29    directory::RoomTypeFilter,
30    events::{
31        room::{
32            join_rules::JoinRule,
33            member::{MembershipState, StrippedRoomMemberEvent},
34            message::{Relation, SyncRoomMessageEvent},
35        },
36        AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
37        AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
38        TimelineEventType,
39    },
40    html::RemoveReplyFallback,
41    push::Action,
42    serde::Raw,
43    uint, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
44};
45use thiserror::Error;
46use tokio::sync::Mutex as AsyncMutex;
47use tracing::{debug, info, instrument, trace, warn};
48
49use crate::{
50    encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
51    sync_service::SyncService,
52    DEFAULT_SANITIZER_MODE,
53};
54
55/// What kind of process setup do we have for this notification client?
56#[derive(Clone)]
57pub enum NotificationProcessSetup {
58    /// The notification client may run on a separate process than the rest of
59    /// the app.
60    ///
61    /// For instance, this is the case on iOS, where notifications are handled
62    /// in a separate process (the Notification Service Extension, aka NSE).
63    ///
64    /// In that case, a cross-process lock will be used to coordinate writes
65    /// into the stores handled by the SDK.
66    MultipleProcesses,
67
68    /// The notification client runs in the same process as the rest of the
69    /// `Client` performing syncs.
70    ///
71    /// For instance, this is the case on Android, where a notification will
72    /// wake up the main app process.
73    ///
74    /// In that case, a smart reference to the [`SyncService`] must be provided.
75    SingleProcess { sync_service: Arc<SyncService> },
76}
77
78/// A client specialized for handling push notifications received over the
79/// network, for an app.
80///
81/// In particular, it takes care of running a full decryption sync, in case the
82/// event in the notification was impossible to decrypt beforehand.
83pub struct NotificationClient {
84    /// SDK client that uses an in-memory state store.
85    client: Client,
86
87    /// SDK client that uses the same state store as the caller's context.
88    parent_client: Client,
89
90    /// Is the notification client running on its own process or not?
91    process_setup: NotificationProcessSetup,
92
93    /// A mutex to serialize requests to the notifications sliding sync.
94    ///
95    /// If several notifications come in at the same time (e.g. network was
96    /// unreachable because of airplane mode or something similar), then we
97    /// need to make sure that repeated calls to `get_notification` won't
98    /// cause multiple requests with the same `conn_id` we're using for
99    /// notifications. This mutex solves this by sequentializing the requests.
100    notification_sync_mutex: AsyncMutex<()>,
101
102    /// A mutex to serialize requests to the encryption sliding sync that's used
103    /// in case we didn't have the keys to decipher an event.
104    ///
105    /// Same reasoning as [`Self::notification_sync_mutex`].
106    encryption_sync_mutex: AsyncMutex<()>,
107}
108
109impl NotificationClient {
110    const CONNECTION_ID: &'static str = "notifications";
111    const LOCK_ID: &'static str = "notifications";
112
113    /// Create a new notification client.
114    pub async fn new(
115        parent_client: Client,
116        process_setup: NotificationProcessSetup,
117    ) -> Result<Self, Error> {
118        let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
119
120        Ok(NotificationClient {
121            client,
122            parent_client,
123            notification_sync_mutex: AsyncMutex::new(()),
124            encryption_sync_mutex: AsyncMutex::new(()),
125            process_setup,
126        })
127    }
128
129    /// Fetches a room by its ID using the in-memory state store backed client.
130    /// Useful to retrieve room information after running the limited
131    /// notification client sliding sync loop.
132    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
133        self.client.get_room(room_id)
134    }
135
136    /// Fetches the content of a notification.
137    ///
138    /// This will first try to get the notification using a short-lived sliding
139    /// sync, and if the sliding-sync can't find the event, then it'll use a
140    /// `/context` query to find the event with associated member information.
141    ///
142    /// An error result means that we couldn't resolve the notification; in that
143    /// case, a dummy notification may be displayed instead.
144    #[instrument(skip(self))]
145    pub async fn get_notification(
146        &self,
147        room_id: &RoomId,
148        event_id: &EventId,
149    ) -> Result<NotificationStatus, Error> {
150        let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
151        match status {
152            NotificationStatus::Event(..) | NotificationStatus::EventFilteredOut => Ok(status),
153            NotificationStatus::EventNotFound => {
154                self.get_notification_with_context(room_id, event_id).await
155            }
156        }
157    }
158
159    /// Fetches the content of several notifications.
160    ///
161    /// This will first try to get the notifications using a short-lived sliding
162    /// sync, and if the sliding-sync can't find the events, then it'll use a
163    /// `/context` query to find the events with associated member information.
164    ///
165    /// An error result at the top level means that something failed when trying
166    /// to set up the notification fetching.
167    ///
168    /// For each notification item you can also receive an error, which means
169    /// something failed when trying to fetch that particular notification
170    /// (decryption, fetching push actions, etc.); in that case, a dummy
171    /// notification may be displayed instead.
172    pub async fn get_notifications(
173        &self,
174        requests: &[NotificationItemsRequest],
175    ) -> Result<BatchNotificationFetchingResult, Error> {
176        let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
177
178        for request in requests {
179            for event_id in &request.event_ids {
180                match notifications.get_mut(event_id) {
181                    // If the notification for a given event wasn't found with sliding sync, try
182                    // with a /context for each event.
183                    Some(Ok(NotificationStatus::EventNotFound)) | None => {
184                        notifications.insert(
185                            event_id.to_owned(),
186                            self.get_notification_with_context(&request.room_id, event_id).await,
187                        );
188                    }
189
190                    _ => {}
191                }
192            }
193        }
194
195        Ok(notifications)
196    }
197
198    /// Run an encryption sync loop, in case an event is still encrypted.
199    ///
200    /// Will return `Ok(Some)` if and only if:
201    /// - the event was encrypted,
202    /// - we successfully ran an encryption sync or waited long enough for an
203    ///   existing encryption sync to decrypt the event.
204    ///
205    /// Otherwise, if the event was not encrypted, or couldn't be decrypted
206    /// (without causing a fatal error), will return `Ok(None)`.
207    #[instrument(skip_all)]
208    async fn retry_decryption(
209        &self,
210        room: &Room,
211        raw_event: &Raw<AnySyncTimelineEvent>,
212    ) -> Result<Option<TimelineEvent>, Error> {
213        let event: AnySyncTimelineEvent =
214            raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
215
216        if !is_event_encrypted(event.event_type()) {
217            return Ok(None);
218        }
219
220        // Serialize calls to this function.
221        let _guard = self.encryption_sync_mutex.lock().await;
222
223        // The message is still encrypted, and the client is configured to retry
224        // decryption.
225        //
226        // Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
227        // - the first iteration allows to get SS events as well as send e2ee requests.
228        // - the second one let the SS homeserver forward events triggered by the
229        //   sending of e2ee requests.
230        //
231        // Keep timeouts small for both, since we might be short on time.
232
233        let with_locking = WithLocking::from(matches!(
234            self.process_setup,
235            NotificationProcessSetup::MultipleProcesses
236        ));
237
238        let push_ctx = room.push_context().await?;
239        let sync_permit_guard = match &self.process_setup {
240            NotificationProcessSetup::MultipleProcesses => {
241                // We're running on our own process, dedicated for notifications. In that case,
242                // create a dummy sync permit; we're guaranteed there's at most one since we've
243                // acquired the `encryption_sync_mutex' lock here.
244                let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
245                sync_permit.lock_owned().await
246            }
247
248            NotificationProcessSetup::SingleProcess { sync_service } => {
249                if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
250                    permit_guard
251                } else {
252                    // There's already a sync service active, thus the encryption sync is already
253                    // running elsewhere. As a matter of fact, if the event was encrypted, that
254                    // means we were racing against the encryption sync. Wait a bit, attempt to
255                    // decrypt, and carry on.
256
257                    // We repeat the sleep 3 times at most, each iteration we
258                    // double the amount of time waited, so overall we may wait up to 7 times this
259                    // amount.
260                    let mut wait = 200;
261
262                    debug!("Encryption sync running in background");
263                    for _ in 0..3 {
264                        trace!("waiting for decryption…");
265
266                        sleep(Duration::from_millis(wait)).await;
267
268                        let new_event =
269                            room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await?;
270
271                        match new_event.kind {
272                            matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
273                                utd_info, ..} => {
274                                if utd_info.reason.is_missing_room_key() {
275                                    // Decryption error that could be caused by a missing room
276                                    // key; retry in a few.
277                                    wait *= 2;
278                                } else {
279                                    debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
280                                    return Ok(None);
281                                }
282                            }
283                            _ => {
284                                trace!("Waiting succeeded and event could be decrypted!");
285                                return Ok(Some(new_event));
286                            }
287                        }
288                    }
289
290                    // We couldn't decrypt the event after waiting a few times, abort.
291                    debug!("Timeout waiting for the encryption sync to decrypt notification.");
292                    return Ok(None);
293                }
294            }
295        };
296
297        let encryption_sync = EncryptionSyncService::new(
298            self.client.clone(),
299            Some((Duration::from_secs(3), Duration::from_secs(4))),
300            with_locking,
301        )
302        .await;
303
304        // Just log out errors, but don't have them abort the notification processing:
305        // an undecrypted notification is still better than no
306        // notifications.
307
308        match encryption_sync {
309            Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
310                Ok(()) => match room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await {
311                    Ok(new_event) => match new_event.kind {
312                        matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
313                            utd_info, ..
314                        } => {
315                            trace!(
316                                "Encryption sync failed to decrypt the event: {:?}",
317                                utd_info.reason
318                            );
319                            Ok(None)
320                        }
321                        _ => {
322                            trace!("Encryption sync managed to decrypt the event.");
323                            Ok(Some(new_event))
324                        }
325                    },
326                    Err(err) => {
327                        trace!("Encryption sync failed to decrypt the event: {err}");
328                        Ok(None)
329                    }
330                },
331                Err(err) => {
332                    warn!("Encryption sync error: {err:#}");
333                    Ok(None)
334                }
335            },
336            Err(err) => {
337                warn!("Encryption sync build error: {err:#}",);
338                Ok(None)
339            }
340        }
341    }
342
343    /// Try to run a sliding sync (without encryption) to retrieve the events
344    /// from the notification.
345    ///
346    /// An event can either be:
347    /// - an invite event,
348    /// - or a non-invite event.
349    ///
350    /// In case it's a non-invite event, it's rather easy: we'll request
351    /// explicit state that'll be useful for building the
352    /// `NotificationItem`, and subscribe to the room which the notification
353    /// relates to.
354    ///
355    /// In case it's an invite-event, it's trickier because the stripped event
356    /// may not contain the event id, so we can't just match on it. Rather,
357    /// we look at stripped room member events that may be fitting (i.e.
358    /// match the current user and are invites), and if the SDK concludes the
359    /// room was in the invited state, and we didn't find the event by id,
360    /// *then* we'll use that stripped room member event.
361    #[instrument(skip_all)]
362    async fn try_sliding_sync(
363        &self,
364        requests: &[NotificationItemsRequest],
365    ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
366        // Serialize all the calls to this method by taking a lock at the beginning,
367        // that will be dropped later.
368        let _guard = self.notification_sync_mutex.lock().await;
369
370        // Set up a sliding sync that only subscribes to the room that had the
371        // notification, so we can figure out the full event and associated
372        // information.
373
374        let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
375
376        let handler_raw_notification = raw_notifications.clone();
377
378        let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
379
380        let timeline_event_handler = self.client.add_event_handler({
381            let requests = requests.clone();
382            move |raw: Raw<AnySyncTimelineEvent>| async move {
383                match &raw.get_field::<OwnedEventId>("event_id") {
384                    Ok(Some(event_id)) => {
385                        let request =
386                            &requests.iter().find(|request| request.event_ids.contains(event_id));
387                        if request.is_none() {
388                            return;
389                        }
390                        let room_id = request.unwrap().room_id.clone();
391                        for request in requests.iter() {
392                            if request.event_ids.contains(event_id) {
393                                // found it! There shouldn't be a previous event before, but if
394                                // there is, that should be ok to
395                                // just replace it.
396                                handler_raw_notification.lock().unwrap().insert(
397                                    event_id.to_owned(),
398                                    (room_id, Some(RawNotificationEvent::Timeline(raw))),
399                                );
400                                return;
401                            }
402                        }
403                    }
404                    Ok(None) => {
405                        warn!("a sync event had no event id");
406                    }
407                    Err(err) => {
408                        warn!("failed to deserialize sync event id: {err}");
409                    }
410                }
411            }
412        });
413
414        // We'll only use this event if the room is in the invited state.
415        let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
416
417        let user_id = self.client.user_id().unwrap().to_owned();
418        let handler_raw_invites = raw_invites.clone();
419        let handler_raw_notifications = raw_notifications.clone();
420        let stripped_member_handler = self.client.add_event_handler({
421            let requests = requests.clone();
422            move |raw: Raw<StrippedRoomMemberEvent>| async move {
423                let deserialized = match raw.deserialize() {
424                    Ok(d) => d,
425                    Err(err) => {
426                        warn!("failed to deserialize raw stripped room member event: {err}");
427                        return;
428                    }
429                };
430
431                trace!("received a stripped room member event");
432
433                // Try to match the event by event_id, as it's the most precise. In theory, we
434                // shouldn't receive it, so that's a first attempt.
435                match &raw.get_field::<OwnedEventId>("event_id") {
436                    Ok(Some(event_id)) => {
437                        let request =
438                            &requests.iter().find(|request| request.event_ids.contains(event_id));
439                        if request.is_none() {
440                            return;
441                        }
442                        let room_id = request.unwrap().room_id.clone();
443
444                        // found it! There shouldn't be a previous event before, but if
445                        // there is, that should be ok to
446                        // just replace it.
447                        handler_raw_notifications.lock().unwrap().insert(
448                            event_id.to_owned(),
449                            (room_id, Some(RawNotificationEvent::Invite(raw))),
450                        );
451                        return;
452                    }
453                    Ok(None) => {
454                        warn!("a room member event had no id");
455                    }
456                    Err(err) => {
457                        warn!("failed to deserialize room member event id: {err}");
458                    }
459                }
460
461                // Try to match the event by membership and state_key for the current user.
462                if deserialized.content.membership == MembershipState::Invite
463                    && deserialized.state_key == user_id
464                {
465                    trace!("found an invite event for the current user");
466                    // This could be it! There might be several of these following each other, so
467                    // assume it's the latest one (in sync ordering), and override a previous one if
468                    // present.
469                    handler_raw_invites
470                        .lock()
471                        .unwrap()
472                        .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
473                } else {
474                    trace!("not an invite event, or not for the current user");
475                }
476            }
477        });
478
479        // Room power levels are necessary to build the push context.
480        let required_state = vec![
481            (StateEventType::RoomEncryption, "".to_owned()),
482            (StateEventType::RoomMember, "$LAZY".to_owned()),
483            (StateEventType::RoomMember, "$ME".to_owned()),
484            (StateEventType::RoomCanonicalAlias, "".to_owned()),
485            (StateEventType::RoomName, "".to_owned()),
486            (StateEventType::RoomPowerLevels, "".to_owned()),
487            (StateEventType::RoomJoinRules, "".to_owned()),
488            (StateEventType::CallMember, "*".to_owned()),
489        ];
490
491        let invites = SlidingSyncList::builder("invites")
492            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
493            .timeline_limit(8)
494            .required_state(required_state.clone())
495            .filters(Some(assign!(http::request::ListFilters::default(), {
496                is_invite: Some(true),
497                not_room_types: vec![RoomTypeFilter::Space],
498            })));
499
500        let sync = self
501            .client
502            .sliding_sync(Self::CONNECTION_ID)?
503            .poll_timeout(Duration::from_secs(1))
504            .network_timeout(Duration::from_secs(3))
505            .with_account_data_extension(
506                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
507            )
508            .add_list(invites)
509            .build()
510            .await?;
511
512        let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
513        sync.subscribe_to_rooms(
514            &room_ids,
515            Some(assign!(http::request::RoomSubscription::default(), {
516                required_state,
517                timeline_limit: uint!(16)
518            })),
519            true,
520        );
521
522        let mut remaining_attempts = 3;
523
524        let stream = sync.sync();
525        pin_mut!(stream);
526
527        // Sum the expected event count for each room
528        let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
529
530        loop {
531            if stream.next().await.is_none() {
532                // Sliding sync aborted early.
533                break;
534            }
535
536            if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
537                == expected_event_count
538            {
539                // We got the events.
540                break;
541            }
542
543            remaining_attempts -= 1;
544            if remaining_attempts == 0 {
545                // We're out of luck.
546                break;
547            }
548        }
549
550        self.client.remove_event_handler(stripped_member_handler);
551        self.client.remove_event_handler(timeline_event_handler);
552
553        let mut notifications = raw_notifications.clone().lock().unwrap().clone();
554        let mut missing_event_ids = Vec::new();
555
556        // Create the list of missing event ids after the syncs.
557        for request in requests.iter() {
558            for event_id in &request.event_ids {
559                if !notifications.contains_key(event_id) {
560                    missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
561                }
562            }
563        }
564
565        // Try checking if the missing notifications could be invites.
566        for (room_id, missing_event_id) in missing_event_ids {
567            trace!("we didn't have a non-invite event, looking for invited room now");
568            if let Some(room) = self.client.get_room(&room_id) {
569                if room.state() == RoomState::Invited {
570                    if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
571                        notifications.insert(
572                            missing_event_id.to_owned(),
573                            (room_id.to_owned(), stripped_event),
574                        );
575                    }
576                } else {
577                    debug!("the room isn't in the invited state");
578                }
579            } else {
580                warn!(%room_id, "unknown room, can't check for invite events");
581            }
582        }
583
584        let found = if notifications.len() == expected_event_count { "" } else { "not " };
585        trace!("all notification events have{found} been found");
586
587        Ok(notifications)
588    }
589
590    pub async fn get_notification_with_sliding_sync(
591        &self,
592        room_id: &RoomId,
593        event_id: &EventId,
594    ) -> Result<NotificationStatus, Error> {
595        info!("fetching notification event with a sliding sync");
596
597        let request = NotificationItemsRequest {
598            room_id: room_id.to_owned(),
599            event_ids: vec![event_id.to_owned()],
600        };
601
602        let mut get_notifications_result =
603            self.get_notifications_with_sliding_sync(&[request]).await?;
604
605        get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
606    }
607
608    /// Get a list of full notifications, given a room id and event ids.
609    ///
610    /// This will run a small sliding sync to retrieve the content of the
611    /// events, along with extra data to form a rich notification context.
612    pub async fn get_notifications_with_sliding_sync(
613        &self,
614        requests: &[NotificationItemsRequest],
615    ) -> Result<BatchNotificationFetchingResult, Error> {
616        let raw_events = self.try_sliding_sync(requests).await?;
617
618        let mut batch_result = BatchNotificationFetchingResult::new();
619
620        for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
621            // At this point it should have been added by the sync, if it's not, give up.
622            let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
623
624            let Some(raw_event) = raw_event else {
625                // The event was not found, so we can't build a notification.
626                batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
627                continue;
628            };
629
630            let (raw_event, push_actions) = match &raw_event {
631                RawNotificationEvent::Timeline(timeline_event) => {
632                    // Timeline events may be encrypted, so make sure they get decrypted first.
633                    match self.retry_decryption(&room, timeline_event).await {
634                        Ok(Some(timeline_event)) => {
635                            let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
636                            (
637                                RawNotificationEvent::Timeline(timeline_event.into_raw()),
638                                push_actions,
639                            )
640                        }
641
642                        Ok(None) => {
643                            // The event was either not encrypted in the first place, or we
644                            // couldn't decrypt it after retrying. Use the raw event as is.
645                            match room.event_push_actions(timeline_event).await {
646                                Ok(push_actions) => (raw_event.clone(), push_actions),
647                                Err(err) => {
648                                    // Could not get push actions.
649                                    batch_result.insert(event_id, Err(err.into()));
650                                    continue;
651                                }
652                            }
653                        }
654
655                        Err(err) => {
656                            batch_result.insert(event_id, Err(err));
657                            continue;
658                        }
659                    }
660                }
661
662                RawNotificationEvent::Invite(invite_event) => {
663                    // Invite events can't be encrypted, so they should be in clear text.
664                    match room.event_push_actions(invite_event).await {
665                        Ok(push_actions) => {
666                            (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
667                        }
668                        Err(err) => {
669                            batch_result.insert(event_id, Err(err.into()));
670                            continue;
671                        }
672                    }
673                }
674            };
675
676            let should_notify = push_actions
677                .as_ref()
678                .is_some_and(|actions| actions.iter().any(|a| a.should_notify()));
679
680            if !should_notify {
681                // The event has been filtered out by the user's push rules.
682                batch_result.insert(event_id, Ok(NotificationStatus::EventFilteredOut));
683                continue;
684            }
685
686            let status =
687                match NotificationItem::new(&room, raw_event, push_actions.as_deref(), Vec::new())
688                    .await
689                    .map(|event| NotificationStatus::Event(Box::new(event)))
690                {
691                    Ok(status) => status,
692                    Err(err) => {
693                        // Could not build the notification item, return an error.
694                        batch_result.insert(event_id, Err(err));
695                        continue;
696                    }
697                };
698
699            match status {
700                NotificationStatus::Event(event) => {
701                    if self.client.is_user_ignored(event.event.sender()).await {
702                        batch_result.insert(event_id, Ok(NotificationStatus::EventFilteredOut));
703                    } else {
704                        batch_result.insert(event_id, Ok(NotificationStatus::Event(event)));
705                    }
706                }
707                _ => {
708                    batch_result.insert(event_id, Ok(status));
709                }
710            }
711        }
712
713        Ok(batch_result)
714    }
715
716    /// Retrieve a notification using a `/context` query.
717    ///
718    /// This is for clients that are already running other sliding syncs in the
719    /// same process, so that most of the contextual information for the
720    /// notification should already be there. In particular, the room containing
721    /// the event MUST be known (via a sliding sync for invites, or another
722    /// sliding sync).
723    ///
724    /// An error result means that we couldn't resolve the notification; in that
725    /// case, a dummy notification may be displayed instead. A `None` result
726    /// means the notification has been filtered out by the user's push
727    /// rules.
728    pub async fn get_notification_with_context(
729        &self,
730        room_id: &RoomId,
731        event_id: &EventId,
732    ) -> Result<NotificationStatus, Error> {
733        info!("fetching notification event with a /context query");
734
735        // See above comment.
736        let Some(room) = self.parent_client.get_room(room_id) else {
737            return Err(Error::UnknownRoom);
738        };
739
740        let response = room.event_with_context(event_id, true, uint!(0), None).await?;
741
742        let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
743        let state_events = response.state;
744
745        if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
746            timeline_event = decrypted_event;
747        }
748
749        if let Some(actions) = timeline_event.push_actions() {
750            if !actions.iter().any(|a| a.should_notify()) {
751                return Ok(NotificationStatus::EventFilteredOut);
752            }
753        }
754
755        let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
756        let notification_item = NotificationItem::new(
757            &room,
758            RawNotificationEvent::Timeline(timeline_event.into_raw()),
759            push_actions.as_deref(),
760            state_events,
761        )
762        .await?;
763
764        if self.client.is_user_ignored(notification_item.event.sender()).await {
765            Ok(NotificationStatus::EventFilteredOut)
766        } else {
767            Ok(NotificationStatus::Event(Box::new(notification_item)))
768        }
769    }
770}
771
772fn is_event_encrypted(event_type: TimelineEventType) -> bool {
773    let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
774
775    #[cfg(feature = "unstable-msc3956")]
776    let is_still_encrypted =
777        is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
778
779    is_still_encrypted
780}
781
782#[derive(Debug)]
783pub enum NotificationStatus {
784    /// The event has been found and was not filtered out.
785    Event(Box<NotificationItem>),
786    /// The event couldn't be found in the network queries used to find it.
787    EventNotFound,
788    /// The event has been filtered out, either because of the user's push
789    /// rules, or because the user which triggered it is ignored by the
790    /// current user.
791    EventFilteredOut,
792}
793
794#[derive(Debug, Clone)]
795pub struct NotificationItemsRequest {
796    pub room_id: OwnedRoomId,
797    pub event_ids: Vec<OwnedEventId>,
798}
799
800type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
801
802/// The Notification event as it was fetched from remote for the
803/// given `event_id`, represented as Raw but decrypted, thus only
804/// whether it is an invite or regular Timeline event has been
805/// determined.
806#[derive(Debug, Clone)]
807pub enum RawNotificationEvent {
808    /// The raw event for a timeline event
809    Timeline(Raw<AnySyncTimelineEvent>),
810    /// The notification contains an invitation with the given
811    /// StrippedRoomMemberEvent (in raw here)
812    Invite(Raw<StrippedRoomMemberEvent>),
813}
814
815/// The deserialized Event as it was fetched from remote for the
816/// given `event_id` and after decryption (if possible).
817#[derive(Debug)]
818pub enum NotificationEvent {
819    /// The Notification was for a TimelineEvent
820    Timeline(Box<AnySyncTimelineEvent>),
821    /// The Notification is an invite with the given stripped room event data
822    Invite(Box<StrippedRoomMemberEvent>),
823}
824
825impl NotificationEvent {
826    pub fn sender(&self) -> &UserId {
827        match self {
828            NotificationEvent::Timeline(ev) => ev.sender(),
829            NotificationEvent::Invite(ev) => &ev.sender,
830        }
831    }
832
833    /// Returns the root event id of the thread the notification event is in, if
834    /// any.
835    fn thread_id(&self) -> Option<OwnedEventId> {
836        let NotificationEvent::Timeline(sync_timeline_event) = &self else {
837            return None;
838        };
839        let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
840            return None;
841        };
842        let content = event.original_content()?;
843        match content {
844            AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
845                Relation::Thread(thread) => Some(thread.event_id),
846                _ => None,
847            },
848            _ => None,
849        }
850    }
851}
852
853/// A notification with its full content.
854#[derive(Debug)]
855pub struct NotificationItem {
856    /// Underlying Ruma event.
857    pub event: NotificationEvent,
858
859    /// The raw of the underlying event.
860    pub raw_event: RawNotificationEvent,
861
862    /// Display name of the sender.
863    pub sender_display_name: Option<String>,
864    /// Avatar URL of the sender.
865    pub sender_avatar_url: Option<String>,
866    /// Is the sender's name ambiguous?
867    pub is_sender_name_ambiguous: bool,
868
869    /// Room computed display name.
870    pub room_computed_display_name: String,
871    /// Room avatar URL.
872    pub room_avatar_url: Option<String>,
873    /// Room canonical alias.
874    pub room_canonical_alias: Option<String>,
875    /// Room topic.
876    pub room_topic: Option<String>,
877    /// Room join rule.
878    ///
879    /// Set to `None` if the join rule for this room is not available.
880    pub room_join_rule: Option<JoinRule>,
881    /// Is this room encrypted?
882    pub is_room_encrypted: Option<bool>,
883    /// Is this room considered a direct message?
884    pub is_direct_message_room: bool,
885    /// Numbers of members who joined the room.
886    pub joined_members_count: u64,
887
888    /// Is it a noisy notification? (i.e. does any push action contain a sound
889    /// action)
890    ///
891    /// It is set if and only if the push actions could be determined.
892    pub is_noisy: Option<bool>,
893    pub has_mention: Option<bool>,
894    pub thread_id: Option<OwnedEventId>,
895}
896
897impl NotificationItem {
898    async fn new(
899        room: &Room,
900        raw_event: RawNotificationEvent,
901        push_actions: Option<&[Action]>,
902        state_events: Vec<Raw<AnyStateEvent>>,
903    ) -> Result<Self, Error> {
904        let event = match &raw_event {
905            RawNotificationEvent::Timeline(raw_event) => {
906                let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
907                if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
908                    SyncRoomMessageEvent::Original(ev),
909                )) = &mut event
910                {
911                    ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
912                }
913                NotificationEvent::Timeline(Box::new(event))
914            }
915            RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
916                raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
917            )),
918        };
919
920        let sender = match room.state() {
921            RoomState::Invited => room.invite_details().await?.inviter,
922            _ => room.get_member_no_sync(event.sender()).await?,
923        };
924
925        let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
926            match &sender {
927                Some(sender) => (
928                    sender.display_name().map(|s| s.to_owned()),
929                    sender.avatar_url().map(|s| s.to_string()),
930                    sender.name_ambiguous(),
931                ),
932                None => (None, None, false),
933            };
934
935        if sender_display_name.is_none() || sender_avatar_url.is_none() {
936            let sender_id = event.sender();
937            for ev in state_events {
938                let ev = match ev.deserialize() {
939                    Ok(ev) => ev,
940                    Err(err) => {
941                        warn!("Failed to deserialize a state event: {err}");
942                        continue;
943                    }
944                };
945                if ev.sender() != sender_id {
946                    continue;
947                }
948                if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
949                    content,
950                    ..
951                }) = ev.content()
952                {
953                    if sender_display_name.is_none() {
954                        sender_display_name = content.displayname;
955                    }
956                    if sender_avatar_url.is_none() {
957                        sender_avatar_url = content.avatar_url.map(|url| url.to_string());
958                    }
959                }
960            }
961        }
962
963        let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
964        let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
965        let thread_id = event.thread_id().clone();
966
967        let item = NotificationItem {
968            event,
969            raw_event,
970            sender_display_name,
971            sender_avatar_url,
972            is_sender_name_ambiguous,
973            room_computed_display_name: room.display_name().await?.to_string(),
974            room_avatar_url: room.avatar_url().map(|s| s.to_string()),
975            room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
976            room_topic: room.topic(),
977            room_join_rule: room.join_rule(),
978            is_direct_message_room: room.is_direct().await?,
979            is_room_encrypted: room
980                .latest_encryption_state()
981                .await
982                .map(|state| state.is_encrypted())
983                .ok(),
984            joined_members_count: room.joined_members_count(),
985            is_noisy,
986            has_mention,
987            thread_id,
988        };
989
990        Ok(item)
991    }
992
993    /// Returns whether this room is public or not, based on the join rule.
994    ///
995    /// Maybe return `None` if the join rule is not available.
996    pub fn is_public(&self) -> Option<bool> {
997        self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
998    }
999}
1000
1001/// An error for the [`NotificationClient`].
1002#[derive(Debug, Error)]
1003pub enum Error {
1004    #[error(transparent)]
1005    BuildingLocalClient(ClientBuildError),
1006
1007    /// The room associated to this event wasn't found.
1008    #[error("unknown room for a notification")]
1009    UnknownRoom,
1010
1011    /// The Ruma event contained within this notification couldn't be parsed.
1012    #[error("invalid ruma event")]
1013    InvalidRumaEvent,
1014
1015    /// When calling `get_notification_with_sliding_sync`, the room was missing
1016    /// in the response.
1017    #[error("the sliding sync response doesn't include the target room")]
1018    SlidingSyncEmptyRoom,
1019
1020    #[error("the event was missing in the `/context` query")]
1021    ContextMissingEvent,
1022
1023    /// An error forwarded from the client.
1024    #[error(transparent)]
1025    SdkError(#[from] matrix_sdk::Error),
1026
1027    /// An error forwarded from the underlying state store.
1028    #[error(transparent)]
1029    StoreError(#[from] StoreError),
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034    use assert_matches2::assert_let;
1035    use matrix_sdk::test_utils::mocks::MatrixMockServer;
1036    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1037    use ruma::{event_id, room_id, user_id};
1038
1039    use crate::notification_client::{NotificationItem, RawNotificationEvent};
1040
1041    #[async_test]
1042    async fn test_notification_item_returns_thread_id() {
1043        let server = MatrixMockServer::new().await;
1044        let client = server.client_builder().build().await;
1045
1046        let room_id = room_id!("!a:b.c");
1047        let thread_root_event_id = event_id!("$root:b.c");
1048        let message = EventFactory::new()
1049            .room(room_id)
1050            .sender(user_id!("@sender:b.c"))
1051            .text_msg("Threaded")
1052            .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1053            .into_raw_sync();
1054        let room = server.sync_joined_room(&client, room_id).await;
1055
1056        let raw_notification_event = RawNotificationEvent::Timeline(message);
1057        let notification_item =
1058            NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1059                .await
1060                .expect("Could not create notification item");
1061
1062        assert_let!(Some(thread_id) = notification_item.thread_id);
1063        assert_eq!(thread_id, thread_root_event_id);
1064    }
1065}