1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{future::join_all, stream::FuturesUnordered};
30use http::StatusCode;
31#[cfg(feature = "e2e-encryption")]
32pub use identity_status_changes::IdentityStatusChanges;
33#[cfg(feature = "e2e-encryption")]
34use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
35#[cfg(feature = "e2e-encryption")]
36use matrix_sdk_base::{
37 crypto::{DecryptionSettings, RoomEventDecryptionResult},
38 deserialized_responses::EncryptionInfo,
39};
40use matrix_sdk_base::{
41 deserialized_responses::{
42 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
43 },
44 event_cache::store::media::IgnoreMediaRetentionPolicy,
45 media::MediaThumbnailSettings,
46 store::StateStoreExt,
47 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
48 StateChanges, StateStoreDataKey, StateStoreDataValue,
49};
50#[cfg(feature = "e2e-encryption")]
51use matrix_sdk_common::BoxFuture;
52use matrix_sdk_common::{
53 deserialized_responses::TimelineEvent,
54 executor::{spawn, JoinHandle},
55 timeout::timeout,
56};
57use mime::Mime;
58use reply::Reply;
59#[cfg(feature = "unstable-msc4274")]
60use ruma::events::room::message::GalleryItemType;
61#[cfg(feature = "e2e-encryption")]
62use ruma::events::{
63 room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
64 SyncMessageLikeEvent,
65};
66use ruma::{
67 api::client::{
68 config::{set_global_account_data, set_room_account_data},
69 context,
70 error::ErrorKind,
71 filter::LazyLoadOptions,
72 membership::{
73 ban_user, forget_room, get_member_events,
74 invite_user::{self, v3::InvitationRecipient},
75 kick_user, leave_room, unban_user, Invite3pid,
76 },
77 message::send_message_event,
78 read_marker::set_read_marker,
79 receipt::create_receipt,
80 redact::redact_event,
81 room::{get_room_event, report_content, report_room},
82 state::{get_state_events_for_key, send_state_event},
83 tag::{create_tag, delete_tag},
84 typing::create_typing_event::{self, v3::Typing},
85 },
86 assign,
87 events::{
88 beacon::BeaconEventContent,
89 beacon_info::BeaconInfoEventContent,
90 call::notify::{ApplicationType, CallNotifyEventContent, NotifyType},
91 direct::DirectEventContent,
92 marked_unread::MarkedUnreadEventContent,
93 receipt::{Receipt, ReceiptThread, ReceiptType},
94 room::{
95 avatar::{self, RoomAvatarEventContent},
96 encryption::RoomEncryptionEventContent,
97 history_visibility::HistoryVisibility,
98 member::{MembershipChange, SyncRoomMemberEvent},
99 message::{
100 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
101 FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
102 UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
103 VideoMessageEventContent,
104 },
105 name::RoomNameEventContent,
106 pinned_events::RoomPinnedEventsEventContent,
107 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
108 server_acl::RoomServerAclEventContent,
109 topic::RoomTopicEventContent,
110 ImageInfo, MediaSource, ThumbnailInfo,
111 },
112 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
113 tag::{TagInfo, TagName},
114 typing::SyncTypingEvent,
115 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
116 Mentions, MessageLikeEventContent, MessageLikeEventType, OriginalSyncStateEvent,
117 RedactContent, RedactedStateEventContent, RoomAccountDataEvent,
118 RoomAccountDataEventContent, RoomAccountDataEventType, StateEventContent, StateEventType,
119 StaticEventContent, StaticStateEventContent, SyncStateEvent,
120 },
121 push::{Action, PushConditionRoomCtx, Ruleset},
122 serde::Raw,
123 time::Instant,
124 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
125 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
126};
127use serde::de::DeserializeOwned;
128use thiserror::Error;
129use tokio::{join, sync::broadcast};
130use tokio_stream::StreamExt;
131use tracing::{debug, error, info, instrument, trace, warn};
132
133use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
134pub use self::{
135 member::{RoomMember, RoomMemberRole},
136 messages::{
137 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
138 Relations, RelationsOptions, ThreadRoots,
139 },
140};
141#[cfg(doc)]
142use crate::event_cache::EventCache;
143use crate::{
144 attachment::{AttachmentConfig, AttachmentInfo},
145 client::WeakClient,
146 config::RequestConfig,
147 error::{BeaconError, WrongRoomState},
148 event_cache::{self, EventCacheDropHandles, RoomEventCache},
149 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
150 live_location_share::ObservableLiveLocation,
151 media::{MediaFormat, MediaRequestParameters},
152 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
153 room::{
154 knock_requests::{KnockRequest, KnockRequestMemberInfo},
155 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
156 privacy_settings::RoomPrivacySettings,
157 },
158 sync::RoomUpdate,
159 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
160 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
161};
162#[cfg(feature = "e2e-encryption")]
163use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
164
165pub mod edit;
166pub mod futures;
167pub mod identity_status_changes;
168pub mod knock_requests;
170mod member;
171mod messages;
172pub mod power_levels;
173pub mod reply;
174
175pub mod privacy_settings;
177
178#[cfg(feature = "e2e-encryption")]
179mod shared_room_history;
180
181#[derive(Debug, Clone)]
184pub struct Room {
185 inner: BaseRoom,
186 pub(crate) client: Client,
187}
188
189impl Deref for Room {
190 type Target = BaseRoom;
191
192 fn deref(&self) -> &Self::Target {
193 &self.inner
194 }
195}
196
197const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
198const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
199
200#[derive(Debug)]
202pub struct PushContext {
203 push_condition_room_ctx: PushConditionRoomCtx,
205
206 push_rules: Ruleset,
209}
210
211impl PushContext {
212 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
214 Self { push_condition_room_ctx, push_rules }
215 }
216
217 pub fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
219 self.push_rules.get_actions(event, &self.push_condition_room_ctx).to_owned()
220 }
221}
222
223macro_rules! make_media_type {
224 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $formatted_caption: ident, $info: ident, $thumbnail: ident) => {{
225 let (body, filename) = match $caption {
229 Some(caption) => (caption, Some($filename)),
230 None => ($filename, None),
231 };
232
233 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
234
235 match $content_type.type_() {
236 mime::IMAGE => {
237 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
238 mimetype: Some($content_type.as_ref().to_owned()),
239 thumbnail_source,
240 thumbnail_info
241 });
242 let content = assign!(ImageMessageEventContent::new(body, $source), {
243 info: Some(Box::new(info)),
244 formatted: $formatted_caption,
245 filename
246 });
247 <$t>::Image(content)
248 }
249
250 mime::AUDIO => {
251 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
252 formatted: $formatted_caption,
253 filename
254 });
255
256 if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
257 &$info
258 {
259 if let Some(duration) = audio_info.duration {
260 let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
261 content.audio =
262 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
263 }
264 content.voice = Some(UnstableVoiceContentBlock::new());
265 }
266
267 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
268 audio_info.mimetype = Some($content_type.as_ref().to_owned());
269 let content = content.info(Box::new(audio_info));
270
271 <$t>::Audio(content)
272 }
273
274 mime::VIDEO => {
275 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
276 mimetype: Some($content_type.as_ref().to_owned()),
277 thumbnail_source,
278 thumbnail_info
279 });
280 let content = assign!(VideoMessageEventContent::new(body, $source), {
281 info: Some(Box::new(info)),
282 formatted: $formatted_caption,
283 filename
284 });
285 <$t>::Video(content)
286 }
287
288 _ => {
289 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
290 mimetype: Some($content_type.as_ref().to_owned()),
291 thumbnail_source,
292 thumbnail_info
293 });
294 let content = assign!(FileMessageEventContent::new(body, $source), {
295 info: Some(Box::new(info)),
296 formatted: $formatted_caption,
297 filename,
298 });
299 <$t>::File(content)
300 }
301 }
302 }};
303}
304
305impl Room {
306 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
313 Self { inner: room, client }
314 }
315
316 #[doc(alias = "reject_invitation")]
322 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
323 pub async fn leave(&self) -> Result<()> {
324 let state = self.state();
325 if state == RoomState::Left {
326 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
327 "Joined or Invited",
328 state,
329 ))));
330 }
331
332 let should_forget = matches!(self.state(), RoomState::Invited);
335
336 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
337 let response = self.client.send(request).await;
338
339 if let Err(error) = response {
342 #[allow(clippy::collapsible_match)]
343 let ignore_error = if let Some(error) = error.client_api_error_kind() {
344 match error {
345 ErrorKind::Forbidden { .. } => true,
348 _ => false,
349 }
350 } else {
351 false
352 };
353
354 error!(?error, ignore_error, should_forget, "Failed to leave the room");
355
356 if !ignore_error {
357 return Err(error.into());
358 }
359 }
360
361 self.client.base_client().room_left(self.room_id()).await?;
362
363 if should_forget {
364 trace!("Trying to forget the room");
365
366 if let Err(error) = self.forget().await {
367 error!(?error, "Failed to forget the room");
368 }
369 }
370
371 Ok(())
372 }
373
374 #[doc(alias = "accept_invitation")]
378 pub async fn join(&self) -> Result<()> {
379 let prev_room_state = self.inner.state();
380
381 if prev_room_state == RoomState::Joined {
382 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
383 "Invited or Left",
384 prev_room_state,
385 ))));
386 }
387
388 let inviter = if prev_room_state == RoomState::Invited {
389 match self.invite_details().await {
390 Ok(details) => details.inviter,
391 Err(e) => {
392 warn!("No invite details were found, can't attempt to find a room key bundle to accept: {e:?}");
393 None
394 }
395 }
396 } else {
397 None
398 };
399
400 self.client.join_room_by_id(self.room_id()).await?;
401
402 #[cfg(feature = "e2e-encryption")]
403 if self.client.inner.enable_share_history_on_invite {
404 if let Some(inviter) = inviter {
405 shared_room_history::maybe_accept_key_bundle(self, inviter.user_id()).await?;
406 }
407 }
408
409 #[cfg(not(feature = "e2e-encryption"))]
410 let _inviter = inviter;
412
413 Ok(())
414 }
415
416 pub fn client(&self) -> Client {
420 self.client.clone()
421 }
422
423 pub fn is_synced(&self) -> bool {
426 self.inner.is_state_fully_synced()
427 }
428
429 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
459 let Some(url) = self.avatar_url() else { return Ok(None) };
460 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
461 Ok(Some(self.client.media().get_media_content(&request, true).await?))
462 }
463
464 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
493 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
494 let room_id = self.inner.room_id();
495 let request = options.into_request(room_id);
496 let http_response = self.client.send(request).await?;
497
498 let push_ctx = self.push_context().await?;
499 let chunk = join_all(
500 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
501 )
502 .await;
503
504 Ok(Messages {
505 start: http_response.start,
506 end: http_response.end,
507 chunk,
508 state: http_response.state,
509 })
510 }
511
512 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
522 where
523 Ev: SyncEvent + DeserializeOwned + Send + 'static,
524 H: EventHandler<Ev, Ctx>,
525 {
526 self.client.add_room_event_handler(self.room_id(), handler)
527 }
528
529 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
534 self.client.subscribe_to_room_updates(self.room_id())
535 }
536
537 pub fn subscribe_to_typing_notifications(
543 &self,
544 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
545 let (sender, receiver) = broadcast::channel(16);
546 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
547 let own_user_id = self.own_user_id().to_owned();
548 move |event: SyncTypingEvent| async move {
549 let typing_user_ids = event
551 .content
552 .user_ids
553 .into_iter()
554 .filter(|user_id| *user_id != own_user_id)
555 .collect();
556 let _ = sender.send(typing_user_ids);
558 }
559 });
560 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
561 (drop_guard, receiver)
562 }
563
564 #[cfg(feature = "e2e-encryption")]
587 pub async fn subscribe_to_identity_status_changes(
588 &self,
589 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
590 IdentityStatusChanges::create_stream(self.clone()).await
591 }
592
593 #[allow(clippy::unused_async)] async fn try_decrypt_event(
599 &self,
600 event: Raw<AnyTimelineEvent>,
601 push_ctx: Option<&PushContext>,
602 ) -> TimelineEvent {
603 #[cfg(feature = "e2e-encryption")]
604 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
605 SyncMessageLikeEvent::Original(_),
606 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
607 {
608 if let Ok(event) = self.decrypt_event(event.cast_ref(), push_ctx).await {
609 return event;
610 }
611 }
612
613 let mut event = TimelineEvent::from_plaintext(event.cast());
614 if let Some(push_ctx) = push_ctx {
615 event.set_push_actions(push_ctx.for_event(event.raw()));
616 }
617
618 event
619 }
620
621 pub async fn event(
626 &self,
627 event_id: &EventId,
628 request_config: Option<RequestConfig>,
629 ) -> Result<TimelineEvent> {
630 let request =
631 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
632
633 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
634 let push_ctx = self.push_context().await?;
635 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
636
637 if let Ok((cache, _handles)) = self.event_cache().await {
639 cache.save_events([event.clone()]).await;
640 }
641
642 Ok(event)
643 }
644
645 pub async fn load_or_fetch_event(
652 &self,
653 event_id: &EventId,
654 request_config: Option<RequestConfig>,
655 ) -> Result<TimelineEvent> {
656 match self.event_cache().await {
657 Ok((event_cache, _drop_handles)) => {
658 if let Some(event) = event_cache.event(event_id).await {
659 return Ok(event);
660 }
661 }
663 Err(err) => {
664 debug!("error when getting the event cache: {err}");
665 }
666 }
667 self.event(event_id, request_config).await
668 }
669
670 pub async fn event_with_context(
673 &self,
674 event_id: &EventId,
675 lazy_load_members: bool,
676 context_size: UInt,
677 request_config: Option<RequestConfig>,
678 ) -> Result<EventWithContextResponse> {
679 let mut request =
680 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
681
682 request.limit = context_size;
683
684 if lazy_load_members {
685 request.filter.lazy_load_options =
686 LazyLoadOptions::Enabled { include_redundant_members: false };
687 }
688
689 let response = self.client.send(request).with_request_config(request_config).await?;
690
691 let push_ctx = self.push_context().await?;
692 let push_ctx = push_ctx.as_ref();
693 let target_event = if let Some(event) = response.event {
694 Some(self.try_decrypt_event(event, push_ctx).await)
695 } else {
696 None
697 };
698
699 let (events_before, events_after) = join!(
703 join_all(
704 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
705 ),
706 join_all(
707 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
708 ),
709 );
710
711 if let Ok((cache, _handles)) = self.event_cache().await {
713 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
714 if let Some(event) = &target_event {
715 events_to_save.push(event.clone());
716 }
717
718 for event in &events_before {
719 events_to_save.push(event.clone());
720 }
721
722 for event in &events_after {
723 events_to_save.push(event.clone());
724 }
725
726 cache.save_events(events_to_save).await;
727 }
728
729 Ok(EventWithContextResponse {
730 event: target_event,
731 events_before,
732 events_after,
733 state: response.state,
734 prev_batch_token: response.start,
735 next_batch_token: response.end,
736 })
737 }
738
739 pub(crate) async fn request_members(&self) -> Result<()> {
740 self.client
741 .locks()
742 .members_request_deduplicated_handler
743 .run(self.room_id().to_owned(), async move {
744 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
745 let response = self
746 .client
747 .send(request.clone())
748 .with_request_config(
749 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
752 )
753 .await?;
754
755 Box::pin(self.client.base_client().receive_all_members(
757 self.room_id(),
758 &request,
759 &response,
760 ))
761 .await?;
762
763 Ok(())
764 })
765 .await
766 }
767
768 pub async fn request_encryption_state(&self) -> Result<()> {
773 if !self.inner.encryption_state().is_unknown() {
774 return Ok(());
775 }
776
777 self.client
778 .locks()
779 .encryption_state_deduplicated_handler
780 .run(self.room_id().to_owned(), async move {
781 let request = get_state_events_for_key::v3::Request::new(
783 self.room_id().to_owned(),
784 StateEventType::RoomEncryption,
785 "".to_owned(),
786 );
787 let response = match self.client.send(request).await {
788 Ok(response) => {
789 Some(response.content.deserialize_as::<RoomEncryptionEventContent>()?)
790 }
791 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
792 Err(err) => return Err(err.into()),
793 };
794
795 let _sync_lock = self.client.base_client().sync_lock().lock().await;
796
797 let mut room_info = self.clone_info();
800 room_info.mark_encryption_state_synced();
801 room_info.set_encryption_event(response.clone());
802 let mut changes = StateChanges::default();
803 changes.add_room(room_info.clone());
804
805 self.client.state_store().save_changes(&changes).await?;
806 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
807
808 Ok(())
809 })
810 .await
811 }
812
813 pub fn encryption_state(&self) -> EncryptionState {
818 self.inner.encryption_state()
819 }
820
821 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
827 self.request_encryption_state().await?;
828
829 Ok(self.encryption_state())
830 }
831
832 #[cfg(feature = "e2e-encryption")]
834 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
835 let encryption = self.client.encryption();
836
837 let this_device_is_verified = match encryption.get_own_device().await {
838 Ok(Some(device)) => device.is_verified_with_cross_signing(),
839
840 _ => true,
842 };
843
844 let backup_exists_on_server =
845 encryption.backups().exists_on_server().await.unwrap_or(false);
846
847 CryptoContextInfo {
848 device_creation_ts: encryption.device_creation_timestamp().await,
849 this_device_is_verified,
850 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
851 backup_exists_on_server,
852 }
853 }
854
855 fn are_events_visible(&self) -> bool {
856 if let RoomState::Invited = self.inner.state() {
857 return matches!(
858 self.inner.history_visibility_or_default(),
859 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
860 );
861 }
862
863 true
864 }
865
866 pub async fn sync_members(&self) -> Result<()> {
872 if !self.are_events_visible() {
873 return Ok(());
874 }
875
876 if !self.are_members_synced() {
877 self.request_members().await
878 } else {
879 Ok(())
880 }
881 }
882
883 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
897 self.sync_members().await?;
898 self.get_member_no_sync(user_id).await
899 }
900
901 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
915 Ok(self
916 .inner
917 .get_member(user_id)
918 .await?
919 .map(|member| RoomMember::new(self.client.clone(), member)))
920 }
921
922 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
931 self.sync_members().await?;
932 self.members_no_sync(memberships).await
933 }
934
935 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
944 Ok(self
945 .inner
946 .members(memberships)
947 .await?
948 .into_iter()
949 .map(|member| RoomMember::new(self.client.clone(), member))
950 .collect())
951 }
952
953 pub async fn get_state_events(
955 &self,
956 event_type: StateEventType,
957 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
958 self.client
959 .state_store()
960 .get_state_events(self.room_id(), event_type)
961 .await
962 .map_err(Into::into)
963 }
964
965 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
982 where
983 C: StaticEventContent + StaticStateEventContent + RedactContent,
984 C::Redacted: RedactedStateEventContent,
985 {
986 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
987 }
988
989 pub async fn get_state_events_for_keys(
992 &self,
993 event_type: StateEventType,
994 state_keys: &[&str],
995 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
996 self.client
997 .state_store()
998 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
999 .await
1000 .map_err(Into::into)
1001 }
1002
1003 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1023 &self,
1024 state_keys: I,
1025 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1026 where
1027 C: StaticEventContent + StaticStateEventContent + RedactContent,
1028 C::StateKey: Borrow<K>,
1029 C::Redacted: RedactedStateEventContent,
1030 K: AsRef<str> + Sized + Sync + 'a,
1031 I: IntoIterator<Item = &'a K> + Send,
1032 I::IntoIter: Send,
1033 {
1034 Ok(self
1035 .client
1036 .state_store()
1037 .get_state_events_for_keys_static(self.room_id(), state_keys)
1038 .await?)
1039 }
1040
1041 pub async fn get_state_event(
1043 &self,
1044 event_type: StateEventType,
1045 state_key: &str,
1046 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1047 self.client
1048 .state_store()
1049 .get_state_event(self.room_id(), event_type, state_key)
1050 .await
1051 .map_err(Into::into)
1052 }
1053
1054 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1073 where
1074 C: StaticEventContent + StaticStateEventContent<StateKey = EmptyStateKey> + RedactContent,
1075 C::Redacted: RedactedStateEventContent,
1076 {
1077 self.get_state_event_static_for_key(&EmptyStateKey).await
1078 }
1079
1080 pub async fn get_state_event_static_for_key<C, K>(
1100 &self,
1101 state_key: &K,
1102 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1103 where
1104 C: StaticEventContent + StaticStateEventContent + RedactContent,
1105 C::StateKey: Borrow<K>,
1106 C::Redacted: RedactedStateEventContent,
1107 K: AsRef<str> + ?Sized + Sync,
1108 {
1109 Ok(self
1110 .client
1111 .state_store()
1112 .get_state_event_static_for_key(self.room_id(), state_key)
1113 .await?)
1114 }
1115
1116 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1120 Ok(self
1125 .get_state_events_static::<SpaceParentEventContent>()
1126 .await?
1127 .into_iter()
1128 .flat_map(|parent_event| match parent_event.deserialize() {
1130 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1131 Some((e.state_key.to_owned(), e.sender))
1132 }
1133 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1134 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1135 Err(e) => {
1136 info!(room_id = ?self.room_id(), "Could not deserialize m.room.parent: {e}");
1137 None
1138 }
1139 })
1140 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1142 let Some(parent_room) = self.client.get_room(&state_key) else {
1143 return Ok(ParentSpace::Unverifiable(state_key));
1146 };
1147 if let Some(child_event) = parent_room
1150 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1151 .await?
1152 {
1153 match child_event.deserialize() {
1154 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1155 return Ok(ParentSpace::Reciprocal(parent_room));
1158 }
1159 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1160 Ok(SyncOrStrippedState::Stripped(_)) => {}
1161 Err(e) => {
1162 info!(
1163 room_id = ?self.room_id(), parent_room_id = ?state_key,
1164 "Could not deserialize m.room.child: {e}"
1165 );
1166 }
1167 }
1168 }
1173
1174 let Some(member) = parent_room.get_member(&sender).await? else {
1177 return Ok(ParentSpace::Illegitimate(parent_room));
1179 };
1180
1181 if member.can_send_state(StateEventType::SpaceChild) {
1182 Ok(ParentSpace::WithPowerlevel(parent_room))
1184 } else {
1185 Ok(ParentSpace::Illegitimate(parent_room))
1186 }
1187 })
1188 .collect::<FuturesUnordered<_>>())
1189 }
1190
1191 pub async fn account_data(
1193 &self,
1194 data_type: RoomAccountDataEventType,
1195 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1196 self.client
1197 .state_store()
1198 .get_room_account_data_event(self.room_id(), data_type)
1199 .await
1200 .map_err(Into::into)
1201 }
1202
1203 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1222 where
1223 C: StaticEventContent + RoomAccountDataEventContent,
1224 {
1225 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast))
1226 }
1227
1228 #[cfg(feature = "e2e-encryption")]
1233 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1234 let user_ids = self
1235 .client
1236 .state_store()
1237 .get_user_ids(self.room_id(), RoomMemberships::empty())
1238 .await?;
1239
1240 for user_id in user_ids {
1241 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1242 let any_unverified = devices.devices().any(|d| !d.is_verified());
1243
1244 if any_unverified {
1245 return Ok(false);
1246 }
1247 }
1248
1249 Ok(true)
1250 }
1251
1252 pub async fn set_account_data<T>(
1267 &self,
1268 content: T,
1269 ) -> Result<set_room_account_data::v3::Response>
1270 where
1271 T: RoomAccountDataEventContent,
1272 {
1273 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1274
1275 let request = set_room_account_data::v3::Request::new(
1276 own_user.to_owned(),
1277 self.room_id().to_owned(),
1278 &content,
1279 )?;
1280
1281 Ok(self.client.send(request).await?)
1282 }
1283
1284 pub async fn set_account_data_raw(
1309 &self,
1310 event_type: RoomAccountDataEventType,
1311 content: Raw<AnyRoomAccountDataEventContent>,
1312 ) -> Result<set_room_account_data::v3::Response> {
1313 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1314
1315 let request = set_room_account_data::v3::Request::new_raw(
1316 own_user.to_owned(),
1317 self.room_id().to_owned(),
1318 event_type,
1319 content,
1320 );
1321
1322 Ok(self.client.send(request).await?)
1323 }
1324
1325 pub async fn set_tag(
1356 &self,
1357 tag: TagName,
1358 tag_info: TagInfo,
1359 ) -> Result<create_tag::v3::Response> {
1360 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1361 let request = create_tag::v3::Request::new(
1362 user_id.to_owned(),
1363 self.inner.room_id().to_owned(),
1364 tag.to_string(),
1365 tag_info,
1366 );
1367 Ok(self.client.send(request).await?)
1368 }
1369
1370 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1377 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1378 let request = delete_tag::v3::Request::new(
1379 user_id.to_owned(),
1380 self.inner.room_id().to_owned(),
1381 tag.to_string(),
1382 );
1383 Ok(self.client.send(request).await?)
1384 }
1385
1386 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1396 if is_favourite {
1397 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1398
1399 self.set_tag(TagName::Favorite, tag_info).await?;
1400
1401 if self.is_low_priority() {
1402 self.remove_tag(TagName::LowPriority).await?;
1403 }
1404 } else {
1405 self.remove_tag(TagName::Favorite).await?;
1406 }
1407 Ok(())
1408 }
1409
1410 pub async fn set_is_low_priority(
1420 &self,
1421 is_low_priority: bool,
1422 tag_order: Option<f64>,
1423 ) -> Result<()> {
1424 if is_low_priority {
1425 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1426
1427 self.set_tag(TagName::LowPriority, tag_info).await?;
1428
1429 if self.is_favourite() {
1430 self.remove_tag(TagName::Favorite).await?;
1431 }
1432 } else {
1433 self.remove_tag(TagName::LowPriority).await?;
1434 }
1435 Ok(())
1436 }
1437
1438 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1447 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1448
1449 let mut content = self
1450 .client
1451 .account()
1452 .account_data::<DirectEventContent>()
1453 .await?
1454 .map(|c| c.deserialize())
1455 .transpose()?
1456 .unwrap_or_default();
1457
1458 let this_room_id = self.inner.room_id();
1459
1460 if is_direct {
1461 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1462 room_members.retain(|member| member.user_id() != self.own_user_id());
1463
1464 for member in room_members {
1465 let entry = content.entry(member.user_id().into()).or_default();
1466 if !entry.iter().any(|room_id| room_id == this_room_id) {
1467 entry.push(this_room_id.to_owned());
1468 }
1469 }
1470 } else {
1471 for (_, list) in content.iter_mut() {
1472 list.retain(|room_id| *room_id != this_room_id);
1473 }
1474
1475 content.retain(|_, list| !list.is_empty());
1477 }
1478
1479 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1480
1481 self.client.send(request).await?;
1482 Ok(())
1483 }
1484
1485 #[cfg(feature = "e2e-encryption")]
1493 pub async fn decrypt_event(
1494 &self,
1495 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1496 push_ctx: Option<&PushContext>,
1497 ) -> Result<TimelineEvent> {
1498 let machine = self.client.olm_machine().await;
1499 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1500
1501 let decryption_settings = DecryptionSettings {
1502 sender_device_trust_requirement: self.client.base_client().decryption_trust_requirement,
1503 };
1504
1505 match machine
1506 .try_decrypt_room_event(event.cast_ref(), self.inner.room_id(), &decryption_settings)
1507 .await?
1508 {
1509 RoomEventDecryptionResult::Decrypted(decrypted) => {
1510 let push_actions = push_ctx.map(|push_ctx| push_ctx.for_event(&decrypted.event));
1511 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1512 }
1513 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1514 self.client
1515 .encryption()
1516 .backups()
1517 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1518 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1519 }
1520 }
1521 }
1522
1523 #[cfg(feature = "e2e-encryption")]
1536 pub async fn get_encryption_info(
1537 &self,
1538 session_id: &str,
1539 sender: &UserId,
1540 ) -> Option<Arc<EncryptionInfo>> {
1541 let machine = self.client.olm_machine().await;
1542 let machine = machine.as_ref()?;
1543 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1544 }
1545
1546 #[cfg(feature = "e2e-encryption")]
1559 pub async fn discard_room_key(&self) -> Result<()> {
1560 let machine = self.client.olm_machine().await;
1561 if let Some(machine) = machine.as_ref() {
1562 machine.discard_room_key(self.inner.room_id()).await?;
1563 Ok(())
1564 } else {
1565 Err(Error::NoOlmMachine)
1566 }
1567 }
1568
1569 #[instrument(skip_all)]
1577 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1578 let request = assign!(
1579 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1580 { reason: reason.map(ToOwned::to_owned) }
1581 );
1582 self.client.send(request).await?;
1583 Ok(())
1584 }
1585
1586 #[instrument(skip_all)]
1594 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1595 let request = assign!(
1596 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1597 { reason: reason.map(ToOwned::to_owned) }
1598 );
1599 self.client.send(request).await?;
1600 Ok(())
1601 }
1602
1603 #[instrument(skip_all)]
1612 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1613 let request = assign!(
1614 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1615 { reason: reason.map(ToOwned::to_owned) }
1616 );
1617 self.client.send(request).await?;
1618 Ok(())
1619 }
1620
1621 #[instrument(skip_all)]
1627 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1628 #[cfg(feature = "e2e-encryption")]
1629 if self.client.inner.enable_share_history_on_invite {
1630 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1631 }
1632
1633 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1634 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1635 self.client.send(request).await?;
1636
1637 self.mark_members_missing();
1641
1642 Ok(())
1643 }
1644
1645 #[instrument(skip_all)]
1651 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1652 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1653 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1654 self.client.send(request).await?;
1655
1656 self.mark_members_missing();
1660
1661 Ok(())
1662 }
1663
1664 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1699 self.ensure_room_joined()?;
1700
1701 let send = if let Some(typing_time) =
1704 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1705 {
1706 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1707 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1711 } else {
1712 !typing
1714 }
1715 } else {
1716 typing
1719 };
1720
1721 if send {
1722 self.send_typing_notice(typing).await?;
1723 }
1724
1725 Ok(())
1726 }
1727
1728 #[instrument(name = "typing_notice", skip(self))]
1729 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1730 let typing = if typing {
1731 self.client
1732 .inner
1733 .typing_notice_times
1734 .write()
1735 .unwrap()
1736 .insert(self.room_id().to_owned(), Instant::now());
1737 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1738 } else {
1739 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1740 Typing::No
1741 };
1742
1743 let request = create_typing_event::v3::Request::new(
1744 self.own_user_id().to_owned(),
1745 self.room_id().to_owned(),
1746 typing,
1747 );
1748
1749 self.client.send(request).await?;
1750
1751 Ok(())
1752 }
1753
1754 #[instrument(skip_all)]
1771 pub async fn send_single_receipt(
1772 &self,
1773 receipt_type: create_receipt::v3::ReceiptType,
1774 thread: ReceiptThread,
1775 event_id: OwnedEventId,
1776 ) -> Result<()> {
1777 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1780
1781 self.client
1782 .inner
1783 .locks
1784 .read_receipt_deduplicated_handler
1785 .run((request_key, event_id.clone()), async {
1786 let is_unthreaded = thread == ReceiptThread::Unthreaded;
1788
1789 let mut request = create_receipt::v3::Request::new(
1790 self.room_id().to_owned(),
1791 receipt_type,
1792 event_id,
1793 );
1794 request.thread = thread;
1795
1796 self.client.send(request).await?;
1797
1798 if is_unthreaded {
1799 self.set_unread_flag(false).await?;
1800 }
1801
1802 Ok(())
1803 })
1804 .await
1805 }
1806
1807 #[instrument(skip_all)]
1817 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
1818 if receipts.is_empty() {
1819 return Ok(());
1820 }
1821
1822 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
1823 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
1824 fully_read,
1825 read_receipt: public_read_receipt,
1826 private_read_receipt,
1827 });
1828
1829 self.client.send(request).await?;
1830
1831 self.set_unread_flag(false).await?;
1832
1833 Ok(())
1834 }
1835
1836 #[instrument(skip_all)]
1868 pub async fn enable_encryption(&self) -> Result<()> {
1869 use ruma::{
1870 events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm,
1871 };
1872 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
1873
1874 if !self.latest_encryption_state().await?.is_encrypted() {
1875 let content =
1876 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
1877 self.send_state_event(content).await?;
1878
1879 _ = timeout(self.client.inner.sync_beat.listen(), SYNC_WAIT_TIME).await;
1883
1884 let _sync_lock = self.client.base_client().sync_lock().lock().await;
1889 if !self.inner.encryption_state().is_encrypted() {
1890 debug!("still not marked as encrypted, marking encryption state as missing");
1891
1892 let mut room_info = self.clone_info();
1893 room_info.mark_encryption_state_missing();
1894 let mut changes = StateChanges::default();
1895 changes.add_room(room_info.clone());
1896
1897 self.client.state_store().save_changes(&changes).await?;
1898 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1899 } else {
1900 debug!("room successfully marked as encrypted");
1901 }
1902 }
1903
1904 Ok(())
1905 }
1906
1907 #[cfg(feature = "e2e-encryption")]
1916 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
1917 async fn preshare_room_key(&self) -> Result<()> {
1918 self.ensure_room_joined()?;
1919
1920 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
1922 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
1923
1924 self.client
1925 .locks()
1926 .group_session_deduplicated_handler
1927 .run(self.room_id().to_owned(), async move {
1928 {
1929 let members = self
1930 .client
1931 .state_store()
1932 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
1933 .await?;
1934 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
1935 };
1936
1937 let response = self.share_room_key().await;
1938
1939 if let Err(r) = response {
1943 let machine = self.client.olm_machine().await;
1944 if let Some(machine) = machine.as_ref() {
1945 machine.discard_room_key(self.room_id()).await?;
1946 }
1947 return Err(r);
1948 }
1949
1950 Ok(())
1951 })
1952 .await
1953 }
1954
1955 #[cfg(feature = "e2e-encryption")]
1961 #[instrument(skip_all)]
1962 async fn share_room_key(&self) -> Result<()> {
1963 self.ensure_room_joined()?;
1964
1965 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
1966
1967 for request in requests {
1968 let response = self.client.send_to_device(&request).await?;
1969 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
1970 }
1971
1972 Ok(())
1973 }
1974
1975 #[instrument(skip_all)]
1984 pub async fn sync_up(&self) {
1985 while !self.is_synced() && self.state() == RoomState::Joined {
1986 let wait_for_beat = self.client.inner.sync_beat.listen();
1987 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
1989 }
1990 }
1991
1992 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2062 SendMessageLikeEvent::new(self, content)
2063 }
2064
2065 #[cfg(feature = "e2e-encryption")]
2068 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2069 let olm = self.client.olm_machine().await;
2070 let olm = olm.as_ref().expect("Olm machine wasn't started");
2071
2072 let members =
2073 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2074
2075 let tracked: HashMap<_, _> = olm
2076 .store()
2077 .load_tracked_users()
2078 .await?
2079 .into_iter()
2080 .map(|tracked| (tracked.user_id, tracked.dirty))
2081 .collect();
2082
2083 let members_with_unknown_devices =
2086 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2087
2088 let (req_id, request) =
2089 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2090
2091 if !request.device_keys.is_empty() {
2092 self.client.keys_query(&req_id, request.device_keys).await?;
2093 }
2094
2095 Ok(())
2096 }
2097
2098 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2142 pub fn send_raw<'a>(
2143 &'a self,
2144 event_type: &'a str,
2145 content: impl IntoRawMessageLikeEventContent,
2146 ) -> SendRawMessageLikeEvent<'a> {
2147 SendRawMessageLikeEvent::new(self, event_type, content)
2150 }
2151
2152 #[instrument(skip_all)]
2200 pub fn send_attachment<'a>(
2201 &'a self,
2202 filename: impl Into<String>,
2203 content_type: &'a Mime,
2204 data: Vec<u8>,
2205 config: AttachmentConfig,
2206 ) -> SendAttachment<'a> {
2207 SendAttachment::new(self, filename.into(), content_type, data, config)
2208 }
2209
2210 #[instrument(skip_all)]
2238 pub(super) async fn prepare_and_send_attachment<'a>(
2239 &'a self,
2240 filename: String,
2241 content_type: &'a Mime,
2242 data: Vec<u8>,
2243 mut config: AttachmentConfig,
2244 send_progress: SharedObservable<TransmissionProgress>,
2245 store_in_cache: bool,
2246 ) -> Result<send_message_event::v3::Response> {
2247 self.ensure_room_joined()?;
2248
2249 let txn_id = config.txn_id.take();
2250 let mentions = config.mentions.take();
2251
2252 let thumbnail = config.thumbnail.take();
2253
2254 let thumbnail_cache_info = if store_in_cache {
2256 thumbnail
2257 .as_ref()
2258 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2259 } else {
2260 None
2261 };
2262
2263 #[cfg(feature = "e2e-encryption")]
2264 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2265 self.client
2266 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2267 .await?
2268 } else {
2269 self.client
2270 .media()
2271 .upload_plain_media_and_thumbnail(
2272 content_type,
2273 data.clone(),
2276 thumbnail,
2277 send_progress,
2278 )
2279 .await?
2280 };
2281
2282 #[cfg(not(feature = "e2e-encryption"))]
2283 let (media_source, thumbnail) = self
2284 .client
2285 .media()
2286 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2287 .await?;
2288
2289 if store_in_cache {
2290 let cache_store_lock_guard = self.client.event_cache_store().lock().await?;
2291
2292 debug!("caching the media");
2296 let request =
2297 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2298
2299 if let Err(err) = cache_store_lock_guard
2300 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2301 .await
2302 {
2303 warn!("unable to cache the media after uploading it: {err}");
2304 }
2305
2306 if let Some(((data, height, width), source)) =
2307 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2308 {
2309 debug!("caching the thumbnail");
2310
2311 let request = MediaRequestParameters {
2312 source: source.clone(),
2313 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2314 };
2315
2316 if let Err(err) = cache_store_lock_guard
2317 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2318 .await
2319 {
2320 warn!("unable to cache the media after uploading it: {err}");
2321 }
2322 }
2323 }
2324
2325 let content = self
2326 .make_media_event(
2327 Room::make_attachment_type(
2328 content_type,
2329 filename,
2330 media_source,
2331 config.caption,
2332 config.formatted_caption,
2333 config.info,
2334 thumbnail,
2335 ),
2336 mentions,
2337 config.reply,
2338 )
2339 .await?;
2340
2341 let mut fut = self.send(content);
2342 if let Some(txn_id) = txn_id {
2343 fut = fut.with_transaction_id(txn_id);
2344 }
2345 fut.await
2346 }
2347
2348 #[allow(clippy::too_many_arguments)]
2351 pub(crate) fn make_attachment_type(
2352 content_type: &Mime,
2353 filename: String,
2354 source: MediaSource,
2355 caption: Option<String>,
2356 formatted_caption: Option<FormattedBody>,
2357 info: Option<AttachmentInfo>,
2358 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2359 ) -> MessageType {
2360 make_media_type!(
2361 MessageType,
2362 content_type,
2363 filename,
2364 source,
2365 caption,
2366 formatted_caption,
2367 info,
2368 thumbnail
2369 )
2370 }
2371
2372 pub(crate) async fn make_media_event(
2375 &self,
2376 msg_type: MessageType,
2377 mentions: Option<Mentions>,
2378 reply: Option<Reply>,
2379 ) -> Result<RoomMessageEventContent> {
2380 let mut content = RoomMessageEventContent::new(msg_type);
2381 if let Some(mentions) = mentions {
2382 content = content.add_mentions(mentions);
2383 }
2384 if let Some(reply) = reply {
2385 content = self.make_reply_event(content.into(), reply).await?;
2388 }
2389 Ok(content)
2390 }
2391
2392 #[cfg(feature = "unstable-msc4274")]
2395 #[allow(clippy::too_many_arguments)]
2396 pub(crate) fn make_gallery_item_type(
2397 content_type: &Mime,
2398 filename: String,
2399 source: MediaSource,
2400 caption: Option<String>,
2401 formatted_caption: Option<FormattedBody>,
2402 info: Option<AttachmentInfo>,
2403 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2404 ) -> GalleryItemType {
2405 make_media_type!(
2406 GalleryItemType,
2407 content_type,
2408 filename,
2409 source,
2410 caption,
2411 formatted_caption,
2412 info,
2413 thumbnail
2414 )
2415 }
2416
2417 pub async fn update_power_levels(
2426 &self,
2427 updates: Vec<(&UserId, Int)>,
2428 ) -> Result<send_state_event::v3::Response> {
2429 let mut power_levels = self.power_levels().await?;
2430
2431 for (user_id, new_level) in updates {
2432 if new_level == power_levels.users_default {
2433 power_levels.users.remove(user_id);
2434 } else {
2435 power_levels.users.insert(user_id.to_owned(), new_level);
2436 }
2437 }
2438
2439 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await
2440 }
2441
2442 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2447 let mut power_levels = self.power_levels().await?;
2448 power_levels.apply(changes)?;
2449 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await?;
2450 Ok(())
2451 }
2452
2453 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2457 let default_power_levels = RoomPowerLevels::from(RoomPowerLevelsEventContent::new());
2458 let changes = RoomPowerLevelChanges::from(default_power_levels);
2459 self.apply_power_level_changes(changes).await?;
2460 Ok(self.power_levels().await?)
2461 }
2462
2463 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2468 let power_level = self.get_user_power_level(user_id).await?;
2469 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2470 }
2471
2472 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<i64> {
2477 let event = self.power_levels().await?;
2478 Ok(event.for_user(user_id).into())
2479 }
2480
2481 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2484 let power_levels = self.power_levels().await.ok();
2485 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2486 if let Some(power_levels) = power_levels {
2487 for (id, level) in power_levels.users.into_iter() {
2488 user_power_levels.insert(id, level.into());
2489 }
2490 }
2491 user_power_levels
2492 }
2493
2494 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2496 self.send_state_event(RoomNameEventContent::new(name)).await
2497 }
2498
2499 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2501 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2502 }
2503
2504 pub async fn set_avatar_url(
2510 &self,
2511 url: &MxcUri,
2512 info: Option<avatar::ImageInfo>,
2513 ) -> Result<send_state_event::v3::Response> {
2514 self.ensure_room_joined()?;
2515
2516 let mut room_avatar_event = RoomAvatarEventContent::new();
2517 room_avatar_event.url = Some(url.to_owned());
2518 room_avatar_event.info = info.map(Box::new);
2519
2520 self.send_state_event(room_avatar_event).await
2521 }
2522
2523 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2525 self.send_state_event(RoomAvatarEventContent::new()).await
2526 }
2527
2528 pub async fn upload_avatar(
2536 &self,
2537 mime: &Mime,
2538 data: Vec<u8>,
2539 info: Option<avatar::ImageInfo>,
2540 ) -> Result<send_state_event::v3::Response> {
2541 self.ensure_room_joined()?;
2542
2543 let upload_response = self.client.media().upload(mime, data, None).await?;
2544 let mut info = info.unwrap_or_default();
2545 info.blurhash = upload_response.blurhash;
2546 info.mimetype = Some(mime.to_string());
2547
2548 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2549 }
2550
2551 #[instrument(skip_all)]
2595 pub async fn send_state_event(
2596 &self,
2597 content: impl StateEventContent<StateKey = EmptyStateKey>,
2598 ) -> Result<send_state_event::v3::Response> {
2599 self.send_state_event_for_key(&EmptyStateKey, content).await
2600 }
2601
2602 pub async fn send_state_event_for_key<C, K>(
2643 &self,
2644 state_key: &K,
2645 content: C,
2646 ) -> Result<send_state_event::v3::Response>
2647 where
2648 C: StateEventContent,
2649 C::StateKey: Borrow<K>,
2650 K: AsRef<str> + ?Sized,
2651 {
2652 self.ensure_room_joined()?;
2653 let request =
2654 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
2655 let response = self.client.send(request).await?;
2656 Ok(response)
2657 }
2658
2659 #[instrument(skip_all)]
2694 pub async fn send_state_event_raw(
2695 &self,
2696 event_type: &str,
2697 state_key: &str,
2698 content: impl IntoRawStateEventContent,
2699 ) -> Result<send_state_event::v3::Response> {
2700 self.ensure_room_joined()?;
2701
2702 let request = send_state_event::v3::Request::new_raw(
2703 self.room_id().to_owned(),
2704 event_type.into(),
2705 state_key.to_owned(),
2706 content.into_raw_state_event_content(),
2707 );
2708
2709 Ok(self.client.send(request).await?)
2710 }
2711
2712 #[instrument(skip_all)]
2747 pub async fn redact(
2748 &self,
2749 event_id: &EventId,
2750 reason: Option<&str>,
2751 txn_id: Option<OwnedTransactionId>,
2752 ) -> HttpResult<redact_event::v3::Response> {
2753 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
2754 let request = assign!(
2755 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
2756 { reason: reason.map(ToOwned::to_owned) }
2757 );
2758
2759 self.client.send(request).await
2760 }
2761
2762 pub async fn can_user_redact_own(&self, user_id: &UserId) -> Result<bool> {
2767 Ok(self.power_levels().await?.user_can_redact_own_event(user_id))
2768 }
2769
2770 pub async fn can_user_redact_other(&self, user_id: &UserId) -> Result<bool> {
2775 Ok(self.power_levels().await?.user_can_redact_event_of_other(user_id))
2776 }
2777
2778 pub async fn can_user_ban(&self, user_id: &UserId) -> Result<bool> {
2783 Ok(self.power_levels().await?.user_can_ban(user_id))
2784 }
2785
2786 pub async fn can_user_invite(&self, user_id: &UserId) -> Result<bool> {
2791 Ok(self.power_levels().await?.user_can_invite(user_id))
2792 }
2793
2794 pub async fn can_user_kick(&self, user_id: &UserId) -> Result<bool> {
2799 Ok(self.power_levels().await?.user_can_kick(user_id))
2800 }
2801
2802 pub async fn can_user_send_state(
2807 &self,
2808 user_id: &UserId,
2809 state_event: StateEventType,
2810 ) -> Result<bool> {
2811 Ok(self.power_levels().await?.user_can_send_state(user_id, state_event))
2812 }
2813
2814 pub async fn can_user_send_message(
2819 &self,
2820 user_id: &UserId,
2821 message: MessageLikeEventType,
2822 ) -> Result<bool> {
2823 Ok(self.power_levels().await?.user_can_send_message(user_id, message))
2824 }
2825
2826 pub async fn can_user_pin_unpin(&self, user_id: &UserId) -> Result<bool> {
2831 Ok(self
2832 .power_levels()
2833 .await?
2834 .user_can_send_state(user_id, StateEventType::RoomPinnedEvents))
2835 }
2836
2837 pub async fn can_user_trigger_room_notification(&self, user_id: &UserId) -> Result<bool> {
2842 Ok(self.power_levels().await?.user_can_trigger_room_notification(user_id))
2843 }
2844
2845 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
2854 let acl_ev = self
2855 .get_state_event_static::<RoomServerAclEventContent>()
2856 .await?
2857 .and_then(|ev| ev.deserialize().ok());
2858 let acl = acl_ev.as_ref().and_then(|ev| match ev {
2859 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
2860 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
2861 });
2862
2863 let members: Vec<_> = self
2867 .members_no_sync(RoomMemberships::JOIN)
2868 .await?
2869 .into_iter()
2870 .filter(|member| {
2871 let server = member.user_id().server_name();
2872 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
2873 })
2874 .collect();
2875
2876 let max = members
2879 .iter()
2880 .max_by_key(|member| member.power_level())
2881 .filter(|max| max.power_level() >= 50)
2882 .map(|member| member.user_id().server_name());
2883
2884 let servers = members
2886 .iter()
2887 .map(|member| member.user_id().server_name())
2888 .filter(|server| max.filter(|max| max == server).is_none())
2889 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
2890 *servers.entry(server).or_default() += 1;
2891 servers
2892 });
2893 let mut servers: Vec<_> = servers.into_iter().collect();
2894 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
2895
2896 Ok(max
2897 .into_iter()
2898 .chain(servers.into_iter().map(|(name, _)| name))
2899 .take(3)
2900 .map(ToOwned::to_owned)
2901 .collect())
2902 }
2903
2904 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
2911 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2912 return Ok(alias.matrix_to_uri());
2913 }
2914
2915 let via = self.route().await?;
2916 Ok(self.room_id().matrix_to_uri_via(via))
2917 }
2918
2919 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
2930 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2931 return Ok(alias.matrix_uri(join));
2932 }
2933
2934 let via = self.route().await?;
2935 Ok(self.room_id().matrix_uri_via(via, join))
2936 }
2937
2938 pub async fn matrix_to_event_permalink(
2952 &self,
2953 event_id: impl Into<OwnedEventId>,
2954 ) -> Result<MatrixToUri> {
2955 let via = self.route().await?;
2958 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
2959 }
2960
2961 pub async fn matrix_event_permalink(
2975 &self,
2976 event_id: impl Into<OwnedEventId>,
2977 ) -> Result<MatrixUri> {
2978 let via = self.route().await?;
2981 Ok(self.room_id().matrix_event_uri_via(event_id, via))
2982 }
2983
2984 pub async fn load_user_receipt(
2997 &self,
2998 receipt_type: ReceiptType,
2999 thread: ReceiptThread,
3000 user_id: &UserId,
3001 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3002 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3003 }
3004
3005 pub async fn load_event_receipts(
3018 &self,
3019 receipt_type: ReceiptType,
3020 thread: ReceiptThread,
3021 event_id: &EventId,
3022 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3023 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3024 }
3025
3026 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3031 let room_id = self.room_id();
3032 let user_id = self.own_user_id();
3033 let room_info = self.clone_info();
3034 let member_count = room_info.active_members_count();
3035
3036 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3037 member.name().to_owned()
3038 } else {
3039 return Ok(None);
3040 };
3041
3042 let power_levels = self
3043 .get_state_event_static::<RoomPowerLevelsEventContent>()
3044 .await?
3045 .and_then(|e| e.deserialize().ok())
3046 .map(|e| e.power_levels().into());
3047
3048 Ok(Some(PushConditionRoomCtx {
3049 user_id: user_id.to_owned(),
3050 room_id: room_id.to_owned(),
3051 member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
3052 user_display_name,
3053 power_levels,
3054 }))
3055 }
3056
3057 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3060 let Some(push_condition_room_ctx) = self.push_condition_room_ctx().await? else {
3061 debug!("Could not aggregate push context");
3062 return Ok(None);
3063 };
3064 let push_rules = self.client().account().push_rules().await?;
3065 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3066 }
3067
3068 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3073 Ok(self.push_context().await?.map(|ctx| ctx.for_event(event)))
3074 }
3075
3076 pub async fn invite_details(&self) -> Result<Invite> {
3079 let state = self.state();
3080
3081 if state != RoomState::Invited {
3082 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3083 }
3084
3085 let invitee = self
3086 .get_member_no_sync(self.own_user_id())
3087 .await?
3088 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3089 let event = invitee.event();
3090 let inviter_id = event.sender();
3091 let inviter = self.get_member_no_sync(inviter_id).await?;
3092 Ok(Invite { invitee, inviter })
3093 }
3094
3095 pub async fn member_with_sender_info(
3103 &self,
3104 user_id: &UserId,
3105 ) -> Result<RoomMemberWithSenderInfo> {
3106 let Some(member) = self.get_member_no_sync(user_id).await? else {
3107 return Err(Error::InsufficientData);
3108 };
3109
3110 let sender_member =
3111 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3112 Some(member)
3114 } else if self.are_members_synced() {
3115 None
3117 } else if self.sync_members().await.is_ok() {
3118 self.get_member_no_sync(member.event().sender()).await?
3120 } else {
3121 None
3122 };
3123
3124 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3125 }
3126
3127 pub async fn forget(&self) -> Result<()> {
3133 let state = self.state();
3134 match state {
3135 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3136 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3137 "Left / Banned",
3138 state,
3139 ))));
3140 }
3141 RoomState::Left | RoomState::Banned => {}
3142 }
3143
3144 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3145 let _response = self.client.send(request).await?;
3146
3147 if self.inner.direct_targets_length() != 0 {
3149 if let Err(e) = self.set_is_direct(false).await {
3150 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3153 }
3154 }
3155
3156 self.client.base_client().forget_room(self.inner.room_id()).await?;
3157
3158 Ok(())
3159 }
3160
3161 fn ensure_room_joined(&self) -> Result<()> {
3162 let state = self.state();
3163 if state == RoomState::Joined {
3164 Ok(())
3165 } else {
3166 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3167 }
3168 }
3169
3170 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3172 if !matches!(self.state(), RoomState::Joined) {
3173 return None;
3174 }
3175
3176 let notification_settings = self.client().notification_settings().await;
3177
3178 let notification_mode =
3180 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3181
3182 if notification_mode.is_some() {
3183 notification_mode
3184 } else if let Ok(is_encrypted) =
3185 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3186 {
3187 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3192 let default_mode = notification_settings
3193 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3194 .await;
3195 Some(default_mode)
3196 } else {
3197 None
3198 }
3199 }
3200
3201 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3212 if !matches!(self.state(), RoomState::Joined) {
3213 return None;
3214 }
3215
3216 let notification_settings = self.client().notification_settings().await;
3217
3218 let mode =
3220 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3221
3222 if let Some(mode) = mode {
3223 self.update_cached_user_defined_notification_mode(mode);
3224 }
3225
3226 mode
3227 }
3228
3229 pub async fn report_content(
3242 &self,
3243 event_id: OwnedEventId,
3244 score: Option<ReportedContentScore>,
3245 reason: Option<String>,
3246 ) -> Result<report_content::v3::Response> {
3247 let state = self.state();
3248 if state != RoomState::Joined {
3249 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3250 }
3251
3252 let request = report_content::v3::Request::new(
3253 self.inner.room_id().to_owned(),
3254 event_id,
3255 score.map(Into::into),
3256 reason,
3257 );
3258 Ok(self.client.send(request).await?)
3259 }
3260
3261 pub async fn report_room(&self, reason: Option<String>) -> Result<report_room::v3::Response> {
3272 let mut request = report_room::v3::Request::new(self.inner.room_id().to_owned());
3273 request.reason = reason;
3274
3275 Ok(self.client.send(request).await?)
3276 }
3277
3278 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3284 if self.is_marked_unread() == unread {
3285 return Ok(());
3287 }
3288
3289 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3290
3291 let content = MarkedUnreadEventContent::new(unread);
3292
3293 let request = set_room_account_data::v3::Request::new(
3294 user_id.to_owned(),
3295 self.inner.room_id().to_owned(),
3296 &content,
3297 )?;
3298
3299 self.client.send(request).await?;
3300 Ok(())
3301 }
3302
3303 pub async fn event_cache(
3306 &self,
3307 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3308 self.client.event_cache().for_room(self.room_id()).await
3309 }
3310
3311 pub async fn send_call_notification_if_needed(&self) -> Result<bool> {
3326 debug!("Sending call notification for room {} if needed", self.inner.room_id());
3327
3328 if self.has_active_room_call() {
3329 warn!("Room {} has active room call, not sending a new notify event.", self.room_id());
3330 return Ok(false);
3331 }
3332
3333 if !self.can_user_trigger_room_notification(self.own_user_id()).await? {
3334 warn!(
3335 "User can't send notifications to everyone in the room {}. \
3336 Not sending a new notify event.",
3337 self.room_id()
3338 );
3339 return Ok(false);
3340 }
3341
3342 let notify_type = if self.is_direct().await.unwrap_or(false) {
3343 NotifyType::Ring
3344 } else {
3345 NotifyType::Notify
3346 };
3347
3348 debug!("Sending `m.call.notify` event with notify type: {notify_type:?}");
3349
3350 self.send_call_notification(
3351 self.room_id().to_string().to_owned(),
3352 ApplicationType::Call,
3353 notify_type,
3354 Mentions::with_room_mention(),
3355 )
3356 .await?;
3357
3358 Ok(true)
3359 }
3360
3361 pub(crate) async fn get_user_beacon_info(
3368 &self,
3369 user_id: &UserId,
3370 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3371 let raw_event = self
3372 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3373 .await?
3374 .ok_or(BeaconError::NotFound)?;
3375
3376 match raw_event.deserialize()? {
3377 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3378 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3379 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3380 }
3381 }
3382
3383 pub async fn start_live_location_share(
3396 &self,
3397 duration_millis: u64,
3398 description: Option<String>,
3399 ) -> Result<send_state_event::v3::Response> {
3400 self.ensure_room_joined()?;
3401
3402 self.send_state_event_for_key(
3403 self.own_user_id(),
3404 BeaconInfoEventContent::new(
3405 description,
3406 Duration::from_millis(duration_millis),
3407 true,
3408 None,
3409 ),
3410 )
3411 .await
3412 }
3413
3414 pub async fn stop_live_location_share(
3421 &self,
3422 ) -> Result<send_state_event::v3::Response, BeaconError> {
3423 self.ensure_room_joined()?;
3424
3425 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3426 beacon_info_event.content.stop();
3427 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3428 }
3429
3430 pub async fn send_location_beacon(
3442 &self,
3443 geo_uri: String,
3444 ) -> Result<send_message_event::v3::Response, BeaconError> {
3445 self.ensure_room_joined()?;
3446
3447 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3448
3449 if beacon_info_event.content.is_live() {
3450 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3451 Ok(self.send(content).await?)
3452 } else {
3453 Err(BeaconError::NotLive)
3454 }
3455 }
3456
3457 pub async fn send_call_notification(
3469 &self,
3470 call_id: String,
3471 application: ApplicationType,
3472 notify_type: NotifyType,
3473 mentions: Mentions,
3474 ) -> Result<()> {
3475 let call_notify_event_content =
3476 CallNotifyEventContent::new(call_id, application, notify_type, mentions);
3477 self.send(call_notify_event_content).await?;
3478 Ok(())
3479 }
3480
3481 pub async fn save_composer_draft(&self, draft: ComposerDraft) -> Result<()> {
3484 self.client
3485 .state_store()
3486 .set_kv_data(
3487 StateStoreDataKey::ComposerDraft(self.room_id()),
3488 StateStoreDataValue::ComposerDraft(draft),
3489 )
3490 .await?;
3491 Ok(())
3492 }
3493
3494 pub async fn load_composer_draft(&self) -> Result<Option<ComposerDraft>> {
3496 let data = self
3497 .client
3498 .state_store()
3499 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id()))
3500 .await?;
3501 Ok(data.and_then(|d| d.into_composer_draft()))
3502 }
3503
3504 pub async fn clear_composer_draft(&self) -> Result<()> {
3506 self.client
3507 .state_store()
3508 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id()))
3509 .await?;
3510 Ok(())
3511 }
3512
3513 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3516 let response = self
3517 .client
3518 .send(get_state_events_for_key::v3::Request::new(
3519 self.room_id().to_owned(),
3520 StateEventType::RoomPinnedEvents,
3521 "".to_owned(),
3522 ))
3523 .await;
3524
3525 match response {
3526 Ok(response) => {
3527 Ok(Some(response.content.deserialize_as::<RoomPinnedEventsEventContent>()?.pinned))
3528 }
3529 Err(http_error) => match http_error.as_client_api_error() {
3530 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3531 _ => Err(http_error.into()),
3532 },
3533 }
3534 }
3535
3536 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3544 ObservableLiveLocation::new(&self.client, self.room_id())
3545 }
3546
3547 pub async fn subscribe_to_knock_requests(
3561 &self,
3562 ) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
3563 let this = Arc::new(self.clone());
3564
3565 let room_member_events_observer =
3566 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3567
3568 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3569 let mut seen_request_ids_stream = self
3570 .seen_knock_request_ids_map
3571 .subscribe()
3572 .await
3573 .map(|values| values.unwrap_or_default());
3574
3575 let mut room_info_stream = self.subscribe_info();
3576
3577 let clear_seen_ids_handle = spawn({
3580 let this = self.clone();
3581 async move {
3582 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3583 while member_updates_stream.recv().await.is_ok() {
3584 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3586 warn!("Failed to remove seen knock requests: {err}")
3587 }
3588 }
3589 }
3590 });
3591
3592 let combined_stream = stream! {
3593 match this.get_current_join_requests(¤t_seen_ids).await {
3595 Ok(initial_requests) => yield initial_requests,
3596 Err(err) => warn!("Failed to get initial requests to join: {err}")
3597 }
3598
3599 let mut requests_stream = room_member_events_observer.subscribe();
3600 let mut seen_ids = current_seen_ids.clone();
3601
3602 loop {
3603 tokio::select! {
3606 Some((event, _)) = requests_stream.next() => {
3607 if let Some(event) = event.as_original() {
3608 let emit = if event.prev_content().is_some() {
3610 matches!(event.membership_change(),
3611 MembershipChange::Banned |
3612 MembershipChange::Knocked |
3613 MembershipChange::KnockAccepted |
3614 MembershipChange::KnockDenied |
3615 MembershipChange::KnockRetracted
3616 )
3617 } else {
3618 true
3621 };
3622
3623 if emit {
3624 match this.get_current_join_requests(&seen_ids).await {
3625 Ok(requests) => yield requests,
3626 Err(err) => {
3627 warn!("Failed to get updated knock requests on new member event: {err}")
3628 }
3629 }
3630 }
3631 }
3632 }
3633
3634 Some(new_seen_ids) = seen_request_ids_stream.next() => {
3635 seen_ids = new_seen_ids;
3637
3638 match this.get_current_join_requests(&seen_ids).await {
3641 Ok(requests) => yield requests,
3642 Err(err) => {
3643 warn!("Failed to get updated knock requests on seen ids changed: {err}")
3644 }
3645 }
3646 }
3647
3648 Some(room_info) = room_info_stream.next() => {
3649 if !room_info.are_members_synced() {
3652 match this.get_current_join_requests(&seen_ids).await {
3653 Ok(requests) => yield requests,
3654 Err(err) => {
3655 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
3656 }
3657 }
3658 }
3659 }
3660 else => break,
3662 }
3663 }
3664 };
3665
3666 Ok((combined_stream, clear_seen_ids_handle))
3667 }
3668
3669 async fn get_current_join_requests(
3670 &self,
3671 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
3672 ) -> Result<Vec<KnockRequest>> {
3673 Ok(self
3674 .members(RoomMemberships::KNOCK)
3675 .await?
3676 .into_iter()
3677 .filter_map(|member| {
3678 let event_id = member.event().event_id()?;
3679 Some(KnockRequest::new(
3680 self,
3681 event_id,
3682 member.event().timestamp(),
3683 KnockRequestMemberInfo::from_member(&member),
3684 seen_request_ids.contains_key(event_id),
3685 ))
3686 })
3687 .collect())
3688 }
3689
3690 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
3692 RoomPrivacySettings::new(&self.inner, &self.client)
3693 }
3694
3695 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
3703 let request = opts.into_request(self.room_id());
3704
3705 let response = self.client.send(request).await?;
3706
3707 let push_ctx = self.push_context().await?;
3708 let chunk = join_all(
3709 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
3710 )
3711 .await;
3712
3713 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
3714 }
3715
3716 pub async fn relations(
3730 &self,
3731 event_id: OwnedEventId,
3732 opts: RelationsOptions,
3733 ) -> Result<Relations> {
3734 opts.send(self, event_id).await
3735 }
3736}
3737
3738#[cfg(feature = "e2e-encryption")]
3739impl RoomIdentityProvider for Room {
3740 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
3741 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
3742 }
3743
3744 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
3745 Box::pin(async {
3746 let members = self
3747 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
3748 .await
3749 .unwrap_or_else(|_| Default::default());
3750
3751 let mut ret: Vec<UserIdentity> = Vec::new();
3752 for member in members {
3753 if let Some(i) = self.user_identity(member.user_id()).await {
3754 ret.push(i);
3755 }
3756 }
3757 ret
3758 })
3759 }
3760
3761 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
3762 Box::pin(async {
3763 self.client
3764 .encryption()
3765 .get_user_identity(user_id)
3766 .await
3767 .unwrap_or(None)
3768 .map(|u| u.underlying_identity())
3769 })
3770 }
3771}
3772
3773#[derive(Clone)]
3776pub(crate) struct WeakRoom {
3777 client: WeakClient,
3778 room_id: OwnedRoomId,
3779}
3780
3781impl WeakRoom {
3782 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
3784 Self { client, room_id }
3785 }
3786
3787 pub fn get(&self) -> Option<Room> {
3789 self.client.get().and_then(|client| client.get_room(&self.room_id))
3790 }
3791
3792 pub fn room_id(&self) -> &RoomId {
3794 &self.room_id
3795 }
3796}
3797
3798#[derive(Debug, Clone)]
3800pub struct Invite {
3801 pub invitee: RoomMember,
3803 pub inviter: Option<RoomMember>,
3805}
3806
3807#[derive(Error, Debug)]
3808enum InvitationError {
3809 #[error("No membership event found")]
3810 EventMissing,
3811}
3812
3813#[derive(Debug, Clone, Default)]
3815#[non_exhaustive]
3816pub struct Receipts {
3817 pub fully_read: Option<OwnedEventId>,
3819 pub public_read_receipt: Option<OwnedEventId>,
3821 pub private_read_receipt: Option<OwnedEventId>,
3823}
3824
3825impl Receipts {
3826 pub fn new() -> Self {
3828 Self::default()
3829 }
3830
3831 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3840 self.fully_read = event_id.into();
3841 self
3842 }
3843
3844 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3850 self.public_read_receipt = event_id.into();
3851 self
3852 }
3853
3854 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3858 self.private_read_receipt = event_id.into();
3859 self
3860 }
3861
3862 pub fn is_empty(&self) -> bool {
3864 self.fully_read.is_none()
3865 && self.public_read_receipt.is_none()
3866 && self.private_read_receipt.is_none()
3867 }
3868}
3869
3870#[derive(Debug)]
3873pub enum ParentSpace {
3874 Reciprocal(Room),
3877 WithPowerlevel(Room),
3882 Illegitimate(Room),
3885 Unverifiable(OwnedRoomId),
3888}
3889
3890#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
3894pub struct ReportedContentScore(i8);
3895
3896impl ReportedContentScore {
3897 pub const MIN: Self = Self(-100);
3901
3902 pub const MAX: Self = Self(0);
3906
3907 pub fn new(value: i8) -> Option<Self> {
3916 value.try_into().ok()
3917 }
3918
3919 pub fn new_saturating(value: i8) -> Self {
3925 if value > Self::MAX {
3926 Self::MAX
3927 } else if value < Self::MIN {
3928 Self::MIN
3929 } else {
3930 Self(value)
3931 }
3932 }
3933
3934 pub fn value(&self) -> i8 {
3936 self.0
3937 }
3938}
3939
3940impl PartialEq<i8> for ReportedContentScore {
3941 fn eq(&self, other: &i8) -> bool {
3942 self.0.eq(other)
3943 }
3944}
3945
3946impl PartialEq<ReportedContentScore> for i8 {
3947 fn eq(&self, other: &ReportedContentScore) -> bool {
3948 self.eq(&other.0)
3949 }
3950}
3951
3952impl PartialOrd<i8> for ReportedContentScore {
3953 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
3954 self.0.partial_cmp(other)
3955 }
3956}
3957
3958impl PartialOrd<ReportedContentScore> for i8 {
3959 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
3960 self.partial_cmp(&other.0)
3961 }
3962}
3963
3964impl From<ReportedContentScore> for Int {
3965 fn from(value: ReportedContentScore) -> Self {
3966 value.0.into()
3967 }
3968}
3969
3970impl TryFrom<i8> for ReportedContentScore {
3971 type Error = TryFromReportedContentScoreError;
3972
3973 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
3974 if value > Self::MAX || value < Self::MIN {
3975 Err(TryFromReportedContentScoreError(()))
3976 } else {
3977 Ok(Self(value))
3978 }
3979 }
3980}
3981
3982impl TryFrom<i16> for ReportedContentScore {
3983 type Error = TryFromReportedContentScoreError;
3984
3985 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
3986 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3987 value.try_into()
3988 }
3989}
3990
3991impl TryFrom<i32> for ReportedContentScore {
3992 type Error = TryFromReportedContentScoreError;
3993
3994 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
3995 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3996 value.try_into()
3997 }
3998}
3999
4000impl TryFrom<i64> for ReportedContentScore {
4001 type Error = TryFromReportedContentScoreError;
4002
4003 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4004 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4005 value.try_into()
4006 }
4007}
4008
4009impl TryFrom<Int> for ReportedContentScore {
4010 type Error = TryFromReportedContentScoreError;
4011
4012 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4013 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4014 value.try_into()
4015 }
4016}
4017
4018trait EventSource {
4019 fn get_event(
4020 &self,
4021 event_id: &EventId,
4022 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4023}
4024
4025impl EventSource for &Room {
4026 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4027 self.load_or_fetch_event(event_id, None).await
4028 }
4029}
4030
4031#[derive(Debug, Clone, Error)]
4034#[error("out of range conversion attempted")]
4035pub struct TryFromReportedContentScoreError(());
4036
4037#[derive(Debug)]
4040pub struct RoomMemberWithSenderInfo {
4041 pub room_member: RoomMember,
4043 pub sender_info: Option<RoomMember>,
4046}
4047
4048#[cfg(all(test, not(target_family = "wasm")))]
4049mod tests {
4050 use matrix_sdk_base::{store::ComposerDraftType, ComposerDraft};
4051 use matrix_sdk_test::{
4052 async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
4053 SyncResponseBuilder,
4054 };
4055 use ruma::{
4056 event_id,
4057 events::{relation::RelationType, room::member::MembershipState},
4058 int, owned_event_id, room_id, user_id,
4059 };
4060 use wiremock::{
4061 matchers::{header, method, path_regex},
4062 Mock, MockServer, ResponseTemplate,
4063 };
4064
4065 use super::ReportedContentScore;
4066 use crate::{
4067 config::RequestConfig,
4068 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4069 test_utils::{
4070 client::mock_matrix_session,
4071 logged_in_client,
4072 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4073 },
4074 Client,
4075 };
4076
4077 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4078 #[async_test]
4079 async fn test_cache_invalidation_while_encrypt() {
4080 use matrix_sdk_base::store::RoomLoadSettings;
4081 use matrix_sdk_test::{message_like_event_content, DEFAULT_TEST_ROOM_ID};
4082
4083 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4084 let session = mock_matrix_session();
4085
4086 let client = Client::builder()
4087 .homeserver_url("http://localhost:1234")
4088 .request_config(RequestConfig::new().disable_retry())
4089 .sqlite_store(&sqlite_path, None)
4090 .build()
4091 .await
4092 .unwrap();
4093 client
4094 .matrix_auth()
4095 .restore_session(session.clone(), RoomLoadSettings::default())
4096 .await
4097 .unwrap();
4098
4099 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4100
4101 let server = MockServer::start().await;
4103 {
4104 Mock::given(method("GET"))
4105 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4106 .and(header("authorization", "Bearer 1234"))
4107 .respond_with(
4108 ResponseTemplate::new(200)
4109 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4110 )
4111 .mount(&server)
4112 .await;
4113 let response = SyncResponseBuilder::default()
4114 .add_joined_room(
4115 JoinedRoomBuilder::default()
4116 .add_state_event(StateTestEvent::Member)
4117 .add_state_event(StateTestEvent::PowerLevels)
4118 .add_state_event(StateTestEvent::Encryption),
4119 )
4120 .build_sync_response();
4121 client.base_client().receive_sync_response(response).await.unwrap();
4122 }
4123
4124 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4125
4126 room.preshare_room_key().await.unwrap();
4128
4129 {
4132 let client = Client::builder()
4133 .homeserver_url("http://localhost:1234")
4134 .request_config(RequestConfig::new().disable_retry())
4135 .sqlite_store(&sqlite_path, None)
4136 .build()
4137 .await
4138 .unwrap();
4139 client
4140 .matrix_auth()
4141 .restore_session(session.clone(), RoomLoadSettings::default())
4142 .await
4143 .unwrap();
4144 client
4145 .encryption()
4146 .enable_cross_process_store_lock("client2".to_owned())
4147 .await
4148 .unwrap();
4149
4150 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4151 assert!(guard.is_some());
4152 }
4153
4154 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4156 assert!(guard.is_some());
4157
4158 let olm = client.olm_machine().await;
4160 let olm = olm.as_ref().expect("Olm machine wasn't started");
4161
4162 let _encrypted_content = olm
4165 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4166 .await
4167 .unwrap();
4168 }
4169
4170 #[test]
4171 fn reported_content_score() {
4172 let score = ReportedContentScore::new(0).unwrap();
4174 assert_eq!(score.value(), 0);
4175 let score = ReportedContentScore::new(-50).unwrap();
4176 assert_eq!(score.value(), -50);
4177 let score = ReportedContentScore::new(-100).unwrap();
4178 assert_eq!(score.value(), -100);
4179 assert_eq!(ReportedContentScore::new(10), None);
4180 assert_eq!(ReportedContentScore::new(-110), None);
4181
4182 let score = ReportedContentScore::new_saturating(0);
4183 assert_eq!(score.value(), 0);
4184 let score = ReportedContentScore::new_saturating(-50);
4185 assert_eq!(score.value(), -50);
4186 let score = ReportedContentScore::new_saturating(-100);
4187 assert_eq!(score.value(), -100);
4188 let score = ReportedContentScore::new_saturating(10);
4189 assert_eq!(score, ReportedContentScore::MAX);
4190 let score = ReportedContentScore::new_saturating(-110);
4191 assert_eq!(score, ReportedContentScore::MIN);
4192
4193 let score = ReportedContentScore::try_from(0i16).unwrap();
4195 assert_eq!(score.value(), 0);
4196 let score = ReportedContentScore::try_from(-100i16).unwrap();
4197 assert_eq!(score.value(), -100);
4198 ReportedContentScore::try_from(10i16).unwrap_err();
4199 ReportedContentScore::try_from(-110i16).unwrap_err();
4200
4201 let score = ReportedContentScore::try_from(0i32).unwrap();
4203 assert_eq!(score.value(), 0);
4204 let score = ReportedContentScore::try_from(-100i32).unwrap();
4205 assert_eq!(score.value(), -100);
4206 ReportedContentScore::try_from(10i32).unwrap_err();
4207 ReportedContentScore::try_from(-110i32).unwrap_err();
4208
4209 let score = ReportedContentScore::try_from(0i64).unwrap();
4211 assert_eq!(score.value(), 0);
4212 let score = ReportedContentScore::try_from(-100i64).unwrap();
4213 assert_eq!(score.value(), -100);
4214 ReportedContentScore::try_from(10i64).unwrap_err();
4215 ReportedContentScore::try_from(-110i64).unwrap_err();
4216
4217 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4219 assert_eq!(score.value(), 0);
4220 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4221 assert_eq!(score.value(), -100);
4222 ReportedContentScore::try_from(int!(10)).unwrap_err();
4223 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4224 }
4225
4226 #[async_test]
4227 async fn test_composer_draft() {
4228 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4229
4230 let client = logged_in_client(None).await;
4231
4232 let response = SyncResponseBuilder::default()
4233 .add_joined_room(JoinedRoomBuilder::default())
4234 .build_sync_response();
4235 client.base_client().receive_sync_response(response).await.unwrap();
4236 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4237
4238 assert_eq!(room.load_composer_draft().await.unwrap(), None);
4239
4240 let draft = ComposerDraft {
4241 plain_text: "Hello, world!".to_owned(),
4242 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4243 draft_type: ComposerDraftType::NewMessage,
4244 };
4245 room.save_composer_draft(draft.clone()).await.unwrap();
4246 assert_eq!(room.load_composer_draft().await.unwrap(), Some(draft));
4247
4248 room.clear_composer_draft().await.unwrap();
4249 assert_eq!(room.load_composer_draft().await.unwrap(), None);
4250 }
4251
4252 #[async_test]
4253 async fn test_mark_join_requests_as_seen() {
4254 let server = MatrixMockServer::new().await;
4255 let client = server.client_builder().build().await;
4256 let event_id = event_id!("$a:b.c");
4257 let room_id = room_id!("!a:b.c");
4258 let user_id = user_id!("@alice:b.c");
4259
4260 let f = EventFactory::new().room(room_id);
4261 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f
4262 .member(user_id)
4263 .membership(MembershipState::Knock)
4264 .event_id(event_id)
4265 .into_raw_timeline()
4266 .cast()]);
4267 let room = server.sync_room(&client, joined_room_builder).await;
4268
4269 let seen_ids =
4271 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4272 assert!(seen_ids.is_empty());
4273
4274 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4276 .await
4277 .expect("Couldn't mark join request as seen");
4278
4279 let seen_ids =
4281 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4282 assert_eq!(seen_ids.len(), 1);
4283 assert_eq!(
4284 seen_ids.into_iter().next().expect("No next value"),
4285 (event_id.to_owned(), user_id.to_owned())
4286 )
4287 }
4288
4289 #[async_test]
4290 async fn test_own_room_membership_with_no_own_member_event() {
4291 let server = MatrixMockServer::new().await;
4292 let client = server.client_builder().build().await;
4293 let room_id = room_id!("!a:b.c");
4294
4295 let room = server.sync_joined_room(&client, room_id).await;
4296
4297 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4300 assert!(error.is_some());
4301 }
4302
4303 #[async_test]
4304 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4305 let server = MatrixMockServer::new().await;
4306 let client = server.client_builder().build().await;
4307 let room_id = room_id!("!a:b.c");
4308 let user_id = user_id!("@example:localhost");
4309
4310 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4311 let joined_room_builder = JoinedRoomBuilder::new(room_id)
4312 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
4313 let room = server.sync_room(&client, joined_room_builder).await;
4314
4315 let ret = room
4317 .member_with_sender_info(client.user_id().unwrap())
4318 .await
4319 .expect("Room member info should be available");
4320
4321 assert_eq!(ret.room_member.event().user_id(), user_id);
4323
4324 assert!(ret.sender_info.is_none());
4326 }
4327
4328 #[async_test]
4329 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4330 let server = MatrixMockServer::new().await;
4331 let client = server.client_builder().build().await;
4332 let room_id = room_id!("!a:b.c");
4333 let user_id = user_id!("@example:localhost");
4334
4335 let f = EventFactory::new().room(room_id).sender(user_id);
4336 let joined_room_builder = JoinedRoomBuilder::new(room_id)
4337 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
4338 let room = server.sync_room(&client, joined_room_builder).await;
4339
4340 let ret = room
4342 .member_with_sender_info(client.user_id().unwrap())
4343 .await
4344 .expect("Room member info should be available");
4345
4346 assert_eq!(ret.room_member.event().user_id(), user_id);
4348
4349 assert!(ret.sender_info.is_some());
4351 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
4352 }
4353
4354 #[async_test]
4355 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
4356 let server = MatrixMockServer::new().await;
4357 let client = server.client_builder().build().await;
4358 let room_id = room_id!("!a:b.c");
4359 let user_id = user_id!("@example:localhost");
4360 let sender_id = user_id!("@alice:b.c");
4361
4362 let f = EventFactory::new().room(room_id).sender(sender_id);
4363 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4364 f.member(user_id).into_raw_sync().cast(),
4365 f.member(sender_id).into_raw_sync().cast(),
4367 ]);
4368 let room = server.sync_room(&client, joined_room_builder).await;
4369
4370 let ret = room
4372 .member_with_sender_info(client.user_id().unwrap())
4373 .await
4374 .expect("Room member info should be available");
4375
4376 assert_eq!(ret.room_member.event().user_id(), user_id);
4378
4379 assert!(ret.sender_info.is_some());
4381 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
4382 }
4383
4384 #[async_test]
4385 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
4386 let server = MatrixMockServer::new().await;
4387 let client = server.client_builder().build().await;
4388 let room_id = room_id!("!a:b.c");
4389 let user_id = user_id!("@example:localhost");
4390 let sender_id = user_id!("@alice:b.c");
4391
4392 let f = EventFactory::new().room(room_id).sender(sender_id);
4393 let joined_room_builder = JoinedRoomBuilder::new(room_id)
4394 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
4395 let room = server.sync_room(&client, joined_room_builder).await;
4396
4397 server
4399 .mock_get_members()
4400 .ok(vec![f.member(sender_id).into_raw_timeline().cast()])
4401 .mock_once()
4402 .mount()
4403 .await;
4404
4405 let ret = room
4407 .member_with_sender_info(client.user_id().unwrap())
4408 .await
4409 .expect("Room member info should be available");
4410
4411 assert_eq!(ret.room_member.event().user_id(), user_id);
4413
4414 assert!(ret.sender_info.is_some());
4416 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
4417 }
4418
4419 #[async_test]
4420 async fn test_list_threads() {
4421 let server = MatrixMockServer::new().await;
4422 let client = server.client_builder().build().await;
4423
4424 let room_id = room_id!("!a:b.c");
4425 let sender_id = user_id!("@alice:b.c");
4426 let f = EventFactory::new().room(room_id).sender(sender_id);
4427
4428 let eid1 = event_id!("$1");
4429 let eid2 = event_id!("$2");
4430 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw_sync().cast()];
4431 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw_sync().cast()];
4432
4433 server
4434 .mock_room_threads()
4435 .ok(batch1.clone(), Some("prev_batch".to_owned()))
4436 .mock_once()
4437 .mount()
4438 .await;
4439 server
4440 .mock_room_threads()
4441 .match_from("prev_batch")
4442 .ok(batch2, None)
4443 .mock_once()
4444 .mount()
4445 .await;
4446
4447 let room = server.sync_joined_room(&client, room_id).await;
4448 let result =
4449 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
4450 assert_eq!(result.chunk.len(), 1);
4451 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
4452 assert!(result.prev_batch_token.is_some());
4453
4454 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
4455 let result = room.list_threads(opts).await.expect("Failed to list threads");
4456 assert_eq!(result.chunk.len(), 1);
4457 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
4458 assert!(result.prev_batch_token.is_none());
4459 }
4460
4461 #[async_test]
4462 async fn test_relations() {
4463 let server = MatrixMockServer::new().await;
4464 let client = server.client_builder().build().await;
4465
4466 let room_id = room_id!("!a:b.c");
4467 let sender_id = user_id!("@alice:b.c");
4468 let f = EventFactory::new().room(room_id).sender(sender_id);
4469
4470 let target_event_id = owned_event_id!("$target");
4471 let eid1 = event_id!("$1");
4472 let eid2 = event_id!("$2");
4473 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw_sync().cast()];
4474 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw_sync().cast()];
4475
4476 server
4477 .mock_room_relations()
4478 .match_target_event(target_event_id.clone())
4479 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
4480 .mock_once()
4481 .mount()
4482 .await;
4483
4484 server
4485 .mock_room_relations()
4486 .match_target_event(target_event_id.clone())
4487 .match_from("next_batch")
4488 .ok(RoomRelationsResponseTemplate::default().events(batch2))
4489 .mock_once()
4490 .mount()
4491 .await;
4492
4493 let room = server.sync_joined_room(&client, room_id).await;
4494
4495 let mut opts = RelationsOptions {
4497 include_relations: IncludeRelations::AllRelations,
4498 ..Default::default()
4499 };
4500 let result = room
4501 .relations(target_event_id.clone(), opts.clone())
4502 .await
4503 .expect("Failed to list relations the first time");
4504 assert_eq!(result.chunk.len(), 1);
4505 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
4506 assert!(result.prev_batch_token.is_none());
4507 assert!(result.next_batch_token.is_some());
4508 assert!(result.recursion_depth.is_none());
4509
4510 opts.from = result.next_batch_token;
4511 let result = room
4512 .relations(target_event_id, opts)
4513 .await
4514 .expect("Failed to list relations the second time");
4515 assert_eq!(result.chunk.len(), 1);
4516 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
4517 assert!(result.prev_batch_token.is_none());
4518 assert!(result.next_batch_token.is_none());
4519 assert!(result.recursion_depth.is_none());
4520 }
4521
4522 #[async_test]
4523 async fn test_relations_with_reltype() {
4524 let server = MatrixMockServer::new().await;
4525 let client = server.client_builder().build().await;
4526
4527 let room_id = room_id!("!a:b.c");
4528 let sender_id = user_id!("@alice:b.c");
4529 let f = EventFactory::new().room(room_id).sender(sender_id);
4530
4531 let target_event_id = owned_event_id!("$target");
4532 let eid1 = event_id!("$1");
4533 let eid2 = event_id!("$2");
4534 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw_sync().cast()];
4535 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw_sync().cast()];
4536
4537 server
4538 .mock_room_relations()
4539 .match_target_event(target_event_id.clone())
4540 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
4541 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
4542 .mock_once()
4543 .mount()
4544 .await;
4545
4546 server
4547 .mock_room_relations()
4548 .match_target_event(target_event_id.clone())
4549 .match_from("next_batch")
4550 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
4551 .ok(RoomRelationsResponseTemplate::default().events(batch2))
4552 .mock_once()
4553 .mount()
4554 .await;
4555
4556 let room = server.sync_joined_room(&client, room_id).await;
4557
4558 let mut opts = RelationsOptions {
4560 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
4561 ..Default::default()
4562 };
4563 let result = room
4564 .relations(target_event_id.clone(), opts.clone())
4565 .await
4566 .expect("Failed to list relations the first time");
4567 assert_eq!(result.chunk.len(), 1);
4568 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
4569 assert!(result.prev_batch_token.is_none());
4570 assert!(result.next_batch_token.is_some());
4571 assert!(result.recursion_depth.is_none());
4572
4573 opts.from = result.next_batch_token;
4574 let result = room
4575 .relations(target_event_id, opts)
4576 .await
4577 .expect("Failed to list relations the second time");
4578 assert_eq!(result.chunk.len(), 1);
4579 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
4580 assert!(result.prev_batch_token.is_none());
4581 assert!(result.next_batch_token.is_none());
4582 assert!(result.recursion_depth.is_none());
4583 }
4584}