matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
33use matrix_sdk_base::{
34    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
35    StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
36    event_cache::store::EventCacheStoreLock,
37    media::store::MediaStoreLock,
38    store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
39    sync::{Notification, RoomUpdates},
40};
41use matrix_sdk_common::ttl_cache::TtlCache;
42#[cfg(feature = "e2e-encryption")]
43use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
44use ruma::{
45    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
46    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
47    api::{
48        FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
49        client::{
50            account::whoami,
51            alias::{create_alias, delete_alias, get_alias},
52            authenticated_media,
53            device::{delete_devices, get_devices, update_device},
54            directory::{get_public_rooms, get_public_rooms_filtered},
55            discovery::{
56                discover_homeserver::{self, RtcFocusInfo},
57                get_capabilities::{self, v3::Capabilities},
58                get_supported_versions,
59            },
60            error::ErrorKind,
61            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
62            knock::knock_room,
63            media,
64            membership::{join_room_by_id, join_room_by_id_or_alias},
65            room::create_room,
66            session::login::v3::DiscoveryInfo,
67            sync::sync_events,
68            threads::get_thread_subscriptions_changes,
69            uiaa,
70            user_directory::search_users,
71        },
72        error::FromHttpResponseError,
73        federation::discovery::get_server_version,
74    },
75    assign,
76    push::Ruleset,
77    time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, instrument, trace, warn};
82use url::Url;
83
84use self::futures::SendRequest;
85use crate::{
86    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
87    Room, SessionTokens, TransmissionProgress,
88    authentication::{
89        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
90        oauth::OAuth,
91    },
92    client::thread_subscriptions::ThreadSubscriptionCatchup,
93    config::{RequestConfig, SyncToken},
94    deduplicating_handler::DeduplicatingHandler,
95    error::HttpResult,
96    event_cache::EventCache,
97    event_handler::{
98        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
99        EventHandlerStore, ObservableEventHandler, SyncEvent,
100    },
101    http_client::HttpClient,
102    latest_events::LatestEvents,
103    media::MediaError,
104    notification_settings::NotificationSettings,
105    room::RoomMember,
106    room_preview::RoomPreview,
107    send_queue::{SendQueue, SendQueueData},
108    sliding_sync::Version as SlidingSyncVersion,
109    sync::{RoomUpdate, SyncResponse},
110};
111#[cfg(feature = "e2e-encryption")]
112use crate::{
113    cross_process_lock::CrossProcessLock,
114    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
115};
116
117mod builder;
118pub(crate) mod caches;
119pub(crate) mod futures;
120pub(crate) mod thread_subscriptions;
121
122pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
123#[cfg(feature = "experimental-search")]
124use crate::search_index::SearchIndex;
125
126#[cfg(not(target_family = "wasm"))]
127type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
128#[cfg(target_family = "wasm")]
129type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
130
131#[cfg(not(target_family = "wasm"))]
132type NotificationHandlerFn =
133    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
134#[cfg(target_family = "wasm")]
135type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
136
137/// Enum controlling if a loop running callbacks should continue or abort.
138///
139/// This is mainly used in the [`sync_with_callback`] method, the return value
140/// of the provided callback controls if the sync loop should be exited.
141///
142/// [`sync_with_callback`]: #method.sync_with_callback
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub enum LoopCtrl {
145    /// Continue running the loop.
146    Continue,
147    /// Break out of the loop.
148    Break,
149}
150
151/// Represents changes that can occur to a `Client`s `Session`.
152#[derive(Debug, Clone, PartialEq)]
153pub enum SessionChange {
154    /// The session's token is no longer valid.
155    UnknownToken {
156        /// Whether or not the session was soft logged out
157        soft_logout: bool,
158    },
159    /// The session's tokens have been refreshed.
160    TokensRefreshed,
161}
162
163/// Information about the server vendor obtained from the federation API.
164#[derive(Debug, Clone, PartialEq, Eq)]
165#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
166pub struct ServerVendorInfo {
167    /// The server name.
168    pub server_name: String,
169    /// The server version.
170    pub version: String,
171}
172
173/// An async/await enabled Matrix client.
174///
175/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
176#[derive(Clone)]
177pub struct Client {
178    pub(crate) inner: Arc<ClientInner>,
179}
180
181#[derive(Default)]
182pub(crate) struct ClientLocks {
183    /// Lock ensuring that only a single room may be marked as a DM at once.
184    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
185    /// explanation.
186    pub(crate) mark_as_dm_lock: Mutex<()>,
187
188    /// Lock ensuring that only a single secret store is getting opened at the
189    /// same time.
190    ///
191    /// This is important so we don't accidentally create multiple different new
192    /// default secret storage keys.
193    #[cfg(feature = "e2e-encryption")]
194    pub(crate) open_secret_store_lock: Mutex<()>,
195
196    /// Lock ensuring that we're only storing a single secret at a time.
197    ///
198    /// Take a look at the [`SecretStore::put_secret`] method for a more
199    /// detailed explanation.
200    ///
201    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
202    #[cfg(feature = "e2e-encryption")]
203    pub(crate) store_secret_lock: Mutex<()>,
204
205    /// Lock ensuring that only one method at a time might modify our backup.
206    #[cfg(feature = "e2e-encryption")]
207    pub(crate) backup_modify_lock: Mutex<()>,
208
209    /// Lock ensuring that we're going to attempt to upload backups for a single
210    /// requester.
211    #[cfg(feature = "e2e-encryption")]
212    pub(crate) backup_upload_lock: Mutex<()>,
213
214    /// Handler making sure we only have one group session sharing request in
215    /// flight per room.
216    #[cfg(feature = "e2e-encryption")]
217    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
218
219    /// Lock making sure we're only doing one key claim request at a time.
220    #[cfg(feature = "e2e-encryption")]
221    pub(crate) key_claim_lock: Mutex<()>,
222
223    /// Handler to ensure that only one members request is running at a time,
224    /// given a room.
225    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
226
227    /// Handler to ensure that only one encryption state request is running at a
228    /// time, given a room.
229    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
230
231    /// Deduplicating handler for sending read receipts. The string is an
232    /// internal implementation detail, see [`Self::send_single_receipt`].
233    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
234
235    #[cfg(feature = "e2e-encryption")]
236    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
237
238    /// Latest "generation" of data known by the crypto store.
239    ///
240    /// This is a counter that only increments, set in the database (and can
241    /// wrap). It's incremented whenever some process acquires a lock for the
242    /// first time. *This assumes the crypto store lock is being held, to
243    /// avoid data races on writing to this value in the store*.
244    ///
245    /// The current process will maintain this value in local memory and in the
246    /// DB over time. Observing a different value than the one read in
247    /// memory, when reading from the store indicates that somebody else has
248    /// written into the database under our feet.
249    ///
250    /// TODO: this should live in the `OlmMachine`, since it's information
251    /// related to the lock. As of today (2023-07-28), we blow up the entire
252    /// olm machine when there's a generation mismatch. So storing the
253    /// generation in the olm machine would make the client think there's
254    /// *always* a mismatch, and that's why we need to store the generation
255    /// outside the `OlmMachine`.
256    #[cfg(feature = "e2e-encryption")]
257    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
258}
259
260pub(crate) struct ClientInner {
261    /// All the data related to authentication and authorization.
262    pub(crate) auth_ctx: Arc<AuthCtx>,
263
264    /// The URL of the server.
265    ///
266    /// Not to be confused with the `Self::homeserver`. `server` is usually
267    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
268    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
269    /// homeserver (at the time of writing — 2024-08-28).
270    ///
271    /// This value is optional depending on how the `Client` has been built.
272    /// If it's been built from a homeserver URL directly, we don't know the
273    /// server. However, if the `Client` has been built from a server URL or
274    /// name, then the homeserver has been discovered, and we know both.
275    server: Option<Url>,
276
277    /// The URL of the homeserver to connect to.
278    ///
279    /// This is the URL for the client-server Matrix API.
280    homeserver: StdRwLock<Url>,
281
282    /// The sliding sync version.
283    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
284
285    /// The underlying HTTP client.
286    pub(crate) http_client: HttpClient,
287
288    /// User session data.
289    pub(super) base_client: BaseClient,
290
291    /// Collection of in-memory caches for the [`Client`].
292    pub(crate) caches: ClientCaches,
293
294    /// Collection of locks individual client methods might want to use, either
295    /// to ensure that only a single call to a method happens at once or to
296    /// deduplicate multiple calls to a method.
297    pub(crate) locks: ClientLocks,
298
299    /// The cross-process store locks holder name.
300    ///
301    /// The SDK provides cross-process store locks (see
302    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
303    /// `holder_name` is the value used for all cross-process store locks
304    /// used by this `Client`.
305    ///
306    /// If multiple `Client`s are running in different processes, this
307    /// value MUST be different for each `Client`.
308    cross_process_store_locks_holder_name: String,
309
310    /// A mapping of the times at which the current user sent typing notices,
311    /// keyed by room.
312    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
313
314    /// Event handlers. See `add_event_handler`.
315    pub(crate) event_handlers: EventHandlerStore,
316
317    /// Notification handlers. See `register_notification_handler`.
318    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
319
320    /// The sender-side of channels used to receive room updates.
321    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
322
323    /// The sender-side of a channel used to observe all the room updates of a
324    /// sync response.
325    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
326
327    /// Whether the client should update its homeserver URL with the discovery
328    /// information present in the login response.
329    respect_login_well_known: bool,
330
331    /// An event that can be listened on to wait for a successful sync. The
332    /// event will only be fired if a sync loop is running. Can be used for
333    /// synchronization, e.g. if we send out a request to create a room, we can
334    /// wait for the sync to get the data to fetch a room object from the state
335    /// store.
336    pub(crate) sync_beat: event_listener::Event,
337
338    /// A central cache for events, inactive first.
339    ///
340    /// It becomes active when [`EventCache::subscribe`] is called.
341    pub(crate) event_cache: OnceCell<EventCache>,
342
343    /// End-to-end encryption related state.
344    #[cfg(feature = "e2e-encryption")]
345    pub(crate) e2ee: EncryptionData,
346
347    /// The verification state of our own device.
348    #[cfg(feature = "e2e-encryption")]
349    pub(crate) verification_state: SharedObservable<VerificationState>,
350
351    /// Whether to enable the experimental support for sending and receiving
352    /// encrypted room history on invite, per [MSC4268].
353    ///
354    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
355    #[cfg(feature = "e2e-encryption")]
356    pub(crate) enable_share_history_on_invite: bool,
357
358    /// Data related to the [`SendQueue`].
359    ///
360    /// [`SendQueue`]: crate::send_queue::SendQueue
361    pub(crate) send_queue_data: Arc<SendQueueData>,
362
363    /// The `max_upload_size` value of the homeserver, it contains the max
364    /// request size you can send.
365    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
366
367    /// The entry point to get the [`LatestEvent`] of rooms and threads.
368    ///
369    /// [`LatestEvent`]: crate::latest_event::LatestEvent
370    latest_events: OnceCell<LatestEvents>,
371
372    /// Service handling the catching up of thread subscriptions in the
373    /// background.
374    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
375
376    #[cfg(feature = "experimental-search")]
377    /// Handler for [`RoomIndex`]'s of each room
378    search_index: SearchIndex,
379}
380
381impl ClientInner {
382    /// Create a new `ClientInner`.
383    ///
384    /// All the fields passed as parameters here are those that must be cloned
385    /// upon instantiation of a sub-client, e.g. a client specialized for
386    /// notifications.
387    #[allow(clippy::too_many_arguments)]
388    async fn new(
389        auth_ctx: Arc<AuthCtx>,
390        server: Option<Url>,
391        homeserver: Url,
392        sliding_sync_version: SlidingSyncVersion,
393        http_client: HttpClient,
394        base_client: BaseClient,
395        server_info: ClientServerInfo,
396        respect_login_well_known: bool,
397        event_cache: OnceCell<EventCache>,
398        send_queue: Arc<SendQueueData>,
399        latest_events: OnceCell<LatestEvents>,
400        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
401        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
402        cross_process_store_locks_holder_name: String,
403        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
404        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
405    ) -> Arc<Self> {
406        let caches = ClientCaches {
407            server_info: server_info.into(),
408            server_metadata: Mutex::new(TtlCache::new()),
409        };
410
411        let client = Self {
412            server,
413            homeserver: StdRwLock::new(homeserver),
414            auth_ctx,
415            sliding_sync_version: StdRwLock::new(sliding_sync_version),
416            http_client,
417            base_client,
418            caches,
419            locks: Default::default(),
420            cross_process_store_locks_holder_name,
421            typing_notice_times: Default::default(),
422            event_handlers: Default::default(),
423            notification_handlers: Default::default(),
424            room_update_channels: Default::default(),
425            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
426            // ballast for all observers to catch up.
427            room_updates_sender: broadcast::Sender::new(32),
428            respect_login_well_known,
429            sync_beat: event_listener::Event::new(),
430            event_cache,
431            send_queue_data: send_queue,
432            latest_events,
433            #[cfg(feature = "e2e-encryption")]
434            e2ee: EncryptionData::new(encryption_settings),
435            #[cfg(feature = "e2e-encryption")]
436            verification_state: SharedObservable::new(VerificationState::Unknown),
437            #[cfg(feature = "e2e-encryption")]
438            enable_share_history_on_invite,
439            server_max_upload_size: Mutex::new(OnceCell::new()),
440            #[cfg(feature = "experimental-search")]
441            search_index: search_index_handler,
442            thread_subscription_catchup,
443        };
444
445        #[allow(clippy::let_and_return)]
446        let client = Arc::new(client);
447
448        #[cfg(feature = "e2e-encryption")]
449        client.e2ee.initialize_tasks(&client);
450
451        let _ = client
452            .event_cache
453            .get_or_init(|| async {
454                EventCache::new(
455                    WeakClient::from_inner(&client),
456                    client.base_client.event_cache_store().clone(),
457                )
458            })
459            .await;
460
461        let _ = client
462            .thread_subscription_catchup
463            .get_or_init(|| async {
464                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
465            })
466            .await;
467
468        client
469    }
470}
471
472#[cfg(not(tarpaulin_include))]
473impl Debug for Client {
474    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
475        write!(fmt, "Client")
476    }
477}
478
479impl Client {
480    /// Create a new [`Client`] that will use the given homeserver.
481    ///
482    /// # Arguments
483    ///
484    /// * `homeserver_url` - The homeserver that the client should connect to.
485    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
486        Self::builder().homeserver_url(homeserver_url).build().await
487    }
488
489    /// Returns a subscriber that publishes an event every time the ignore user
490    /// list changes.
491    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
492        self.inner.base_client.subscribe_to_ignore_user_list_changes()
493    }
494
495    /// Create a new [`ClientBuilder`].
496    pub fn builder() -> ClientBuilder {
497        ClientBuilder::new()
498    }
499
500    pub(crate) fn base_client(&self) -> &BaseClient {
501        &self.inner.base_client
502    }
503
504    /// The underlying HTTP client.
505    pub fn http_client(&self) -> &reqwest::Client {
506        &self.inner.http_client.inner
507    }
508
509    pub(crate) fn locks(&self) -> &ClientLocks {
510        &self.inner.locks
511    }
512
513    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
514        &self.inner.auth_ctx
515    }
516
517    /// The cross-process store locks holder name.
518    ///
519    /// The SDK provides cross-process store locks (see
520    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
521    /// `holder_name` is the value used for all cross-process store locks
522    /// used by this `Client`.
523    pub fn cross_process_store_locks_holder_name(&self) -> &str {
524        &self.inner.cross_process_store_locks_holder_name
525    }
526
527    /// Change the homeserver URL used by this client.
528    ///
529    /// # Arguments
530    ///
531    /// * `homeserver_url` - The new URL to use.
532    fn set_homeserver(&self, homeserver_url: Url) {
533        *self.inner.homeserver.write().unwrap() = homeserver_url;
534    }
535
536    /// Get the capabilities of the homeserver.
537    ///
538    /// This method should be used to check what features are supported by the
539    /// homeserver.
540    ///
541    /// # Examples
542    ///
543    /// ```no_run
544    /// # use matrix_sdk::Client;
545    /// # use url::Url;
546    /// # async {
547    /// # let homeserver = Url::parse("http://example.com")?;
548    /// let client = Client::new(homeserver).await?;
549    ///
550    /// let capabilities = client.get_capabilities().await?;
551    ///
552    /// if capabilities.change_password.enabled {
553    ///     // Change password
554    /// }
555    /// # anyhow::Ok(()) };
556    /// ```
557    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
558        let res = self.send(get_capabilities::v3::Request::new()).await?;
559        Ok(res.capabilities)
560    }
561
562    /// Get the server vendor information from the federation API.
563    ///
564    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
565    /// both the server's software name and version.
566    ///
567    /// # Examples
568    ///
569    /// ```no_run
570    /// # use matrix_sdk::Client;
571    /// # use url::Url;
572    /// # async {
573    /// # let homeserver = Url::parse("http://example.com")?;
574    /// let client = Client::new(homeserver).await?;
575    ///
576    /// let server_info = client.server_vendor_info(None).await?;
577    /// println!(
578    ///     "Server: {}, Version: {}",
579    ///     server_info.server_name, server_info.version
580    /// );
581    /// # anyhow::Ok(()) };
582    /// ```
583    pub async fn server_vendor_info(
584        &self,
585        request_config: Option<RequestConfig>,
586    ) -> HttpResult<ServerVendorInfo> {
587        let res = self
588            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
589            .await?;
590
591        // Extract server info, using defaults if fields are missing.
592        let server = res.server.unwrap_or_default();
593        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
594        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
595
596        Ok(ServerVendorInfo { server_name: server_name_str, version })
597    }
598
599    /// Get a copy of the default request config.
600    ///
601    /// The default request config is what's used when sending requests if no
602    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
603    /// function with such a parameter.
604    ///
605    /// If the default request config was not customized through
606    /// [`ClientBuilder`] when creating this `Client`, the returned value will
607    /// be equivalent to [`RequestConfig::default()`].
608    pub fn request_config(&self) -> RequestConfig {
609        self.inner.http_client.request_config
610    }
611
612    /// Check whether the client has been activated.
613    ///
614    /// A client is considered active when:
615    ///
616    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
617    ///    is logged in,
618    /// 2. Has loaded cached data from storage,
619    /// 3. If encryption is enabled, it also initialized or restored its
620    ///    `OlmMachine`.
621    pub fn is_active(&self) -> bool {
622        self.inner.base_client.is_active()
623    }
624
625    /// The server used by the client.
626    ///
627    /// See `Self::server` to learn more.
628    pub fn server(&self) -> Option<&Url> {
629        self.inner.server.as_ref()
630    }
631
632    /// The homeserver of the client.
633    pub fn homeserver(&self) -> Url {
634        self.inner.homeserver.read().unwrap().clone()
635    }
636
637    /// Get the sliding sync version.
638    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
639        self.inner.sliding_sync_version.read().unwrap().clone()
640    }
641
642    /// Override the sliding sync version.
643    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
644        let mut lock = self.inner.sliding_sync_version.write().unwrap();
645        *lock = version;
646    }
647
648    /// Get the Matrix user session meta information.
649    ///
650    /// If the client is currently logged in, this will return a
651    /// [`SessionMeta`] object which contains the user ID and device ID.
652    /// Otherwise it returns `None`.
653    pub fn session_meta(&self) -> Option<&SessionMeta> {
654        self.base_client().session_meta()
655    }
656
657    /// Returns a receiver that gets events for each room info update. To watch
658    /// for new events, use `receiver.resubscribe()`.
659    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
660        self.base_client().room_info_notable_update_receiver()
661    }
662
663    /// Performs a search for users.
664    /// The search is performed case-insensitively on user IDs and display names
665    ///
666    /// # Arguments
667    ///
668    /// * `search_term` - The search term for the search
669    /// * `limit` - The maximum number of results to return. Defaults to 10.
670    ///
671    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
672    pub async fn search_users(
673        &self,
674        search_term: &str,
675        limit: u64,
676    ) -> HttpResult<search_users::v3::Response> {
677        let mut request = search_users::v3::Request::new(search_term.to_owned());
678
679        if let Some(limit) = UInt::new(limit) {
680            request.limit = limit;
681        }
682
683        self.send(request).await
684    }
685
686    /// Get the user id of the current owner of the client.
687    pub fn user_id(&self) -> Option<&UserId> {
688        self.session_meta().map(|s| s.user_id.as_ref())
689    }
690
691    /// Get the device ID that identifies the current session.
692    pub fn device_id(&self) -> Option<&DeviceId> {
693        self.session_meta().map(|s| s.device_id.as_ref())
694    }
695
696    /// Get the current access token for this session.
697    ///
698    /// Will be `None` if the client has not been logged in.
699    pub fn access_token(&self) -> Option<String> {
700        self.auth_ctx().access_token()
701    }
702
703    /// Get the current tokens for this session.
704    ///
705    /// To be notified of changes in the session tokens, use
706    /// [`Client::subscribe_to_session_changes()`] or
707    /// [`Client::set_session_callbacks()`].
708    ///
709    /// Returns `None` if the client has not been logged in.
710    pub fn session_tokens(&self) -> Option<SessionTokens> {
711        self.auth_ctx().session_tokens()
712    }
713
714    /// Access the authentication API used to log in this client.
715    ///
716    /// Will be `None` if the client has not been logged in.
717    pub fn auth_api(&self) -> Option<AuthApi> {
718        match self.auth_ctx().auth_data.get()? {
719            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
720            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
721        }
722    }
723
724    /// Get the whole session info of this client.
725    ///
726    /// Will be `None` if the client has not been logged in.
727    ///
728    /// Can be used with [`Client::restore_session`] to restore a previously
729    /// logged-in session.
730    pub fn session(&self) -> Option<AuthSession> {
731        match self.auth_api()? {
732            AuthApi::Matrix(api) => api.session().map(Into::into),
733            AuthApi::OAuth(api) => api.full_session().map(Into::into),
734        }
735    }
736
737    /// Get a reference to the state store.
738    pub fn state_store(&self) -> &DynStateStore {
739        self.base_client().state_store()
740    }
741
742    /// Get a reference to the event cache store.
743    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
744        self.base_client().event_cache_store()
745    }
746
747    /// Get a reference to the media store.
748    pub fn media_store(&self) -> &MediaStoreLock {
749        self.base_client().media_store()
750    }
751
752    /// Access the native Matrix authentication API with this client.
753    pub fn matrix_auth(&self) -> MatrixAuth {
754        MatrixAuth::new(self.clone())
755    }
756
757    /// Get the account of the current owner of the client.
758    pub fn account(&self) -> Account {
759        Account::new(self.clone())
760    }
761
762    /// Get the encryption manager of the client.
763    #[cfg(feature = "e2e-encryption")]
764    pub fn encryption(&self) -> Encryption {
765        Encryption::new(self.clone())
766    }
767
768    /// Get the media manager of the client.
769    pub fn media(&self) -> Media {
770        Media::new(self.clone())
771    }
772
773    /// Get the pusher manager of the client.
774    pub fn pusher(&self) -> Pusher {
775        Pusher::new(self.clone())
776    }
777
778    /// Access the OAuth 2.0 API of the client.
779    pub fn oauth(&self) -> OAuth {
780        OAuth::new(self.clone())
781    }
782
783    /// Register a handler for a specific event type.
784    ///
785    /// The handler is a function or closure with one or more arguments. The
786    /// first argument is the event itself. All additional arguments are
787    /// "context" arguments: They have to implement [`EventHandlerContext`].
788    /// This trait is named that way because most of the types implementing it
789    /// give additional context about an event: The room it was in, its raw form
790    /// and other similar things. As two exceptions to this,
791    /// [`Client`] and [`EventHandlerHandle`] also implement the
792    /// `EventHandlerContext` trait so you don't have to clone your client
793    /// into the event handler manually and a handler can decide to remove
794    /// itself.
795    ///
796    /// Some context arguments are not universally applicable. A context
797    /// argument that isn't available for the given event type will result in
798    /// the event handler being skipped and an error being logged. The following
799    /// context argument types are only available for a subset of event types:
800    ///
801    /// * [`Room`] is only available for room-specific events, i.e. not for
802    ///   events like global account data events or presence events.
803    ///
804    /// You can provide custom context via
805    /// [`add_event_handler_context`](Client::add_event_handler_context) and
806    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
807    /// into the event handler.
808    ///
809    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
810    ///
811    /// # Examples
812    ///
813    /// ```no_run
814    /// use matrix_sdk::{
815    ///     deserialized_responses::EncryptionInfo,
816    ///     event_handler::Ctx,
817    ///     ruma::{
818    ///         events::{
819    ///             macros::EventContent,
820    ///             push_rules::PushRulesEvent,
821    ///             room::{
822    ///                 message::SyncRoomMessageEvent,
823    ///                 topic::SyncRoomTopicEvent,
824    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
825    ///             },
826    ///         },
827    ///         push::Action,
828    ///         Int, MilliSecondsSinceUnixEpoch,
829    ///     },
830    ///     Client, Room,
831    /// };
832    /// use serde::{Deserialize, Serialize};
833    ///
834    /// # async fn example(client: Client) {
835    /// client.add_event_handler(
836    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
837    ///         // Common usage: Room event plus room and client.
838    ///     },
839    /// );
840    /// client.add_event_handler(
841    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
842    ///         async move {
843    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
844    ///             // unencrypted events and events that were decrypted by the SDK.
845    ///         }
846    ///     },
847    /// );
848    /// client.add_event_handler(
849    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
850    ///         async move {
851    ///             // A `Vec<Action>` parameter allows you to know which push actions
852    ///             // are applicable for an event. For example, an event with
853    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
854    ///             // in the timeline.
855    ///         }
856    ///     },
857    /// );
858    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
859    ///     // You can omit any or all arguments after the first.
860    /// });
861    ///
862    /// // Registering a temporary event handler:
863    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
864    ///     /* Event handler */
865    /// });
866    /// client.remove_event_handler(handle);
867    ///
868    /// // Registering custom event handler context:
869    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
870    /// struct MyContext {
871    ///     number: usize,
872    /// }
873    /// client.add_event_handler_context(MyContext { number: 5 });
874    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
875    ///     // Use the context
876    /// });
877    ///
878    /// // This will handle membership events in joined rooms. Invites are special, see below.
879    /// client.add_event_handler(
880    ///     |ev: SyncRoomMemberEvent| async move {},
881    /// );
882    ///
883    /// // To handle state events in invited rooms (including invite membership events),
884    /// // `StrippedRoomMemberEvent` should be used.
885    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
886    /// client.add_event_handler(
887    ///     |ev: StrippedRoomMemberEvent| async move {},
888    /// );
889    ///
890    /// // Custom events work exactly the same way, you just need to declare
891    /// // the content struct and use the EventContent derive macro on it.
892    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
893    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
894    /// struct TokenEventContent {
895    ///     token: String,
896    ///     #[serde(rename = "exp")]
897    ///     expires_at: MilliSecondsSinceUnixEpoch,
898    /// }
899    ///
900    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
901    ///     todo!("Display the token");
902    /// });
903    ///
904    /// // Event handler closures can also capture local variables.
905    /// // Make sure they are cheap to clone though, because they will be cloned
906    /// // every time the closure is called.
907    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
908    ///
909    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
910    ///     println!("Calling the handler with identifier {data}");
911    /// });
912    /// # }
913    /// ```
914    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
915    where
916        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
917        H: EventHandler<Ev, Ctx>,
918    {
919        self.add_event_handler_impl(handler, None)
920    }
921
922    /// Register a handler for a specific room, and event type.
923    ///
924    /// This method works the same way as
925    /// [`add_event_handler`][Self::add_event_handler], except that the handler
926    /// will only be called for events in the room with the specified ID. See
927    /// that method for more details on event handler functions.
928    ///
929    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
930    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
931    /// your use case.
932    pub fn add_room_event_handler<Ev, Ctx, H>(
933        &self,
934        room_id: &RoomId,
935        handler: H,
936    ) -> EventHandlerHandle
937    where
938        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
939        H: EventHandler<Ev, Ctx>,
940    {
941        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
942    }
943
944    /// Observe a specific event type.
945    ///
946    /// `Ev` represents the kind of event that will be observed. `Ctx`
947    /// represents the context that will come with the event. It relies on the
948    /// same mechanism as [`Client::add_event_handler`]. The main difference is
949    /// that it returns an [`ObservableEventHandler`] and doesn't require a
950    /// user-defined closure. It is possible to subscribe to the
951    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
952    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
953    /// Ctx)`.
954    ///
955    /// Be careful that only the most recent value can be observed. Subscribers
956    /// are notified when a new value is sent, but there is no guarantee
957    /// that they will see all values.
958    ///
959    /// # Example
960    ///
961    /// Let's see a classical usage:
962    ///
963    /// ```
964    /// use futures_util::StreamExt as _;
965    /// use matrix_sdk::{
966    ///     Client, Room,
967    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
968    /// };
969    ///
970    /// # async fn example(client: Client) -> Option<()> {
971    /// let observer =
972    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
973    ///
974    /// let mut subscriber = observer.subscribe();
975    ///
976    /// let (event, (room, push_actions)) = subscriber.next().await?;
977    /// # Some(())
978    /// # }
979    /// ```
980    ///
981    /// Now let's see how to get several contexts that can be useful for you:
982    ///
983    /// ```
984    /// use matrix_sdk::{
985    ///     Client, Room,
986    ///     deserialized_responses::EncryptionInfo,
987    ///     ruma::{
988    ///         events::room::{
989    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
990    ///         },
991    ///         push::Action,
992    ///     },
993    /// };
994    ///
995    /// # async fn example(client: Client) {
996    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
997    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
998    ///
999    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1000    /// // to distinguish between unencrypted events and events that were decrypted
1001    /// // by the SDK.
1002    /// let _ = client
1003    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1004    ///     );
1005    ///
1006    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1007    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1008    /// // should be highlighted in the timeline.
1009    /// let _ =
1010    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1011    ///
1012    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1013    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1014    /// # }
1015    /// ```
1016    ///
1017    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1018    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1019    where
1020        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1021        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1022    {
1023        self.observe_room_events_impl(None)
1024    }
1025
1026    /// Observe a specific room, and event type.
1027    ///
1028    /// This method works the same way as [`Client::observe_events`], except
1029    /// that the observability will only be applied for events in the room with
1030    /// the specified ID. See that method for more details.
1031    ///
1032    /// Be careful that only the most recent value can be observed. Subscribers
1033    /// are notified when a new value is sent, but there is no guarantee
1034    /// that they will see all values.
1035    pub fn observe_room_events<Ev, Ctx>(
1036        &self,
1037        room_id: &RoomId,
1038    ) -> ObservableEventHandler<(Ev, Ctx)>
1039    where
1040        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1041        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1042    {
1043        self.observe_room_events_impl(Some(room_id.to_owned()))
1044    }
1045
1046    /// Shared implementation for `Client::observe_events` and
1047    /// `Client::observe_room_events`.
1048    fn observe_room_events_impl<Ev, Ctx>(
1049        &self,
1050        room_id: Option<OwnedRoomId>,
1051    ) -> ObservableEventHandler<(Ev, Ctx)>
1052    where
1053        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1054        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1055    {
1056        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1057        // new value.
1058        let shared_observable = SharedObservable::new(None);
1059
1060        ObservableEventHandler::new(
1061            shared_observable.clone(),
1062            self.event_handler_drop_guard(self.add_event_handler_impl(
1063                move |event: Ev, context: Ctx| {
1064                    shared_observable.set(Some((event, context)));
1065
1066                    ready(())
1067                },
1068                room_id,
1069            )),
1070        )
1071    }
1072
1073    /// Remove the event handler associated with the handle.
1074    ///
1075    /// Note that you **must not** call `remove_event_handler` from the
1076    /// non-async part of an event handler, that is:
1077    ///
1078    /// ```ignore
1079    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1080    ///     // ⚠ this will cause a deadlock ⚠
1081    ///     client.remove_event_handler(handle);
1082    ///
1083    ///     async move {
1084    ///         // removing the event handler here is fine
1085    ///         client.remove_event_handler(handle);
1086    ///     }
1087    /// })
1088    /// ```
1089    ///
1090    /// Note also that handlers that remove themselves will still execute with
1091    /// events received in the same sync cycle.
1092    ///
1093    /// # Arguments
1094    ///
1095    /// `handle` - The [`EventHandlerHandle`] that is returned when
1096    /// registering the event handler with [`Client::add_event_handler`].
1097    ///
1098    /// # Examples
1099    ///
1100    /// ```no_run
1101    /// # use url::Url;
1102    /// # use tokio::sync::mpsc;
1103    /// #
1104    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1105    /// #
1106    /// use matrix_sdk::{
1107    ///     Client, event_handler::EventHandlerHandle,
1108    ///     ruma::events::room::member::SyncRoomMemberEvent,
1109    /// };
1110    /// #
1111    /// # futures_executor::block_on(async {
1112    /// # let client = matrix_sdk::Client::builder()
1113    /// #     .homeserver_url(homeserver)
1114    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1115    /// #     .build()
1116    /// #     .await
1117    /// #     .unwrap();
1118    ///
1119    /// client.add_event_handler(
1120    ///     |ev: SyncRoomMemberEvent,
1121    ///      client: Client,
1122    ///      handle: EventHandlerHandle| async move {
1123    ///         // Common usage: Check arriving Event is the expected one
1124    ///         println!("Expected RoomMemberEvent received!");
1125    ///         client.remove_event_handler(handle);
1126    ///     },
1127    /// );
1128    /// # });
1129    /// ```
1130    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1131        self.inner.event_handlers.remove(handle);
1132    }
1133
1134    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1135    /// the given handle.
1136    ///
1137    /// When the returned value is dropped, the event handler will be removed.
1138    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1139        EventHandlerDropGuard::new(handle, self.clone())
1140    }
1141
1142    /// Add an arbitrary value for use as event handler context.
1143    ///
1144    /// The value can be obtained in an event handler by adding an argument of
1145    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1146    ///
1147    /// If a value of the same type has been added before, it will be
1148    /// overwritten.
1149    ///
1150    /// # Examples
1151    ///
1152    /// ```no_run
1153    /// use matrix_sdk::{
1154    ///     Room, event_handler::Ctx,
1155    ///     ruma::events::room::message::SyncRoomMessageEvent,
1156    /// };
1157    /// # #[derive(Clone)]
1158    /// # struct SomeType;
1159    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1160    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1161    /// # futures_executor::block_on(async {
1162    /// # let client = matrix_sdk::Client::builder()
1163    /// #     .homeserver_url(homeserver)
1164    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1165    /// #     .build()
1166    /// #     .await
1167    /// #     .unwrap();
1168    ///
1169    /// // Handle used to send messages to the UI part of the app
1170    /// let my_gui_handle: SomeType = obtain_gui_handle();
1171    ///
1172    /// client.add_event_handler_context(my_gui_handle.clone());
1173    /// client.add_event_handler(
1174    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1175    ///         async move {
1176    ///             // gui_handle.send(DisplayMessage { message: ev });
1177    ///         }
1178    ///     },
1179    /// );
1180    /// # });
1181    /// ```
1182    pub fn add_event_handler_context<T>(&self, ctx: T)
1183    where
1184        T: Clone + Send + Sync + 'static,
1185    {
1186        self.inner.event_handlers.add_context(ctx);
1187    }
1188
1189    /// Register a handler for a notification.
1190    ///
1191    /// Similar to [`Client::add_event_handler`], but only allows functions
1192    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1193    /// [`Client`] for now.
1194    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1195    where
1196        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1197        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1198    {
1199        self.inner.notification_handlers.write().await.push(Box::new(
1200            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1201        ));
1202
1203        self
1204    }
1205
1206    /// Subscribe to all updates for the room with the given ID.
1207    ///
1208    /// The returned receiver will receive a new message for each sync response
1209    /// that contains updates for that room.
1210    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1211        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1212            btree_map::Entry::Vacant(entry) => {
1213                let (tx, rx) = broadcast::channel(8);
1214                entry.insert(tx);
1215                rx
1216            }
1217            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1218        }
1219    }
1220
1221    /// Subscribe to all updates to all rooms, whenever any has been received in
1222    /// a sync response.
1223    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1224        self.inner.room_updates_sender.subscribe()
1225    }
1226
1227    pub(crate) async fn notification_handlers(
1228        &self,
1229    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1230        self.inner.notification_handlers.read().await
1231    }
1232
1233    /// Get all the rooms the client knows about.
1234    ///
1235    /// This will return the list of joined, invited, and left rooms.
1236    pub fn rooms(&self) -> Vec<Room> {
1237        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1238    }
1239
1240    /// Get all the rooms the client knows about, filtered by room state.
1241    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1242        self.base_client()
1243            .rooms_filtered(filter)
1244            .into_iter()
1245            .map(|room| Room::new(self.clone(), room))
1246            .collect()
1247    }
1248
1249    /// Get a stream of all the rooms, in addition to the existing rooms.
1250    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1251        let (rooms, stream) = self.base_client().rooms_stream();
1252
1253        let map_room = |room| Room::new(self.clone(), room);
1254
1255        (
1256            rooms.into_iter().map(map_room).collect(),
1257            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1258        )
1259    }
1260
1261    /// Returns the joined rooms this client knows about.
1262    pub fn joined_rooms(&self) -> Vec<Room> {
1263        self.rooms_filtered(RoomStateFilter::JOINED)
1264    }
1265
1266    /// Returns the invited rooms this client knows about.
1267    pub fn invited_rooms(&self) -> Vec<Room> {
1268        self.rooms_filtered(RoomStateFilter::INVITED)
1269    }
1270
1271    /// Returns the left rooms this client knows about.
1272    pub fn left_rooms(&self) -> Vec<Room> {
1273        self.rooms_filtered(RoomStateFilter::LEFT)
1274    }
1275
1276    /// Returns the joined space rooms this client knows about.
1277    pub fn joined_space_rooms(&self) -> Vec<Room> {
1278        self.base_client()
1279            .rooms_filtered(RoomStateFilter::JOINED)
1280            .into_iter()
1281            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1282            .collect()
1283    }
1284
1285    /// Get a room with the given room id.
1286    ///
1287    /// # Arguments
1288    ///
1289    /// `room_id` - The unique id of the room that should be fetched.
1290    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1291        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1292    }
1293
1294    /// Gets the preview of a room, whether the current user has joined it or
1295    /// not.
1296    pub async fn get_room_preview(
1297        &self,
1298        room_or_alias_id: &RoomOrAliasId,
1299        via: Vec<OwnedServerName>,
1300    ) -> Result<RoomPreview> {
1301        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1302            Ok(room_id) => room_id.to_owned(),
1303            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1304        };
1305
1306        if let Some(room) = self.get_room(&room_id) {
1307            // The cached data can only be trusted if the room state is joined or
1308            // banned: for invite and knock rooms, no updates will be received
1309            // for the rooms after the invite/knock action took place so we may
1310            // have very out to date data for important fields such as
1311            // `join_rule`. For left rooms, the homeserver should return the latest info.
1312            match room.state() {
1313                RoomState::Joined | RoomState::Banned => {
1314                    return Ok(RoomPreview::from_known_room(&room).await);
1315                }
1316                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1317            }
1318        }
1319
1320        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1321    }
1322
1323    /// Resolve a room alias to a room id and a list of servers which know
1324    /// about it.
1325    ///
1326    /// # Arguments
1327    ///
1328    /// `room_alias` - The room alias to be resolved.
1329    pub async fn resolve_room_alias(
1330        &self,
1331        room_alias: &RoomAliasId,
1332    ) -> HttpResult<get_alias::v3::Response> {
1333        let request = get_alias::v3::Request::new(room_alias.to_owned());
1334        self.send(request).await
1335    }
1336
1337    /// Checks if a room alias is not in use yet.
1338    ///
1339    /// Returns:
1340    /// - `Ok(true)` if the room alias is available.
1341    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1342    ///   status code).
1343    /// - An `Err` otherwise.
1344    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1345        match self.resolve_room_alias(alias).await {
1346            // The room alias was resolved, so it's already in use.
1347            Ok(_) => Ok(false),
1348            Err(error) => {
1349                match error.client_api_error_kind() {
1350                    // The room alias wasn't found, so it's available.
1351                    Some(ErrorKind::NotFound) => Ok(true),
1352                    _ => Err(error),
1353                }
1354            }
1355        }
1356    }
1357
1358    /// Adds a new room alias associated with a room to the room directory.
1359    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1360        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1361        self.send(request).await?;
1362        Ok(())
1363    }
1364
1365    /// Removes a room alias from the room directory.
1366    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1367        let request = delete_alias::v3::Request::new(alias.to_owned());
1368        self.send(request).await?;
1369        Ok(())
1370    }
1371
1372    /// Update the homeserver from the login response well-known if needed.
1373    ///
1374    /// # Arguments
1375    ///
1376    /// * `login_well_known` - The `well_known` field from a successful login
1377    ///   response.
1378    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1379        if self.inner.respect_login_well_known
1380            && let Some(well_known) = login_well_known
1381            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1382        {
1383            self.set_homeserver(homeserver);
1384        }
1385    }
1386
1387    /// Similar to [`Client::restore_session_with`], with
1388    /// [`RoomLoadSettings::default()`].
1389    ///
1390    /// # Panics
1391    ///
1392    /// Panics if a session was already restored or logged in.
1393    #[instrument(skip_all)]
1394    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1395        self.restore_session_with(session, RoomLoadSettings::default()).await
1396    }
1397
1398    /// Restore a session previously logged-in using one of the available
1399    /// authentication APIs. The number of rooms to restore is controlled by
1400    /// [`RoomLoadSettings`].
1401    ///
1402    /// See the documentation of the corresponding authentication API's
1403    /// `restore_session` method for more information.
1404    ///
1405    /// # Panics
1406    ///
1407    /// Panics if a session was already restored or logged in.
1408    #[instrument(skip_all)]
1409    pub async fn restore_session_with(
1410        &self,
1411        session: impl Into<AuthSession>,
1412        room_load_settings: RoomLoadSettings,
1413    ) -> Result<()> {
1414        let session = session.into();
1415        match session {
1416            AuthSession::Matrix(session) => {
1417                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1418            }
1419            AuthSession::OAuth(session) => {
1420                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1421            }
1422        }
1423    }
1424
1425    /// Refresh the access token using the authentication API used to log into
1426    /// this session.
1427    ///
1428    /// See the documentation of the authentication API's `refresh_access_token`
1429    /// method for more information.
1430    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1431        let Some(auth_api) = self.auth_api() else {
1432            return Err(RefreshTokenError::RefreshTokenRequired);
1433        };
1434
1435        match auth_api {
1436            AuthApi::Matrix(api) => {
1437                trace!("Token refresh: Using the homeserver.");
1438                Box::pin(api.refresh_access_token()).await?;
1439            }
1440            AuthApi::OAuth(api) => {
1441                trace!("Token refresh: Using OAuth 2.0.");
1442                Box::pin(api.refresh_access_token()).await?;
1443            }
1444        }
1445
1446        Ok(())
1447    }
1448
1449    /// Log out the current session using the proper authentication API.
1450    ///
1451    /// # Errors
1452    ///
1453    /// Returns an error if the session is not authenticated or if an error
1454    /// occurred while making the request to the server.
1455    pub async fn logout(&self) -> Result<(), Error> {
1456        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1457        match auth_api {
1458            AuthApi::Matrix(matrix_auth) => {
1459                matrix_auth.logout().await?;
1460                Ok(())
1461            }
1462            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1463        }
1464    }
1465
1466    /// Get or upload a sync filter.
1467    ///
1468    /// This method will either get a filter ID from the store or upload the
1469    /// filter definition to the homeserver and return the new filter ID.
1470    ///
1471    /// # Arguments
1472    ///
1473    /// * `filter_name` - The unique name of the filter, this name will be used
1474    /// locally to store and identify the filter ID returned by the server.
1475    ///
1476    /// * `definition` - The filter definition that should be uploaded to the
1477    /// server if no filter ID can be found in the store.
1478    ///
1479    /// # Examples
1480    ///
1481    /// ```no_run
1482    /// # use matrix_sdk::{
1483    /// #    Client, config::SyncSettings,
1484    /// #    ruma::api::client::{
1485    /// #        filter::{
1486    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1487    /// #        },
1488    /// #        sync::sync_events::v3::Filter,
1489    /// #    }
1490    /// # };
1491    /// # use url::Url;
1492    /// # async {
1493    /// # let homeserver = Url::parse("http://example.com").unwrap();
1494    /// # let client = Client::new(homeserver).await.unwrap();
1495    /// let mut filter = FilterDefinition::default();
1496    ///
1497    /// // Let's enable member lazy loading.
1498    /// filter.room.state.lazy_load_options =
1499    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1500    ///
1501    /// let filter_id = client
1502    ///     .get_or_upload_filter("sync", filter)
1503    ///     .await
1504    ///     .unwrap();
1505    ///
1506    /// let sync_settings = SyncSettings::new()
1507    ///     .filter(Filter::FilterId(filter_id));
1508    ///
1509    /// let response = client.sync_once(sync_settings).await.unwrap();
1510    /// # };
1511    #[instrument(skip(self, definition))]
1512    pub async fn get_or_upload_filter(
1513        &self,
1514        filter_name: &str,
1515        definition: FilterDefinition,
1516    ) -> Result<String> {
1517        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1518            debug!("Found filter locally");
1519            Ok(filter)
1520        } else {
1521            debug!("Didn't find filter locally");
1522            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1523            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1524            let response = self.send(request).await?;
1525
1526            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1527
1528            Ok(response.filter_id)
1529        }
1530    }
1531
1532    /// Prepare to join a room by ID, by getting the current details about it
1533    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1534        let room = self.get_room(room_id)?;
1535
1536        let inviter = match room.invite_details().await {
1537            Ok(details) => details.inviter,
1538            Err(Error::WrongRoomState(_)) => None,
1539            Err(e) => {
1540                warn!("Error fetching invite details for room: {e:?}");
1541                None
1542            }
1543        };
1544
1545        Some(PreJoinRoomInfo { inviter })
1546    }
1547
1548    /// Finish joining a room.
1549    ///
1550    /// If the room was an invite that should be marked as a DM, will include it
1551    /// in the DM event after creating the joined room.
1552    ///
1553    /// If encrypted history sharing is enabled, will check to see if we have a
1554    /// key bundle, and import it if so.
1555    ///
1556    /// # Arguments
1557    ///
1558    /// * `room_id` - The `RoomId` of the room that was joined.
1559    /// * `pre_join_room_info` - Information about the room before we joined.
1560    async fn finish_join_room(
1561        &self,
1562        room_id: &RoomId,
1563        pre_join_room_info: Option<PreJoinRoomInfo>,
1564    ) -> Result<Room> {
1565        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1566            room.state() == RoomState::Invited
1567                && room.is_direct().await.unwrap_or_else(|e| {
1568                    warn!(%room_id, "is_direct() failed: {e}");
1569                    false
1570                })
1571        } else {
1572            false
1573        };
1574
1575        let base_room = self
1576            .base_client()
1577            .room_joined(
1578                room_id,
1579                pre_join_room_info
1580                    .as_ref()
1581                    .and_then(|info| info.inviter.as_ref())
1582                    .map(|i| i.user_id().to_owned()),
1583            )
1584            .await?;
1585        let room = Room::new(self.clone(), base_room);
1586
1587        if mark_as_dm {
1588            room.set_is_direct(true).await?;
1589        }
1590
1591        #[cfg(feature = "e2e-encryption")]
1592        if self.inner.enable_share_history_on_invite
1593            && let Some(inviter) =
1594                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1595        {
1596            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1597                .await?;
1598        }
1599
1600        // Suppress "unused variable" and "unused field" lints
1601        #[cfg(not(feature = "e2e-encryption"))]
1602        let _ = pre_join_room_info.map(|i| i.inviter);
1603
1604        Ok(room)
1605    }
1606
1607    /// Join a room by `RoomId`.
1608    ///
1609    /// Returns the `Room` in the joined state.
1610    ///
1611    /// # Arguments
1612    ///
1613    /// * `room_id` - The `RoomId` of the room to be joined.
1614    #[instrument(skip(self))]
1615    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1616        // See who invited us to this room, if anyone. Note we have to do this before
1617        // making the `/join` request, otherwise we could race against the sync.
1618        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1619
1620        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1621        let response = self.send(request).await?;
1622        self.finish_join_room(&response.room_id, pre_join_info).await
1623    }
1624
1625    /// Join a room by `RoomOrAliasId`.
1626    ///
1627    /// Returns the `Room` in the joined state.
1628    ///
1629    /// # Arguments
1630    ///
1631    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1632    ///   alias looks like `#name:example.com`.
1633    /// * `server_names` - The server names to be used for resolving the alias,
1634    ///   if needs be.
1635    pub async fn join_room_by_id_or_alias(
1636        &self,
1637        alias: &RoomOrAliasId,
1638        server_names: &[OwnedServerName],
1639    ) -> Result<Room> {
1640        let pre_join_info = {
1641            match alias.try_into() {
1642                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1643                Err(_) => {
1644                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1645                    // responding to an invitation to the room, and therefore don't need to handle
1646                    // things that happen as a result of invites.
1647                    None
1648                }
1649            }
1650        };
1651        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1652            via: server_names.to_owned(),
1653        });
1654        let response = self.send(request).await?;
1655        self.finish_join_room(&response.room_id, pre_join_info).await
1656    }
1657
1658    /// Search the homeserver's directory of public rooms.
1659    ///
1660    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1661    /// a `get_public_rooms::Response`.
1662    ///
1663    /// # Arguments
1664    ///
1665    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1666    ///
1667    /// * `since` - Pagination token from a previous request.
1668    ///
1669    /// * `server` - The name of the server, if `None` the requested server is
1670    ///   used.
1671    ///
1672    /// # Examples
1673    /// ```no_run
1674    /// use matrix_sdk::Client;
1675    /// # use url::Url;
1676    /// # let homeserver = Url::parse("http://example.com").unwrap();
1677    /// # let limit = Some(10);
1678    /// # let since = Some("since token");
1679    /// # let server = Some("servername.com".try_into().unwrap());
1680    /// # async {
1681    /// let mut client = Client::new(homeserver).await.unwrap();
1682    ///
1683    /// client.public_rooms(limit, since, server).await;
1684    /// # };
1685    /// ```
1686    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1687    pub async fn public_rooms(
1688        &self,
1689        limit: Option<u32>,
1690        since: Option<&str>,
1691        server: Option<&ServerName>,
1692    ) -> HttpResult<get_public_rooms::v3::Response> {
1693        let limit = limit.map(UInt::from);
1694
1695        let request = assign!(get_public_rooms::v3::Request::new(), {
1696            limit,
1697            since: since.map(ToOwned::to_owned),
1698            server: server.map(ToOwned::to_owned),
1699        });
1700        self.send(request).await
1701    }
1702
1703    /// Create a room with the given parameters.
1704    ///
1705    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1706    /// created room.
1707    ///
1708    /// If you want to create a direct message with one specific user, you can
1709    /// use [`create_dm`][Self::create_dm], which is more convenient than
1710    /// assembling the [`create_room::v3::Request`] yourself.
1711    ///
1712    /// If the `is_direct` field of the request is set to `true` and at least
1713    /// one user is invited, the room will be automatically added to the direct
1714    /// rooms in the account data.
1715    ///
1716    /// # Examples
1717    ///
1718    /// ```no_run
1719    /// use matrix_sdk::{
1720    ///     Client,
1721    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1722    /// };
1723    /// # use url::Url;
1724    /// #
1725    /// # async {
1726    /// # let homeserver = Url::parse("http://example.com").unwrap();
1727    /// let request = CreateRoomRequest::new();
1728    /// let client = Client::new(homeserver).await.unwrap();
1729    /// assert!(client.create_room(request).await.is_ok());
1730    /// # };
1731    /// ```
1732    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1733        let invite = request.invite.clone();
1734        let is_direct_room = request.is_direct;
1735        let response = self.send(request).await?;
1736
1737        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1738
1739        let joined_room = Room::new(self.clone(), base_room);
1740
1741        if is_direct_room
1742            && !invite.is_empty()
1743            && let Err(error) =
1744                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1745        {
1746            // FIXME: Retry in the background
1747            error!("Failed to mark room as DM: {error}");
1748        }
1749
1750        Ok(joined_room)
1751    }
1752
1753    /// Create a DM room.
1754    ///
1755    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1756    /// given user being invited, the room marked `is_direct` and both the
1757    /// creator and invitee getting the default maximum power level.
1758    ///
1759    /// If the `e2e-encryption` feature is enabled, the room will also be
1760    /// encrypted.
1761    ///
1762    /// # Arguments
1763    ///
1764    /// * `user_id` - The ID of the user to create a DM for.
1765    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1766        #[cfg(feature = "e2e-encryption")]
1767        let initial_state = vec![
1768            InitialStateEvent::with_empty_state_key(
1769                RoomEncryptionEventContent::with_recommended_defaults(),
1770            )
1771            .to_raw_any(),
1772        ];
1773
1774        #[cfg(not(feature = "e2e-encryption"))]
1775        let initial_state = vec![];
1776
1777        let request = assign!(create_room::v3::Request::new(), {
1778            invite: vec![user_id.to_owned()],
1779            is_direct: true,
1780            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1781            initial_state,
1782        });
1783
1784        self.create_room(request).await
1785    }
1786
1787    /// Search the homeserver's directory for public rooms with a filter.
1788    ///
1789    /// # Arguments
1790    ///
1791    /// * `room_search` - The easiest way to create this request is using the
1792    ///   `get_public_rooms_filtered::Request` itself.
1793    ///
1794    /// # Examples
1795    ///
1796    /// ```no_run
1797    /// # use url::Url;
1798    /// # use matrix_sdk::Client;
1799    /// # async {
1800    /// # let homeserver = Url::parse("http://example.com")?;
1801    /// use matrix_sdk::ruma::{
1802    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1803    /// };
1804    /// # let mut client = Client::new(homeserver).await?;
1805    ///
1806    /// let mut filter = Filter::new();
1807    /// filter.generic_search_term = Some("rust".to_owned());
1808    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1809    /// request.filter = filter;
1810    ///
1811    /// let response = client.public_rooms_filtered(request).await?;
1812    ///
1813    /// for room in response.chunk {
1814    ///     println!("Found room {room:?}");
1815    /// }
1816    /// # anyhow::Ok(()) };
1817    /// ```
1818    pub async fn public_rooms_filtered(
1819        &self,
1820        request: get_public_rooms_filtered::v3::Request,
1821    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1822        self.send(request).await
1823    }
1824
1825    /// Send an arbitrary request to the server, without updating client state.
1826    ///
1827    /// **Warning:** Because this method *does not* update the client state, it
1828    /// is important to make sure that you account for this yourself, and
1829    /// use wrapper methods where available.  This method should *only* be
1830    /// used if a wrapper method for the endpoint you'd like to use is not
1831    /// available.
1832    ///
1833    /// # Arguments
1834    ///
1835    /// * `request` - A filled out and valid request for the endpoint to be hit
1836    ///
1837    /// * `timeout` - An optional request timeout setting, this overrides the
1838    ///   default request setting if one was set.
1839    ///
1840    /// # Examples
1841    ///
1842    /// ```no_run
1843    /// # use matrix_sdk::{Client, config::SyncSettings};
1844    /// # use url::Url;
1845    /// # async {
1846    /// # let homeserver = Url::parse("http://localhost:8080")?;
1847    /// # let mut client = Client::new(homeserver).await?;
1848    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1849    ///
1850    /// // First construct the request you want to make
1851    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1852    /// // for all available Endpoints
1853    /// let user_id = user_id!("@example:localhost").to_owned();
1854    /// let request = profile::get_profile::v3::Request::new(user_id);
1855    ///
1856    /// // Start the request using Client::send()
1857    /// let response = client.send(request).await?;
1858    ///
1859    /// // Check the corresponding Response struct to find out what types are
1860    /// // returned
1861    /// # anyhow::Ok(()) };
1862    /// ```
1863    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1864    where
1865        Request: OutgoingRequest + Clone + Debug,
1866        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1867    {
1868        SendRequest {
1869            client: self.clone(),
1870            request,
1871            config: None,
1872            send_progress: Default::default(),
1873        }
1874    }
1875
1876    pub(crate) async fn send_inner<Request>(
1877        &self,
1878        request: Request,
1879        config: Option<RequestConfig>,
1880        send_progress: SharedObservable<TransmissionProgress>,
1881    ) -> HttpResult<Request::IncomingResponse>
1882    where
1883        Request: OutgoingRequest + Debug,
1884        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1885    {
1886        let homeserver = self.homeserver().to_string();
1887        let access_token = self.access_token();
1888
1889        self.inner
1890            .http_client
1891            .send(
1892                request,
1893                config,
1894                homeserver,
1895                access_token.as_deref(),
1896                &self.supported_versions().await?,
1897                send_progress,
1898            )
1899            .await
1900    }
1901
1902    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1903        _ = self
1904            .inner
1905            .auth_ctx
1906            .session_change_sender
1907            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1908    }
1909
1910    /// Fetches server versions from network; no caching.
1911    pub async fn fetch_server_versions(
1912        &self,
1913        request_config: Option<RequestConfig>,
1914    ) -> HttpResult<get_supported_versions::Response> {
1915        let server_versions = self
1916            .inner
1917            .http_client
1918            .send(
1919                get_supported_versions::Request::new(),
1920                request_config,
1921                self.homeserver().to_string(),
1922                None,
1923                &SupportedVersions {
1924                    versions: [MatrixVersion::V1_0].into(),
1925                    features: Default::default(),
1926                },
1927                Default::default(),
1928            )
1929            .await?;
1930
1931        Ok(server_versions)
1932    }
1933
1934    /// Fetches client well_known from network; no caching.
1935    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1936        let server_url_string = self
1937            .server()
1938            .unwrap_or(
1939                // Sometimes people configure their well-known directly on the homeserver so use
1940                // this as a fallback when the server name is unknown.
1941                &self.homeserver(),
1942            )
1943            .to_string();
1944
1945        let well_known = self
1946            .inner
1947            .http_client
1948            .send(
1949                discover_homeserver::Request::new(),
1950                Some(RequestConfig::short_retry()),
1951                server_url_string,
1952                None,
1953                &SupportedVersions {
1954                    versions: [MatrixVersion::V1_0].into(),
1955                    features: Default::default(),
1956                },
1957                Default::default(),
1958            )
1959            .await;
1960
1961        match well_known {
1962            Ok(well_known) => Some(well_known),
1963            Err(http_error) => {
1964                // It is perfectly valid to not have a well-known file.
1965                // Maybe we should check for a specific error code to be sure?
1966                warn!("Failed to fetch client well-known: {http_error}");
1967                None
1968            }
1969        }
1970    }
1971
1972    /// Load server info from storage, or fetch them from network and cache
1973    /// them.
1974    async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1975        match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1976            Ok(Some(stored)) => {
1977                if let Some(server_info) =
1978                    stored.into_server_info().and_then(|info| info.maybe_decode())
1979                {
1980                    return Ok(server_info);
1981                }
1982            }
1983            Ok(None) => {
1984                // fallthrough: cache is empty
1985            }
1986            Err(err) => {
1987                warn!("error when loading cached server info: {err}");
1988                // fallthrough to network.
1989            }
1990        }
1991
1992        let server_versions = self.fetch_server_versions(None).await?;
1993        let well_known = self.fetch_client_well_known().await;
1994        let server_info = ServerInfo::new(
1995            server_versions.versions.clone(),
1996            server_versions.unstable_features.clone(),
1997            well_known.map(Into::into),
1998        );
1999
2000        // Attempt to cache the result in storage.
2001        {
2002            if let Err(err) = self
2003                .state_store()
2004                .set_kv_data(
2005                    StateStoreDataKey::ServerInfo,
2006                    StateStoreDataValue::ServerInfo(server_info.clone()),
2007                )
2008                .await
2009            {
2010                warn!("error when caching server info: {err}");
2011            }
2012        }
2013
2014        Ok(server_info)
2015    }
2016
2017    async fn get_or_load_and_cache_server_info<
2018        Value,
2019        MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2020    >(
2021        &self,
2022        map: MapFunction,
2023    ) -> HttpResult<Value> {
2024        let server_info = &self.inner.caches.server_info;
2025        if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2026            return Ok(val);
2027        }
2028
2029        let mut guarded_server_info = server_info.write().await;
2030        if let CachedValue::Cached(val) = map(&guarded_server_info) {
2031            return Ok(val);
2032        }
2033
2034        let server_info = self.load_or_fetch_server_info().await?;
2035
2036        // Fill both unstable features and server versions at once.
2037        let mut supported = server_info.supported_versions();
2038        if supported.versions.is_empty() {
2039            supported.versions = [MatrixVersion::V1_0].into();
2040        }
2041
2042        guarded_server_info.supported_versions = CachedValue::Cached(supported);
2043        guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2044
2045        // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2046        // fetch an optional property), the function will always return some.
2047        Ok(map(&guarded_server_info).unwrap_cached_value())
2048    }
2049
2050    /// Get the Matrix versions and features supported by the homeserver by
2051    /// fetching them from the server or the cache.
2052    ///
2053    /// This is equivalent to calling both [`Client::server_versions()`] and
2054    /// [`Client::unstable_features()`]. To always fetch the result from the
2055    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2056    /// and then `.as_supported_versions()` on the response.
2057    ///
2058    /// # Examples
2059    ///
2060    /// ```no_run
2061    /// use ruma::api::{FeatureFlag, MatrixVersion};
2062    /// # use matrix_sdk::{Client, config::SyncSettings};
2063    /// # use url::Url;
2064    /// # async {
2065    /// # let homeserver = Url::parse("http://localhost:8080")?;
2066    /// # let mut client = Client::new(homeserver).await?;
2067    ///
2068    /// let supported = client.supported_versions().await?;
2069    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2070    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2071    ///
2072    /// let msc_x_feature = FeatureFlag::from("msc_x");
2073    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2074    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2075    /// # anyhow::Ok(()) };
2076    /// ```
2077    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2078        self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2079            .await
2080    }
2081
2082    /// Get the Matrix versions supported by the homeserver by fetching them
2083    /// from the server or the cache.
2084    ///
2085    /// # Examples
2086    ///
2087    /// ```no_run
2088    /// use ruma::api::MatrixVersion;
2089    /// # use matrix_sdk::{Client, config::SyncSettings};
2090    /// # use url::Url;
2091    /// # async {
2092    /// # let homeserver = Url::parse("http://localhost:8080")?;
2093    /// # let mut client = Client::new(homeserver).await?;
2094    ///
2095    /// let server_versions = client.server_versions().await?;
2096    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2097    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2098    /// # anyhow::Ok(()) };
2099    /// ```
2100    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2101        self.get_or_load_and_cache_server_info(|server_info| {
2102            server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2103        })
2104        .await
2105    }
2106
2107    /// Get the unstable features supported by the homeserver by fetching them
2108    /// from the server or the cache.
2109    ///
2110    /// # Examples
2111    ///
2112    /// ```no_run
2113    /// use matrix_sdk::ruma::api::FeatureFlag;
2114    /// # use matrix_sdk::{Client, config::SyncSettings};
2115    /// # use url::Url;
2116    /// # async {
2117    /// # let homeserver = Url::parse("http://localhost:8080")?;
2118    /// # let mut client = Client::new(homeserver).await?;
2119    ///
2120    /// let msc_x_feature = FeatureFlag::from("msc_x");
2121    /// let unstable_features = client.unstable_features().await?;
2122    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2123    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2124    /// # anyhow::Ok(()) };
2125    /// ```
2126    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2127        self.get_or_load_and_cache_server_info(|server_info| {
2128            server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2129        })
2130        .await
2131    }
2132
2133    /// Get information about the homeserver's advertised RTC foci by fetching
2134    /// the well-known file from the server or the cache.
2135    ///
2136    /// # Examples
2137    /// ```no_run
2138    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2139    /// # use url::Url;
2140    /// # async {
2141    /// # let homeserver = Url::parse("http://localhost:8080")?;
2142    /// # let mut client = Client::new(homeserver).await?;
2143    /// let rtc_foci = client.rtc_foci().await?;
2144    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2145    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2146    ///     _ => None,
2147    /// });
2148    /// if let Some(info) = default_livekit_focus_info {
2149    ///     println!("Default LiveKit service URL: {}", info.service_url);
2150    /// }
2151    /// # anyhow::Ok(()) };
2152    /// ```
2153    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2154        let well_known = self
2155            .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2156            .await?;
2157
2158        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2159    }
2160
2161    /// Empty the server version and unstable features cache.
2162    ///
2163    /// Since the SDK caches server info (versions, unstable features,
2164    /// well-known etc), it's possible to have a stale entry in the cache. This
2165    /// functions makes it possible to force reset it.
2166    pub async fn reset_server_info(&self) -> Result<()> {
2167        // Empty the in-memory caches.
2168        let mut guard = self.inner.caches.server_info.write().await;
2169        guard.supported_versions = CachedValue::NotSet;
2170
2171        // Empty the store cache.
2172        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2173    }
2174
2175    /// Check whether MSC 4028 is enabled on the homeserver.
2176    ///
2177    /// # Examples
2178    ///
2179    /// ```no_run
2180    /// # use matrix_sdk::{Client, config::SyncSettings};
2181    /// # use url::Url;
2182    /// # async {
2183    /// # let homeserver = Url::parse("http://localhost:8080")?;
2184    /// # let mut client = Client::new(homeserver).await?;
2185    /// let msc4028_enabled =
2186    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2187    /// # anyhow::Ok(()) };
2188    /// ```
2189    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2190        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2191    }
2192
2193    /// Get information of all our own devices.
2194    ///
2195    /// # Examples
2196    ///
2197    /// ```no_run
2198    /// # use matrix_sdk::{Client, config::SyncSettings};
2199    /// # use url::Url;
2200    /// # async {
2201    /// # let homeserver = Url::parse("http://localhost:8080")?;
2202    /// # let mut client = Client::new(homeserver).await?;
2203    /// let response = client.devices().await?;
2204    ///
2205    /// for device in response.devices {
2206    ///     println!(
2207    ///         "Device: {} {}",
2208    ///         device.device_id,
2209    ///         device.display_name.as_deref().unwrap_or("")
2210    ///     );
2211    /// }
2212    /// # anyhow::Ok(()) };
2213    /// ```
2214    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2215        let request = get_devices::v3::Request::new();
2216
2217        self.send(request).await
2218    }
2219
2220    /// Delete the given devices from the server.
2221    ///
2222    /// # Arguments
2223    ///
2224    /// * `devices` - The list of devices that should be deleted from the
2225    ///   server.
2226    ///
2227    /// * `auth_data` - This request requires user interactive auth, the first
2228    ///   request needs to set this to `None` and will always fail with an
2229    ///   `UiaaResponse`. The response will contain information for the
2230    ///   interactive auth and the same request needs to be made but this time
2231    ///   with some `auth_data` provided.
2232    ///
2233    /// ```no_run
2234    /// # use matrix_sdk::{
2235    /// #    ruma::{api::client::uiaa, device_id},
2236    /// #    Client, Error, config::SyncSettings,
2237    /// # };
2238    /// # use serde_json::json;
2239    /// # use url::Url;
2240    /// # use std::collections::BTreeMap;
2241    /// # async {
2242    /// # let homeserver = Url::parse("http://localhost:8080")?;
2243    /// # let mut client = Client::new(homeserver).await?;
2244    /// let devices = &[device_id!("DEVICEID").to_owned()];
2245    ///
2246    /// if let Err(e) = client.delete_devices(devices, None).await {
2247    ///     if let Some(info) = e.as_uiaa_response() {
2248    ///         let mut password = uiaa::Password::new(
2249    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2250    ///             "wordpass".to_owned(),
2251    ///         );
2252    ///         password.session = info.session.clone();
2253    ///
2254    ///         client
2255    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2256    ///             .await?;
2257    ///     }
2258    /// }
2259    /// # anyhow::Ok(()) };
2260    pub async fn delete_devices(
2261        &self,
2262        devices: &[OwnedDeviceId],
2263        auth_data: Option<uiaa::AuthData>,
2264    ) -> HttpResult<delete_devices::v3::Response> {
2265        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2266        request.auth = auth_data;
2267
2268        self.send(request).await
2269    }
2270
2271    /// Change the display name of a device owned by the current user.
2272    ///
2273    /// Returns a `update_device::Response` which specifies the result
2274    /// of the operation.
2275    ///
2276    /// # Arguments
2277    ///
2278    /// * `device_id` - The ID of the device to change the display name of.
2279    /// * `display_name` - The new display name to set.
2280    pub async fn rename_device(
2281        &self,
2282        device_id: &DeviceId,
2283        display_name: &str,
2284    ) -> HttpResult<update_device::v3::Response> {
2285        let mut request = update_device::v3::Request::new(device_id.to_owned());
2286        request.display_name = Some(display_name.to_owned());
2287
2288        self.send(request).await
2289    }
2290
2291    /// Synchronize the client's state with the latest state on the server.
2292    ///
2293    /// ## Syncing Events
2294    ///
2295    /// Messages or any other type of event need to be periodically fetched from
2296    /// the server, this is achieved by sending a `/sync` request to the server.
2297    ///
2298    /// The first sync is sent out without a [`token`]. The response of the
2299    /// first sync will contain a [`next_batch`] field which should then be
2300    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2301    /// don't receive the same events multiple times.
2302    ///
2303    /// ## Long Polling
2304    ///
2305    /// A sync should in the usual case always be in flight. The
2306    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2307    /// long the server will wait for new events before it will respond.
2308    /// The server will respond immediately if some new events arrive before the
2309    /// timeout has expired. If no changes arrive and the timeout expires an
2310    /// empty sync response will be sent to the client.
2311    ///
2312    /// This method of sending a request that may not receive a response
2313    /// immediately is called long polling.
2314    ///
2315    /// ## Filtering Events
2316    ///
2317    /// The number or type of messages and events that the client should receive
2318    /// from the server can be altered using a [`Filter`].
2319    ///
2320    /// Filters can be non-trivial and, since they will be sent with every sync
2321    /// request, they may take up a bunch of unnecessary bandwidth.
2322    ///
2323    /// Luckily filters can be uploaded to the server and reused using an unique
2324    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2325    /// method.
2326    ///
2327    /// # Arguments
2328    ///
2329    /// * `sync_settings` - Settings for the sync call, this allows us to set
2330    /// various options to configure the sync:
2331    ///     * [`filter`] - To configure which events we receive and which get
2332    ///       [filtered] by the server
2333    ///     * [`timeout`] - To configure our [long polling] setup.
2334    ///     * [`token`] - To tell the server which events we already received
2335    ///       and where we wish to continue syncing.
2336    ///     * [`full_state`] - To tell the server that we wish to receive all
2337    ///       state events, regardless of our configured [`token`].
2338    ///     * [`set_presence`] - To tell the server to set the presence and to
2339    ///       which state.
2340    ///
2341    /// # Examples
2342    ///
2343    /// ```no_run
2344    /// # use url::Url;
2345    /// # async {
2346    /// # let homeserver = Url::parse("http://localhost:8080")?;
2347    /// # let username = "";
2348    /// # let password = "";
2349    /// use matrix_sdk::{
2350    ///     Client, config::SyncSettings,
2351    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2352    /// };
2353    ///
2354    /// let client = Client::new(homeserver).await?;
2355    /// client.matrix_auth().login_username(username, password).send().await?;
2356    ///
2357    /// // Sync once so we receive the client state and old messages.
2358    /// client.sync_once(SyncSettings::default()).await?;
2359    ///
2360    /// // Register our handler so we start responding once we receive a new
2361    /// // event.
2362    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2363    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2364    /// });
2365    ///
2366    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2367    /// // from our `sync_once()` call automatically.
2368    /// client.sync(SyncSettings::default()).await;
2369    /// # anyhow::Ok(()) };
2370    /// ```
2371    ///
2372    /// [`sync`]: #method.sync
2373    /// [`SyncSettings`]: crate::config::SyncSettings
2374    /// [`token`]: crate::config::SyncSettings#method.token
2375    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2376    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2377    /// [`set_presence`]: ruma::presence::PresenceState
2378    /// [`filter`]: crate::config::SyncSettings#method.filter
2379    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2380    /// [`next_batch`]: SyncResponse#structfield.next_batch
2381    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2382    /// [long polling]: #long-polling
2383    /// [filtered]: #filtering-events
2384    #[instrument(skip(self))]
2385    pub async fn sync_once(
2386        &self,
2387        sync_settings: crate::config::SyncSettings,
2388    ) -> Result<SyncResponse> {
2389        // The sync might not return for quite a while due to the timeout.
2390        // We'll see if there's anything crypto related to send out before we
2391        // sync, i.e. if we closed our client after a sync but before the
2392        // crypto requests were sent out.
2393        //
2394        // This will mostly be a no-op.
2395        #[cfg(feature = "e2e-encryption")]
2396        if let Err(e) = self.send_outgoing_requests().await {
2397            error!(error = ?e, "Error while sending outgoing E2EE requests");
2398        }
2399
2400        let token = match sync_settings.token {
2401            SyncToken::Specific(token) => Some(token),
2402            SyncToken::NoToken => None,
2403            SyncToken::ReusePrevious => self.sync_token().await,
2404        };
2405
2406        let request = assign!(sync_events::v3::Request::new(), {
2407            filter: sync_settings.filter.map(|f| *f),
2408            since: token,
2409            full_state: sync_settings.full_state,
2410            set_presence: sync_settings.set_presence,
2411            timeout: sync_settings.timeout,
2412            use_state_after: true,
2413        });
2414        let mut request_config = self.request_config();
2415        if let Some(timeout) = sync_settings.timeout {
2416            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2417            request_config.timeout = Some(base_timeout + timeout);
2418        }
2419
2420        let response = self.send(request).with_request_config(request_config).await?;
2421        let next_batch = response.next_batch.clone();
2422        let response = self.process_sync(response).await?;
2423
2424        #[cfg(feature = "e2e-encryption")]
2425        if let Err(e) = self.send_outgoing_requests().await {
2426            error!(error = ?e, "Error while sending outgoing E2EE requests");
2427        }
2428
2429        self.inner.sync_beat.notify(usize::MAX);
2430
2431        Ok(SyncResponse::new(next_batch, response))
2432    }
2433
2434    /// Repeatedly synchronize the client state with the server.
2435    ///
2436    /// This method will only return on error, if cancellation is needed
2437    /// the method should be wrapped in a cancelable task or the
2438    /// [`Client::sync_with_callback`] method can be used or
2439    /// [`Client::sync_with_result_callback`] if you want to handle error
2440    /// cases in the loop, too.
2441    ///
2442    /// This method will internally call [`Client::sync_once`] in a loop.
2443    ///
2444    /// This method can be used with the [`Client::add_event_handler`]
2445    /// method to react to individual events. If you instead wish to handle
2446    /// events in a bulk manner the [`Client::sync_with_callback`],
2447    /// [`Client::sync_with_result_callback`] and
2448    /// [`Client::sync_stream`] methods can be used instead. Those methods
2449    /// repeatedly return the whole sync response.
2450    ///
2451    /// # Arguments
2452    ///
2453    /// * `sync_settings` - Settings for the sync call. *Note* that those
2454    ///   settings will be only used for the first sync call. See the argument
2455    ///   docs for [`Client::sync_once`] for more info.
2456    ///
2457    /// # Return
2458    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2459    /// up to the user of the API to check the error and decide whether the sync
2460    /// should continue or not.
2461    ///
2462    /// # Examples
2463    ///
2464    /// ```no_run
2465    /// # use url::Url;
2466    /// # async {
2467    /// # let homeserver = Url::parse("http://localhost:8080")?;
2468    /// # let username = "";
2469    /// # let password = "";
2470    /// use matrix_sdk::{
2471    ///     Client, config::SyncSettings,
2472    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2473    /// };
2474    ///
2475    /// let client = Client::new(homeserver).await?;
2476    /// client.matrix_auth().login_username(&username, &password).send().await?;
2477    ///
2478    /// // Register our handler so we start responding once we receive a new
2479    /// // event.
2480    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2481    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2482    /// });
2483    ///
2484    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2485    /// // automatically.
2486    /// client.sync(SyncSettings::default()).await?;
2487    /// # anyhow::Ok(()) };
2488    /// ```
2489    ///
2490    /// [argument docs]: #method.sync_once
2491    /// [`sync_with_callback`]: #method.sync_with_callback
2492    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2493        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2494    }
2495
2496    /// Repeatedly call sync to synchronize the client state with the server.
2497    ///
2498    /// # Arguments
2499    ///
2500    /// * `sync_settings` - Settings for the sync call. *Note* that those
2501    ///   settings will be only used for the first sync call. See the argument
2502    ///   docs for [`Client::sync_once`] for more info.
2503    ///
2504    /// * `callback` - A callback that will be called every time a successful
2505    ///   response has been fetched from the server. The callback must return a
2506    ///   boolean which signalizes if the method should stop syncing. If the
2507    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2508    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2509    ///
2510    /// # Return
2511    /// The sync runs until an error occurs or the
2512    /// callback indicates that the Loop should stop. If the callback asked for
2513    /// a regular stop, the result will be `Ok(())` otherwise the
2514    /// `Err(Error)` is returned.
2515    ///
2516    /// # Examples
2517    ///
2518    /// The following example demonstrates how to sync forever while sending all
2519    /// the interesting events through a mpsc channel to another thread e.g. a
2520    /// UI thread.
2521    ///
2522    /// ```no_run
2523    /// # use std::time::Duration;
2524    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2525    /// # use url::Url;
2526    /// # async {
2527    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2528    /// # let mut client = Client::new(homeserver).await.unwrap();
2529    ///
2530    /// use tokio::sync::mpsc::channel;
2531    ///
2532    /// let (tx, rx) = channel(100);
2533    ///
2534    /// let sync_channel = &tx;
2535    /// let sync_settings = SyncSettings::new()
2536    ///     .timeout(Duration::from_secs(30));
2537    ///
2538    /// client
2539    ///     .sync_with_callback(sync_settings, |response| async move {
2540    ///         let channel = sync_channel;
2541    ///         for (room_id, room) in response.rooms.joined {
2542    ///             for event in room.timeline.events {
2543    ///                 channel.send(event).await.unwrap();
2544    ///             }
2545    ///         }
2546    ///
2547    ///         LoopCtrl::Continue
2548    ///     })
2549    ///     .await;
2550    /// };
2551    /// ```
2552    #[instrument(skip_all)]
2553    pub async fn sync_with_callback<C>(
2554        &self,
2555        sync_settings: crate::config::SyncSettings,
2556        callback: impl Fn(SyncResponse) -> C,
2557    ) -> Result<(), Error>
2558    where
2559        C: Future<Output = LoopCtrl>,
2560    {
2561        self.sync_with_result_callback(sync_settings, |result| async {
2562            Ok(callback(result?).await)
2563        })
2564        .await
2565    }
2566
2567    /// Repeatedly call sync to synchronize the client state with the server.
2568    ///
2569    /// # Arguments
2570    ///
2571    /// * `sync_settings` - Settings for the sync call. *Note* that those
2572    ///   settings will be only used for the first sync call. See the argument
2573    ///   docs for [`Client::sync_once`] for more info.
2574    ///
2575    /// * `callback` - A callback that will be called every time after a
2576    ///   response has been received, failure or not. The callback returns a
2577    ///   `Result<LoopCtrl, Error>`, too. When returning
2578    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2579    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2580    ///   function returns `Ok(())`. In case the callback can't handle the
2581    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2582    ///   which results in the sync ending and the `Err(Error)` being returned.
2583    ///
2584    /// # Return
2585    /// The sync runs until an error occurs that the callback can't handle or
2586    /// the callback indicates that the Loop should stop. If the callback
2587    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2588    /// `Err(Error)` is returned.
2589    ///
2590    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2591    /// this, and are handled first without sending the result to the
2592    /// callback. Only after they have exceeded is the `Result` handed to
2593    /// the callback.
2594    ///
2595    /// # Examples
2596    ///
2597    /// The following example demonstrates how to sync forever while sending all
2598    /// the interesting events through a mpsc channel to another thread e.g. a
2599    /// UI thread.
2600    ///
2601    /// ```no_run
2602    /// # use std::time::Duration;
2603    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2604    /// # use url::Url;
2605    /// # async {
2606    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2607    /// # let mut client = Client::new(homeserver).await.unwrap();
2608    /// #
2609    /// use tokio::sync::mpsc::channel;
2610    ///
2611    /// let (tx, rx) = channel(100);
2612    ///
2613    /// let sync_channel = &tx;
2614    /// let sync_settings = SyncSettings::new()
2615    ///     .timeout(Duration::from_secs(30));
2616    ///
2617    /// client
2618    ///     .sync_with_result_callback(sync_settings, |response| async move {
2619    ///         let channel = sync_channel;
2620    ///         let sync_response = response?;
2621    ///         for (room_id, room) in sync_response.rooms.joined {
2622    ///              for event in room.timeline.events {
2623    ///                  channel.send(event).await.unwrap();
2624    ///               }
2625    ///         }
2626    ///
2627    ///         Ok(LoopCtrl::Continue)
2628    ///     })
2629    ///     .await;
2630    /// };
2631    /// ```
2632    #[instrument(skip(self, callback))]
2633    pub async fn sync_with_result_callback<C>(
2634        &self,
2635        sync_settings: crate::config::SyncSettings,
2636        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2637    ) -> Result<(), Error>
2638    where
2639        C: Future<Output = Result<LoopCtrl, Error>>,
2640    {
2641        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2642
2643        while let Some(result) = sync_stream.next().await {
2644            trace!("Running callback");
2645            if callback(result).await? == LoopCtrl::Break {
2646                trace!("Callback told us to stop");
2647                break;
2648            }
2649            trace!("Done running callback");
2650        }
2651
2652        Ok(())
2653    }
2654
2655    //// Repeatedly synchronize the client state with the server.
2656    ///
2657    /// This method will internally call [`Client::sync_once`] in a loop and is
2658    /// equivalent to the [`Client::sync`] method but the responses are provided
2659    /// as an async stream.
2660    ///
2661    /// # Arguments
2662    ///
2663    /// * `sync_settings` - Settings for the sync call. *Note* that those
2664    ///   settings will be only used for the first sync call. See the argument
2665    ///   docs for [`Client::sync_once`] for more info.
2666    ///
2667    /// # Examples
2668    ///
2669    /// ```no_run
2670    /// # use url::Url;
2671    /// # async {
2672    /// # let homeserver = Url::parse("http://localhost:8080")?;
2673    /// # let username = "";
2674    /// # let password = "";
2675    /// use futures_util::StreamExt;
2676    /// use matrix_sdk::{Client, config::SyncSettings};
2677    ///
2678    /// let client = Client::new(homeserver).await?;
2679    /// client.matrix_auth().login_username(&username, &password).send().await?;
2680    ///
2681    /// let mut sync_stream =
2682    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2683    ///
2684    /// while let Some(Ok(response)) = sync_stream.next().await {
2685    ///     for room in response.rooms.joined.values() {
2686    ///         for e in &room.timeline.events {
2687    ///             if let Ok(event) = e.raw().deserialize() {
2688    ///                 println!("Received event {:?}", event);
2689    ///             }
2690    ///         }
2691    ///     }
2692    /// }
2693    ///
2694    /// # anyhow::Ok(()) };
2695    /// ```
2696    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2697    #[instrument(skip(self))]
2698    pub async fn sync_stream(
2699        &self,
2700        mut sync_settings: crate::config::SyncSettings,
2701    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2702        let mut is_first_sync = true;
2703        let mut timeout = None;
2704        let mut last_sync_time: Option<Instant> = None;
2705
2706        let parent_span = Span::current();
2707
2708        async_stream::stream!({
2709            loop {
2710                trace!("Syncing");
2711
2712                if sync_settings.ignore_timeout_on_first_sync {
2713                    if is_first_sync {
2714                        timeout = sync_settings.timeout.take();
2715                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
2716                        sync_settings.timeout = timeout.take();
2717                    }
2718
2719                    is_first_sync = false;
2720                }
2721
2722                yield self
2723                    .sync_loop_helper(&mut sync_settings)
2724                    .instrument(parent_span.clone())
2725                    .await;
2726
2727                Client::delay_sync(&mut last_sync_time).await
2728            }
2729        })
2730    }
2731
2732    /// Get the current, if any, sync token of the client.
2733    /// This will be None if the client didn't sync at least once.
2734    pub(crate) async fn sync_token(&self) -> Option<String> {
2735        self.inner.base_client.sync_token().await
2736    }
2737
2738    /// Gets information about the owner of a given access token.
2739    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2740        let request = whoami::v3::Request::new();
2741        self.send(request).await
2742    }
2743
2744    /// Subscribes a new receiver to client SessionChange broadcasts.
2745    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2746        let broadcast = &self.auth_ctx().session_change_sender;
2747        broadcast.subscribe()
2748    }
2749
2750    /// Sets the save/restore session callbacks.
2751    ///
2752    /// This is another mechanism to get synchronous updates to session tokens,
2753    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2754    pub fn set_session_callbacks(
2755        &self,
2756        reload_session_callback: Box<ReloadSessionCallback>,
2757        save_session_callback: Box<SaveSessionCallback>,
2758    ) -> Result<()> {
2759        self.inner
2760            .auth_ctx
2761            .reload_session_callback
2762            .set(reload_session_callback)
2763            .map_err(|_| Error::MultipleSessionCallbacks)?;
2764
2765        self.inner
2766            .auth_ctx
2767            .save_session_callback
2768            .set(save_session_callback)
2769            .map_err(|_| Error::MultipleSessionCallbacks)?;
2770
2771        Ok(())
2772    }
2773
2774    /// Get the notification settings of the current owner of the client.
2775    pub async fn notification_settings(&self) -> NotificationSettings {
2776        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2777        NotificationSettings::new(self.clone(), ruleset)
2778    }
2779
2780    /// Create a new specialized `Client` that can process notifications.
2781    ///
2782    /// See [`CrossProcessLock::new`] to learn more about
2783    /// `cross_process_store_locks_holder_name`.
2784    ///
2785    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
2786    pub async fn notification_client(
2787        &self,
2788        cross_process_store_locks_holder_name: String,
2789    ) -> Result<Client> {
2790        let client = Client {
2791            inner: ClientInner::new(
2792                self.inner.auth_ctx.clone(),
2793                self.server().cloned(),
2794                self.homeserver(),
2795                self.sliding_sync_version(),
2796                self.inner.http_client.clone(),
2797                self.inner
2798                    .base_client
2799                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2800                    .await?,
2801                self.inner.caches.server_info.read().await.clone(),
2802                self.inner.respect_login_well_known,
2803                self.inner.event_cache.clone(),
2804                self.inner.send_queue_data.clone(),
2805                self.inner.latest_events.clone(),
2806                #[cfg(feature = "e2e-encryption")]
2807                self.inner.e2ee.encryption_settings,
2808                #[cfg(feature = "e2e-encryption")]
2809                self.inner.enable_share_history_on_invite,
2810                cross_process_store_locks_holder_name,
2811                #[cfg(feature = "experimental-search")]
2812                self.inner.search_index.clone(),
2813                self.inner.thread_subscription_catchup.clone(),
2814            )
2815            .await,
2816        };
2817
2818        Ok(client)
2819    }
2820
2821    /// The [`EventCache`] instance for this [`Client`].
2822    pub fn event_cache(&self) -> &EventCache {
2823        // SAFETY: always initialized in the `Client` ctor.
2824        self.inner.event_cache.get().unwrap()
2825    }
2826
2827    /// The [`LatestEvents`] instance for this [`Client`].
2828    pub async fn latest_events(&self) -> &LatestEvents {
2829        self.inner
2830            .latest_events
2831            .get_or_init(|| async {
2832                LatestEvents::new(
2833                    WeakClient::from_client(self),
2834                    self.event_cache().clone(),
2835                    SendQueue::new(self.clone()),
2836                )
2837            })
2838            .await
2839    }
2840
2841    /// Waits until an at least partially synced room is received, and returns
2842    /// it.
2843    ///
2844    /// **Note: this function will loop endlessly until either it finds the room
2845    /// or an externally set timeout happens.**
2846    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2847        loop {
2848            if let Some(room) = self.get_room(room_id) {
2849                if room.is_state_partially_or_fully_synced() {
2850                    debug!("Found just created room!");
2851                    return room;
2852                }
2853                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2854            } else {
2855                debug!("Room wasn't found, waiting for sync beat to try again");
2856            }
2857            self.inner.sync_beat.listen().await;
2858        }
2859    }
2860
2861    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2862    /// join it.
2863    pub async fn knock(
2864        &self,
2865        room_id_or_alias: OwnedRoomOrAliasId,
2866        reason: Option<String>,
2867        server_names: Vec<OwnedServerName>,
2868    ) -> Result<Room> {
2869        let request =
2870            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2871        let response = self.send(request).await?;
2872        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2873        Ok(Room::new(self.clone(), base_room))
2874    }
2875
2876    /// Checks whether the provided `user_id` belongs to an ignored user.
2877    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2878        self.base_client().is_user_ignored(user_id).await
2879    }
2880
2881    /// Gets the `max_upload_size` value from the homeserver, getting either a
2882    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2883    /// missing.
2884    ///
2885    /// Check the spec for more info:
2886    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2887    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2888        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2889        if let Some(data) = max_upload_size_lock.get() {
2890            return Ok(data.to_owned());
2891        }
2892
2893        // Use the authenticated endpoint when the server supports it.
2894        let supported_versions = self.supported_versions().await?;
2895        let use_auth =
2896            authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2897
2898        let upload_size = if use_auth {
2899            self.send(authenticated_media::get_media_config::v1::Request::default())
2900                .await?
2901                .upload_size
2902        } else {
2903            #[allow(deprecated)]
2904            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2905        };
2906
2907        match max_upload_size_lock.set(upload_size) {
2908            Ok(_) => Ok(upload_size),
2909            Err(error) => {
2910                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2911            }
2912        }
2913    }
2914
2915    /// The settings to use for decrypting events.
2916    #[cfg(feature = "e2e-encryption")]
2917    pub fn decryption_settings(&self) -> &DecryptionSettings {
2918        &self.base_client().decryption_settings
2919    }
2920
2921    /// Returns the [`SearchIndex`] for this [`Client`].
2922    #[cfg(feature = "experimental-search")]
2923    pub fn search_index(&self) -> &SearchIndex {
2924        &self.inner.search_index
2925    }
2926
2927    /// Whether the client is configured to take thread subscriptions (MSC4306
2928    /// and MSC4308) into account.
2929    ///
2930    /// This may cause filtering out of thread subscriptions, and loading the
2931    /// thread subscriptions via the sliding sync extension, when the room
2932    /// list service is being used.
2933    pub fn enabled_thread_subscriptions(&self) -> bool {
2934        match self.base_client().threading_support {
2935            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2936            ThreadingSupport::Disabled => false,
2937        }
2938    }
2939
2940    /// Fetch thread subscriptions changes between `from` and up to `to`.
2941    ///
2942    /// The `limit` optional parameter can be used to limit the number of
2943    /// entries in a response. It can also be overridden by the server, if
2944    /// it's deemed too large.
2945    pub async fn fetch_thread_subscriptions(
2946        &self,
2947        from: Option<String>,
2948        to: Option<String>,
2949        limit: Option<UInt>,
2950    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2951        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2952            from,
2953            to,
2954            limit,
2955        });
2956        Ok(self.send(request).await?)
2957    }
2958
2959    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
2960        self.inner.thread_subscription_catchup.get().unwrap()
2961    }
2962}
2963
2964#[cfg(any(feature = "testing", test))]
2965impl Client {
2966    /// Test helper to mark users as tracked by the crypto layer.
2967    #[cfg(feature = "e2e-encryption")]
2968    pub async fn update_tracked_users_for_testing(
2969        &self,
2970        user_ids: impl IntoIterator<Item = &UserId>,
2971    ) {
2972        let olm = self.olm_machine().await;
2973        let olm = olm.as_ref().unwrap();
2974        olm.update_tracked_users(user_ids).await.unwrap();
2975    }
2976}
2977
2978/// A weak reference to the inner client, useful when trying to get a handle
2979/// on the owning client.
2980#[derive(Clone, Debug)]
2981pub(crate) struct WeakClient {
2982    client: Weak<ClientInner>,
2983}
2984
2985impl WeakClient {
2986    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2987    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2988        Self { client: Arc::downgrade(client) }
2989    }
2990
2991    /// Construct a [`WeakClient`] from a [`Client`].
2992    pub fn from_client(client: &Client) -> Self {
2993        Self::from_inner(&client.inner)
2994    }
2995
2996    /// Attempts to get a [`Client`] from this [`WeakClient`].
2997    pub fn get(&self) -> Option<Client> {
2998        self.client.upgrade().map(|inner| Client { inner })
2999    }
3000
3001    /// Gets the number of strong (`Arc`) pointers still pointing to this
3002    /// client.
3003    #[allow(dead_code)]
3004    pub fn strong_count(&self) -> usize {
3005        self.client.strong_count()
3006    }
3007}
3008
3009#[derive(Clone)]
3010struct ClientServerInfo {
3011    /// The Matrix versions and unstable features the server supports (known
3012    /// ones only).
3013    supported_versions: CachedValue<SupportedVersions>,
3014
3015    /// The server's well-known file, if any.
3016    well_known: CachedValue<Option<WellKnownResponse>>,
3017}
3018
3019/// A cached value that can either be set or not set, used to avoid confusion
3020/// between a value that is set to `None` (because it doesn't exist) and a value
3021/// that has not been cached yet.
3022#[derive(Clone)]
3023enum CachedValue<Value> {
3024    /// A value has been cached.
3025    Cached(Value),
3026    /// Nothing has been cached yet.
3027    NotSet,
3028}
3029
3030impl<Value> CachedValue<Value> {
3031    /// Unwraps the cached value, returning it if it exists.
3032    ///
3033    /// # Panics
3034    ///
3035    /// If the cached value is not set, this will panic.
3036    fn unwrap_cached_value(self) -> Value {
3037        match self {
3038            CachedValue::Cached(value) => value,
3039            CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3040        }
3041    }
3042
3043    /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3044    fn as_ref(&self) -> CachedValue<&Value> {
3045        match self {
3046            Self::Cached(value) => CachedValue::Cached(value),
3047            Self::NotSet => CachedValue::NotSet,
3048        }
3049    }
3050
3051    /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3052    /// function to a contained value (if `Cached`) or returns `NotSet` (if
3053    /// `NotSet`).
3054    fn map<Other, F>(self, f: F) -> CachedValue<Other>
3055    where
3056        F: FnOnce(Value) -> Other,
3057    {
3058        match self {
3059            Self::Cached(value) => CachedValue::Cached(f(value)),
3060            Self::NotSet => CachedValue::NotSet,
3061        }
3062    }
3063}
3064
3065/// Information about the state of a room before we joined it.
3066#[derive(Debug, Clone, Default)]
3067struct PreJoinRoomInfo {
3068    /// The user who invited us to the room, if any.
3069    pub inviter: Option<RoomMember>,
3070}
3071
3072// The http mocking library is not supported for wasm32
3073#[cfg(all(test, not(target_family = "wasm")))]
3074pub(crate) mod tests {
3075    use std::{sync::Arc, time::Duration};
3076
3077    use assert_matches::assert_matches;
3078    use assert_matches2::assert_let;
3079    use eyeball::SharedObservable;
3080    use futures_util::{FutureExt, StreamExt, pin_mut};
3081    use js_int::{UInt, uint};
3082    use matrix_sdk_base::{
3083        RoomState,
3084        store::{MemoryStore, StoreConfig},
3085    };
3086    use matrix_sdk_test::{
3087        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3088        event_factory::EventFactory,
3089    };
3090    #[cfg(target_family = "wasm")]
3091    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3092
3093    use ruma::{
3094        RoomId, ServerName, UserId,
3095        api::{
3096            FeatureFlag, MatrixVersion,
3097            client::{
3098                discovery::discover_homeserver::RtcFocusInfo,
3099                room::create_room::v3::Request as CreateRoomRequest,
3100            },
3101        },
3102        assign,
3103        events::{
3104            ignored_user_list::IgnoredUserListEventContent,
3105            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3106        },
3107        owned_room_id, owned_user_id, room_alias_id, room_id,
3108    };
3109    use serde_json::json;
3110    use stream_assert::{assert_next_matches, assert_pending};
3111    use tokio::{
3112        spawn,
3113        time::{sleep, timeout},
3114    };
3115    use url::Url;
3116
3117    use super::Client;
3118    use crate::{
3119        Error, TransmissionProgress,
3120        client::{WeakClient, futures::SendMediaUploadRequest},
3121        config::{RequestConfig, SyncSettings},
3122        futures::SendRequest,
3123        media::MediaError,
3124        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3125    };
3126
3127    #[async_test]
3128    async fn test_account_data() {
3129        let server = MatrixMockServer::new().await;
3130        let client = server.client_builder().build().await;
3131
3132        let f = EventFactory::new();
3133        server
3134            .mock_sync()
3135            .ok_and_run(&client, |builder| {
3136                builder.add_global_account_data(
3137                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3138                );
3139            })
3140            .await;
3141
3142        let content = client
3143            .account()
3144            .account_data::<IgnoredUserListEventContent>()
3145            .await
3146            .unwrap()
3147            .unwrap()
3148            .deserialize()
3149            .unwrap();
3150
3151        assert_eq!(content.ignored_users.len(), 1);
3152    }
3153
3154    #[async_test]
3155    async fn test_successful_discovery() {
3156        // Imagine this is `matrix.org`.
3157        let server = MatrixMockServer::new().await;
3158        let server_url = server.uri();
3159
3160        // Imagine this is `matrix-client.matrix.org`.
3161        let homeserver = MatrixMockServer::new().await;
3162        let homeserver_url = homeserver.uri();
3163
3164        // Imagine Alice has the user ID `@alice:matrix.org`.
3165        let domain = server_url.strip_prefix("http://").unwrap();
3166        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3167
3168        // The `.well-known` is on the server (e.g. `matrix.org`).
3169        server
3170            .mock_well_known()
3171            .ok_with_homeserver_url(&homeserver_url)
3172            .mock_once()
3173            .named("well-known")
3174            .mount()
3175            .await;
3176
3177        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3178        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3179
3180        let client = Client::builder()
3181            .insecure_server_name_no_tls(alice.server_name())
3182            .build()
3183            .await
3184            .unwrap();
3185
3186        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3187        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3188        client.server_versions().await.unwrap();
3189    }
3190
3191    #[async_test]
3192    async fn test_discovery_broken_server() {
3193        let server = MatrixMockServer::new().await;
3194        let server_url = server.uri();
3195        let domain = server_url.strip_prefix("http://").unwrap();
3196        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3197
3198        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3199
3200        assert!(
3201            Client::builder()
3202                .insecure_server_name_no_tls(alice.server_name())
3203                .build()
3204                .await
3205                .is_err(),
3206            "Creating a client from a user ID should fail when the .well-known request fails."
3207        );
3208    }
3209
3210    #[async_test]
3211    async fn test_room_creation() {
3212        let server = MatrixMockServer::new().await;
3213        let client = server.client_builder().build().await;
3214
3215        server
3216            .mock_sync()
3217            .ok_and_run(&client, |builder| {
3218                builder.add_joined_room(
3219                    JoinedRoomBuilder::default()
3220                        .add_state_event(StateTestEvent::Member)
3221                        .add_state_event(StateTestEvent::PowerLevels),
3222                );
3223            })
3224            .await;
3225
3226        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3227        assert_eq!(room.state(), RoomState::Joined);
3228    }
3229
3230    #[async_test]
3231    async fn test_retry_limit_http_requests() {
3232        let server = MatrixMockServer::new().await;
3233        let client = server
3234            .client_builder()
3235            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3236            .build()
3237            .await;
3238
3239        assert!(client.request_config().retry_limit.unwrap() == 3);
3240
3241        server.mock_login().error500().expect(3).mount().await;
3242
3243        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3244    }
3245
3246    #[async_test]
3247    async fn test_retry_timeout_http_requests() {
3248        // Keep this timeout small so that the test doesn't take long
3249        let retry_timeout = Duration::from_secs(5);
3250        let server = MatrixMockServer::new().await;
3251        let client = server
3252            .client_builder()
3253            .on_builder(|builder| {
3254                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3255            })
3256            .build()
3257            .await;
3258
3259        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3260
3261        server.mock_login().error500().expect(2..).mount().await;
3262
3263        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3264    }
3265
3266    #[async_test]
3267    async fn test_short_retry_initial_http_requests() {
3268        let server = MatrixMockServer::new().await;
3269        let client = server
3270            .client_builder()
3271            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3272            .build()
3273            .await;
3274
3275        server.mock_login().error500().expect(3..).mount().await;
3276
3277        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3278    }
3279
3280    #[async_test]
3281    async fn test_no_retry_http_requests() {
3282        let server = MatrixMockServer::new().await;
3283        let client = server.client_builder().build().await;
3284
3285        server.mock_devices().error500().mock_once().mount().await;
3286
3287        client.devices().await.unwrap_err();
3288    }
3289
3290    #[async_test]
3291    async fn test_set_homeserver() {
3292        let client = MockClientBuilder::new(None).build().await;
3293        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3294
3295        let homeserver = Url::parse("http://example.com/").unwrap();
3296        client.set_homeserver(homeserver.clone());
3297        assert_eq!(client.homeserver(), homeserver);
3298    }
3299
3300    #[async_test]
3301    async fn test_search_user_request() {
3302        let server = MatrixMockServer::new().await;
3303        let client = server.client_builder().build().await;
3304
3305        server.mock_user_directory().ok().mock_once().mount().await;
3306
3307        let response = client.search_users("test", 50).await.unwrap();
3308        assert_eq!(response.results.len(), 1);
3309        let result = &response.results[0];
3310        assert_eq!(result.user_id.to_string(), "@test:example.me");
3311        assert_eq!(result.display_name.clone().unwrap(), "Test");
3312        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3313        assert!(!response.limited);
3314    }
3315
3316    #[async_test]
3317    async fn test_request_unstable_features() {
3318        let server = MatrixMockServer::new().await;
3319        let client = server.client_builder().no_server_versions().build().await;
3320
3321        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3322
3323        let unstable_features = client.unstable_features().await.unwrap();
3324        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3325        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3326    }
3327
3328    #[async_test]
3329    async fn test_can_homeserver_push_encrypted_event_to_device() {
3330        let server = MatrixMockServer::new().await;
3331        let client = server.client_builder().no_server_versions().build().await;
3332
3333        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3334
3335        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3336        assert!(msc4028_enabled);
3337    }
3338
3339    #[async_test]
3340    async fn test_recently_visited_rooms() {
3341        // Tracking recently visited rooms requires authentication
3342        let client = MockClientBuilder::new(None).unlogged().build().await;
3343        assert_matches!(
3344            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3345            Err(Error::AuthenticationRequired)
3346        );
3347
3348        let client = MockClientBuilder::new(None).build().await;
3349        let account = client.account();
3350
3351        // We should start off with an empty list
3352        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3353
3354        // Tracking a valid room id should add it to the list
3355        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3356        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3357        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3358
3359        // And the existing list shouldn't be changed
3360        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3361        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3362
3363        // Tracking the same room again shouldn't change the list
3364        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3365        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3366        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3367
3368        // Tracking a second room should add it to the front of the list
3369        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3370        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3371        assert_eq!(
3372            account.get_recently_visited_rooms().await.unwrap(),
3373            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3374        );
3375
3376        // Tracking the first room yet again should move it to the front of the list
3377        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3378        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3379        assert_eq!(
3380            account.get_recently_visited_rooms().await.unwrap(),
3381            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3382        );
3383
3384        // Tracking should be capped at 20
3385        for n in 0..20 {
3386            account
3387                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3388                .await
3389                .unwrap();
3390        }
3391
3392        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3393
3394        // And the initial rooms should've been pushed out
3395        let rooms = account.get_recently_visited_rooms().await.unwrap();
3396        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3397        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3398
3399        // And the last tracked room should be the first
3400        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3401    }
3402
3403    #[async_test]
3404    async fn test_client_no_cycle_with_event_cache() {
3405        let client = MockClientBuilder::new(None).build().await;
3406
3407        // Wait for the init tasks to die.
3408        sleep(Duration::from_secs(1)).await;
3409
3410        let weak_client = WeakClient::from_client(&client);
3411        assert_eq!(weak_client.strong_count(), 1);
3412
3413        {
3414            let room_id = room_id!("!room:example.org");
3415
3416            // Have the client know the room.
3417            let response = SyncResponseBuilder::default()
3418                .add_joined_room(JoinedRoomBuilder::new(room_id))
3419                .build_sync_response();
3420            client.inner.base_client.receive_sync_response(response).await.unwrap();
3421
3422            client.event_cache().subscribe().unwrap();
3423
3424            let (_room_event_cache, _drop_handles) =
3425                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3426        }
3427
3428        drop(client);
3429
3430        // Give a bit of time for background tasks to die.
3431        sleep(Duration::from_secs(1)).await;
3432
3433        // The weak client must be the last reference to the client now.
3434        assert_eq!(weak_client.strong_count(), 0);
3435        let client = weak_client.get();
3436        assert!(
3437            client.is_none(),
3438            "too many strong references to the client: {}",
3439            Arc::strong_count(&client.unwrap().inner)
3440        );
3441    }
3442
3443    #[async_test]
3444    async fn test_server_info_caching() {
3445        let server = MatrixMockServer::new().await;
3446        let server_url = server.uri();
3447        let domain = server_url.strip_prefix("http://").unwrap();
3448        let server_name = <&ServerName>::try_from(domain).unwrap();
3449        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3450
3451        let well_known_mock = server
3452            .mock_well_known()
3453            .ok()
3454            .named("well known mock")
3455            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3456            .mount_as_scoped()
3457            .await;
3458
3459        let versions_mock = server
3460            .mock_versions()
3461            .ok_with_unstable_features()
3462            .named("first versions mock")
3463            .expect(1)
3464            .mount_as_scoped()
3465            .await;
3466
3467        let memory_store = Arc::new(MemoryStore::new());
3468        let client = Client::builder()
3469            .insecure_server_name_no_tls(server_name)
3470            .store_config(
3471                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3472                    .state_store(memory_store.clone()),
3473            )
3474            .build()
3475            .await
3476            .unwrap();
3477
3478        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3479
3480        // These subsequent calls hit the in-memory cache.
3481        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3482        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3483
3484        drop(client);
3485
3486        let client = server
3487            .client_builder()
3488            .no_server_versions()
3489            .on_builder(|builder| {
3490                builder.store_config(
3491                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3492                        .state_store(memory_store.clone()),
3493                )
3494            })
3495            .build()
3496            .await;
3497
3498        // These calls to the new client hit the on-disk cache.
3499        assert!(
3500            client
3501                .unstable_features()
3502                .await
3503                .unwrap()
3504                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3505        );
3506        let supported = client.supported_versions().await.unwrap();
3507        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3508        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3509
3510        // Then this call hits the in-memory cache.
3511        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3512
3513        drop(versions_mock);
3514        drop(well_known_mock);
3515
3516        // Now, reset the cache, and observe the endpoints being called again once.
3517        client.reset_server_info().await.unwrap();
3518
3519        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3520
3521        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3522
3523        // Hits network again.
3524        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3525        // Hits in-memory cache again.
3526        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3527        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3528    }
3529
3530    #[async_test]
3531    async fn test_server_info_without_a_well_known() {
3532        let server = MatrixMockServer::new().await;
3533        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3534
3535        let versions_mock = server
3536            .mock_versions()
3537            .ok_with_unstable_features()
3538            .named("first versions mock")
3539            .expect(1)
3540            .mount_as_scoped()
3541            .await;
3542
3543        let memory_store = Arc::new(MemoryStore::new());
3544        let client = server
3545            .client_builder()
3546            .no_server_versions()
3547            .on_builder(|builder| {
3548                builder.store_config(
3549                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3550                        .state_store(memory_store.clone()),
3551                )
3552            })
3553            .build()
3554            .await;
3555
3556        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3557
3558        // These subsequent calls hit the in-memory cache.
3559        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3560        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3561
3562        drop(client);
3563
3564        let client = server
3565            .client_builder()
3566            .no_server_versions()
3567            .on_builder(|builder| {
3568                builder.store_config(
3569                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3570                        .state_store(memory_store.clone()),
3571                )
3572            })
3573            .build()
3574            .await;
3575
3576        // This call to the new client hits the on-disk cache.
3577        assert!(
3578            client
3579                .unstable_features()
3580                .await
3581                .unwrap()
3582                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3583        );
3584
3585        // Then this call hits the in-memory cache.
3586        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3587
3588        drop(versions_mock);
3589
3590        // Now, reset the cache, and observe the endpoints being called again once.
3591        client.reset_server_info().await.unwrap();
3592
3593        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3594
3595        // Hits network again.
3596        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3597        // Hits in-memory cache again.
3598        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3599        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3600    }
3601
3602    #[async_test]
3603    async fn test_no_network_doesnt_cause_infinite_retries() {
3604        // We want infinite retries for transient errors.
3605        let client = MockClientBuilder::new(None)
3606            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3607            .build()
3608            .await;
3609
3610        // We don't define a mock server on purpose here, so that the error is really a
3611        // network error.
3612        client.whoami().await.unwrap_err();
3613    }
3614
3615    #[async_test]
3616    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3617        let server = MatrixMockServer::new().await;
3618        let client = server.client_builder().build().await;
3619
3620        let room_id = room_id!("!room:example.org");
3621
3622        server
3623            .mock_sync()
3624            .ok_and_run(&client, |builder| {
3625                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3626            })
3627            .await;
3628
3629        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3630        assert_eq!(room.room_id(), room_id);
3631    }
3632
3633    #[async_test]
3634    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3635        let server = MatrixMockServer::new().await;
3636        let client = server.client_builder().build().await;
3637
3638        let room_id = room_id!("!room:example.org");
3639
3640        let client = Arc::new(client);
3641
3642        // Perform the /sync request with a delay so it starts after the
3643        // `await_room_remote_echo` call has happened
3644        spawn({
3645            let client = client.clone();
3646            async move {
3647                sleep(Duration::from_millis(100)).await;
3648
3649                server
3650                    .mock_sync()
3651                    .ok_and_run(&client, |builder| {
3652                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3653                    })
3654                    .await;
3655            }
3656        });
3657
3658        let room =
3659            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3660        assert_eq!(room.room_id(), room_id);
3661    }
3662
3663    #[async_test]
3664    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3665        let client = MockClientBuilder::new(None).build().await;
3666
3667        let room_id = room_id!("!room:example.org");
3668        // Room is not present so the client won't be able to find it. The call will
3669        // timeout.
3670        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3671    }
3672
3673    #[async_test]
3674    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3675        let server = MatrixMockServer::new().await;
3676        let client = server.client_builder().build().await;
3677
3678        server.mock_create_room().ok().mount().await;
3679
3680        // Create a room in the internal store
3681        let room = client
3682            .create_room(assign!(CreateRoomRequest::new(), {
3683                invite: vec![],
3684                is_direct: false,
3685            }))
3686            .await
3687            .unwrap();
3688
3689        // Room is locally present, but not synced, the call will timeout
3690        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3691            .await
3692            .unwrap_err();
3693    }
3694
3695    #[async_test]
3696    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3697        let server = MatrixMockServer::new().await;
3698        let client = server.client_builder().build().await;
3699
3700        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3701
3702        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3703        assert_matches!(ret, Ok(true));
3704    }
3705
3706    #[async_test]
3707    async fn test_is_room_alias_available_if_alias_is_resolved() {
3708        let server = MatrixMockServer::new().await;
3709        let client = server.client_builder().build().await;
3710
3711        server
3712            .mock_room_directory_resolve_alias()
3713            .ok("!some_room_id:matrix.org", Vec::new())
3714            .expect(1)
3715            .mount()
3716            .await;
3717
3718        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3719        assert_matches!(ret, Ok(false));
3720    }
3721
3722    #[async_test]
3723    async fn test_is_room_alias_available_if_error_found() {
3724        let server = MatrixMockServer::new().await;
3725        let client = server.client_builder().build().await;
3726
3727        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3728
3729        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3730        assert_matches!(ret, Err(_));
3731    }
3732
3733    #[async_test]
3734    async fn test_create_room_alias() {
3735        let server = MatrixMockServer::new().await;
3736        let client = server.client_builder().build().await;
3737
3738        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3739
3740        let ret = client
3741            .create_room_alias(
3742                room_alias_id!("#some_alias:matrix.org"),
3743                room_id!("!some_room:matrix.org"),
3744            )
3745            .await;
3746        assert_matches!(ret, Ok(()));
3747    }
3748
3749    #[async_test]
3750    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3751        let server = MatrixMockServer::new().await;
3752        let client = server.client_builder().build().await;
3753
3754        let room_id = room_id!("!a-room:matrix.org");
3755
3756        // Make sure the summary endpoint is called once
3757        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3758
3759        // We create a locally cached invited room
3760        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3761
3762        // And we get a preview, the server endpoint was reached
3763        let preview = client
3764            .get_room_preview(room_id.into(), Vec::new())
3765            .await
3766            .expect("Room preview should be retrieved");
3767
3768        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3769    }
3770
3771    #[async_test]
3772    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3773        let server = MatrixMockServer::new().await;
3774        let client = server.client_builder().build().await;
3775
3776        let room_id = room_id!("!a-room:matrix.org");
3777
3778        // Make sure the summary endpoint is called once
3779        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3780
3781        // We create a locally cached left room
3782        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3783
3784        // And we get a preview, the server endpoint was reached
3785        let preview = client
3786            .get_room_preview(room_id.into(), Vec::new())
3787            .await
3788            .expect("Room preview should be retrieved");
3789
3790        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3791    }
3792
3793    #[async_test]
3794    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3795        let server = MatrixMockServer::new().await;
3796        let client = server.client_builder().build().await;
3797
3798        let room_id = room_id!("!a-room:matrix.org");
3799
3800        // Make sure the summary endpoint is called once
3801        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3802
3803        // We create a locally cached knocked room
3804        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3805
3806        // And we get a preview, the server endpoint was reached
3807        let preview = client
3808            .get_room_preview(room_id.into(), Vec::new())
3809            .await
3810            .expect("Room preview should be retrieved");
3811
3812        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3813    }
3814
3815    #[async_test]
3816    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3817        let server = MatrixMockServer::new().await;
3818        let client = server.client_builder().build().await;
3819
3820        let room_id = room_id!("!a-room:matrix.org");
3821
3822        // Make sure the summary endpoint is not called
3823        server.mock_room_summary().ok(room_id).never().mount().await;
3824
3825        // We create a locally cached joined room
3826        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3827
3828        // And we get a preview, no server endpoint was reached
3829        let preview = client
3830            .get_room_preview(room_id.into(), Vec::new())
3831            .await
3832            .expect("Room preview should be retrieved");
3833
3834        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3835    }
3836
3837    #[async_test]
3838    async fn test_media_preview_config() {
3839        let server = MatrixMockServer::new().await;
3840        let client = server.client_builder().build().await;
3841
3842        server
3843            .mock_sync()
3844            .ok_and_run(&client, |builder| {
3845                builder.add_custom_global_account_data(json!({
3846                    "content": {
3847                        "media_previews": "private",
3848                        "invite_avatars": "off"
3849                    },
3850                    "type": "m.media_preview_config"
3851                }));
3852            })
3853            .await;
3854
3855        let (initial_value, stream) =
3856            client.account().observe_media_preview_config().await.unwrap();
3857
3858        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3859        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3860        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3861        pin_mut!(stream);
3862        assert_pending!(stream);
3863
3864        server
3865            .mock_sync()
3866            .ok_and_run(&client, |builder| {
3867                builder.add_custom_global_account_data(json!({
3868                    "content": {
3869                        "media_previews": "off",
3870                        "invite_avatars": "on"
3871                    },
3872                    "type": "m.media_preview_config"
3873                }));
3874            })
3875            .await;
3876
3877        assert_next_matches!(
3878            stream,
3879            MediaPreviewConfigEventContent {
3880                media_previews: Some(MediaPreviews::Off),
3881                invite_avatars: Some(InviteAvatars::On),
3882                ..
3883            }
3884        );
3885        assert_pending!(stream);
3886    }
3887
3888    #[async_test]
3889    async fn test_unstable_media_preview_config() {
3890        let server = MatrixMockServer::new().await;
3891        let client = server.client_builder().build().await;
3892
3893        server
3894            .mock_sync()
3895            .ok_and_run(&client, |builder| {
3896                builder.add_custom_global_account_data(json!({
3897                    "content": {
3898                        "media_previews": "private",
3899                        "invite_avatars": "off"
3900                    },
3901                    "type": "io.element.msc4278.media_preview_config"
3902                }));
3903            })
3904            .await;
3905
3906        let (initial_value, stream) =
3907            client.account().observe_media_preview_config().await.unwrap();
3908
3909        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3910        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3911        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3912        pin_mut!(stream);
3913        assert_pending!(stream);
3914
3915        server
3916            .mock_sync()
3917            .ok_and_run(&client, |builder| {
3918                builder.add_custom_global_account_data(json!({
3919                    "content": {
3920                        "media_previews": "off",
3921                        "invite_avatars": "on"
3922                    },
3923                    "type": "io.element.msc4278.media_preview_config"
3924                }));
3925            })
3926            .await;
3927
3928        assert_next_matches!(
3929            stream,
3930            MediaPreviewConfigEventContent {
3931                media_previews: Some(MediaPreviews::Off),
3932                invite_avatars: Some(InviteAvatars::On),
3933                ..
3934            }
3935        );
3936        assert_pending!(stream);
3937    }
3938
3939    #[async_test]
3940    async fn test_media_preview_config_not_found() {
3941        let server = MatrixMockServer::new().await;
3942        let client = server.client_builder().build().await;
3943
3944        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3945
3946        assert!(initial_value.is_none());
3947    }
3948
3949    #[async_test]
3950    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3951        // The default Matrix version we use is 1.11 or higher, so authenticated media
3952        // is supported.
3953        let server = MatrixMockServer::new().await;
3954        let client = server.client_builder().build().await;
3955
3956        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3957
3958        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3959        client.load_or_fetch_max_upload_size().await.unwrap();
3960
3961        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3962    }
3963
3964    #[async_test]
3965    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3966        // The server must advertise support for the stable feature for authenticated
3967        // media support, so we mock the `GET /versions` response.
3968        let server = MatrixMockServer::new().await;
3969        let client = server.client_builder().no_server_versions().build().await;
3970
3971        server
3972            .mock_versions()
3973            .ok_custom(
3974                &["v1.7", "v1.8", "v1.9", "v1.10"],
3975                &[("org.matrix.msc3916.stable", true)].into(),
3976            )
3977            .named("versions")
3978            .expect(1)
3979            .mount()
3980            .await;
3981
3982        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3983
3984        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3985        client.load_or_fetch_max_upload_size().await.unwrap();
3986
3987        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3988    }
3989
3990    #[async_test]
3991    async fn test_load_or_fetch_max_upload_size_no_auth() {
3992        // The server must not support Matrix 1.11 or higher for unauthenticated
3993        // media requests, so we mock the `GET /versions` response.
3994        let server = MatrixMockServer::new().await;
3995        let client = server.client_builder().no_server_versions().build().await;
3996
3997        server
3998            .mock_versions()
3999            .ok_custom(&["v1.1"], &Default::default())
4000            .named("versions")
4001            .expect(1)
4002            .mount()
4003            .await;
4004
4005        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4006
4007        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4008        client.load_or_fetch_max_upload_size().await.unwrap();
4009
4010        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4011    }
4012
4013    #[async_test]
4014    async fn test_uploading_a_too_large_media_file() {
4015        let server = MatrixMockServer::new().await;
4016        let client = server.client_builder().build().await;
4017
4018        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4019        client.load_or_fetch_max_upload_size().await.unwrap();
4020        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4021
4022        let data = vec![1, 2];
4023        let upload_request =
4024            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4025        let request = SendRequest {
4026            client: client.clone(),
4027            request: upload_request,
4028            config: None,
4029            send_progress: SharedObservable::new(TransmissionProgress::default()),
4030        };
4031        let media_request = SendMediaUploadRequest::new(request);
4032
4033        let error = media_request.await.err();
4034        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4035        assert_eq!(max, uint!(1));
4036        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4037    }
4038
4039    #[async_test]
4040    async fn test_dont_ignore_timeout_on_first_sync() {
4041        let server = MatrixMockServer::new().await;
4042        let client = server.client_builder().build().await;
4043
4044        server
4045            .mock_sync()
4046            .timeout(Some(Duration::from_secs(30)))
4047            .ok(|_| {})
4048            .mock_once()
4049            .named("sync_with_timeout")
4050            .mount()
4051            .await;
4052
4053        // Call the endpoint once to check the timeout.
4054        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4055
4056        timeout(Duration::from_secs(1), async {
4057            stream.next().await.unwrap().unwrap();
4058        })
4059        .await
4060        .unwrap();
4061    }
4062
4063    #[async_test]
4064    async fn test_ignore_timeout_on_first_sync() {
4065        let server = MatrixMockServer::new().await;
4066        let client = server.client_builder().build().await;
4067
4068        server
4069            .mock_sync()
4070            .timeout(None)
4071            .ok(|_| {})
4072            .mock_once()
4073            .named("sync_no_timeout")
4074            .mount()
4075            .await;
4076        server
4077            .mock_sync()
4078            .timeout(Some(Duration::from_secs(30)))
4079            .ok(|_| {})
4080            .mock_once()
4081            .named("sync_with_timeout")
4082            .mount()
4083            .await;
4084
4085        // Call each version of the endpoint once to check the timeouts.
4086        let mut stream = Box::pin(
4087            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4088        );
4089
4090        timeout(Duration::from_secs(1), async {
4091            stream.next().await.unwrap().unwrap();
4092            stream.next().await.unwrap().unwrap();
4093        })
4094        .await
4095        .unwrap();
4096    }
4097}