1use std::{
16 collections::BTreeMap,
17 sync::{Arc, Mutex},
18 time::Duration,
19};
20
21use futures_util::{pin_mut, StreamExt as _};
22use matrix_sdk::{
23 room::Room, sleep::sleep, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode,
24};
25use matrix_sdk_base::{deserialized_responses::TimelineEvent, RoomState, StoreError};
26use ruma::{
27 api::client::sync::sync_events::v5 as http,
28 assign,
29 directory::RoomTypeFilter,
30 events::{
31 room::{
32 join_rules::JoinRule,
33 member::{MembershipState, StrippedRoomMemberEvent},
34 message::{Relation, SyncRoomMessageEvent},
35 },
36 AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
37 AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
38 TimelineEventType,
39 },
40 html::RemoveReplyFallback,
41 push::Action,
42 serde::Raw,
43 uint, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
44};
45use thiserror::Error;
46use tokio::sync::Mutex as AsyncMutex;
47use tracing::{debug, info, instrument, trace, warn};
48
49use crate::{
50 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
51 sync_service::SyncService,
52 DEFAULT_SANITIZER_MODE,
53};
54
55#[derive(Clone)]
57pub enum NotificationProcessSetup {
58 MultipleProcesses,
67
68 SingleProcess { sync_service: Arc<SyncService> },
76}
77
78pub struct NotificationClient {
84 client: Client,
86
87 parent_client: Client,
89
90 process_setup: NotificationProcessSetup,
92
93 notification_sync_mutex: AsyncMutex<()>,
101
102 encryption_sync_mutex: AsyncMutex<()>,
107}
108
109impl NotificationClient {
110 const CONNECTION_ID: &'static str = "notifications";
111 const LOCK_ID: &'static str = "notifications";
112
113 pub async fn new(
115 parent_client: Client,
116 process_setup: NotificationProcessSetup,
117 ) -> Result<Self, Error> {
118 let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
119
120 Ok(NotificationClient {
121 client,
122 parent_client,
123 notification_sync_mutex: AsyncMutex::new(()),
124 encryption_sync_mutex: AsyncMutex::new(()),
125 process_setup,
126 })
127 }
128
129 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
133 self.client.get_room(room_id)
134 }
135
136 #[instrument(skip(self))]
145 pub async fn get_notification(
146 &self,
147 room_id: &RoomId,
148 event_id: &EventId,
149 ) -> Result<NotificationStatus, Error> {
150 let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
151 match status {
152 NotificationStatus::Event(..) | NotificationStatus::EventFilteredOut => Ok(status),
153 NotificationStatus::EventNotFound => {
154 self.get_notification_with_context(room_id, event_id).await
155 }
156 }
157 }
158
159 pub async fn get_notifications(
173 &self,
174 requests: &[NotificationItemsRequest],
175 ) -> Result<BatchNotificationFetchingResult, Error> {
176 let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
177
178 for request in requests {
179 for event_id in &request.event_ids {
180 match notifications.get_mut(event_id) {
181 Some(Ok(NotificationStatus::EventNotFound)) | None => {
184 notifications.insert(
185 event_id.to_owned(),
186 self.get_notification_with_context(&request.room_id, event_id).await,
187 );
188 }
189
190 _ => {}
191 }
192 }
193 }
194
195 Ok(notifications)
196 }
197
198 #[instrument(skip_all)]
208 async fn retry_decryption(
209 &self,
210 room: &Room,
211 raw_event: &Raw<AnySyncTimelineEvent>,
212 ) -> Result<Option<TimelineEvent>, Error> {
213 let event: AnySyncTimelineEvent =
214 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
215
216 if !is_event_encrypted(event.event_type()) {
217 return Ok(None);
218 }
219
220 let _guard = self.encryption_sync_mutex.lock().await;
222
223 let with_locking = WithLocking::from(matches!(
234 self.process_setup,
235 NotificationProcessSetup::MultipleProcesses
236 ));
237
238 let push_ctx = room.push_context().await?;
239 let sync_permit_guard = match &self.process_setup {
240 NotificationProcessSetup::MultipleProcesses => {
241 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
245 sync_permit.lock_owned().await
246 }
247
248 NotificationProcessSetup::SingleProcess { sync_service } => {
249 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
250 permit_guard
251 } else {
252 let mut wait = 200;
261
262 debug!("Encryption sync running in background");
263 for _ in 0..3 {
264 trace!("waiting for decryption…");
265
266 sleep(Duration::from_millis(wait)).await;
267
268 let new_event =
269 room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await?;
270
271 match new_event.kind {
272 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
273 utd_info, ..} => {
274 if utd_info.reason.is_missing_room_key() {
275 wait *= 2;
278 } else {
279 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
280 return Ok(None);
281 }
282 }
283 _ => {
284 trace!("Waiting succeeded and event could be decrypted!");
285 return Ok(Some(new_event));
286 }
287 }
288 }
289
290 debug!("Timeout waiting for the encryption sync to decrypt notification.");
292 return Ok(None);
293 }
294 }
295 };
296
297 let encryption_sync = EncryptionSyncService::new(
298 self.client.clone(),
299 Some((Duration::from_secs(3), Duration::from_secs(4))),
300 with_locking,
301 )
302 .await;
303
304 match encryption_sync {
309 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
310 Ok(()) => match room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await {
311 Ok(new_event) => match new_event.kind {
312 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
313 utd_info, ..
314 } => {
315 trace!(
316 "Encryption sync failed to decrypt the event: {:?}",
317 utd_info.reason
318 );
319 Ok(None)
320 }
321 _ => {
322 trace!("Encryption sync managed to decrypt the event.");
323 Ok(Some(new_event))
324 }
325 },
326 Err(err) => {
327 trace!("Encryption sync failed to decrypt the event: {err}");
328 Ok(None)
329 }
330 },
331 Err(err) => {
332 warn!("Encryption sync error: {err:#}");
333 Ok(None)
334 }
335 },
336 Err(err) => {
337 warn!("Encryption sync build error: {err:#}",);
338 Ok(None)
339 }
340 }
341 }
342
343 #[instrument(skip_all)]
362 async fn try_sliding_sync(
363 &self,
364 requests: &[NotificationItemsRequest],
365 ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
366 let _guard = self.notification_sync_mutex.lock().await;
369
370 let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
375
376 let handler_raw_notification = raw_notifications.clone();
377
378 let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
379
380 let timeline_event_handler = self.client.add_event_handler({
381 let requests = requests.clone();
382 move |raw: Raw<AnySyncTimelineEvent>| async move {
383 match &raw.get_field::<OwnedEventId>("event_id") {
384 Ok(Some(event_id)) => {
385 let request =
386 &requests.iter().find(|request| request.event_ids.contains(event_id));
387 if request.is_none() {
388 return;
389 }
390 let room_id = request.unwrap().room_id.clone();
391 for request in requests.iter() {
392 if request.event_ids.contains(event_id) {
393 handler_raw_notification.lock().unwrap().insert(
397 event_id.to_owned(),
398 (room_id, Some(RawNotificationEvent::Timeline(raw))),
399 );
400 return;
401 }
402 }
403 }
404 Ok(None) => {
405 warn!("a sync event had no event id");
406 }
407 Err(err) => {
408 warn!("failed to deserialize sync event id: {err}");
409 }
410 }
411 }
412 });
413
414 let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
416
417 let user_id = self.client.user_id().unwrap().to_owned();
418 let handler_raw_invites = raw_invites.clone();
419 let handler_raw_notifications = raw_notifications.clone();
420 let stripped_member_handler = self.client.add_event_handler({
421 let requests = requests.clone();
422 move |raw: Raw<StrippedRoomMemberEvent>| async move {
423 let deserialized = match raw.deserialize() {
424 Ok(d) => d,
425 Err(err) => {
426 warn!("failed to deserialize raw stripped room member event: {err}");
427 return;
428 }
429 };
430
431 trace!("received a stripped room member event");
432
433 match &raw.get_field::<OwnedEventId>("event_id") {
436 Ok(Some(event_id)) => {
437 let request =
438 &requests.iter().find(|request| request.event_ids.contains(event_id));
439 if request.is_none() {
440 return;
441 }
442 let room_id = request.unwrap().room_id.clone();
443
444 handler_raw_notifications.lock().unwrap().insert(
448 event_id.to_owned(),
449 (room_id, Some(RawNotificationEvent::Invite(raw))),
450 );
451 return;
452 }
453 Ok(None) => {
454 warn!("a room member event had no id");
455 }
456 Err(err) => {
457 warn!("failed to deserialize room member event id: {err}");
458 }
459 }
460
461 if deserialized.content.membership == MembershipState::Invite
463 && deserialized.state_key == user_id
464 {
465 trace!("found an invite event for the current user");
466 handler_raw_invites
470 .lock()
471 .unwrap()
472 .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
473 } else {
474 trace!("not an invite event, or not for the current user");
475 }
476 }
477 });
478
479 let required_state = vec![
481 (StateEventType::RoomEncryption, "".to_owned()),
482 (StateEventType::RoomMember, "$LAZY".to_owned()),
483 (StateEventType::RoomMember, "$ME".to_owned()),
484 (StateEventType::RoomCanonicalAlias, "".to_owned()),
485 (StateEventType::RoomName, "".to_owned()),
486 (StateEventType::RoomPowerLevels, "".to_owned()),
487 (StateEventType::RoomJoinRules, "".to_owned()),
488 (StateEventType::CallMember, "*".to_owned()),
489 ];
490
491 let invites = SlidingSyncList::builder("invites")
492 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
493 .timeline_limit(8)
494 .required_state(required_state.clone())
495 .filters(Some(assign!(http::request::ListFilters::default(), {
496 is_invite: Some(true),
497 not_room_types: vec![RoomTypeFilter::Space],
498 })));
499
500 let sync = self
501 .client
502 .sliding_sync(Self::CONNECTION_ID)?
503 .poll_timeout(Duration::from_secs(1))
504 .network_timeout(Duration::from_secs(3))
505 .with_account_data_extension(
506 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
507 )
508 .add_list(invites)
509 .build()
510 .await?;
511
512 let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
513 sync.subscribe_to_rooms(
514 &room_ids,
515 Some(assign!(http::request::RoomSubscription::default(), {
516 required_state,
517 timeline_limit: uint!(16)
518 })),
519 true,
520 );
521
522 let mut remaining_attempts = 3;
523
524 let stream = sync.sync();
525 pin_mut!(stream);
526
527 let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
529
530 loop {
531 if stream.next().await.is_none() {
532 break;
534 }
535
536 if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
537 == expected_event_count
538 {
539 break;
541 }
542
543 remaining_attempts -= 1;
544 if remaining_attempts == 0 {
545 break;
547 }
548 }
549
550 self.client.remove_event_handler(stripped_member_handler);
551 self.client.remove_event_handler(timeline_event_handler);
552
553 let mut notifications = raw_notifications.clone().lock().unwrap().clone();
554 let mut missing_event_ids = Vec::new();
555
556 for request in requests.iter() {
558 for event_id in &request.event_ids {
559 if !notifications.contains_key(event_id) {
560 missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
561 }
562 }
563 }
564
565 for (room_id, missing_event_id) in missing_event_ids {
567 trace!("we didn't have a non-invite event, looking for invited room now");
568 if let Some(room) = self.client.get_room(&room_id) {
569 if room.state() == RoomState::Invited {
570 if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
571 notifications.insert(
572 missing_event_id.to_owned(),
573 (room_id.to_owned(), stripped_event),
574 );
575 }
576 } else {
577 debug!("the room isn't in the invited state");
578 }
579 } else {
580 warn!(%room_id, "unknown room, can't check for invite events");
581 }
582 }
583
584 let found = if notifications.len() == expected_event_count { "" } else { "not " };
585 trace!("all notification events have{found} been found");
586
587 Ok(notifications)
588 }
589
590 pub async fn get_notification_with_sliding_sync(
591 &self,
592 room_id: &RoomId,
593 event_id: &EventId,
594 ) -> Result<NotificationStatus, Error> {
595 info!("fetching notification event with a sliding sync");
596
597 let request = NotificationItemsRequest {
598 room_id: room_id.to_owned(),
599 event_ids: vec![event_id.to_owned()],
600 };
601
602 let mut get_notifications_result =
603 self.get_notifications_with_sliding_sync(&[request]).await?;
604
605 get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
606 }
607
608 pub async fn get_notifications_with_sliding_sync(
613 &self,
614 requests: &[NotificationItemsRequest],
615 ) -> Result<BatchNotificationFetchingResult, Error> {
616 let raw_events = self.try_sliding_sync(requests).await?;
617
618 let mut batch_result = BatchNotificationFetchingResult::new();
619
620 for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
621 let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
623
624 let Some(raw_event) = raw_event else {
625 batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
627 continue;
628 };
629
630 let (raw_event, push_actions) = match &raw_event {
631 RawNotificationEvent::Timeline(timeline_event) => {
632 match self.retry_decryption(&room, timeline_event).await {
634 Ok(Some(timeline_event)) => {
635 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
636 (
637 RawNotificationEvent::Timeline(timeline_event.into_raw()),
638 push_actions,
639 )
640 }
641
642 Ok(None) => {
643 match room.event_push_actions(timeline_event).await {
646 Ok(push_actions) => (raw_event.clone(), push_actions),
647 Err(err) => {
648 batch_result.insert(event_id, Err(err.into()));
650 continue;
651 }
652 }
653 }
654
655 Err(err) => {
656 batch_result.insert(event_id, Err(err));
657 continue;
658 }
659 }
660 }
661
662 RawNotificationEvent::Invite(invite_event) => {
663 match room.event_push_actions(invite_event).await {
665 Ok(push_actions) => {
666 (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
667 }
668 Err(err) => {
669 batch_result.insert(event_id, Err(err.into()));
670 continue;
671 }
672 }
673 }
674 };
675
676 let should_notify = push_actions
677 .as_ref()
678 .is_some_and(|actions| actions.iter().any(|a| a.should_notify()));
679
680 if !should_notify {
681 batch_result.insert(event_id, Ok(NotificationStatus::EventFilteredOut));
683 continue;
684 }
685
686 let status =
687 match NotificationItem::new(&room, raw_event, push_actions.as_deref(), Vec::new())
688 .await
689 .map(|event| NotificationStatus::Event(Box::new(event)))
690 {
691 Ok(status) => status,
692 Err(err) => {
693 batch_result.insert(event_id, Err(err));
695 continue;
696 }
697 };
698
699 match status {
700 NotificationStatus::Event(event) => {
701 if self.client.is_user_ignored(event.event.sender()).await {
702 batch_result.insert(event_id, Ok(NotificationStatus::EventFilteredOut));
703 } else {
704 batch_result.insert(event_id, Ok(NotificationStatus::Event(event)));
705 }
706 }
707 _ => {
708 batch_result.insert(event_id, Ok(status));
709 }
710 }
711 }
712
713 Ok(batch_result)
714 }
715
716 pub async fn get_notification_with_context(
729 &self,
730 room_id: &RoomId,
731 event_id: &EventId,
732 ) -> Result<NotificationStatus, Error> {
733 info!("fetching notification event with a /context query");
734
735 let Some(room) = self.parent_client.get_room(room_id) else {
737 return Err(Error::UnknownRoom);
738 };
739
740 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
741
742 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
743 let state_events = response.state;
744
745 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
746 timeline_event = decrypted_event;
747 }
748
749 if let Some(actions) = timeline_event.push_actions() {
750 if !actions.iter().any(|a| a.should_notify()) {
751 return Ok(NotificationStatus::EventFilteredOut);
752 }
753 }
754
755 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
756 let notification_item = NotificationItem::new(
757 &room,
758 RawNotificationEvent::Timeline(timeline_event.into_raw()),
759 push_actions.as_deref(),
760 state_events,
761 )
762 .await?;
763
764 if self.client.is_user_ignored(notification_item.event.sender()).await {
765 Ok(NotificationStatus::EventFilteredOut)
766 } else {
767 Ok(NotificationStatus::Event(Box::new(notification_item)))
768 }
769 }
770}
771
772fn is_event_encrypted(event_type: TimelineEventType) -> bool {
773 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
774
775 #[cfg(feature = "unstable-msc3956")]
776 let is_still_encrypted =
777 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
778
779 is_still_encrypted
780}
781
782#[derive(Debug)]
783pub enum NotificationStatus {
784 Event(Box<NotificationItem>),
786 EventNotFound,
788 EventFilteredOut,
792}
793
794#[derive(Debug, Clone)]
795pub struct NotificationItemsRequest {
796 pub room_id: OwnedRoomId,
797 pub event_ids: Vec<OwnedEventId>,
798}
799
800type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
801
802#[derive(Debug, Clone)]
807pub enum RawNotificationEvent {
808 Timeline(Raw<AnySyncTimelineEvent>),
810 Invite(Raw<StrippedRoomMemberEvent>),
813}
814
815#[derive(Debug)]
818pub enum NotificationEvent {
819 Timeline(Box<AnySyncTimelineEvent>),
821 Invite(Box<StrippedRoomMemberEvent>),
823}
824
825impl NotificationEvent {
826 pub fn sender(&self) -> &UserId {
827 match self {
828 NotificationEvent::Timeline(ev) => ev.sender(),
829 NotificationEvent::Invite(ev) => &ev.sender,
830 }
831 }
832
833 fn thread_id(&self) -> Option<OwnedEventId> {
836 let NotificationEvent::Timeline(sync_timeline_event) = &self else {
837 return None;
838 };
839 let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
840 return None;
841 };
842 let content = event.original_content()?;
843 match content {
844 AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
845 Relation::Thread(thread) => Some(thread.event_id),
846 _ => None,
847 },
848 _ => None,
849 }
850 }
851}
852
853#[derive(Debug)]
855pub struct NotificationItem {
856 pub event: NotificationEvent,
858
859 pub raw_event: RawNotificationEvent,
861
862 pub sender_display_name: Option<String>,
864 pub sender_avatar_url: Option<String>,
866 pub is_sender_name_ambiguous: bool,
868
869 pub room_computed_display_name: String,
871 pub room_avatar_url: Option<String>,
873 pub room_canonical_alias: Option<String>,
875 pub room_topic: Option<String>,
877 pub room_join_rule: Option<JoinRule>,
881 pub is_room_encrypted: Option<bool>,
883 pub is_direct_message_room: bool,
885 pub joined_members_count: u64,
887
888 pub is_noisy: Option<bool>,
893 pub has_mention: Option<bool>,
894 pub thread_id: Option<OwnedEventId>,
895}
896
897impl NotificationItem {
898 async fn new(
899 room: &Room,
900 raw_event: RawNotificationEvent,
901 push_actions: Option<&[Action]>,
902 state_events: Vec<Raw<AnyStateEvent>>,
903 ) -> Result<Self, Error> {
904 let event = match &raw_event {
905 RawNotificationEvent::Timeline(raw_event) => {
906 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
907 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
908 SyncRoomMessageEvent::Original(ev),
909 )) = &mut event
910 {
911 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
912 }
913 NotificationEvent::Timeline(Box::new(event))
914 }
915 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
916 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
917 )),
918 };
919
920 let sender = match room.state() {
921 RoomState::Invited => room.invite_details().await?.inviter,
922 _ => room.get_member_no_sync(event.sender()).await?,
923 };
924
925 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
926 match &sender {
927 Some(sender) => (
928 sender.display_name().map(|s| s.to_owned()),
929 sender.avatar_url().map(|s| s.to_string()),
930 sender.name_ambiguous(),
931 ),
932 None => (None, None, false),
933 };
934
935 if sender_display_name.is_none() || sender_avatar_url.is_none() {
936 let sender_id = event.sender();
937 for ev in state_events {
938 let ev = match ev.deserialize() {
939 Ok(ev) => ev,
940 Err(err) => {
941 warn!("Failed to deserialize a state event: {err}");
942 continue;
943 }
944 };
945 if ev.sender() != sender_id {
946 continue;
947 }
948 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
949 content,
950 ..
951 }) = ev.content()
952 {
953 if sender_display_name.is_none() {
954 sender_display_name = content.displayname;
955 }
956 if sender_avatar_url.is_none() {
957 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
958 }
959 }
960 }
961 }
962
963 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
964 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
965 let thread_id = event.thread_id().clone();
966
967 let item = NotificationItem {
968 event,
969 raw_event,
970 sender_display_name,
971 sender_avatar_url,
972 is_sender_name_ambiguous,
973 room_computed_display_name: room.display_name().await?.to_string(),
974 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
975 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
976 room_topic: room.topic(),
977 room_join_rule: room.join_rule(),
978 is_direct_message_room: room.is_direct().await?,
979 is_room_encrypted: room
980 .latest_encryption_state()
981 .await
982 .map(|state| state.is_encrypted())
983 .ok(),
984 joined_members_count: room.joined_members_count(),
985 is_noisy,
986 has_mention,
987 thread_id,
988 };
989
990 Ok(item)
991 }
992
993 pub fn is_public(&self) -> Option<bool> {
997 self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
998 }
999}
1000
1001#[derive(Debug, Error)]
1003pub enum Error {
1004 #[error(transparent)]
1005 BuildingLocalClient(ClientBuildError),
1006
1007 #[error("unknown room for a notification")]
1009 UnknownRoom,
1010
1011 #[error("invalid ruma event")]
1013 InvalidRumaEvent,
1014
1015 #[error("the sliding sync response doesn't include the target room")]
1018 SlidingSyncEmptyRoom,
1019
1020 #[error("the event was missing in the `/context` query")]
1021 ContextMissingEvent,
1022
1023 #[error(transparent)]
1025 SdkError(#[from] matrix_sdk::Error),
1026
1027 #[error(transparent)]
1029 StoreError(#[from] StoreError),
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034 use assert_matches2::assert_let;
1035 use matrix_sdk::test_utils::mocks::MatrixMockServer;
1036 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1037 use ruma::{event_id, room_id, user_id};
1038
1039 use crate::notification_client::{NotificationItem, RawNotificationEvent};
1040
1041 #[async_test]
1042 async fn test_notification_item_returns_thread_id() {
1043 let server = MatrixMockServer::new().await;
1044 let client = server.client_builder().build().await;
1045
1046 let room_id = room_id!("!a:b.c");
1047 let thread_root_event_id = event_id!("$root:b.c");
1048 let message = EventFactory::new()
1049 .room(room_id)
1050 .sender(user_id!("@sender:b.c"))
1051 .text_msg("Threaded")
1052 .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1053 .into_raw_sync();
1054 let room = server.sync_joined_room(&client, room_id).await;
1055
1056 let raw_notification_event = RawNotificationEvent::Timeline(message);
1057 let notification_item =
1058 NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1059 .await
1060 .expect("Could not create notification item");
1061
1062 assert_let!(Some(thread_id) = notification_item.thread_id);
1063 assert_eq!(thread_id, thread_root_event_id);
1064 }
1065}