matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
33use matrix_sdk_base::{
34 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
35 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
36 event_cache::store::EventCacheStoreLock,
37 media::store::MediaStoreLock,
38 store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
39 sync::{Notification, RoomUpdates},
40};
41use matrix_sdk_common::ttl_cache::TtlCache;
42#[cfg(feature = "e2e-encryption")]
43use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
44use ruma::{
45 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
46 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
47 api::{
48 FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
49 client::{
50 account::whoami,
51 alias::{create_alias, delete_alias, get_alias},
52 authenticated_media,
53 device::{delete_devices, get_devices, update_device},
54 directory::{get_public_rooms, get_public_rooms_filtered},
55 discovery::{
56 discover_homeserver::{self, RtcFocusInfo},
57 get_capabilities::{self, v3::Capabilities},
58 get_supported_versions,
59 },
60 error::ErrorKind,
61 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
62 knock::knock_room,
63 media,
64 membership::{join_room_by_id, join_room_by_id_or_alias},
65 room::create_room,
66 session::login::v3::DiscoveryInfo,
67 sync::sync_events,
68 threads::get_thread_subscriptions_changes,
69 uiaa,
70 user_directory::search_users,
71 },
72 error::FromHttpResponseError,
73 federation::discovery::get_server_version,
74 },
75 assign,
76 push::Ruleset,
77 time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, instrument, trace, warn};
82use url::Url;
83
84use self::futures::SendRequest;
85use crate::{
86 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
87 Room, SessionTokens, TransmissionProgress,
88 authentication::{
89 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
90 oauth::OAuth,
91 },
92 client::thread_subscriptions::ThreadSubscriptionCatchup,
93 config::{RequestConfig, SyncToken},
94 deduplicating_handler::DeduplicatingHandler,
95 error::HttpResult,
96 event_cache::EventCache,
97 event_handler::{
98 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
99 EventHandlerStore, ObservableEventHandler, SyncEvent,
100 },
101 http_client::HttpClient,
102 latest_events::LatestEvents,
103 media::MediaError,
104 notification_settings::NotificationSettings,
105 room::RoomMember,
106 room_preview::RoomPreview,
107 send_queue::{SendQueue, SendQueueData},
108 sliding_sync::Version as SlidingSyncVersion,
109 sync::{RoomUpdate, SyncResponse},
110};
111#[cfg(feature = "e2e-encryption")]
112use crate::{
113 cross_process_lock::CrossProcessLock,
114 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
115};
116
117mod builder;
118pub(crate) mod caches;
119pub(crate) mod futures;
120pub(crate) mod thread_subscriptions;
121
122pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
123#[cfg(feature = "experimental-search")]
124use crate::search_index::SearchIndex;
125
126#[cfg(not(target_family = "wasm"))]
127type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
128#[cfg(target_family = "wasm")]
129type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
130
131#[cfg(not(target_family = "wasm"))]
132type NotificationHandlerFn =
133 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
134#[cfg(target_family = "wasm")]
135type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
136
137/// Enum controlling if a loop running callbacks should continue or abort.
138///
139/// This is mainly used in the [`sync_with_callback`] method, the return value
140/// of the provided callback controls if the sync loop should be exited.
141///
142/// [`sync_with_callback`]: #method.sync_with_callback
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub enum LoopCtrl {
145 /// Continue running the loop.
146 Continue,
147 /// Break out of the loop.
148 Break,
149}
150
151/// Represents changes that can occur to a `Client`s `Session`.
152#[derive(Debug, Clone, PartialEq)]
153pub enum SessionChange {
154 /// The session's token is no longer valid.
155 UnknownToken {
156 /// Whether or not the session was soft logged out
157 soft_logout: bool,
158 },
159 /// The session's tokens have been refreshed.
160 TokensRefreshed,
161}
162
163/// Information about the server vendor obtained from the federation API.
164#[derive(Debug, Clone, PartialEq, Eq)]
165#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
166pub struct ServerVendorInfo {
167 /// The server name.
168 pub server_name: String,
169 /// The server version.
170 pub version: String,
171}
172
173/// An async/await enabled Matrix client.
174///
175/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
176#[derive(Clone)]
177pub struct Client {
178 pub(crate) inner: Arc<ClientInner>,
179}
180
181#[derive(Default)]
182pub(crate) struct ClientLocks {
183 /// Lock ensuring that only a single room may be marked as a DM at once.
184 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
185 /// explanation.
186 pub(crate) mark_as_dm_lock: Mutex<()>,
187
188 /// Lock ensuring that only a single secret store is getting opened at the
189 /// same time.
190 ///
191 /// This is important so we don't accidentally create multiple different new
192 /// default secret storage keys.
193 #[cfg(feature = "e2e-encryption")]
194 pub(crate) open_secret_store_lock: Mutex<()>,
195
196 /// Lock ensuring that we're only storing a single secret at a time.
197 ///
198 /// Take a look at the [`SecretStore::put_secret`] method for a more
199 /// detailed explanation.
200 ///
201 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
202 #[cfg(feature = "e2e-encryption")]
203 pub(crate) store_secret_lock: Mutex<()>,
204
205 /// Lock ensuring that only one method at a time might modify our backup.
206 #[cfg(feature = "e2e-encryption")]
207 pub(crate) backup_modify_lock: Mutex<()>,
208
209 /// Lock ensuring that we're going to attempt to upload backups for a single
210 /// requester.
211 #[cfg(feature = "e2e-encryption")]
212 pub(crate) backup_upload_lock: Mutex<()>,
213
214 /// Handler making sure we only have one group session sharing request in
215 /// flight per room.
216 #[cfg(feature = "e2e-encryption")]
217 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
218
219 /// Lock making sure we're only doing one key claim request at a time.
220 #[cfg(feature = "e2e-encryption")]
221 pub(crate) key_claim_lock: Mutex<()>,
222
223 /// Handler to ensure that only one members request is running at a time,
224 /// given a room.
225 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
226
227 /// Handler to ensure that only one encryption state request is running at a
228 /// time, given a room.
229 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
230
231 /// Deduplicating handler for sending read receipts. The string is an
232 /// internal implementation detail, see [`Self::send_single_receipt`].
233 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
234
235 #[cfg(feature = "e2e-encryption")]
236 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
237
238 /// Latest "generation" of data known by the crypto store.
239 ///
240 /// This is a counter that only increments, set in the database (and can
241 /// wrap). It's incremented whenever some process acquires a lock for the
242 /// first time. *This assumes the crypto store lock is being held, to
243 /// avoid data races on writing to this value in the store*.
244 ///
245 /// The current process will maintain this value in local memory and in the
246 /// DB over time. Observing a different value than the one read in
247 /// memory, when reading from the store indicates that somebody else has
248 /// written into the database under our feet.
249 ///
250 /// TODO: this should live in the `OlmMachine`, since it's information
251 /// related to the lock. As of today (2023-07-28), we blow up the entire
252 /// olm machine when there's a generation mismatch. So storing the
253 /// generation in the olm machine would make the client think there's
254 /// *always* a mismatch, and that's why we need to store the generation
255 /// outside the `OlmMachine`.
256 #[cfg(feature = "e2e-encryption")]
257 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
258}
259
260pub(crate) struct ClientInner {
261 /// All the data related to authentication and authorization.
262 pub(crate) auth_ctx: Arc<AuthCtx>,
263
264 /// The URL of the server.
265 ///
266 /// Not to be confused with the `Self::homeserver`. `server` is usually
267 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
268 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
269 /// homeserver (at the time of writing — 2024-08-28).
270 ///
271 /// This value is optional depending on how the `Client` has been built.
272 /// If it's been built from a homeserver URL directly, we don't know the
273 /// server. However, if the `Client` has been built from a server URL or
274 /// name, then the homeserver has been discovered, and we know both.
275 server: Option<Url>,
276
277 /// The URL of the homeserver to connect to.
278 ///
279 /// This is the URL for the client-server Matrix API.
280 homeserver: StdRwLock<Url>,
281
282 /// The sliding sync version.
283 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
284
285 /// The underlying HTTP client.
286 pub(crate) http_client: HttpClient,
287
288 /// User session data.
289 pub(super) base_client: BaseClient,
290
291 /// Collection of in-memory caches for the [`Client`].
292 pub(crate) caches: ClientCaches,
293
294 /// Collection of locks individual client methods might want to use, either
295 /// to ensure that only a single call to a method happens at once or to
296 /// deduplicate multiple calls to a method.
297 pub(crate) locks: ClientLocks,
298
299 /// The cross-process store locks holder name.
300 ///
301 /// The SDK provides cross-process store locks (see
302 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
303 /// `holder_name` is the value used for all cross-process store locks
304 /// used by this `Client`.
305 ///
306 /// If multiple `Client`s are running in different processes, this
307 /// value MUST be different for each `Client`.
308 cross_process_store_locks_holder_name: String,
309
310 /// A mapping of the times at which the current user sent typing notices,
311 /// keyed by room.
312 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
313
314 /// Event handlers. See `add_event_handler`.
315 pub(crate) event_handlers: EventHandlerStore,
316
317 /// Notification handlers. See `register_notification_handler`.
318 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
319
320 /// The sender-side of channels used to receive room updates.
321 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
322
323 /// The sender-side of a channel used to observe all the room updates of a
324 /// sync response.
325 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
326
327 /// Whether the client should update its homeserver URL with the discovery
328 /// information present in the login response.
329 respect_login_well_known: bool,
330
331 /// An event that can be listened on to wait for a successful sync. The
332 /// event will only be fired if a sync loop is running. Can be used for
333 /// synchronization, e.g. if we send out a request to create a room, we can
334 /// wait for the sync to get the data to fetch a room object from the state
335 /// store.
336 pub(crate) sync_beat: event_listener::Event,
337
338 /// A central cache for events, inactive first.
339 ///
340 /// It becomes active when [`EventCache::subscribe`] is called.
341 pub(crate) event_cache: OnceCell<EventCache>,
342
343 /// End-to-end encryption related state.
344 #[cfg(feature = "e2e-encryption")]
345 pub(crate) e2ee: EncryptionData,
346
347 /// The verification state of our own device.
348 #[cfg(feature = "e2e-encryption")]
349 pub(crate) verification_state: SharedObservable<VerificationState>,
350
351 /// Whether to enable the experimental support for sending and receiving
352 /// encrypted room history on invite, per [MSC4268].
353 ///
354 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
355 #[cfg(feature = "e2e-encryption")]
356 pub(crate) enable_share_history_on_invite: bool,
357
358 /// Data related to the [`SendQueue`].
359 ///
360 /// [`SendQueue`]: crate::send_queue::SendQueue
361 pub(crate) send_queue_data: Arc<SendQueueData>,
362
363 /// The `max_upload_size` value of the homeserver, it contains the max
364 /// request size you can send.
365 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
366
367 /// The entry point to get the [`LatestEvent`] of rooms and threads.
368 ///
369 /// [`LatestEvent`]: crate::latest_event::LatestEvent
370 latest_events: OnceCell<LatestEvents>,
371
372 /// Service handling the catching up of thread subscriptions in the
373 /// background.
374 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
375
376 #[cfg(feature = "experimental-search")]
377 /// Handler for [`RoomIndex`]'s of each room
378 search_index: SearchIndex,
379}
380
381impl ClientInner {
382 /// Create a new `ClientInner`.
383 ///
384 /// All the fields passed as parameters here are those that must be cloned
385 /// upon instantiation of a sub-client, e.g. a client specialized for
386 /// notifications.
387 #[allow(clippy::too_many_arguments)]
388 async fn new(
389 auth_ctx: Arc<AuthCtx>,
390 server: Option<Url>,
391 homeserver: Url,
392 sliding_sync_version: SlidingSyncVersion,
393 http_client: HttpClient,
394 base_client: BaseClient,
395 server_info: ClientServerInfo,
396 respect_login_well_known: bool,
397 event_cache: OnceCell<EventCache>,
398 send_queue: Arc<SendQueueData>,
399 latest_events: OnceCell<LatestEvents>,
400 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
401 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
402 cross_process_store_locks_holder_name: String,
403 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
404 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
405 ) -> Arc<Self> {
406 let caches = ClientCaches {
407 server_info: server_info.into(),
408 server_metadata: Mutex::new(TtlCache::new()),
409 };
410
411 let client = Self {
412 server,
413 homeserver: StdRwLock::new(homeserver),
414 auth_ctx,
415 sliding_sync_version: StdRwLock::new(sliding_sync_version),
416 http_client,
417 base_client,
418 caches,
419 locks: Default::default(),
420 cross_process_store_locks_holder_name,
421 typing_notice_times: Default::default(),
422 event_handlers: Default::default(),
423 notification_handlers: Default::default(),
424 room_update_channels: Default::default(),
425 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
426 // ballast for all observers to catch up.
427 room_updates_sender: broadcast::Sender::new(32),
428 respect_login_well_known,
429 sync_beat: event_listener::Event::new(),
430 event_cache,
431 send_queue_data: send_queue,
432 latest_events,
433 #[cfg(feature = "e2e-encryption")]
434 e2ee: EncryptionData::new(encryption_settings),
435 #[cfg(feature = "e2e-encryption")]
436 verification_state: SharedObservable::new(VerificationState::Unknown),
437 #[cfg(feature = "e2e-encryption")]
438 enable_share_history_on_invite,
439 server_max_upload_size: Mutex::new(OnceCell::new()),
440 #[cfg(feature = "experimental-search")]
441 search_index: search_index_handler,
442 thread_subscription_catchup,
443 };
444
445 #[allow(clippy::let_and_return)]
446 let client = Arc::new(client);
447
448 #[cfg(feature = "e2e-encryption")]
449 client.e2ee.initialize_tasks(&client);
450
451 let _ = client
452 .event_cache
453 .get_or_init(|| async {
454 EventCache::new(
455 WeakClient::from_inner(&client),
456 client.base_client.event_cache_store().clone(),
457 )
458 })
459 .await;
460
461 let _ = client
462 .thread_subscription_catchup
463 .get_or_init(|| async {
464 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
465 })
466 .await;
467
468 client
469 }
470}
471
472#[cfg(not(tarpaulin_include))]
473impl Debug for Client {
474 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
475 write!(fmt, "Client")
476 }
477}
478
479impl Client {
480 /// Create a new [`Client`] that will use the given homeserver.
481 ///
482 /// # Arguments
483 ///
484 /// * `homeserver_url` - The homeserver that the client should connect to.
485 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
486 Self::builder().homeserver_url(homeserver_url).build().await
487 }
488
489 /// Returns a subscriber that publishes an event every time the ignore user
490 /// list changes.
491 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
492 self.inner.base_client.subscribe_to_ignore_user_list_changes()
493 }
494
495 /// Create a new [`ClientBuilder`].
496 pub fn builder() -> ClientBuilder {
497 ClientBuilder::new()
498 }
499
500 pub(crate) fn base_client(&self) -> &BaseClient {
501 &self.inner.base_client
502 }
503
504 /// The underlying HTTP client.
505 pub fn http_client(&self) -> &reqwest::Client {
506 &self.inner.http_client.inner
507 }
508
509 pub(crate) fn locks(&self) -> &ClientLocks {
510 &self.inner.locks
511 }
512
513 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
514 &self.inner.auth_ctx
515 }
516
517 /// The cross-process store locks holder name.
518 ///
519 /// The SDK provides cross-process store locks (see
520 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
521 /// `holder_name` is the value used for all cross-process store locks
522 /// used by this `Client`.
523 pub fn cross_process_store_locks_holder_name(&self) -> &str {
524 &self.inner.cross_process_store_locks_holder_name
525 }
526
527 /// Change the homeserver URL used by this client.
528 ///
529 /// # Arguments
530 ///
531 /// * `homeserver_url` - The new URL to use.
532 fn set_homeserver(&self, homeserver_url: Url) {
533 *self.inner.homeserver.write().unwrap() = homeserver_url;
534 }
535
536 /// Get the capabilities of the homeserver.
537 ///
538 /// This method should be used to check what features are supported by the
539 /// homeserver.
540 ///
541 /// # Examples
542 ///
543 /// ```no_run
544 /// # use matrix_sdk::Client;
545 /// # use url::Url;
546 /// # async {
547 /// # let homeserver = Url::parse("http://example.com")?;
548 /// let client = Client::new(homeserver).await?;
549 ///
550 /// let capabilities = client.get_capabilities().await?;
551 ///
552 /// if capabilities.change_password.enabled {
553 /// // Change password
554 /// }
555 /// # anyhow::Ok(()) };
556 /// ```
557 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
558 let res = self.send(get_capabilities::v3::Request::new()).await?;
559 Ok(res.capabilities)
560 }
561
562 /// Get the server vendor information from the federation API.
563 ///
564 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
565 /// both the server's software name and version.
566 ///
567 /// # Examples
568 ///
569 /// ```no_run
570 /// # use matrix_sdk::Client;
571 /// # use url::Url;
572 /// # async {
573 /// # let homeserver = Url::parse("http://example.com")?;
574 /// let client = Client::new(homeserver).await?;
575 ///
576 /// let server_info = client.server_vendor_info(None).await?;
577 /// println!(
578 /// "Server: {}, Version: {}",
579 /// server_info.server_name, server_info.version
580 /// );
581 /// # anyhow::Ok(()) };
582 /// ```
583 pub async fn server_vendor_info(
584 &self,
585 request_config: Option<RequestConfig>,
586 ) -> HttpResult<ServerVendorInfo> {
587 let res = self
588 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
589 .await?;
590
591 // Extract server info, using defaults if fields are missing.
592 let server = res.server.unwrap_or_default();
593 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
594 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
595
596 Ok(ServerVendorInfo { server_name: server_name_str, version })
597 }
598
599 /// Get a copy of the default request config.
600 ///
601 /// The default request config is what's used when sending requests if no
602 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
603 /// function with such a parameter.
604 ///
605 /// If the default request config was not customized through
606 /// [`ClientBuilder`] when creating this `Client`, the returned value will
607 /// be equivalent to [`RequestConfig::default()`].
608 pub fn request_config(&self) -> RequestConfig {
609 self.inner.http_client.request_config
610 }
611
612 /// Check whether the client has been activated.
613 ///
614 /// A client is considered active when:
615 ///
616 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
617 /// is logged in,
618 /// 2. Has loaded cached data from storage,
619 /// 3. If encryption is enabled, it also initialized or restored its
620 /// `OlmMachine`.
621 pub fn is_active(&self) -> bool {
622 self.inner.base_client.is_active()
623 }
624
625 /// The server used by the client.
626 ///
627 /// See `Self::server` to learn more.
628 pub fn server(&self) -> Option<&Url> {
629 self.inner.server.as_ref()
630 }
631
632 /// The homeserver of the client.
633 pub fn homeserver(&self) -> Url {
634 self.inner.homeserver.read().unwrap().clone()
635 }
636
637 /// Get the sliding sync version.
638 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
639 self.inner.sliding_sync_version.read().unwrap().clone()
640 }
641
642 /// Override the sliding sync version.
643 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
644 let mut lock = self.inner.sliding_sync_version.write().unwrap();
645 *lock = version;
646 }
647
648 /// Get the Matrix user session meta information.
649 ///
650 /// If the client is currently logged in, this will return a
651 /// [`SessionMeta`] object which contains the user ID and device ID.
652 /// Otherwise it returns `None`.
653 pub fn session_meta(&self) -> Option<&SessionMeta> {
654 self.base_client().session_meta()
655 }
656
657 /// Returns a receiver that gets events for each room info update. To watch
658 /// for new events, use `receiver.resubscribe()`.
659 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
660 self.base_client().room_info_notable_update_receiver()
661 }
662
663 /// Performs a search for users.
664 /// The search is performed case-insensitively on user IDs and display names
665 ///
666 /// # Arguments
667 ///
668 /// * `search_term` - The search term for the search
669 /// * `limit` - The maximum number of results to return. Defaults to 10.
670 ///
671 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
672 pub async fn search_users(
673 &self,
674 search_term: &str,
675 limit: u64,
676 ) -> HttpResult<search_users::v3::Response> {
677 let mut request = search_users::v3::Request::new(search_term.to_owned());
678
679 if let Some(limit) = UInt::new(limit) {
680 request.limit = limit;
681 }
682
683 self.send(request).await
684 }
685
686 /// Get the user id of the current owner of the client.
687 pub fn user_id(&self) -> Option<&UserId> {
688 self.session_meta().map(|s| s.user_id.as_ref())
689 }
690
691 /// Get the device ID that identifies the current session.
692 pub fn device_id(&self) -> Option<&DeviceId> {
693 self.session_meta().map(|s| s.device_id.as_ref())
694 }
695
696 /// Get the current access token for this session.
697 ///
698 /// Will be `None` if the client has not been logged in.
699 pub fn access_token(&self) -> Option<String> {
700 self.auth_ctx().access_token()
701 }
702
703 /// Get the current tokens for this session.
704 ///
705 /// To be notified of changes in the session tokens, use
706 /// [`Client::subscribe_to_session_changes()`] or
707 /// [`Client::set_session_callbacks()`].
708 ///
709 /// Returns `None` if the client has not been logged in.
710 pub fn session_tokens(&self) -> Option<SessionTokens> {
711 self.auth_ctx().session_tokens()
712 }
713
714 /// Access the authentication API used to log in this client.
715 ///
716 /// Will be `None` if the client has not been logged in.
717 pub fn auth_api(&self) -> Option<AuthApi> {
718 match self.auth_ctx().auth_data.get()? {
719 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
720 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
721 }
722 }
723
724 /// Get the whole session info of this client.
725 ///
726 /// Will be `None` if the client has not been logged in.
727 ///
728 /// Can be used with [`Client::restore_session`] to restore a previously
729 /// logged-in session.
730 pub fn session(&self) -> Option<AuthSession> {
731 match self.auth_api()? {
732 AuthApi::Matrix(api) => api.session().map(Into::into),
733 AuthApi::OAuth(api) => api.full_session().map(Into::into),
734 }
735 }
736
737 /// Get a reference to the state store.
738 pub fn state_store(&self) -> &DynStateStore {
739 self.base_client().state_store()
740 }
741
742 /// Get a reference to the event cache store.
743 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
744 self.base_client().event_cache_store()
745 }
746
747 /// Get a reference to the media store.
748 pub fn media_store(&self) -> &MediaStoreLock {
749 self.base_client().media_store()
750 }
751
752 /// Access the native Matrix authentication API with this client.
753 pub fn matrix_auth(&self) -> MatrixAuth {
754 MatrixAuth::new(self.clone())
755 }
756
757 /// Get the account of the current owner of the client.
758 pub fn account(&self) -> Account {
759 Account::new(self.clone())
760 }
761
762 /// Get the encryption manager of the client.
763 #[cfg(feature = "e2e-encryption")]
764 pub fn encryption(&self) -> Encryption {
765 Encryption::new(self.clone())
766 }
767
768 /// Get the media manager of the client.
769 pub fn media(&self) -> Media {
770 Media::new(self.clone())
771 }
772
773 /// Get the pusher manager of the client.
774 pub fn pusher(&self) -> Pusher {
775 Pusher::new(self.clone())
776 }
777
778 /// Access the OAuth 2.0 API of the client.
779 pub fn oauth(&self) -> OAuth {
780 OAuth::new(self.clone())
781 }
782
783 /// Register a handler for a specific event type.
784 ///
785 /// The handler is a function or closure with one or more arguments. The
786 /// first argument is the event itself. All additional arguments are
787 /// "context" arguments: They have to implement [`EventHandlerContext`].
788 /// This trait is named that way because most of the types implementing it
789 /// give additional context about an event: The room it was in, its raw form
790 /// and other similar things. As two exceptions to this,
791 /// [`Client`] and [`EventHandlerHandle`] also implement the
792 /// `EventHandlerContext` trait so you don't have to clone your client
793 /// into the event handler manually and a handler can decide to remove
794 /// itself.
795 ///
796 /// Some context arguments are not universally applicable. A context
797 /// argument that isn't available for the given event type will result in
798 /// the event handler being skipped and an error being logged. The following
799 /// context argument types are only available for a subset of event types:
800 ///
801 /// * [`Room`] is only available for room-specific events, i.e. not for
802 /// events like global account data events or presence events.
803 ///
804 /// You can provide custom context via
805 /// [`add_event_handler_context`](Client::add_event_handler_context) and
806 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
807 /// into the event handler.
808 ///
809 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
810 ///
811 /// # Examples
812 ///
813 /// ```no_run
814 /// use matrix_sdk::{
815 /// deserialized_responses::EncryptionInfo,
816 /// event_handler::Ctx,
817 /// ruma::{
818 /// events::{
819 /// macros::EventContent,
820 /// push_rules::PushRulesEvent,
821 /// room::{
822 /// message::SyncRoomMessageEvent,
823 /// topic::SyncRoomTopicEvent,
824 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
825 /// },
826 /// },
827 /// push::Action,
828 /// Int, MilliSecondsSinceUnixEpoch,
829 /// },
830 /// Client, Room,
831 /// };
832 /// use serde::{Deserialize, Serialize};
833 ///
834 /// # async fn example(client: Client) {
835 /// client.add_event_handler(
836 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
837 /// // Common usage: Room event plus room and client.
838 /// },
839 /// );
840 /// client.add_event_handler(
841 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
842 /// async move {
843 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
844 /// // unencrypted events and events that were decrypted by the SDK.
845 /// }
846 /// },
847 /// );
848 /// client.add_event_handler(
849 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
850 /// async move {
851 /// // A `Vec<Action>` parameter allows you to know which push actions
852 /// // are applicable for an event. For example, an event with
853 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
854 /// // in the timeline.
855 /// }
856 /// },
857 /// );
858 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
859 /// // You can omit any or all arguments after the first.
860 /// });
861 ///
862 /// // Registering a temporary event handler:
863 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
864 /// /* Event handler */
865 /// });
866 /// client.remove_event_handler(handle);
867 ///
868 /// // Registering custom event handler context:
869 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
870 /// struct MyContext {
871 /// number: usize,
872 /// }
873 /// client.add_event_handler_context(MyContext { number: 5 });
874 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
875 /// // Use the context
876 /// });
877 ///
878 /// // This will handle membership events in joined rooms. Invites are special, see below.
879 /// client.add_event_handler(
880 /// |ev: SyncRoomMemberEvent| async move {},
881 /// );
882 ///
883 /// // To handle state events in invited rooms (including invite membership events),
884 /// // `StrippedRoomMemberEvent` should be used.
885 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
886 /// client.add_event_handler(
887 /// |ev: StrippedRoomMemberEvent| async move {},
888 /// );
889 ///
890 /// // Custom events work exactly the same way, you just need to declare
891 /// // the content struct and use the EventContent derive macro on it.
892 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
893 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
894 /// struct TokenEventContent {
895 /// token: String,
896 /// #[serde(rename = "exp")]
897 /// expires_at: MilliSecondsSinceUnixEpoch,
898 /// }
899 ///
900 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
901 /// todo!("Display the token");
902 /// });
903 ///
904 /// // Event handler closures can also capture local variables.
905 /// // Make sure they are cheap to clone though, because they will be cloned
906 /// // every time the closure is called.
907 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
908 ///
909 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
910 /// println!("Calling the handler with identifier {data}");
911 /// });
912 /// # }
913 /// ```
914 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
915 where
916 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
917 H: EventHandler<Ev, Ctx>,
918 {
919 self.add_event_handler_impl(handler, None)
920 }
921
922 /// Register a handler for a specific room, and event type.
923 ///
924 /// This method works the same way as
925 /// [`add_event_handler`][Self::add_event_handler], except that the handler
926 /// will only be called for events in the room with the specified ID. See
927 /// that method for more details on event handler functions.
928 ///
929 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
930 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
931 /// your use case.
932 pub fn add_room_event_handler<Ev, Ctx, H>(
933 &self,
934 room_id: &RoomId,
935 handler: H,
936 ) -> EventHandlerHandle
937 where
938 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
939 H: EventHandler<Ev, Ctx>,
940 {
941 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
942 }
943
944 /// Observe a specific event type.
945 ///
946 /// `Ev` represents the kind of event that will be observed. `Ctx`
947 /// represents the context that will come with the event. It relies on the
948 /// same mechanism as [`Client::add_event_handler`]. The main difference is
949 /// that it returns an [`ObservableEventHandler`] and doesn't require a
950 /// user-defined closure. It is possible to subscribe to the
951 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
952 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
953 /// Ctx)`.
954 ///
955 /// Be careful that only the most recent value can be observed. Subscribers
956 /// are notified when a new value is sent, but there is no guarantee
957 /// that they will see all values.
958 ///
959 /// # Example
960 ///
961 /// Let's see a classical usage:
962 ///
963 /// ```
964 /// use futures_util::StreamExt as _;
965 /// use matrix_sdk::{
966 /// Client, Room,
967 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
968 /// };
969 ///
970 /// # async fn example(client: Client) -> Option<()> {
971 /// let observer =
972 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
973 ///
974 /// let mut subscriber = observer.subscribe();
975 ///
976 /// let (event, (room, push_actions)) = subscriber.next().await?;
977 /// # Some(())
978 /// # }
979 /// ```
980 ///
981 /// Now let's see how to get several contexts that can be useful for you:
982 ///
983 /// ```
984 /// use matrix_sdk::{
985 /// Client, Room,
986 /// deserialized_responses::EncryptionInfo,
987 /// ruma::{
988 /// events::room::{
989 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
990 /// },
991 /// push::Action,
992 /// },
993 /// };
994 ///
995 /// # async fn example(client: Client) {
996 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
997 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
998 ///
999 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1000 /// // to distinguish between unencrypted events and events that were decrypted
1001 /// // by the SDK.
1002 /// let _ = client
1003 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1004 /// );
1005 ///
1006 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1007 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1008 /// // should be highlighted in the timeline.
1009 /// let _ =
1010 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1011 ///
1012 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1013 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1014 /// # }
1015 /// ```
1016 ///
1017 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1018 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1019 where
1020 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1021 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1022 {
1023 self.observe_room_events_impl(None)
1024 }
1025
1026 /// Observe a specific room, and event type.
1027 ///
1028 /// This method works the same way as [`Client::observe_events`], except
1029 /// that the observability will only be applied for events in the room with
1030 /// the specified ID. See that method for more details.
1031 ///
1032 /// Be careful that only the most recent value can be observed. Subscribers
1033 /// are notified when a new value is sent, but there is no guarantee
1034 /// that they will see all values.
1035 pub fn observe_room_events<Ev, Ctx>(
1036 &self,
1037 room_id: &RoomId,
1038 ) -> ObservableEventHandler<(Ev, Ctx)>
1039 where
1040 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1041 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1042 {
1043 self.observe_room_events_impl(Some(room_id.to_owned()))
1044 }
1045
1046 /// Shared implementation for `Client::observe_events` and
1047 /// `Client::observe_room_events`.
1048 fn observe_room_events_impl<Ev, Ctx>(
1049 &self,
1050 room_id: Option<OwnedRoomId>,
1051 ) -> ObservableEventHandler<(Ev, Ctx)>
1052 where
1053 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1054 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1055 {
1056 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1057 // new value.
1058 let shared_observable = SharedObservable::new(None);
1059
1060 ObservableEventHandler::new(
1061 shared_observable.clone(),
1062 self.event_handler_drop_guard(self.add_event_handler_impl(
1063 move |event: Ev, context: Ctx| {
1064 shared_observable.set(Some((event, context)));
1065
1066 ready(())
1067 },
1068 room_id,
1069 )),
1070 )
1071 }
1072
1073 /// Remove the event handler associated with the handle.
1074 ///
1075 /// Note that you **must not** call `remove_event_handler` from the
1076 /// non-async part of an event handler, that is:
1077 ///
1078 /// ```ignore
1079 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1080 /// // ⚠ this will cause a deadlock ⚠
1081 /// client.remove_event_handler(handle);
1082 ///
1083 /// async move {
1084 /// // removing the event handler here is fine
1085 /// client.remove_event_handler(handle);
1086 /// }
1087 /// })
1088 /// ```
1089 ///
1090 /// Note also that handlers that remove themselves will still execute with
1091 /// events received in the same sync cycle.
1092 ///
1093 /// # Arguments
1094 ///
1095 /// `handle` - The [`EventHandlerHandle`] that is returned when
1096 /// registering the event handler with [`Client::add_event_handler`].
1097 ///
1098 /// # Examples
1099 ///
1100 /// ```no_run
1101 /// # use url::Url;
1102 /// # use tokio::sync::mpsc;
1103 /// #
1104 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1105 /// #
1106 /// use matrix_sdk::{
1107 /// Client, event_handler::EventHandlerHandle,
1108 /// ruma::events::room::member::SyncRoomMemberEvent,
1109 /// };
1110 /// #
1111 /// # futures_executor::block_on(async {
1112 /// # let client = matrix_sdk::Client::builder()
1113 /// # .homeserver_url(homeserver)
1114 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1115 /// # .build()
1116 /// # .await
1117 /// # .unwrap();
1118 ///
1119 /// client.add_event_handler(
1120 /// |ev: SyncRoomMemberEvent,
1121 /// client: Client,
1122 /// handle: EventHandlerHandle| async move {
1123 /// // Common usage: Check arriving Event is the expected one
1124 /// println!("Expected RoomMemberEvent received!");
1125 /// client.remove_event_handler(handle);
1126 /// },
1127 /// );
1128 /// # });
1129 /// ```
1130 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1131 self.inner.event_handlers.remove(handle);
1132 }
1133
1134 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1135 /// the given handle.
1136 ///
1137 /// When the returned value is dropped, the event handler will be removed.
1138 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1139 EventHandlerDropGuard::new(handle, self.clone())
1140 }
1141
1142 /// Add an arbitrary value for use as event handler context.
1143 ///
1144 /// The value can be obtained in an event handler by adding an argument of
1145 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1146 ///
1147 /// If a value of the same type has been added before, it will be
1148 /// overwritten.
1149 ///
1150 /// # Examples
1151 ///
1152 /// ```no_run
1153 /// use matrix_sdk::{
1154 /// Room, event_handler::Ctx,
1155 /// ruma::events::room::message::SyncRoomMessageEvent,
1156 /// };
1157 /// # #[derive(Clone)]
1158 /// # struct SomeType;
1159 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1160 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1161 /// # futures_executor::block_on(async {
1162 /// # let client = matrix_sdk::Client::builder()
1163 /// # .homeserver_url(homeserver)
1164 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1165 /// # .build()
1166 /// # .await
1167 /// # .unwrap();
1168 ///
1169 /// // Handle used to send messages to the UI part of the app
1170 /// let my_gui_handle: SomeType = obtain_gui_handle();
1171 ///
1172 /// client.add_event_handler_context(my_gui_handle.clone());
1173 /// client.add_event_handler(
1174 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1175 /// async move {
1176 /// // gui_handle.send(DisplayMessage { message: ev });
1177 /// }
1178 /// },
1179 /// );
1180 /// # });
1181 /// ```
1182 pub fn add_event_handler_context<T>(&self, ctx: T)
1183 where
1184 T: Clone + Send + Sync + 'static,
1185 {
1186 self.inner.event_handlers.add_context(ctx);
1187 }
1188
1189 /// Register a handler for a notification.
1190 ///
1191 /// Similar to [`Client::add_event_handler`], but only allows functions
1192 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1193 /// [`Client`] for now.
1194 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1195 where
1196 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1197 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1198 {
1199 self.inner.notification_handlers.write().await.push(Box::new(
1200 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1201 ));
1202
1203 self
1204 }
1205
1206 /// Subscribe to all updates for the room with the given ID.
1207 ///
1208 /// The returned receiver will receive a new message for each sync response
1209 /// that contains updates for that room.
1210 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1211 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1212 btree_map::Entry::Vacant(entry) => {
1213 let (tx, rx) = broadcast::channel(8);
1214 entry.insert(tx);
1215 rx
1216 }
1217 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1218 }
1219 }
1220
1221 /// Subscribe to all updates to all rooms, whenever any has been received in
1222 /// a sync response.
1223 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1224 self.inner.room_updates_sender.subscribe()
1225 }
1226
1227 pub(crate) async fn notification_handlers(
1228 &self,
1229 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1230 self.inner.notification_handlers.read().await
1231 }
1232
1233 /// Get all the rooms the client knows about.
1234 ///
1235 /// This will return the list of joined, invited, and left rooms.
1236 pub fn rooms(&self) -> Vec<Room> {
1237 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1238 }
1239
1240 /// Get all the rooms the client knows about, filtered by room state.
1241 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1242 self.base_client()
1243 .rooms_filtered(filter)
1244 .into_iter()
1245 .map(|room| Room::new(self.clone(), room))
1246 .collect()
1247 }
1248
1249 /// Get a stream of all the rooms, in addition to the existing rooms.
1250 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1251 let (rooms, stream) = self.base_client().rooms_stream();
1252
1253 let map_room = |room| Room::new(self.clone(), room);
1254
1255 (
1256 rooms.into_iter().map(map_room).collect(),
1257 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1258 )
1259 }
1260
1261 /// Returns the joined rooms this client knows about.
1262 pub fn joined_rooms(&self) -> Vec<Room> {
1263 self.rooms_filtered(RoomStateFilter::JOINED)
1264 }
1265
1266 /// Returns the invited rooms this client knows about.
1267 pub fn invited_rooms(&self) -> Vec<Room> {
1268 self.rooms_filtered(RoomStateFilter::INVITED)
1269 }
1270
1271 /// Returns the left rooms this client knows about.
1272 pub fn left_rooms(&self) -> Vec<Room> {
1273 self.rooms_filtered(RoomStateFilter::LEFT)
1274 }
1275
1276 /// Returns the joined space rooms this client knows about.
1277 pub fn joined_space_rooms(&self) -> Vec<Room> {
1278 self.base_client()
1279 .rooms_filtered(RoomStateFilter::JOINED)
1280 .into_iter()
1281 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1282 .collect()
1283 }
1284
1285 /// Get a room with the given room id.
1286 ///
1287 /// # Arguments
1288 ///
1289 /// `room_id` - The unique id of the room that should be fetched.
1290 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1291 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1292 }
1293
1294 /// Gets the preview of a room, whether the current user has joined it or
1295 /// not.
1296 pub async fn get_room_preview(
1297 &self,
1298 room_or_alias_id: &RoomOrAliasId,
1299 via: Vec<OwnedServerName>,
1300 ) -> Result<RoomPreview> {
1301 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1302 Ok(room_id) => room_id.to_owned(),
1303 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1304 };
1305
1306 if let Some(room) = self.get_room(&room_id) {
1307 // The cached data can only be trusted if the room state is joined or
1308 // banned: for invite and knock rooms, no updates will be received
1309 // for the rooms after the invite/knock action took place so we may
1310 // have very out to date data for important fields such as
1311 // `join_rule`. For left rooms, the homeserver should return the latest info.
1312 match room.state() {
1313 RoomState::Joined | RoomState::Banned => {
1314 return Ok(RoomPreview::from_known_room(&room).await);
1315 }
1316 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1317 }
1318 }
1319
1320 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1321 }
1322
1323 /// Resolve a room alias to a room id and a list of servers which know
1324 /// about it.
1325 ///
1326 /// # Arguments
1327 ///
1328 /// `room_alias` - The room alias to be resolved.
1329 pub async fn resolve_room_alias(
1330 &self,
1331 room_alias: &RoomAliasId,
1332 ) -> HttpResult<get_alias::v3::Response> {
1333 let request = get_alias::v3::Request::new(room_alias.to_owned());
1334 self.send(request).await
1335 }
1336
1337 /// Checks if a room alias is not in use yet.
1338 ///
1339 /// Returns:
1340 /// - `Ok(true)` if the room alias is available.
1341 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1342 /// status code).
1343 /// - An `Err` otherwise.
1344 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1345 match self.resolve_room_alias(alias).await {
1346 // The room alias was resolved, so it's already in use.
1347 Ok(_) => Ok(false),
1348 Err(error) => {
1349 match error.client_api_error_kind() {
1350 // The room alias wasn't found, so it's available.
1351 Some(ErrorKind::NotFound) => Ok(true),
1352 _ => Err(error),
1353 }
1354 }
1355 }
1356 }
1357
1358 /// Adds a new room alias associated with a room to the room directory.
1359 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1360 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1361 self.send(request).await?;
1362 Ok(())
1363 }
1364
1365 /// Removes a room alias from the room directory.
1366 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1367 let request = delete_alias::v3::Request::new(alias.to_owned());
1368 self.send(request).await?;
1369 Ok(())
1370 }
1371
1372 /// Update the homeserver from the login response well-known if needed.
1373 ///
1374 /// # Arguments
1375 ///
1376 /// * `login_well_known` - The `well_known` field from a successful login
1377 /// response.
1378 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1379 if self.inner.respect_login_well_known
1380 && let Some(well_known) = login_well_known
1381 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1382 {
1383 self.set_homeserver(homeserver);
1384 }
1385 }
1386
1387 /// Similar to [`Client::restore_session_with`], with
1388 /// [`RoomLoadSettings::default()`].
1389 ///
1390 /// # Panics
1391 ///
1392 /// Panics if a session was already restored or logged in.
1393 #[instrument(skip_all)]
1394 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1395 self.restore_session_with(session, RoomLoadSettings::default()).await
1396 }
1397
1398 /// Restore a session previously logged-in using one of the available
1399 /// authentication APIs. The number of rooms to restore is controlled by
1400 /// [`RoomLoadSettings`].
1401 ///
1402 /// See the documentation of the corresponding authentication API's
1403 /// `restore_session` method for more information.
1404 ///
1405 /// # Panics
1406 ///
1407 /// Panics if a session was already restored or logged in.
1408 #[instrument(skip_all)]
1409 pub async fn restore_session_with(
1410 &self,
1411 session: impl Into<AuthSession>,
1412 room_load_settings: RoomLoadSettings,
1413 ) -> Result<()> {
1414 let session = session.into();
1415 match session {
1416 AuthSession::Matrix(session) => {
1417 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1418 }
1419 AuthSession::OAuth(session) => {
1420 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1421 }
1422 }
1423 }
1424
1425 /// Refresh the access token using the authentication API used to log into
1426 /// this session.
1427 ///
1428 /// See the documentation of the authentication API's `refresh_access_token`
1429 /// method for more information.
1430 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1431 let Some(auth_api) = self.auth_api() else {
1432 return Err(RefreshTokenError::RefreshTokenRequired);
1433 };
1434
1435 match auth_api {
1436 AuthApi::Matrix(api) => {
1437 trace!("Token refresh: Using the homeserver.");
1438 Box::pin(api.refresh_access_token()).await?;
1439 }
1440 AuthApi::OAuth(api) => {
1441 trace!("Token refresh: Using OAuth 2.0.");
1442 Box::pin(api.refresh_access_token()).await?;
1443 }
1444 }
1445
1446 Ok(())
1447 }
1448
1449 /// Log out the current session using the proper authentication API.
1450 ///
1451 /// # Errors
1452 ///
1453 /// Returns an error if the session is not authenticated or if an error
1454 /// occurred while making the request to the server.
1455 pub async fn logout(&self) -> Result<(), Error> {
1456 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1457 match auth_api {
1458 AuthApi::Matrix(matrix_auth) => {
1459 matrix_auth.logout().await?;
1460 Ok(())
1461 }
1462 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1463 }
1464 }
1465
1466 /// Get or upload a sync filter.
1467 ///
1468 /// This method will either get a filter ID from the store or upload the
1469 /// filter definition to the homeserver and return the new filter ID.
1470 ///
1471 /// # Arguments
1472 ///
1473 /// * `filter_name` - The unique name of the filter, this name will be used
1474 /// locally to store and identify the filter ID returned by the server.
1475 ///
1476 /// * `definition` - The filter definition that should be uploaded to the
1477 /// server if no filter ID can be found in the store.
1478 ///
1479 /// # Examples
1480 ///
1481 /// ```no_run
1482 /// # use matrix_sdk::{
1483 /// # Client, config::SyncSettings,
1484 /// # ruma::api::client::{
1485 /// # filter::{
1486 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1487 /// # },
1488 /// # sync::sync_events::v3::Filter,
1489 /// # }
1490 /// # };
1491 /// # use url::Url;
1492 /// # async {
1493 /// # let homeserver = Url::parse("http://example.com").unwrap();
1494 /// # let client = Client::new(homeserver).await.unwrap();
1495 /// let mut filter = FilterDefinition::default();
1496 ///
1497 /// // Let's enable member lazy loading.
1498 /// filter.room.state.lazy_load_options =
1499 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1500 ///
1501 /// let filter_id = client
1502 /// .get_or_upload_filter("sync", filter)
1503 /// .await
1504 /// .unwrap();
1505 ///
1506 /// let sync_settings = SyncSettings::new()
1507 /// .filter(Filter::FilterId(filter_id));
1508 ///
1509 /// let response = client.sync_once(sync_settings).await.unwrap();
1510 /// # };
1511 #[instrument(skip(self, definition))]
1512 pub async fn get_or_upload_filter(
1513 &self,
1514 filter_name: &str,
1515 definition: FilterDefinition,
1516 ) -> Result<String> {
1517 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1518 debug!("Found filter locally");
1519 Ok(filter)
1520 } else {
1521 debug!("Didn't find filter locally");
1522 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1523 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1524 let response = self.send(request).await?;
1525
1526 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1527
1528 Ok(response.filter_id)
1529 }
1530 }
1531
1532 /// Prepare to join a room by ID, by getting the current details about it
1533 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1534 let room = self.get_room(room_id)?;
1535
1536 let inviter = match room.invite_details().await {
1537 Ok(details) => details.inviter,
1538 Err(Error::WrongRoomState(_)) => None,
1539 Err(e) => {
1540 warn!("Error fetching invite details for room: {e:?}");
1541 None
1542 }
1543 };
1544
1545 Some(PreJoinRoomInfo { inviter })
1546 }
1547
1548 /// Finish joining a room.
1549 ///
1550 /// If the room was an invite that should be marked as a DM, will include it
1551 /// in the DM event after creating the joined room.
1552 ///
1553 /// If encrypted history sharing is enabled, will check to see if we have a
1554 /// key bundle, and import it if so.
1555 ///
1556 /// # Arguments
1557 ///
1558 /// * `room_id` - The `RoomId` of the room that was joined.
1559 /// * `pre_join_room_info` - Information about the room before we joined.
1560 async fn finish_join_room(
1561 &self,
1562 room_id: &RoomId,
1563 pre_join_room_info: Option<PreJoinRoomInfo>,
1564 ) -> Result<Room> {
1565 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1566 room.state() == RoomState::Invited
1567 && room.is_direct().await.unwrap_or_else(|e| {
1568 warn!(%room_id, "is_direct() failed: {e}");
1569 false
1570 })
1571 } else {
1572 false
1573 };
1574
1575 let base_room = self
1576 .base_client()
1577 .room_joined(
1578 room_id,
1579 pre_join_room_info
1580 .as_ref()
1581 .and_then(|info| info.inviter.as_ref())
1582 .map(|i| i.user_id().to_owned()),
1583 )
1584 .await?;
1585 let room = Room::new(self.clone(), base_room);
1586
1587 if mark_as_dm {
1588 room.set_is_direct(true).await?;
1589 }
1590
1591 #[cfg(feature = "e2e-encryption")]
1592 if self.inner.enable_share_history_on_invite
1593 && let Some(inviter) =
1594 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1595 {
1596 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1597 .await?;
1598 }
1599
1600 // Suppress "unused variable" and "unused field" lints
1601 #[cfg(not(feature = "e2e-encryption"))]
1602 let _ = pre_join_room_info.map(|i| i.inviter);
1603
1604 Ok(room)
1605 }
1606
1607 /// Join a room by `RoomId`.
1608 ///
1609 /// Returns the `Room` in the joined state.
1610 ///
1611 /// # Arguments
1612 ///
1613 /// * `room_id` - The `RoomId` of the room to be joined.
1614 #[instrument(skip(self))]
1615 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1616 // See who invited us to this room, if anyone. Note we have to do this before
1617 // making the `/join` request, otherwise we could race against the sync.
1618 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1619
1620 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1621 let response = self.send(request).await?;
1622 self.finish_join_room(&response.room_id, pre_join_info).await
1623 }
1624
1625 /// Join a room by `RoomOrAliasId`.
1626 ///
1627 /// Returns the `Room` in the joined state.
1628 ///
1629 /// # Arguments
1630 ///
1631 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1632 /// alias looks like `#name:example.com`.
1633 /// * `server_names` - The server names to be used for resolving the alias,
1634 /// if needs be.
1635 pub async fn join_room_by_id_or_alias(
1636 &self,
1637 alias: &RoomOrAliasId,
1638 server_names: &[OwnedServerName],
1639 ) -> Result<Room> {
1640 let pre_join_info = {
1641 match alias.try_into() {
1642 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1643 Err(_) => {
1644 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1645 // responding to an invitation to the room, and therefore don't need to handle
1646 // things that happen as a result of invites.
1647 None
1648 }
1649 }
1650 };
1651 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1652 via: server_names.to_owned(),
1653 });
1654 let response = self.send(request).await?;
1655 self.finish_join_room(&response.room_id, pre_join_info).await
1656 }
1657
1658 /// Search the homeserver's directory of public rooms.
1659 ///
1660 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1661 /// a `get_public_rooms::Response`.
1662 ///
1663 /// # Arguments
1664 ///
1665 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1666 ///
1667 /// * `since` - Pagination token from a previous request.
1668 ///
1669 /// * `server` - The name of the server, if `None` the requested server is
1670 /// used.
1671 ///
1672 /// # Examples
1673 /// ```no_run
1674 /// use matrix_sdk::Client;
1675 /// # use url::Url;
1676 /// # let homeserver = Url::parse("http://example.com").unwrap();
1677 /// # let limit = Some(10);
1678 /// # let since = Some("since token");
1679 /// # let server = Some("servername.com".try_into().unwrap());
1680 /// # async {
1681 /// let mut client = Client::new(homeserver).await.unwrap();
1682 ///
1683 /// client.public_rooms(limit, since, server).await;
1684 /// # };
1685 /// ```
1686 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1687 pub async fn public_rooms(
1688 &self,
1689 limit: Option<u32>,
1690 since: Option<&str>,
1691 server: Option<&ServerName>,
1692 ) -> HttpResult<get_public_rooms::v3::Response> {
1693 let limit = limit.map(UInt::from);
1694
1695 let request = assign!(get_public_rooms::v3::Request::new(), {
1696 limit,
1697 since: since.map(ToOwned::to_owned),
1698 server: server.map(ToOwned::to_owned),
1699 });
1700 self.send(request).await
1701 }
1702
1703 /// Create a room with the given parameters.
1704 ///
1705 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1706 /// created room.
1707 ///
1708 /// If you want to create a direct message with one specific user, you can
1709 /// use [`create_dm`][Self::create_dm], which is more convenient than
1710 /// assembling the [`create_room::v3::Request`] yourself.
1711 ///
1712 /// If the `is_direct` field of the request is set to `true` and at least
1713 /// one user is invited, the room will be automatically added to the direct
1714 /// rooms in the account data.
1715 ///
1716 /// # Examples
1717 ///
1718 /// ```no_run
1719 /// use matrix_sdk::{
1720 /// Client,
1721 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1722 /// };
1723 /// # use url::Url;
1724 /// #
1725 /// # async {
1726 /// # let homeserver = Url::parse("http://example.com").unwrap();
1727 /// let request = CreateRoomRequest::new();
1728 /// let client = Client::new(homeserver).await.unwrap();
1729 /// assert!(client.create_room(request).await.is_ok());
1730 /// # };
1731 /// ```
1732 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1733 let invite = request.invite.clone();
1734 let is_direct_room = request.is_direct;
1735 let response = self.send(request).await?;
1736
1737 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1738
1739 let joined_room = Room::new(self.clone(), base_room);
1740
1741 if is_direct_room
1742 && !invite.is_empty()
1743 && let Err(error) =
1744 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1745 {
1746 // FIXME: Retry in the background
1747 error!("Failed to mark room as DM: {error}");
1748 }
1749
1750 Ok(joined_room)
1751 }
1752
1753 /// Create a DM room.
1754 ///
1755 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1756 /// given user being invited, the room marked `is_direct` and both the
1757 /// creator and invitee getting the default maximum power level.
1758 ///
1759 /// If the `e2e-encryption` feature is enabled, the room will also be
1760 /// encrypted.
1761 ///
1762 /// # Arguments
1763 ///
1764 /// * `user_id` - The ID of the user to create a DM for.
1765 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1766 #[cfg(feature = "e2e-encryption")]
1767 let initial_state = vec![
1768 InitialStateEvent::with_empty_state_key(
1769 RoomEncryptionEventContent::with_recommended_defaults(),
1770 )
1771 .to_raw_any(),
1772 ];
1773
1774 #[cfg(not(feature = "e2e-encryption"))]
1775 let initial_state = vec![];
1776
1777 let request = assign!(create_room::v3::Request::new(), {
1778 invite: vec![user_id.to_owned()],
1779 is_direct: true,
1780 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1781 initial_state,
1782 });
1783
1784 self.create_room(request).await
1785 }
1786
1787 /// Search the homeserver's directory for public rooms with a filter.
1788 ///
1789 /// # Arguments
1790 ///
1791 /// * `room_search` - The easiest way to create this request is using the
1792 /// `get_public_rooms_filtered::Request` itself.
1793 ///
1794 /// # Examples
1795 ///
1796 /// ```no_run
1797 /// # use url::Url;
1798 /// # use matrix_sdk::Client;
1799 /// # async {
1800 /// # let homeserver = Url::parse("http://example.com")?;
1801 /// use matrix_sdk::ruma::{
1802 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1803 /// };
1804 /// # let mut client = Client::new(homeserver).await?;
1805 ///
1806 /// let mut filter = Filter::new();
1807 /// filter.generic_search_term = Some("rust".to_owned());
1808 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1809 /// request.filter = filter;
1810 ///
1811 /// let response = client.public_rooms_filtered(request).await?;
1812 ///
1813 /// for room in response.chunk {
1814 /// println!("Found room {room:?}");
1815 /// }
1816 /// # anyhow::Ok(()) };
1817 /// ```
1818 pub async fn public_rooms_filtered(
1819 &self,
1820 request: get_public_rooms_filtered::v3::Request,
1821 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1822 self.send(request).await
1823 }
1824
1825 /// Send an arbitrary request to the server, without updating client state.
1826 ///
1827 /// **Warning:** Because this method *does not* update the client state, it
1828 /// is important to make sure that you account for this yourself, and
1829 /// use wrapper methods where available. This method should *only* be
1830 /// used if a wrapper method for the endpoint you'd like to use is not
1831 /// available.
1832 ///
1833 /// # Arguments
1834 ///
1835 /// * `request` - A filled out and valid request for the endpoint to be hit
1836 ///
1837 /// * `timeout` - An optional request timeout setting, this overrides the
1838 /// default request setting if one was set.
1839 ///
1840 /// # Examples
1841 ///
1842 /// ```no_run
1843 /// # use matrix_sdk::{Client, config::SyncSettings};
1844 /// # use url::Url;
1845 /// # async {
1846 /// # let homeserver = Url::parse("http://localhost:8080")?;
1847 /// # let mut client = Client::new(homeserver).await?;
1848 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1849 ///
1850 /// // First construct the request you want to make
1851 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1852 /// // for all available Endpoints
1853 /// let user_id = user_id!("@example:localhost").to_owned();
1854 /// let request = profile::get_profile::v3::Request::new(user_id);
1855 ///
1856 /// // Start the request using Client::send()
1857 /// let response = client.send(request).await?;
1858 ///
1859 /// // Check the corresponding Response struct to find out what types are
1860 /// // returned
1861 /// # anyhow::Ok(()) };
1862 /// ```
1863 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1864 where
1865 Request: OutgoingRequest + Clone + Debug,
1866 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1867 {
1868 SendRequest {
1869 client: self.clone(),
1870 request,
1871 config: None,
1872 send_progress: Default::default(),
1873 }
1874 }
1875
1876 pub(crate) async fn send_inner<Request>(
1877 &self,
1878 request: Request,
1879 config: Option<RequestConfig>,
1880 send_progress: SharedObservable<TransmissionProgress>,
1881 ) -> HttpResult<Request::IncomingResponse>
1882 where
1883 Request: OutgoingRequest + Debug,
1884 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1885 {
1886 let homeserver = self.homeserver().to_string();
1887 let access_token = self.access_token();
1888
1889 self.inner
1890 .http_client
1891 .send(
1892 request,
1893 config,
1894 homeserver,
1895 access_token.as_deref(),
1896 &self.supported_versions().await?,
1897 send_progress,
1898 )
1899 .await
1900 }
1901
1902 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1903 _ = self
1904 .inner
1905 .auth_ctx
1906 .session_change_sender
1907 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1908 }
1909
1910 /// Fetches server versions from network; no caching.
1911 pub async fn fetch_server_versions(
1912 &self,
1913 request_config: Option<RequestConfig>,
1914 ) -> HttpResult<get_supported_versions::Response> {
1915 let server_versions = self
1916 .inner
1917 .http_client
1918 .send(
1919 get_supported_versions::Request::new(),
1920 request_config,
1921 self.homeserver().to_string(),
1922 None,
1923 &SupportedVersions {
1924 versions: [MatrixVersion::V1_0].into(),
1925 features: Default::default(),
1926 },
1927 Default::default(),
1928 )
1929 .await?;
1930
1931 Ok(server_versions)
1932 }
1933
1934 /// Fetches client well_known from network; no caching.
1935 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1936 let server_url_string = self
1937 .server()
1938 .unwrap_or(
1939 // Sometimes people configure their well-known directly on the homeserver so use
1940 // this as a fallback when the server name is unknown.
1941 &self.homeserver(),
1942 )
1943 .to_string();
1944
1945 let well_known = self
1946 .inner
1947 .http_client
1948 .send(
1949 discover_homeserver::Request::new(),
1950 Some(RequestConfig::short_retry()),
1951 server_url_string,
1952 None,
1953 &SupportedVersions {
1954 versions: [MatrixVersion::V1_0].into(),
1955 features: Default::default(),
1956 },
1957 Default::default(),
1958 )
1959 .await;
1960
1961 match well_known {
1962 Ok(well_known) => Some(well_known),
1963 Err(http_error) => {
1964 // It is perfectly valid to not have a well-known file.
1965 // Maybe we should check for a specific error code to be sure?
1966 warn!("Failed to fetch client well-known: {http_error}");
1967 None
1968 }
1969 }
1970 }
1971
1972 /// Load server info from storage, or fetch them from network and cache
1973 /// them.
1974 async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1975 match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1976 Ok(Some(stored)) => {
1977 if let Some(server_info) =
1978 stored.into_server_info().and_then(|info| info.maybe_decode())
1979 {
1980 return Ok(server_info);
1981 }
1982 }
1983 Ok(None) => {
1984 // fallthrough: cache is empty
1985 }
1986 Err(err) => {
1987 warn!("error when loading cached server info: {err}");
1988 // fallthrough to network.
1989 }
1990 }
1991
1992 let server_versions = self.fetch_server_versions(None).await?;
1993 let well_known = self.fetch_client_well_known().await;
1994 let server_info = ServerInfo::new(
1995 server_versions.versions.clone(),
1996 server_versions.unstable_features.clone(),
1997 well_known.map(Into::into),
1998 );
1999
2000 // Attempt to cache the result in storage.
2001 {
2002 if let Err(err) = self
2003 .state_store()
2004 .set_kv_data(
2005 StateStoreDataKey::ServerInfo,
2006 StateStoreDataValue::ServerInfo(server_info.clone()),
2007 )
2008 .await
2009 {
2010 warn!("error when caching server info: {err}");
2011 }
2012 }
2013
2014 Ok(server_info)
2015 }
2016
2017 async fn get_or_load_and_cache_server_info<
2018 Value,
2019 MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2020 >(
2021 &self,
2022 map: MapFunction,
2023 ) -> HttpResult<Value> {
2024 let server_info = &self.inner.caches.server_info;
2025 if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2026 return Ok(val);
2027 }
2028
2029 let mut guarded_server_info = server_info.write().await;
2030 if let CachedValue::Cached(val) = map(&guarded_server_info) {
2031 return Ok(val);
2032 }
2033
2034 let server_info = self.load_or_fetch_server_info().await?;
2035
2036 // Fill both unstable features and server versions at once.
2037 let mut supported = server_info.supported_versions();
2038 if supported.versions.is_empty() {
2039 supported.versions = [MatrixVersion::V1_0].into();
2040 }
2041
2042 guarded_server_info.supported_versions = CachedValue::Cached(supported);
2043 guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2044
2045 // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2046 // fetch an optional property), the function will always return some.
2047 Ok(map(&guarded_server_info).unwrap_cached_value())
2048 }
2049
2050 /// Get the Matrix versions and features supported by the homeserver by
2051 /// fetching them from the server or the cache.
2052 ///
2053 /// This is equivalent to calling both [`Client::server_versions()`] and
2054 /// [`Client::unstable_features()`]. To always fetch the result from the
2055 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2056 /// and then `.as_supported_versions()` on the response.
2057 ///
2058 /// # Examples
2059 ///
2060 /// ```no_run
2061 /// use ruma::api::{FeatureFlag, MatrixVersion};
2062 /// # use matrix_sdk::{Client, config::SyncSettings};
2063 /// # use url::Url;
2064 /// # async {
2065 /// # let homeserver = Url::parse("http://localhost:8080")?;
2066 /// # let mut client = Client::new(homeserver).await?;
2067 ///
2068 /// let supported = client.supported_versions().await?;
2069 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2070 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2071 ///
2072 /// let msc_x_feature = FeatureFlag::from("msc_x");
2073 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2074 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2075 /// # anyhow::Ok(()) };
2076 /// ```
2077 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2078 self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2079 .await
2080 }
2081
2082 /// Get the Matrix versions supported by the homeserver by fetching them
2083 /// from the server or the cache.
2084 ///
2085 /// # Examples
2086 ///
2087 /// ```no_run
2088 /// use ruma::api::MatrixVersion;
2089 /// # use matrix_sdk::{Client, config::SyncSettings};
2090 /// # use url::Url;
2091 /// # async {
2092 /// # let homeserver = Url::parse("http://localhost:8080")?;
2093 /// # let mut client = Client::new(homeserver).await?;
2094 ///
2095 /// let server_versions = client.server_versions().await?;
2096 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2097 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2098 /// # anyhow::Ok(()) };
2099 /// ```
2100 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2101 self.get_or_load_and_cache_server_info(|server_info| {
2102 server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2103 })
2104 .await
2105 }
2106
2107 /// Get the unstable features supported by the homeserver by fetching them
2108 /// from the server or the cache.
2109 ///
2110 /// # Examples
2111 ///
2112 /// ```no_run
2113 /// use matrix_sdk::ruma::api::FeatureFlag;
2114 /// # use matrix_sdk::{Client, config::SyncSettings};
2115 /// # use url::Url;
2116 /// # async {
2117 /// # let homeserver = Url::parse("http://localhost:8080")?;
2118 /// # let mut client = Client::new(homeserver).await?;
2119 ///
2120 /// let msc_x_feature = FeatureFlag::from("msc_x");
2121 /// let unstable_features = client.unstable_features().await?;
2122 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2123 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2124 /// # anyhow::Ok(()) };
2125 /// ```
2126 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2127 self.get_or_load_and_cache_server_info(|server_info| {
2128 server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2129 })
2130 .await
2131 }
2132
2133 /// Get information about the homeserver's advertised RTC foci by fetching
2134 /// the well-known file from the server or the cache.
2135 ///
2136 /// # Examples
2137 /// ```no_run
2138 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2139 /// # use url::Url;
2140 /// # async {
2141 /// # let homeserver = Url::parse("http://localhost:8080")?;
2142 /// # let mut client = Client::new(homeserver).await?;
2143 /// let rtc_foci = client.rtc_foci().await?;
2144 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2145 /// RtcFocusInfo::LiveKit(info) => Some(info),
2146 /// _ => None,
2147 /// });
2148 /// if let Some(info) = default_livekit_focus_info {
2149 /// println!("Default LiveKit service URL: {}", info.service_url);
2150 /// }
2151 /// # anyhow::Ok(()) };
2152 /// ```
2153 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2154 let well_known = self
2155 .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2156 .await?;
2157
2158 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2159 }
2160
2161 /// Empty the server version and unstable features cache.
2162 ///
2163 /// Since the SDK caches server info (versions, unstable features,
2164 /// well-known etc), it's possible to have a stale entry in the cache. This
2165 /// functions makes it possible to force reset it.
2166 pub async fn reset_server_info(&self) -> Result<()> {
2167 // Empty the in-memory caches.
2168 let mut guard = self.inner.caches.server_info.write().await;
2169 guard.supported_versions = CachedValue::NotSet;
2170
2171 // Empty the store cache.
2172 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2173 }
2174
2175 /// Check whether MSC 4028 is enabled on the homeserver.
2176 ///
2177 /// # Examples
2178 ///
2179 /// ```no_run
2180 /// # use matrix_sdk::{Client, config::SyncSettings};
2181 /// # use url::Url;
2182 /// # async {
2183 /// # let homeserver = Url::parse("http://localhost:8080")?;
2184 /// # let mut client = Client::new(homeserver).await?;
2185 /// let msc4028_enabled =
2186 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2187 /// # anyhow::Ok(()) };
2188 /// ```
2189 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2190 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2191 }
2192
2193 /// Get information of all our own devices.
2194 ///
2195 /// # Examples
2196 ///
2197 /// ```no_run
2198 /// # use matrix_sdk::{Client, config::SyncSettings};
2199 /// # use url::Url;
2200 /// # async {
2201 /// # let homeserver = Url::parse("http://localhost:8080")?;
2202 /// # let mut client = Client::new(homeserver).await?;
2203 /// let response = client.devices().await?;
2204 ///
2205 /// for device in response.devices {
2206 /// println!(
2207 /// "Device: {} {}",
2208 /// device.device_id,
2209 /// device.display_name.as_deref().unwrap_or("")
2210 /// );
2211 /// }
2212 /// # anyhow::Ok(()) };
2213 /// ```
2214 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2215 let request = get_devices::v3::Request::new();
2216
2217 self.send(request).await
2218 }
2219
2220 /// Delete the given devices from the server.
2221 ///
2222 /// # Arguments
2223 ///
2224 /// * `devices` - The list of devices that should be deleted from the
2225 /// server.
2226 ///
2227 /// * `auth_data` - This request requires user interactive auth, the first
2228 /// request needs to set this to `None` and will always fail with an
2229 /// `UiaaResponse`. The response will contain information for the
2230 /// interactive auth and the same request needs to be made but this time
2231 /// with some `auth_data` provided.
2232 ///
2233 /// ```no_run
2234 /// # use matrix_sdk::{
2235 /// # ruma::{api::client::uiaa, device_id},
2236 /// # Client, Error, config::SyncSettings,
2237 /// # };
2238 /// # use serde_json::json;
2239 /// # use url::Url;
2240 /// # use std::collections::BTreeMap;
2241 /// # async {
2242 /// # let homeserver = Url::parse("http://localhost:8080")?;
2243 /// # let mut client = Client::new(homeserver).await?;
2244 /// let devices = &[device_id!("DEVICEID").to_owned()];
2245 ///
2246 /// if let Err(e) = client.delete_devices(devices, None).await {
2247 /// if let Some(info) = e.as_uiaa_response() {
2248 /// let mut password = uiaa::Password::new(
2249 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2250 /// "wordpass".to_owned(),
2251 /// );
2252 /// password.session = info.session.clone();
2253 ///
2254 /// client
2255 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2256 /// .await?;
2257 /// }
2258 /// }
2259 /// # anyhow::Ok(()) };
2260 pub async fn delete_devices(
2261 &self,
2262 devices: &[OwnedDeviceId],
2263 auth_data: Option<uiaa::AuthData>,
2264 ) -> HttpResult<delete_devices::v3::Response> {
2265 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2266 request.auth = auth_data;
2267
2268 self.send(request).await
2269 }
2270
2271 /// Change the display name of a device owned by the current user.
2272 ///
2273 /// Returns a `update_device::Response` which specifies the result
2274 /// of the operation.
2275 ///
2276 /// # Arguments
2277 ///
2278 /// * `device_id` - The ID of the device to change the display name of.
2279 /// * `display_name` - The new display name to set.
2280 pub async fn rename_device(
2281 &self,
2282 device_id: &DeviceId,
2283 display_name: &str,
2284 ) -> HttpResult<update_device::v3::Response> {
2285 let mut request = update_device::v3::Request::new(device_id.to_owned());
2286 request.display_name = Some(display_name.to_owned());
2287
2288 self.send(request).await
2289 }
2290
2291 /// Synchronize the client's state with the latest state on the server.
2292 ///
2293 /// ## Syncing Events
2294 ///
2295 /// Messages or any other type of event need to be periodically fetched from
2296 /// the server, this is achieved by sending a `/sync` request to the server.
2297 ///
2298 /// The first sync is sent out without a [`token`]. The response of the
2299 /// first sync will contain a [`next_batch`] field which should then be
2300 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2301 /// don't receive the same events multiple times.
2302 ///
2303 /// ## Long Polling
2304 ///
2305 /// A sync should in the usual case always be in flight. The
2306 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2307 /// long the server will wait for new events before it will respond.
2308 /// The server will respond immediately if some new events arrive before the
2309 /// timeout has expired. If no changes arrive and the timeout expires an
2310 /// empty sync response will be sent to the client.
2311 ///
2312 /// This method of sending a request that may not receive a response
2313 /// immediately is called long polling.
2314 ///
2315 /// ## Filtering Events
2316 ///
2317 /// The number or type of messages and events that the client should receive
2318 /// from the server can be altered using a [`Filter`].
2319 ///
2320 /// Filters can be non-trivial and, since they will be sent with every sync
2321 /// request, they may take up a bunch of unnecessary bandwidth.
2322 ///
2323 /// Luckily filters can be uploaded to the server and reused using an unique
2324 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2325 /// method.
2326 ///
2327 /// # Arguments
2328 ///
2329 /// * `sync_settings` - Settings for the sync call, this allows us to set
2330 /// various options to configure the sync:
2331 /// * [`filter`] - To configure which events we receive and which get
2332 /// [filtered] by the server
2333 /// * [`timeout`] - To configure our [long polling] setup.
2334 /// * [`token`] - To tell the server which events we already received
2335 /// and where we wish to continue syncing.
2336 /// * [`full_state`] - To tell the server that we wish to receive all
2337 /// state events, regardless of our configured [`token`].
2338 /// * [`set_presence`] - To tell the server to set the presence and to
2339 /// which state.
2340 ///
2341 /// # Examples
2342 ///
2343 /// ```no_run
2344 /// # use url::Url;
2345 /// # async {
2346 /// # let homeserver = Url::parse("http://localhost:8080")?;
2347 /// # let username = "";
2348 /// # let password = "";
2349 /// use matrix_sdk::{
2350 /// Client, config::SyncSettings,
2351 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2352 /// };
2353 ///
2354 /// let client = Client::new(homeserver).await?;
2355 /// client.matrix_auth().login_username(username, password).send().await?;
2356 ///
2357 /// // Sync once so we receive the client state and old messages.
2358 /// client.sync_once(SyncSettings::default()).await?;
2359 ///
2360 /// // Register our handler so we start responding once we receive a new
2361 /// // event.
2362 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2363 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2364 /// });
2365 ///
2366 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2367 /// // from our `sync_once()` call automatically.
2368 /// client.sync(SyncSettings::default()).await;
2369 /// # anyhow::Ok(()) };
2370 /// ```
2371 ///
2372 /// [`sync`]: #method.sync
2373 /// [`SyncSettings`]: crate::config::SyncSettings
2374 /// [`token`]: crate::config::SyncSettings#method.token
2375 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2376 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2377 /// [`set_presence`]: ruma::presence::PresenceState
2378 /// [`filter`]: crate::config::SyncSettings#method.filter
2379 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2380 /// [`next_batch`]: SyncResponse#structfield.next_batch
2381 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2382 /// [long polling]: #long-polling
2383 /// [filtered]: #filtering-events
2384 #[instrument(skip(self))]
2385 pub async fn sync_once(
2386 &self,
2387 sync_settings: crate::config::SyncSettings,
2388 ) -> Result<SyncResponse> {
2389 // The sync might not return for quite a while due to the timeout.
2390 // We'll see if there's anything crypto related to send out before we
2391 // sync, i.e. if we closed our client after a sync but before the
2392 // crypto requests were sent out.
2393 //
2394 // This will mostly be a no-op.
2395 #[cfg(feature = "e2e-encryption")]
2396 if let Err(e) = self.send_outgoing_requests().await {
2397 error!(error = ?e, "Error while sending outgoing E2EE requests");
2398 }
2399
2400 let token = match sync_settings.token {
2401 SyncToken::Specific(token) => Some(token),
2402 SyncToken::NoToken => None,
2403 SyncToken::ReusePrevious => self.sync_token().await,
2404 };
2405
2406 let request = assign!(sync_events::v3::Request::new(), {
2407 filter: sync_settings.filter.map(|f| *f),
2408 since: token,
2409 full_state: sync_settings.full_state,
2410 set_presence: sync_settings.set_presence,
2411 timeout: sync_settings.timeout,
2412 use_state_after: true,
2413 });
2414 let mut request_config = self.request_config();
2415 if let Some(timeout) = sync_settings.timeout {
2416 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2417 request_config.timeout = Some(base_timeout + timeout);
2418 }
2419
2420 let response = self.send(request).with_request_config(request_config).await?;
2421 let next_batch = response.next_batch.clone();
2422 let response = self.process_sync(response).await?;
2423
2424 #[cfg(feature = "e2e-encryption")]
2425 if let Err(e) = self.send_outgoing_requests().await {
2426 error!(error = ?e, "Error while sending outgoing E2EE requests");
2427 }
2428
2429 self.inner.sync_beat.notify(usize::MAX);
2430
2431 Ok(SyncResponse::new(next_batch, response))
2432 }
2433
2434 /// Repeatedly synchronize the client state with the server.
2435 ///
2436 /// This method will only return on error, if cancellation is needed
2437 /// the method should be wrapped in a cancelable task or the
2438 /// [`Client::sync_with_callback`] method can be used or
2439 /// [`Client::sync_with_result_callback`] if you want to handle error
2440 /// cases in the loop, too.
2441 ///
2442 /// This method will internally call [`Client::sync_once`] in a loop.
2443 ///
2444 /// This method can be used with the [`Client::add_event_handler`]
2445 /// method to react to individual events. If you instead wish to handle
2446 /// events in a bulk manner the [`Client::sync_with_callback`],
2447 /// [`Client::sync_with_result_callback`] and
2448 /// [`Client::sync_stream`] methods can be used instead. Those methods
2449 /// repeatedly return the whole sync response.
2450 ///
2451 /// # Arguments
2452 ///
2453 /// * `sync_settings` - Settings for the sync call. *Note* that those
2454 /// settings will be only used for the first sync call. See the argument
2455 /// docs for [`Client::sync_once`] for more info.
2456 ///
2457 /// # Return
2458 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2459 /// up to the user of the API to check the error and decide whether the sync
2460 /// should continue or not.
2461 ///
2462 /// # Examples
2463 ///
2464 /// ```no_run
2465 /// # use url::Url;
2466 /// # async {
2467 /// # let homeserver = Url::parse("http://localhost:8080")?;
2468 /// # let username = "";
2469 /// # let password = "";
2470 /// use matrix_sdk::{
2471 /// Client, config::SyncSettings,
2472 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2473 /// };
2474 ///
2475 /// let client = Client::new(homeserver).await?;
2476 /// client.matrix_auth().login_username(&username, &password).send().await?;
2477 ///
2478 /// // Register our handler so we start responding once we receive a new
2479 /// // event.
2480 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2481 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2482 /// });
2483 ///
2484 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2485 /// // automatically.
2486 /// client.sync(SyncSettings::default()).await?;
2487 /// # anyhow::Ok(()) };
2488 /// ```
2489 ///
2490 /// [argument docs]: #method.sync_once
2491 /// [`sync_with_callback`]: #method.sync_with_callback
2492 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2493 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2494 }
2495
2496 /// Repeatedly call sync to synchronize the client state with the server.
2497 ///
2498 /// # Arguments
2499 ///
2500 /// * `sync_settings` - Settings for the sync call. *Note* that those
2501 /// settings will be only used for the first sync call. See the argument
2502 /// docs for [`Client::sync_once`] for more info.
2503 ///
2504 /// * `callback` - A callback that will be called every time a successful
2505 /// response has been fetched from the server. The callback must return a
2506 /// boolean which signalizes if the method should stop syncing. If the
2507 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2508 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2509 ///
2510 /// # Return
2511 /// The sync runs until an error occurs or the
2512 /// callback indicates that the Loop should stop. If the callback asked for
2513 /// a regular stop, the result will be `Ok(())` otherwise the
2514 /// `Err(Error)` is returned.
2515 ///
2516 /// # Examples
2517 ///
2518 /// The following example demonstrates how to sync forever while sending all
2519 /// the interesting events through a mpsc channel to another thread e.g. a
2520 /// UI thread.
2521 ///
2522 /// ```no_run
2523 /// # use std::time::Duration;
2524 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2525 /// # use url::Url;
2526 /// # async {
2527 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2528 /// # let mut client = Client::new(homeserver).await.unwrap();
2529 ///
2530 /// use tokio::sync::mpsc::channel;
2531 ///
2532 /// let (tx, rx) = channel(100);
2533 ///
2534 /// let sync_channel = &tx;
2535 /// let sync_settings = SyncSettings::new()
2536 /// .timeout(Duration::from_secs(30));
2537 ///
2538 /// client
2539 /// .sync_with_callback(sync_settings, |response| async move {
2540 /// let channel = sync_channel;
2541 /// for (room_id, room) in response.rooms.joined {
2542 /// for event in room.timeline.events {
2543 /// channel.send(event).await.unwrap();
2544 /// }
2545 /// }
2546 ///
2547 /// LoopCtrl::Continue
2548 /// })
2549 /// .await;
2550 /// };
2551 /// ```
2552 #[instrument(skip_all)]
2553 pub async fn sync_with_callback<C>(
2554 &self,
2555 sync_settings: crate::config::SyncSettings,
2556 callback: impl Fn(SyncResponse) -> C,
2557 ) -> Result<(), Error>
2558 where
2559 C: Future<Output = LoopCtrl>,
2560 {
2561 self.sync_with_result_callback(sync_settings, |result| async {
2562 Ok(callback(result?).await)
2563 })
2564 .await
2565 }
2566
2567 /// Repeatedly call sync to synchronize the client state with the server.
2568 ///
2569 /// # Arguments
2570 ///
2571 /// * `sync_settings` - Settings for the sync call. *Note* that those
2572 /// settings will be only used for the first sync call. See the argument
2573 /// docs for [`Client::sync_once`] for more info.
2574 ///
2575 /// * `callback` - A callback that will be called every time after a
2576 /// response has been received, failure or not. The callback returns a
2577 /// `Result<LoopCtrl, Error>`, too. When returning
2578 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2579 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2580 /// function returns `Ok(())`. In case the callback can't handle the
2581 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2582 /// which results in the sync ending and the `Err(Error)` being returned.
2583 ///
2584 /// # Return
2585 /// The sync runs until an error occurs that the callback can't handle or
2586 /// the callback indicates that the Loop should stop. If the callback
2587 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2588 /// `Err(Error)` is returned.
2589 ///
2590 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2591 /// this, and are handled first without sending the result to the
2592 /// callback. Only after they have exceeded is the `Result` handed to
2593 /// the callback.
2594 ///
2595 /// # Examples
2596 ///
2597 /// The following example demonstrates how to sync forever while sending all
2598 /// the interesting events through a mpsc channel to another thread e.g. a
2599 /// UI thread.
2600 ///
2601 /// ```no_run
2602 /// # use std::time::Duration;
2603 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2604 /// # use url::Url;
2605 /// # async {
2606 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2607 /// # let mut client = Client::new(homeserver).await.unwrap();
2608 /// #
2609 /// use tokio::sync::mpsc::channel;
2610 ///
2611 /// let (tx, rx) = channel(100);
2612 ///
2613 /// let sync_channel = &tx;
2614 /// let sync_settings = SyncSettings::new()
2615 /// .timeout(Duration::from_secs(30));
2616 ///
2617 /// client
2618 /// .sync_with_result_callback(sync_settings, |response| async move {
2619 /// let channel = sync_channel;
2620 /// let sync_response = response?;
2621 /// for (room_id, room) in sync_response.rooms.joined {
2622 /// for event in room.timeline.events {
2623 /// channel.send(event).await.unwrap();
2624 /// }
2625 /// }
2626 ///
2627 /// Ok(LoopCtrl::Continue)
2628 /// })
2629 /// .await;
2630 /// };
2631 /// ```
2632 #[instrument(skip(self, callback))]
2633 pub async fn sync_with_result_callback<C>(
2634 &self,
2635 sync_settings: crate::config::SyncSettings,
2636 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2637 ) -> Result<(), Error>
2638 where
2639 C: Future<Output = Result<LoopCtrl, Error>>,
2640 {
2641 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2642
2643 while let Some(result) = sync_stream.next().await {
2644 trace!("Running callback");
2645 if callback(result).await? == LoopCtrl::Break {
2646 trace!("Callback told us to stop");
2647 break;
2648 }
2649 trace!("Done running callback");
2650 }
2651
2652 Ok(())
2653 }
2654
2655 //// Repeatedly synchronize the client state with the server.
2656 ///
2657 /// This method will internally call [`Client::sync_once`] in a loop and is
2658 /// equivalent to the [`Client::sync`] method but the responses are provided
2659 /// as an async stream.
2660 ///
2661 /// # Arguments
2662 ///
2663 /// * `sync_settings` - Settings for the sync call. *Note* that those
2664 /// settings will be only used for the first sync call. See the argument
2665 /// docs for [`Client::sync_once`] for more info.
2666 ///
2667 /// # Examples
2668 ///
2669 /// ```no_run
2670 /// # use url::Url;
2671 /// # async {
2672 /// # let homeserver = Url::parse("http://localhost:8080")?;
2673 /// # let username = "";
2674 /// # let password = "";
2675 /// use futures_util::StreamExt;
2676 /// use matrix_sdk::{Client, config::SyncSettings};
2677 ///
2678 /// let client = Client::new(homeserver).await?;
2679 /// client.matrix_auth().login_username(&username, &password).send().await?;
2680 ///
2681 /// let mut sync_stream =
2682 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2683 ///
2684 /// while let Some(Ok(response)) = sync_stream.next().await {
2685 /// for room in response.rooms.joined.values() {
2686 /// for e in &room.timeline.events {
2687 /// if let Ok(event) = e.raw().deserialize() {
2688 /// println!("Received event {:?}", event);
2689 /// }
2690 /// }
2691 /// }
2692 /// }
2693 ///
2694 /// # anyhow::Ok(()) };
2695 /// ```
2696 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2697 #[instrument(skip(self))]
2698 pub async fn sync_stream(
2699 &self,
2700 mut sync_settings: crate::config::SyncSettings,
2701 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2702 let mut is_first_sync = true;
2703 let mut timeout = None;
2704 let mut last_sync_time: Option<Instant> = None;
2705
2706 let parent_span = Span::current();
2707
2708 async_stream::stream!({
2709 loop {
2710 trace!("Syncing");
2711
2712 if sync_settings.ignore_timeout_on_first_sync {
2713 if is_first_sync {
2714 timeout = sync_settings.timeout.take();
2715 } else if sync_settings.timeout.is_none() && timeout.is_some() {
2716 sync_settings.timeout = timeout.take();
2717 }
2718
2719 is_first_sync = false;
2720 }
2721
2722 yield self
2723 .sync_loop_helper(&mut sync_settings)
2724 .instrument(parent_span.clone())
2725 .await;
2726
2727 Client::delay_sync(&mut last_sync_time).await
2728 }
2729 })
2730 }
2731
2732 /// Get the current, if any, sync token of the client.
2733 /// This will be None if the client didn't sync at least once.
2734 pub(crate) async fn sync_token(&self) -> Option<String> {
2735 self.inner.base_client.sync_token().await
2736 }
2737
2738 /// Gets information about the owner of a given access token.
2739 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2740 let request = whoami::v3::Request::new();
2741 self.send(request).await
2742 }
2743
2744 /// Subscribes a new receiver to client SessionChange broadcasts.
2745 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2746 let broadcast = &self.auth_ctx().session_change_sender;
2747 broadcast.subscribe()
2748 }
2749
2750 /// Sets the save/restore session callbacks.
2751 ///
2752 /// This is another mechanism to get synchronous updates to session tokens,
2753 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2754 pub fn set_session_callbacks(
2755 &self,
2756 reload_session_callback: Box<ReloadSessionCallback>,
2757 save_session_callback: Box<SaveSessionCallback>,
2758 ) -> Result<()> {
2759 self.inner
2760 .auth_ctx
2761 .reload_session_callback
2762 .set(reload_session_callback)
2763 .map_err(|_| Error::MultipleSessionCallbacks)?;
2764
2765 self.inner
2766 .auth_ctx
2767 .save_session_callback
2768 .set(save_session_callback)
2769 .map_err(|_| Error::MultipleSessionCallbacks)?;
2770
2771 Ok(())
2772 }
2773
2774 /// Get the notification settings of the current owner of the client.
2775 pub async fn notification_settings(&self) -> NotificationSettings {
2776 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2777 NotificationSettings::new(self.clone(), ruleset)
2778 }
2779
2780 /// Create a new specialized `Client` that can process notifications.
2781 ///
2782 /// See [`CrossProcessLock::new`] to learn more about
2783 /// `cross_process_store_locks_holder_name`.
2784 ///
2785 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
2786 pub async fn notification_client(
2787 &self,
2788 cross_process_store_locks_holder_name: String,
2789 ) -> Result<Client> {
2790 let client = Client {
2791 inner: ClientInner::new(
2792 self.inner.auth_ctx.clone(),
2793 self.server().cloned(),
2794 self.homeserver(),
2795 self.sliding_sync_version(),
2796 self.inner.http_client.clone(),
2797 self.inner
2798 .base_client
2799 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2800 .await?,
2801 self.inner.caches.server_info.read().await.clone(),
2802 self.inner.respect_login_well_known,
2803 self.inner.event_cache.clone(),
2804 self.inner.send_queue_data.clone(),
2805 self.inner.latest_events.clone(),
2806 #[cfg(feature = "e2e-encryption")]
2807 self.inner.e2ee.encryption_settings,
2808 #[cfg(feature = "e2e-encryption")]
2809 self.inner.enable_share_history_on_invite,
2810 cross_process_store_locks_holder_name,
2811 #[cfg(feature = "experimental-search")]
2812 self.inner.search_index.clone(),
2813 self.inner.thread_subscription_catchup.clone(),
2814 )
2815 .await,
2816 };
2817
2818 Ok(client)
2819 }
2820
2821 /// The [`EventCache`] instance for this [`Client`].
2822 pub fn event_cache(&self) -> &EventCache {
2823 // SAFETY: always initialized in the `Client` ctor.
2824 self.inner.event_cache.get().unwrap()
2825 }
2826
2827 /// The [`LatestEvents`] instance for this [`Client`].
2828 pub async fn latest_events(&self) -> &LatestEvents {
2829 self.inner
2830 .latest_events
2831 .get_or_init(|| async {
2832 LatestEvents::new(
2833 WeakClient::from_client(self),
2834 self.event_cache().clone(),
2835 SendQueue::new(self.clone()),
2836 )
2837 })
2838 .await
2839 }
2840
2841 /// Waits until an at least partially synced room is received, and returns
2842 /// it.
2843 ///
2844 /// **Note: this function will loop endlessly until either it finds the room
2845 /// or an externally set timeout happens.**
2846 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2847 loop {
2848 if let Some(room) = self.get_room(room_id) {
2849 if room.is_state_partially_or_fully_synced() {
2850 debug!("Found just created room!");
2851 return room;
2852 }
2853 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2854 } else {
2855 debug!("Room wasn't found, waiting for sync beat to try again");
2856 }
2857 self.inner.sync_beat.listen().await;
2858 }
2859 }
2860
2861 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2862 /// join it.
2863 pub async fn knock(
2864 &self,
2865 room_id_or_alias: OwnedRoomOrAliasId,
2866 reason: Option<String>,
2867 server_names: Vec<OwnedServerName>,
2868 ) -> Result<Room> {
2869 let request =
2870 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2871 let response = self.send(request).await?;
2872 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2873 Ok(Room::new(self.clone(), base_room))
2874 }
2875
2876 /// Checks whether the provided `user_id` belongs to an ignored user.
2877 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2878 self.base_client().is_user_ignored(user_id).await
2879 }
2880
2881 /// Gets the `max_upload_size` value from the homeserver, getting either a
2882 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2883 /// missing.
2884 ///
2885 /// Check the spec for more info:
2886 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2887 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2888 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2889 if let Some(data) = max_upload_size_lock.get() {
2890 return Ok(data.to_owned());
2891 }
2892
2893 // Use the authenticated endpoint when the server supports it.
2894 let supported_versions = self.supported_versions().await?;
2895 let use_auth =
2896 authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2897
2898 let upload_size = if use_auth {
2899 self.send(authenticated_media::get_media_config::v1::Request::default())
2900 .await?
2901 .upload_size
2902 } else {
2903 #[allow(deprecated)]
2904 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2905 };
2906
2907 match max_upload_size_lock.set(upload_size) {
2908 Ok(_) => Ok(upload_size),
2909 Err(error) => {
2910 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2911 }
2912 }
2913 }
2914
2915 /// The settings to use for decrypting events.
2916 #[cfg(feature = "e2e-encryption")]
2917 pub fn decryption_settings(&self) -> &DecryptionSettings {
2918 &self.base_client().decryption_settings
2919 }
2920
2921 /// Returns the [`SearchIndex`] for this [`Client`].
2922 #[cfg(feature = "experimental-search")]
2923 pub fn search_index(&self) -> &SearchIndex {
2924 &self.inner.search_index
2925 }
2926
2927 /// Whether the client is configured to take thread subscriptions (MSC4306
2928 /// and MSC4308) into account.
2929 ///
2930 /// This may cause filtering out of thread subscriptions, and loading the
2931 /// thread subscriptions via the sliding sync extension, when the room
2932 /// list service is being used.
2933 pub fn enabled_thread_subscriptions(&self) -> bool {
2934 match self.base_client().threading_support {
2935 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2936 ThreadingSupport::Disabled => false,
2937 }
2938 }
2939
2940 /// Fetch thread subscriptions changes between `from` and up to `to`.
2941 ///
2942 /// The `limit` optional parameter can be used to limit the number of
2943 /// entries in a response. It can also be overridden by the server, if
2944 /// it's deemed too large.
2945 pub async fn fetch_thread_subscriptions(
2946 &self,
2947 from: Option<String>,
2948 to: Option<String>,
2949 limit: Option<UInt>,
2950 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2951 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2952 from,
2953 to,
2954 limit,
2955 });
2956 Ok(self.send(request).await?)
2957 }
2958
2959 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
2960 self.inner.thread_subscription_catchup.get().unwrap()
2961 }
2962}
2963
2964#[cfg(any(feature = "testing", test))]
2965impl Client {
2966 /// Test helper to mark users as tracked by the crypto layer.
2967 #[cfg(feature = "e2e-encryption")]
2968 pub async fn update_tracked_users_for_testing(
2969 &self,
2970 user_ids: impl IntoIterator<Item = &UserId>,
2971 ) {
2972 let olm = self.olm_machine().await;
2973 let olm = olm.as_ref().unwrap();
2974 olm.update_tracked_users(user_ids).await.unwrap();
2975 }
2976}
2977
2978/// A weak reference to the inner client, useful when trying to get a handle
2979/// on the owning client.
2980#[derive(Clone, Debug)]
2981pub(crate) struct WeakClient {
2982 client: Weak<ClientInner>,
2983}
2984
2985impl WeakClient {
2986 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2987 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2988 Self { client: Arc::downgrade(client) }
2989 }
2990
2991 /// Construct a [`WeakClient`] from a [`Client`].
2992 pub fn from_client(client: &Client) -> Self {
2993 Self::from_inner(&client.inner)
2994 }
2995
2996 /// Attempts to get a [`Client`] from this [`WeakClient`].
2997 pub fn get(&self) -> Option<Client> {
2998 self.client.upgrade().map(|inner| Client { inner })
2999 }
3000
3001 /// Gets the number of strong (`Arc`) pointers still pointing to this
3002 /// client.
3003 #[allow(dead_code)]
3004 pub fn strong_count(&self) -> usize {
3005 self.client.strong_count()
3006 }
3007}
3008
3009#[derive(Clone)]
3010struct ClientServerInfo {
3011 /// The Matrix versions and unstable features the server supports (known
3012 /// ones only).
3013 supported_versions: CachedValue<SupportedVersions>,
3014
3015 /// The server's well-known file, if any.
3016 well_known: CachedValue<Option<WellKnownResponse>>,
3017}
3018
3019/// A cached value that can either be set or not set, used to avoid confusion
3020/// between a value that is set to `None` (because it doesn't exist) and a value
3021/// that has not been cached yet.
3022#[derive(Clone)]
3023enum CachedValue<Value> {
3024 /// A value has been cached.
3025 Cached(Value),
3026 /// Nothing has been cached yet.
3027 NotSet,
3028}
3029
3030impl<Value> CachedValue<Value> {
3031 /// Unwraps the cached value, returning it if it exists.
3032 ///
3033 /// # Panics
3034 ///
3035 /// If the cached value is not set, this will panic.
3036 fn unwrap_cached_value(self) -> Value {
3037 match self {
3038 CachedValue::Cached(value) => value,
3039 CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3040 }
3041 }
3042
3043 /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3044 fn as_ref(&self) -> CachedValue<&Value> {
3045 match self {
3046 Self::Cached(value) => CachedValue::Cached(value),
3047 Self::NotSet => CachedValue::NotSet,
3048 }
3049 }
3050
3051 /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3052 /// function to a contained value (if `Cached`) or returns `NotSet` (if
3053 /// `NotSet`).
3054 fn map<Other, F>(self, f: F) -> CachedValue<Other>
3055 where
3056 F: FnOnce(Value) -> Other,
3057 {
3058 match self {
3059 Self::Cached(value) => CachedValue::Cached(f(value)),
3060 Self::NotSet => CachedValue::NotSet,
3061 }
3062 }
3063}
3064
3065/// Information about the state of a room before we joined it.
3066#[derive(Debug, Clone, Default)]
3067struct PreJoinRoomInfo {
3068 /// The user who invited us to the room, if any.
3069 pub inviter: Option<RoomMember>,
3070}
3071
3072// The http mocking library is not supported for wasm32
3073#[cfg(all(test, not(target_family = "wasm")))]
3074pub(crate) mod tests {
3075 use std::{sync::Arc, time::Duration};
3076
3077 use assert_matches::assert_matches;
3078 use assert_matches2::assert_let;
3079 use eyeball::SharedObservable;
3080 use futures_util::{FutureExt, StreamExt, pin_mut};
3081 use js_int::{UInt, uint};
3082 use matrix_sdk_base::{
3083 RoomState,
3084 store::{MemoryStore, StoreConfig},
3085 };
3086 use matrix_sdk_test::{
3087 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3088 event_factory::EventFactory,
3089 };
3090 #[cfg(target_family = "wasm")]
3091 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3092
3093 use ruma::{
3094 RoomId, ServerName, UserId,
3095 api::{
3096 FeatureFlag, MatrixVersion,
3097 client::{
3098 discovery::discover_homeserver::RtcFocusInfo,
3099 room::create_room::v3::Request as CreateRoomRequest,
3100 },
3101 },
3102 assign,
3103 events::{
3104 ignored_user_list::IgnoredUserListEventContent,
3105 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3106 },
3107 owned_room_id, owned_user_id, room_alias_id, room_id,
3108 };
3109 use serde_json::json;
3110 use stream_assert::{assert_next_matches, assert_pending};
3111 use tokio::{
3112 spawn,
3113 time::{sleep, timeout},
3114 };
3115 use url::Url;
3116
3117 use super::Client;
3118 use crate::{
3119 Error, TransmissionProgress,
3120 client::{WeakClient, futures::SendMediaUploadRequest},
3121 config::{RequestConfig, SyncSettings},
3122 futures::SendRequest,
3123 media::MediaError,
3124 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3125 };
3126
3127 #[async_test]
3128 async fn test_account_data() {
3129 let server = MatrixMockServer::new().await;
3130 let client = server.client_builder().build().await;
3131
3132 let f = EventFactory::new();
3133 server
3134 .mock_sync()
3135 .ok_and_run(&client, |builder| {
3136 builder.add_global_account_data(
3137 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3138 );
3139 })
3140 .await;
3141
3142 let content = client
3143 .account()
3144 .account_data::<IgnoredUserListEventContent>()
3145 .await
3146 .unwrap()
3147 .unwrap()
3148 .deserialize()
3149 .unwrap();
3150
3151 assert_eq!(content.ignored_users.len(), 1);
3152 }
3153
3154 #[async_test]
3155 async fn test_successful_discovery() {
3156 // Imagine this is `matrix.org`.
3157 let server = MatrixMockServer::new().await;
3158 let server_url = server.uri();
3159
3160 // Imagine this is `matrix-client.matrix.org`.
3161 let homeserver = MatrixMockServer::new().await;
3162 let homeserver_url = homeserver.uri();
3163
3164 // Imagine Alice has the user ID `@alice:matrix.org`.
3165 let domain = server_url.strip_prefix("http://").unwrap();
3166 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3167
3168 // The `.well-known` is on the server (e.g. `matrix.org`).
3169 server
3170 .mock_well_known()
3171 .ok_with_homeserver_url(&homeserver_url)
3172 .mock_once()
3173 .named("well-known")
3174 .mount()
3175 .await;
3176
3177 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3178 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3179
3180 let client = Client::builder()
3181 .insecure_server_name_no_tls(alice.server_name())
3182 .build()
3183 .await
3184 .unwrap();
3185
3186 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3187 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3188 client.server_versions().await.unwrap();
3189 }
3190
3191 #[async_test]
3192 async fn test_discovery_broken_server() {
3193 let server = MatrixMockServer::new().await;
3194 let server_url = server.uri();
3195 let domain = server_url.strip_prefix("http://").unwrap();
3196 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3197
3198 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3199
3200 assert!(
3201 Client::builder()
3202 .insecure_server_name_no_tls(alice.server_name())
3203 .build()
3204 .await
3205 .is_err(),
3206 "Creating a client from a user ID should fail when the .well-known request fails."
3207 );
3208 }
3209
3210 #[async_test]
3211 async fn test_room_creation() {
3212 let server = MatrixMockServer::new().await;
3213 let client = server.client_builder().build().await;
3214
3215 server
3216 .mock_sync()
3217 .ok_and_run(&client, |builder| {
3218 builder.add_joined_room(
3219 JoinedRoomBuilder::default()
3220 .add_state_event(StateTestEvent::Member)
3221 .add_state_event(StateTestEvent::PowerLevels),
3222 );
3223 })
3224 .await;
3225
3226 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3227 assert_eq!(room.state(), RoomState::Joined);
3228 }
3229
3230 #[async_test]
3231 async fn test_retry_limit_http_requests() {
3232 let server = MatrixMockServer::new().await;
3233 let client = server
3234 .client_builder()
3235 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3236 .build()
3237 .await;
3238
3239 assert!(client.request_config().retry_limit.unwrap() == 3);
3240
3241 server.mock_login().error500().expect(3).mount().await;
3242
3243 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3244 }
3245
3246 #[async_test]
3247 async fn test_retry_timeout_http_requests() {
3248 // Keep this timeout small so that the test doesn't take long
3249 let retry_timeout = Duration::from_secs(5);
3250 let server = MatrixMockServer::new().await;
3251 let client = server
3252 .client_builder()
3253 .on_builder(|builder| {
3254 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3255 })
3256 .build()
3257 .await;
3258
3259 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3260
3261 server.mock_login().error500().expect(2..).mount().await;
3262
3263 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3264 }
3265
3266 #[async_test]
3267 async fn test_short_retry_initial_http_requests() {
3268 let server = MatrixMockServer::new().await;
3269 let client = server
3270 .client_builder()
3271 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3272 .build()
3273 .await;
3274
3275 server.mock_login().error500().expect(3..).mount().await;
3276
3277 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3278 }
3279
3280 #[async_test]
3281 async fn test_no_retry_http_requests() {
3282 let server = MatrixMockServer::new().await;
3283 let client = server.client_builder().build().await;
3284
3285 server.mock_devices().error500().mock_once().mount().await;
3286
3287 client.devices().await.unwrap_err();
3288 }
3289
3290 #[async_test]
3291 async fn test_set_homeserver() {
3292 let client = MockClientBuilder::new(None).build().await;
3293 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3294
3295 let homeserver = Url::parse("http://example.com/").unwrap();
3296 client.set_homeserver(homeserver.clone());
3297 assert_eq!(client.homeserver(), homeserver);
3298 }
3299
3300 #[async_test]
3301 async fn test_search_user_request() {
3302 let server = MatrixMockServer::new().await;
3303 let client = server.client_builder().build().await;
3304
3305 server.mock_user_directory().ok().mock_once().mount().await;
3306
3307 let response = client.search_users("test", 50).await.unwrap();
3308 assert_eq!(response.results.len(), 1);
3309 let result = &response.results[0];
3310 assert_eq!(result.user_id.to_string(), "@test:example.me");
3311 assert_eq!(result.display_name.clone().unwrap(), "Test");
3312 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3313 assert!(!response.limited);
3314 }
3315
3316 #[async_test]
3317 async fn test_request_unstable_features() {
3318 let server = MatrixMockServer::new().await;
3319 let client = server.client_builder().no_server_versions().build().await;
3320
3321 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3322
3323 let unstable_features = client.unstable_features().await.unwrap();
3324 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3325 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3326 }
3327
3328 #[async_test]
3329 async fn test_can_homeserver_push_encrypted_event_to_device() {
3330 let server = MatrixMockServer::new().await;
3331 let client = server.client_builder().no_server_versions().build().await;
3332
3333 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3334
3335 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3336 assert!(msc4028_enabled);
3337 }
3338
3339 #[async_test]
3340 async fn test_recently_visited_rooms() {
3341 // Tracking recently visited rooms requires authentication
3342 let client = MockClientBuilder::new(None).unlogged().build().await;
3343 assert_matches!(
3344 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3345 Err(Error::AuthenticationRequired)
3346 );
3347
3348 let client = MockClientBuilder::new(None).build().await;
3349 let account = client.account();
3350
3351 // We should start off with an empty list
3352 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3353
3354 // Tracking a valid room id should add it to the list
3355 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3356 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3357 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3358
3359 // And the existing list shouldn't be changed
3360 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3361 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3362
3363 // Tracking the same room again shouldn't change the list
3364 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3365 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3366 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3367
3368 // Tracking a second room should add it to the front of the list
3369 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3370 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3371 assert_eq!(
3372 account.get_recently_visited_rooms().await.unwrap(),
3373 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3374 );
3375
3376 // Tracking the first room yet again should move it to the front of the list
3377 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3378 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3379 assert_eq!(
3380 account.get_recently_visited_rooms().await.unwrap(),
3381 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3382 );
3383
3384 // Tracking should be capped at 20
3385 for n in 0..20 {
3386 account
3387 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3388 .await
3389 .unwrap();
3390 }
3391
3392 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3393
3394 // And the initial rooms should've been pushed out
3395 let rooms = account.get_recently_visited_rooms().await.unwrap();
3396 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3397 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3398
3399 // And the last tracked room should be the first
3400 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3401 }
3402
3403 #[async_test]
3404 async fn test_client_no_cycle_with_event_cache() {
3405 let client = MockClientBuilder::new(None).build().await;
3406
3407 // Wait for the init tasks to die.
3408 sleep(Duration::from_secs(1)).await;
3409
3410 let weak_client = WeakClient::from_client(&client);
3411 assert_eq!(weak_client.strong_count(), 1);
3412
3413 {
3414 let room_id = room_id!("!room:example.org");
3415
3416 // Have the client know the room.
3417 let response = SyncResponseBuilder::default()
3418 .add_joined_room(JoinedRoomBuilder::new(room_id))
3419 .build_sync_response();
3420 client.inner.base_client.receive_sync_response(response).await.unwrap();
3421
3422 client.event_cache().subscribe().unwrap();
3423
3424 let (_room_event_cache, _drop_handles) =
3425 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3426 }
3427
3428 drop(client);
3429
3430 // Give a bit of time for background tasks to die.
3431 sleep(Duration::from_secs(1)).await;
3432
3433 // The weak client must be the last reference to the client now.
3434 assert_eq!(weak_client.strong_count(), 0);
3435 let client = weak_client.get();
3436 assert!(
3437 client.is_none(),
3438 "too many strong references to the client: {}",
3439 Arc::strong_count(&client.unwrap().inner)
3440 );
3441 }
3442
3443 #[async_test]
3444 async fn test_server_info_caching() {
3445 let server = MatrixMockServer::new().await;
3446 let server_url = server.uri();
3447 let domain = server_url.strip_prefix("http://").unwrap();
3448 let server_name = <&ServerName>::try_from(domain).unwrap();
3449 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3450
3451 let well_known_mock = server
3452 .mock_well_known()
3453 .ok()
3454 .named("well known mock")
3455 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3456 .mount_as_scoped()
3457 .await;
3458
3459 let versions_mock = server
3460 .mock_versions()
3461 .ok_with_unstable_features()
3462 .named("first versions mock")
3463 .expect(1)
3464 .mount_as_scoped()
3465 .await;
3466
3467 let memory_store = Arc::new(MemoryStore::new());
3468 let client = Client::builder()
3469 .insecure_server_name_no_tls(server_name)
3470 .store_config(
3471 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3472 .state_store(memory_store.clone()),
3473 )
3474 .build()
3475 .await
3476 .unwrap();
3477
3478 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3479
3480 // These subsequent calls hit the in-memory cache.
3481 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3482 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3483
3484 drop(client);
3485
3486 let client = server
3487 .client_builder()
3488 .no_server_versions()
3489 .on_builder(|builder| {
3490 builder.store_config(
3491 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3492 .state_store(memory_store.clone()),
3493 )
3494 })
3495 .build()
3496 .await;
3497
3498 // These calls to the new client hit the on-disk cache.
3499 assert!(
3500 client
3501 .unstable_features()
3502 .await
3503 .unwrap()
3504 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3505 );
3506 let supported = client.supported_versions().await.unwrap();
3507 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3508 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3509
3510 // Then this call hits the in-memory cache.
3511 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3512
3513 drop(versions_mock);
3514 drop(well_known_mock);
3515
3516 // Now, reset the cache, and observe the endpoints being called again once.
3517 client.reset_server_info().await.unwrap();
3518
3519 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3520
3521 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3522
3523 // Hits network again.
3524 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3525 // Hits in-memory cache again.
3526 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3527 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3528 }
3529
3530 #[async_test]
3531 async fn test_server_info_without_a_well_known() {
3532 let server = MatrixMockServer::new().await;
3533 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3534
3535 let versions_mock = server
3536 .mock_versions()
3537 .ok_with_unstable_features()
3538 .named("first versions mock")
3539 .expect(1)
3540 .mount_as_scoped()
3541 .await;
3542
3543 let memory_store = Arc::new(MemoryStore::new());
3544 let client = server
3545 .client_builder()
3546 .no_server_versions()
3547 .on_builder(|builder| {
3548 builder.store_config(
3549 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3550 .state_store(memory_store.clone()),
3551 )
3552 })
3553 .build()
3554 .await;
3555
3556 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3557
3558 // These subsequent calls hit the in-memory cache.
3559 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3560 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3561
3562 drop(client);
3563
3564 let client = server
3565 .client_builder()
3566 .no_server_versions()
3567 .on_builder(|builder| {
3568 builder.store_config(
3569 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3570 .state_store(memory_store.clone()),
3571 )
3572 })
3573 .build()
3574 .await;
3575
3576 // This call to the new client hits the on-disk cache.
3577 assert!(
3578 client
3579 .unstable_features()
3580 .await
3581 .unwrap()
3582 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3583 );
3584
3585 // Then this call hits the in-memory cache.
3586 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3587
3588 drop(versions_mock);
3589
3590 // Now, reset the cache, and observe the endpoints being called again once.
3591 client.reset_server_info().await.unwrap();
3592
3593 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3594
3595 // Hits network again.
3596 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3597 // Hits in-memory cache again.
3598 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3599 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3600 }
3601
3602 #[async_test]
3603 async fn test_no_network_doesnt_cause_infinite_retries() {
3604 // We want infinite retries for transient errors.
3605 let client = MockClientBuilder::new(None)
3606 .on_builder(|builder| builder.request_config(RequestConfig::new()))
3607 .build()
3608 .await;
3609
3610 // We don't define a mock server on purpose here, so that the error is really a
3611 // network error.
3612 client.whoami().await.unwrap_err();
3613 }
3614
3615 #[async_test]
3616 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3617 let server = MatrixMockServer::new().await;
3618 let client = server.client_builder().build().await;
3619
3620 let room_id = room_id!("!room:example.org");
3621
3622 server
3623 .mock_sync()
3624 .ok_and_run(&client, |builder| {
3625 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3626 })
3627 .await;
3628
3629 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3630 assert_eq!(room.room_id(), room_id);
3631 }
3632
3633 #[async_test]
3634 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3635 let server = MatrixMockServer::new().await;
3636 let client = server.client_builder().build().await;
3637
3638 let room_id = room_id!("!room:example.org");
3639
3640 let client = Arc::new(client);
3641
3642 // Perform the /sync request with a delay so it starts after the
3643 // `await_room_remote_echo` call has happened
3644 spawn({
3645 let client = client.clone();
3646 async move {
3647 sleep(Duration::from_millis(100)).await;
3648
3649 server
3650 .mock_sync()
3651 .ok_and_run(&client, |builder| {
3652 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3653 })
3654 .await;
3655 }
3656 });
3657
3658 let room =
3659 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3660 assert_eq!(room.room_id(), room_id);
3661 }
3662
3663 #[async_test]
3664 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3665 let client = MockClientBuilder::new(None).build().await;
3666
3667 let room_id = room_id!("!room:example.org");
3668 // Room is not present so the client won't be able to find it. The call will
3669 // timeout.
3670 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3671 }
3672
3673 #[async_test]
3674 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3675 let server = MatrixMockServer::new().await;
3676 let client = server.client_builder().build().await;
3677
3678 server.mock_create_room().ok().mount().await;
3679
3680 // Create a room in the internal store
3681 let room = client
3682 .create_room(assign!(CreateRoomRequest::new(), {
3683 invite: vec![],
3684 is_direct: false,
3685 }))
3686 .await
3687 .unwrap();
3688
3689 // Room is locally present, but not synced, the call will timeout
3690 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3691 .await
3692 .unwrap_err();
3693 }
3694
3695 #[async_test]
3696 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3697 let server = MatrixMockServer::new().await;
3698 let client = server.client_builder().build().await;
3699
3700 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3701
3702 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3703 assert_matches!(ret, Ok(true));
3704 }
3705
3706 #[async_test]
3707 async fn test_is_room_alias_available_if_alias_is_resolved() {
3708 let server = MatrixMockServer::new().await;
3709 let client = server.client_builder().build().await;
3710
3711 server
3712 .mock_room_directory_resolve_alias()
3713 .ok("!some_room_id:matrix.org", Vec::new())
3714 .expect(1)
3715 .mount()
3716 .await;
3717
3718 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3719 assert_matches!(ret, Ok(false));
3720 }
3721
3722 #[async_test]
3723 async fn test_is_room_alias_available_if_error_found() {
3724 let server = MatrixMockServer::new().await;
3725 let client = server.client_builder().build().await;
3726
3727 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3728
3729 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3730 assert_matches!(ret, Err(_));
3731 }
3732
3733 #[async_test]
3734 async fn test_create_room_alias() {
3735 let server = MatrixMockServer::new().await;
3736 let client = server.client_builder().build().await;
3737
3738 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3739
3740 let ret = client
3741 .create_room_alias(
3742 room_alias_id!("#some_alias:matrix.org"),
3743 room_id!("!some_room:matrix.org"),
3744 )
3745 .await;
3746 assert_matches!(ret, Ok(()));
3747 }
3748
3749 #[async_test]
3750 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3751 let server = MatrixMockServer::new().await;
3752 let client = server.client_builder().build().await;
3753
3754 let room_id = room_id!("!a-room:matrix.org");
3755
3756 // Make sure the summary endpoint is called once
3757 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3758
3759 // We create a locally cached invited room
3760 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3761
3762 // And we get a preview, the server endpoint was reached
3763 let preview = client
3764 .get_room_preview(room_id.into(), Vec::new())
3765 .await
3766 .expect("Room preview should be retrieved");
3767
3768 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3769 }
3770
3771 #[async_test]
3772 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3773 let server = MatrixMockServer::new().await;
3774 let client = server.client_builder().build().await;
3775
3776 let room_id = room_id!("!a-room:matrix.org");
3777
3778 // Make sure the summary endpoint is called once
3779 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3780
3781 // We create a locally cached left room
3782 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3783
3784 // And we get a preview, the server endpoint was reached
3785 let preview = client
3786 .get_room_preview(room_id.into(), Vec::new())
3787 .await
3788 .expect("Room preview should be retrieved");
3789
3790 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3791 }
3792
3793 #[async_test]
3794 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3795 let server = MatrixMockServer::new().await;
3796 let client = server.client_builder().build().await;
3797
3798 let room_id = room_id!("!a-room:matrix.org");
3799
3800 // Make sure the summary endpoint is called once
3801 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3802
3803 // We create a locally cached knocked room
3804 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3805
3806 // And we get a preview, the server endpoint was reached
3807 let preview = client
3808 .get_room_preview(room_id.into(), Vec::new())
3809 .await
3810 .expect("Room preview should be retrieved");
3811
3812 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3813 }
3814
3815 #[async_test]
3816 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3817 let server = MatrixMockServer::new().await;
3818 let client = server.client_builder().build().await;
3819
3820 let room_id = room_id!("!a-room:matrix.org");
3821
3822 // Make sure the summary endpoint is not called
3823 server.mock_room_summary().ok(room_id).never().mount().await;
3824
3825 // We create a locally cached joined room
3826 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3827
3828 // And we get a preview, no server endpoint was reached
3829 let preview = client
3830 .get_room_preview(room_id.into(), Vec::new())
3831 .await
3832 .expect("Room preview should be retrieved");
3833
3834 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3835 }
3836
3837 #[async_test]
3838 async fn test_media_preview_config() {
3839 let server = MatrixMockServer::new().await;
3840 let client = server.client_builder().build().await;
3841
3842 server
3843 .mock_sync()
3844 .ok_and_run(&client, |builder| {
3845 builder.add_custom_global_account_data(json!({
3846 "content": {
3847 "media_previews": "private",
3848 "invite_avatars": "off"
3849 },
3850 "type": "m.media_preview_config"
3851 }));
3852 })
3853 .await;
3854
3855 let (initial_value, stream) =
3856 client.account().observe_media_preview_config().await.unwrap();
3857
3858 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3859 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3860 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3861 pin_mut!(stream);
3862 assert_pending!(stream);
3863
3864 server
3865 .mock_sync()
3866 .ok_and_run(&client, |builder| {
3867 builder.add_custom_global_account_data(json!({
3868 "content": {
3869 "media_previews": "off",
3870 "invite_avatars": "on"
3871 },
3872 "type": "m.media_preview_config"
3873 }));
3874 })
3875 .await;
3876
3877 assert_next_matches!(
3878 stream,
3879 MediaPreviewConfigEventContent {
3880 media_previews: Some(MediaPreviews::Off),
3881 invite_avatars: Some(InviteAvatars::On),
3882 ..
3883 }
3884 );
3885 assert_pending!(stream);
3886 }
3887
3888 #[async_test]
3889 async fn test_unstable_media_preview_config() {
3890 let server = MatrixMockServer::new().await;
3891 let client = server.client_builder().build().await;
3892
3893 server
3894 .mock_sync()
3895 .ok_and_run(&client, |builder| {
3896 builder.add_custom_global_account_data(json!({
3897 "content": {
3898 "media_previews": "private",
3899 "invite_avatars": "off"
3900 },
3901 "type": "io.element.msc4278.media_preview_config"
3902 }));
3903 })
3904 .await;
3905
3906 let (initial_value, stream) =
3907 client.account().observe_media_preview_config().await.unwrap();
3908
3909 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3910 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3911 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3912 pin_mut!(stream);
3913 assert_pending!(stream);
3914
3915 server
3916 .mock_sync()
3917 .ok_and_run(&client, |builder| {
3918 builder.add_custom_global_account_data(json!({
3919 "content": {
3920 "media_previews": "off",
3921 "invite_avatars": "on"
3922 },
3923 "type": "io.element.msc4278.media_preview_config"
3924 }));
3925 })
3926 .await;
3927
3928 assert_next_matches!(
3929 stream,
3930 MediaPreviewConfigEventContent {
3931 media_previews: Some(MediaPreviews::Off),
3932 invite_avatars: Some(InviteAvatars::On),
3933 ..
3934 }
3935 );
3936 assert_pending!(stream);
3937 }
3938
3939 #[async_test]
3940 async fn test_media_preview_config_not_found() {
3941 let server = MatrixMockServer::new().await;
3942 let client = server.client_builder().build().await;
3943
3944 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3945
3946 assert!(initial_value.is_none());
3947 }
3948
3949 #[async_test]
3950 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3951 // The default Matrix version we use is 1.11 or higher, so authenticated media
3952 // is supported.
3953 let server = MatrixMockServer::new().await;
3954 let client = server.client_builder().build().await;
3955
3956 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3957
3958 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3959 client.load_or_fetch_max_upload_size().await.unwrap();
3960
3961 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3962 }
3963
3964 #[async_test]
3965 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3966 // The server must advertise support for the stable feature for authenticated
3967 // media support, so we mock the `GET /versions` response.
3968 let server = MatrixMockServer::new().await;
3969 let client = server.client_builder().no_server_versions().build().await;
3970
3971 server
3972 .mock_versions()
3973 .ok_custom(
3974 &["v1.7", "v1.8", "v1.9", "v1.10"],
3975 &[("org.matrix.msc3916.stable", true)].into(),
3976 )
3977 .named("versions")
3978 .expect(1)
3979 .mount()
3980 .await;
3981
3982 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3983
3984 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3985 client.load_or_fetch_max_upload_size().await.unwrap();
3986
3987 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3988 }
3989
3990 #[async_test]
3991 async fn test_load_or_fetch_max_upload_size_no_auth() {
3992 // The server must not support Matrix 1.11 or higher for unauthenticated
3993 // media requests, so we mock the `GET /versions` response.
3994 let server = MatrixMockServer::new().await;
3995 let client = server.client_builder().no_server_versions().build().await;
3996
3997 server
3998 .mock_versions()
3999 .ok_custom(&["v1.1"], &Default::default())
4000 .named("versions")
4001 .expect(1)
4002 .mount()
4003 .await;
4004
4005 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4006
4007 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4008 client.load_or_fetch_max_upload_size().await.unwrap();
4009
4010 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4011 }
4012
4013 #[async_test]
4014 async fn test_uploading_a_too_large_media_file() {
4015 let server = MatrixMockServer::new().await;
4016 let client = server.client_builder().build().await;
4017
4018 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4019 client.load_or_fetch_max_upload_size().await.unwrap();
4020 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4021
4022 let data = vec![1, 2];
4023 let upload_request =
4024 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4025 let request = SendRequest {
4026 client: client.clone(),
4027 request: upload_request,
4028 config: None,
4029 send_progress: SharedObservable::new(TransmissionProgress::default()),
4030 };
4031 let media_request = SendMediaUploadRequest::new(request);
4032
4033 let error = media_request.await.err();
4034 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4035 assert_eq!(max, uint!(1));
4036 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4037 }
4038
4039 #[async_test]
4040 async fn test_dont_ignore_timeout_on_first_sync() {
4041 let server = MatrixMockServer::new().await;
4042 let client = server.client_builder().build().await;
4043
4044 server
4045 .mock_sync()
4046 .timeout(Some(Duration::from_secs(30)))
4047 .ok(|_| {})
4048 .mock_once()
4049 .named("sync_with_timeout")
4050 .mount()
4051 .await;
4052
4053 // Call the endpoint once to check the timeout.
4054 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4055
4056 timeout(Duration::from_secs(1), async {
4057 stream.next().await.unwrap().unwrap();
4058 })
4059 .await
4060 .unwrap();
4061 }
4062
4063 #[async_test]
4064 async fn test_ignore_timeout_on_first_sync() {
4065 let server = MatrixMockServer::new().await;
4066 let client = server.client_builder().build().await;
4067
4068 server
4069 .mock_sync()
4070 .timeout(None)
4071 .ok(|_| {})
4072 .mock_once()
4073 .named("sync_no_timeout")
4074 .mount()
4075 .await;
4076 server
4077 .mock_sync()
4078 .timeout(Some(Duration::from_secs(30)))
4079 .ok(|_| {})
4080 .mock_once()
4081 .named("sync_with_timeout")
4082 .mount()
4083 .await;
4084
4085 // Call each version of the endpoint once to check the timeouts.
4086 let mut stream = Box::pin(
4087 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4088 );
4089
4090 timeout(Duration::from_secs(1), async {
4091 stream.next().await.unwrap().unwrap();
4092 stream.next().await.unwrap().unwrap();
4093 })
4094 .await
4095 .unwrap();
4096 }
4097}