matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{btree_map, BTreeMap},
19    fmt::{self, Debug},
20    future::{ready, Future},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use caches::ClientCaches;
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::store::LockableCryptoStore;
32use matrix_sdk_base::{
33    event_cache::store::EventCacheStoreLock,
34    store::{DynStateStore, RoomLoadSettings, ServerCapabilities},
35    sync::{Notification, RoomUpdates},
36    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
37    StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
38};
39use matrix_sdk_common::ttl_cache::TtlCache;
40#[cfg(feature = "e2e-encryption")]
41use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
42use ruma::{
43    api::{
44        client::{
45            account::whoami,
46            alias::{create_alias, delete_alias, get_alias},
47            device::{delete_devices, get_devices, update_device},
48            directory::{get_public_rooms, get_public_rooms_filtered},
49            discovery::{
50                get_capabilities::{self, Capabilities},
51                get_supported_versions,
52            },
53            error::ErrorKind,
54            filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
55            knock::knock_room,
56            membership::{join_room_by_id, join_room_by_id_or_alias},
57            room::create_room,
58            session::login::v3::DiscoveryInfo,
59            sync::sync_events,
60            uiaa,
61            user_directory::search_users,
62        },
63        error::FromHttpResponseError,
64        MatrixVersion, OutgoingRequest,
65    },
66    assign,
67    push::Ruleset,
68    time::Instant,
69    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
70    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
71};
72use serde::de::DeserializeOwned;
73use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
74use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
75use url::Url;
76
77use self::futures::SendRequest;
78use crate::{
79    authentication::{
80        matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
81        SaveSessionCallback,
82    },
83    config::RequestConfig,
84    deduplicating_handler::DeduplicatingHandler,
85    error::HttpResult,
86    event_cache::EventCache,
87    event_handler::{
88        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
89        EventHandlerStore, ObservableEventHandler, SyncEvent,
90    },
91    http_client::HttpClient,
92    media::MediaError,
93    notification_settings::NotificationSettings,
94    room_preview::RoomPreview,
95    send_queue::SendQueueData,
96    sliding_sync::Version as SlidingSyncVersion,
97    sync::{RoomUpdate, SyncResponse},
98    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
99    Room, SessionTokens, TransmissionProgress,
100};
101#[cfg(feature = "e2e-encryption")]
102use crate::{
103    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
104    store_locks::CrossProcessStoreLock,
105};
106
107mod builder;
108pub(crate) mod caches;
109pub(crate) mod futures;
110
111pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
112
113#[cfg(not(target_family = "wasm"))]
114type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
115#[cfg(target_family = "wasm")]
116type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
117
118#[cfg(not(target_family = "wasm"))]
119type NotificationHandlerFn =
120    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
121#[cfg(target_family = "wasm")]
122type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
123
124/// Enum controlling if a loop running callbacks should continue or abort.
125///
126/// This is mainly used in the [`sync_with_callback`] method, the return value
127/// of the provided callback controls if the sync loop should be exited.
128///
129/// [`sync_with_callback`]: #method.sync_with_callback
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum LoopCtrl {
132    /// Continue running the loop.
133    Continue,
134    /// Break out of the loop.
135    Break,
136}
137
138/// Represents changes that can occur to a `Client`s `Session`.
139#[derive(Debug, Clone, PartialEq)]
140pub enum SessionChange {
141    /// The session's token is no longer valid.
142    UnknownToken {
143        /// Whether or not the session was soft logged out
144        soft_logout: bool,
145    },
146    /// The session's tokens have been refreshed.
147    TokensRefreshed,
148}
149
150/// An async/await enabled Matrix client.
151///
152/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
153#[derive(Clone)]
154pub struct Client {
155    pub(crate) inner: Arc<ClientInner>,
156}
157
158#[derive(Default)]
159pub(crate) struct ClientLocks {
160    /// Lock ensuring that only a single room may be marked as a DM at once.
161    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
162    /// explanation.
163    pub(crate) mark_as_dm_lock: Mutex<()>,
164
165    /// Lock ensuring that only a single secret store is getting opened at the
166    /// same time.
167    ///
168    /// This is important so we don't accidentally create multiple different new
169    /// default secret storage keys.
170    #[cfg(feature = "e2e-encryption")]
171    pub(crate) open_secret_store_lock: Mutex<()>,
172
173    /// Lock ensuring that we're only storing a single secret at a time.
174    ///
175    /// Take a look at the [`SecretStore::put_secret`] method for a more
176    /// detailed explanation.
177    ///
178    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
179    #[cfg(feature = "e2e-encryption")]
180    pub(crate) store_secret_lock: Mutex<()>,
181
182    /// Lock ensuring that only one method at a time might modify our backup.
183    #[cfg(feature = "e2e-encryption")]
184    pub(crate) backup_modify_lock: Mutex<()>,
185
186    /// Lock ensuring that we're going to attempt to upload backups for a single
187    /// requester.
188    #[cfg(feature = "e2e-encryption")]
189    pub(crate) backup_upload_lock: Mutex<()>,
190
191    /// Handler making sure we only have one group session sharing request in
192    /// flight per room.
193    #[cfg(feature = "e2e-encryption")]
194    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
195
196    /// Lock making sure we're only doing one key claim request at a time.
197    #[cfg(feature = "e2e-encryption")]
198    pub(crate) key_claim_lock: Mutex<()>,
199
200    /// Handler to ensure that only one members request is running at a time,
201    /// given a room.
202    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
203
204    /// Handler to ensure that only one encryption state request is running at a
205    /// time, given a room.
206    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
207
208    /// Deduplicating handler for sending read receipts. The string is an
209    /// internal implementation detail, see [`Self::send_single_receipt`].
210    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
211
212    #[cfg(feature = "e2e-encryption")]
213    pub(crate) cross_process_crypto_store_lock:
214        OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
215
216    /// Latest "generation" of data known by the crypto store.
217    ///
218    /// This is a counter that only increments, set in the database (and can
219    /// wrap). It's incremented whenever some process acquires a lock for the
220    /// first time. *This assumes the crypto store lock is being held, to
221    /// avoid data races on writing to this value in the store*.
222    ///
223    /// The current process will maintain this value in local memory and in the
224    /// DB over time. Observing a different value than the one read in
225    /// memory, when reading from the store indicates that somebody else has
226    /// written into the database under our feet.
227    ///
228    /// TODO: this should live in the `OlmMachine`, since it's information
229    /// related to the lock. As of today (2023-07-28), we blow up the entire
230    /// olm machine when there's a generation mismatch. So storing the
231    /// generation in the olm machine would make the client think there's
232    /// *always* a mismatch, and that's why we need to store the generation
233    /// outside the `OlmMachine`.
234    #[cfg(feature = "e2e-encryption")]
235    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
236}
237
238pub(crate) struct ClientInner {
239    /// All the data related to authentication and authorization.
240    pub(crate) auth_ctx: Arc<AuthCtx>,
241
242    /// The URL of the server.
243    ///
244    /// Not to be confused with the `Self::homeserver`. `server` is usually
245    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
246    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
247    /// homeserver (at the time of writing — 2024-08-28).
248    ///
249    /// This value is optional depending on how the `Client` has been built.
250    /// If it's been built from a homeserver URL directly, we don't know the
251    /// server. However, if the `Client` has been built from a server URL or
252    /// name, then the homeserver has been discovered, and we know both.
253    server: Option<Url>,
254
255    /// The URL of the homeserver to connect to.
256    ///
257    /// This is the URL for the client-server Matrix API.
258    homeserver: StdRwLock<Url>,
259
260    /// The sliding sync version.
261    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
262
263    /// The underlying HTTP client.
264    pub(crate) http_client: HttpClient,
265
266    /// User session data.
267    pub(super) base_client: BaseClient,
268
269    /// Collection of in-memory caches for the [`Client`].
270    pub(crate) caches: ClientCaches,
271
272    /// Collection of locks individual client methods might want to use, either
273    /// to ensure that only a single call to a method happens at once or to
274    /// deduplicate multiple calls to a method.
275    pub(crate) locks: ClientLocks,
276
277    /// The cross-process store locks holder name.
278    ///
279    /// The SDK provides cross-process store locks (see
280    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
281    /// `holder_name` is the value used for all cross-process store locks
282    /// used by this `Client`.
283    ///
284    /// If multiple `Client`s are running in different processes, this
285    /// value MUST be different for each `Client`.
286    cross_process_store_locks_holder_name: String,
287
288    /// A mapping of the times at which the current user sent typing notices,
289    /// keyed by room.
290    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
291
292    /// Event handlers. See `add_event_handler`.
293    pub(crate) event_handlers: EventHandlerStore,
294
295    /// Notification handlers. See `register_notification_handler`.
296    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
297
298    /// The sender-side of channels used to receive room updates.
299    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
300
301    /// The sender-side of a channel used to observe all the room updates of a
302    /// sync response.
303    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
304
305    /// Whether the client should update its homeserver URL with the discovery
306    /// information present in the login response.
307    respect_login_well_known: bool,
308
309    /// An event that can be listened on to wait for a successful sync. The
310    /// event will only be fired if a sync loop is running. Can be used for
311    /// synchronization, e.g. if we send out a request to create a room, we can
312    /// wait for the sync to get the data to fetch a room object from the state
313    /// store.
314    pub(crate) sync_beat: event_listener::Event,
315
316    /// A central cache for events, inactive first.
317    ///
318    /// It becomes active when [`EventCache::subscribe`] is called.
319    pub(crate) event_cache: OnceCell<EventCache>,
320
321    /// End-to-end encryption related state.
322    #[cfg(feature = "e2e-encryption")]
323    pub(crate) e2ee: EncryptionData,
324
325    /// The verification state of our own device.
326    #[cfg(feature = "e2e-encryption")]
327    pub(crate) verification_state: SharedObservable<VerificationState>,
328
329    /// Whether to enable the experimental support for sending and receiving
330    /// encrypted room history on invite, per [MSC4268].
331    ///
332    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
333    #[cfg(feature = "e2e-encryption")]
334    pub(crate) enable_share_history_on_invite: bool,
335
336    /// Data related to the [`SendQueue`].
337    ///
338    /// [`SendQueue`]: crate::send_queue::SendQueue
339    pub(crate) send_queue_data: Arc<SendQueueData>,
340
341    /// The `max_upload_size` value of the homeserver, it contains the max
342    /// request size you can send.
343    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
344}
345
346impl ClientInner {
347    /// Create a new `ClientInner`.
348    ///
349    /// All the fields passed as parameters here are those that must be cloned
350    /// upon instantiation of a sub-client, e.g. a client specialized for
351    /// notifications.
352    #[allow(clippy::too_many_arguments)]
353    async fn new(
354        auth_ctx: Arc<AuthCtx>,
355        server: Option<Url>,
356        homeserver: Url,
357        sliding_sync_version: SlidingSyncVersion,
358        http_client: HttpClient,
359        base_client: BaseClient,
360        server_capabilities: ClientServerCapabilities,
361        respect_login_well_known: bool,
362        event_cache: OnceCell<EventCache>,
363        send_queue: Arc<SendQueueData>,
364        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
365        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
366        cross_process_store_locks_holder_name: String,
367    ) -> Arc<Self> {
368        let caches = ClientCaches {
369            server_capabilities: server_capabilities.into(),
370            server_metadata: Mutex::new(TtlCache::new()),
371        };
372
373        let client = Self {
374            server,
375            homeserver: StdRwLock::new(homeserver),
376            auth_ctx,
377            sliding_sync_version: StdRwLock::new(sliding_sync_version),
378            http_client,
379            base_client,
380            caches,
381            locks: Default::default(),
382            cross_process_store_locks_holder_name,
383            typing_notice_times: Default::default(),
384            event_handlers: Default::default(),
385            notification_handlers: Default::default(),
386            room_update_channels: Default::default(),
387            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
388            // ballast for all observers to catch up.
389            room_updates_sender: broadcast::Sender::new(32),
390            respect_login_well_known,
391            sync_beat: event_listener::Event::new(),
392            event_cache,
393            send_queue_data: send_queue,
394            #[cfg(feature = "e2e-encryption")]
395            e2ee: EncryptionData::new(encryption_settings),
396            #[cfg(feature = "e2e-encryption")]
397            verification_state: SharedObservable::new(VerificationState::Unknown),
398            #[cfg(feature = "e2e-encryption")]
399            enable_share_history_on_invite,
400            server_max_upload_size: Mutex::new(OnceCell::new()),
401        };
402
403        #[allow(clippy::let_and_return)]
404        let client = Arc::new(client);
405
406        #[cfg(feature = "e2e-encryption")]
407        client.e2ee.initialize_room_key_tasks(&client);
408
409        let _ = client
410            .event_cache
411            .get_or_init(|| async {
412                EventCache::new(
413                    WeakClient::from_inner(&client),
414                    client.base_client.event_cache_store().clone(),
415                )
416            })
417            .await;
418
419        client
420    }
421}
422
423#[cfg(not(tarpaulin_include))]
424impl Debug for Client {
425    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
426        write!(fmt, "Client")
427    }
428}
429
430impl Client {
431    /// Create a new [`Client`] that will use the given homeserver.
432    ///
433    /// # Arguments
434    ///
435    /// * `homeserver_url` - The homeserver that the client should connect to.
436    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
437        Self::builder().homeserver_url(homeserver_url).build().await
438    }
439
440    /// Returns a subscriber that publishes an event every time the ignore user
441    /// list changes.
442    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
443        self.inner.base_client.subscribe_to_ignore_user_list_changes()
444    }
445
446    /// Create a new [`ClientBuilder`].
447    pub fn builder() -> ClientBuilder {
448        ClientBuilder::new()
449    }
450
451    pub(crate) fn base_client(&self) -> &BaseClient {
452        &self.inner.base_client
453    }
454
455    /// The underlying HTTP client.
456    pub fn http_client(&self) -> &reqwest::Client {
457        &self.inner.http_client.inner
458    }
459
460    pub(crate) fn locks(&self) -> &ClientLocks {
461        &self.inner.locks
462    }
463
464    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
465        &self.inner.auth_ctx
466    }
467
468    /// The cross-process store locks holder name.
469    ///
470    /// The SDK provides cross-process store locks (see
471    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
472    /// `holder_name` is the value used for all cross-process store locks
473    /// used by this `Client`.
474    pub fn cross_process_store_locks_holder_name(&self) -> &str {
475        &self.inner.cross_process_store_locks_holder_name
476    }
477
478    /// Change the homeserver URL used by this client.
479    ///
480    /// # Arguments
481    ///
482    /// * `homeserver_url` - The new URL to use.
483    fn set_homeserver(&self, homeserver_url: Url) {
484        *self.inner.homeserver.write().unwrap() = homeserver_url;
485    }
486
487    /// Get the capabilities of the homeserver.
488    ///
489    /// This method should be used to check what features are supported by the
490    /// homeserver.
491    ///
492    /// # Examples
493    ///
494    /// ```no_run
495    /// # use matrix_sdk::Client;
496    /// # use url::Url;
497    /// # async {
498    /// # let homeserver = Url::parse("http://example.com")?;
499    /// let client = Client::new(homeserver).await?;
500    ///
501    /// let capabilities = client.get_capabilities().await?;
502    ///
503    /// if capabilities.change_password.enabled {
504    ///     // Change password
505    /// }
506    /// # anyhow::Ok(()) };
507    /// ```
508    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
509        let res = self.send(get_capabilities::v3::Request::new()).await?;
510        Ok(res.capabilities)
511    }
512
513    /// Get a copy of the default request config.
514    ///
515    /// The default request config is what's used when sending requests if no
516    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
517    /// function with such a parameter.
518    ///
519    /// If the default request config was not customized through
520    /// [`ClientBuilder`] when creating this `Client`, the returned value will
521    /// be equivalent to [`RequestConfig::default()`].
522    pub fn request_config(&self) -> RequestConfig {
523        self.inner.http_client.request_config
524    }
525
526    /// Check whether the client has been activated.
527    ///
528    /// A client is considered active when:
529    ///
530    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
531    ///    is logged in,
532    /// 2. Has loaded cached data from storage,
533    /// 3. If encryption is enabled, it also initialized or restored its
534    ///    `OlmMachine`.
535    pub fn is_active(&self) -> bool {
536        self.inner.base_client.is_active()
537    }
538
539    /// The server used by the client.
540    ///
541    /// See `Self::server` to learn more.
542    pub fn server(&self) -> Option<&Url> {
543        self.inner.server.as_ref()
544    }
545
546    /// The homeserver of the client.
547    pub fn homeserver(&self) -> Url {
548        self.inner.homeserver.read().unwrap().clone()
549    }
550
551    /// Get the sliding sync version.
552    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
553        self.inner.sliding_sync_version.read().unwrap().clone()
554    }
555
556    /// Override the sliding sync version.
557    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
558        let mut lock = self.inner.sliding_sync_version.write().unwrap();
559        *lock = version;
560    }
561
562    /// Get the Matrix user session meta information.
563    ///
564    /// If the client is currently logged in, this will return a
565    /// [`SessionMeta`] object which contains the user ID and device ID.
566    /// Otherwise it returns `None`.
567    pub fn session_meta(&self) -> Option<&SessionMeta> {
568        self.base_client().session_meta()
569    }
570
571    /// Returns a receiver that gets events for each room info update. To watch
572    /// for new events, use `receiver.resubscribe()`. Each event contains the
573    /// room and a boolean whether this event should trigger a room list update.
574    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
575        self.base_client().room_info_notable_update_receiver()
576    }
577
578    /// Performs a search for users.
579    /// The search is performed case-insensitively on user IDs and display names
580    ///
581    /// # Arguments
582    ///
583    /// * `search_term` - The search term for the search
584    /// * `limit` - The maximum number of results to return. Defaults to 10.
585    ///
586    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
587    pub async fn search_users(
588        &self,
589        search_term: &str,
590        limit: u64,
591    ) -> HttpResult<search_users::v3::Response> {
592        let mut request = search_users::v3::Request::new(search_term.to_owned());
593
594        if let Some(limit) = UInt::new(limit) {
595            request.limit = limit;
596        }
597
598        self.send(request).await
599    }
600
601    /// Get the user id of the current owner of the client.
602    pub fn user_id(&self) -> Option<&UserId> {
603        self.session_meta().map(|s| s.user_id.as_ref())
604    }
605
606    /// Get the device ID that identifies the current session.
607    pub fn device_id(&self) -> Option<&DeviceId> {
608        self.session_meta().map(|s| s.device_id.as_ref())
609    }
610
611    /// Get the current access token for this session.
612    ///
613    /// Will be `None` if the client has not been logged in.
614    pub fn access_token(&self) -> Option<String> {
615        self.auth_ctx().access_token()
616    }
617
618    /// Get the current tokens for this session.
619    ///
620    /// To be notified of changes in the session tokens, use
621    /// [`Client::subscribe_to_session_changes()`] or
622    /// [`Client::set_session_callbacks()`].
623    ///
624    /// Returns `None` if the client has not been logged in.
625    pub fn session_tokens(&self) -> Option<SessionTokens> {
626        self.auth_ctx().session_tokens()
627    }
628
629    /// Access the authentication API used to log in this client.
630    ///
631    /// Will be `None` if the client has not been logged in.
632    pub fn auth_api(&self) -> Option<AuthApi> {
633        match self.auth_ctx().auth_data.get()? {
634            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
635            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
636        }
637    }
638
639    /// Get the whole session info of this client.
640    ///
641    /// Will be `None` if the client has not been logged in.
642    ///
643    /// Can be used with [`Client::restore_session`] to restore a previously
644    /// logged-in session.
645    pub fn session(&self) -> Option<AuthSession> {
646        match self.auth_api()? {
647            AuthApi::Matrix(api) => api.session().map(Into::into),
648            AuthApi::OAuth(api) => api.full_session().map(Into::into),
649        }
650    }
651
652    /// Get a reference to the state store.
653    pub fn state_store(&self) -> &DynStateStore {
654        self.base_client().state_store()
655    }
656
657    /// Get a reference to the event cache store.
658    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
659        self.base_client().event_cache_store()
660    }
661
662    /// Access the native Matrix authentication API with this client.
663    pub fn matrix_auth(&self) -> MatrixAuth {
664        MatrixAuth::new(self.clone())
665    }
666
667    /// Get the account of the current owner of the client.
668    pub fn account(&self) -> Account {
669        Account::new(self.clone())
670    }
671
672    /// Get the encryption manager of the client.
673    #[cfg(feature = "e2e-encryption")]
674    pub fn encryption(&self) -> Encryption {
675        Encryption::new(self.clone())
676    }
677
678    /// Get the media manager of the client.
679    pub fn media(&self) -> Media {
680        Media::new(self.clone())
681    }
682
683    /// Get the pusher manager of the client.
684    pub fn pusher(&self) -> Pusher {
685        Pusher::new(self.clone())
686    }
687
688    /// Access the OAuth 2.0 API of the client.
689    pub fn oauth(&self) -> OAuth {
690        OAuth::new(self.clone())
691    }
692
693    /// Register a handler for a specific event type.
694    ///
695    /// The handler is a function or closure with one or more arguments. The
696    /// first argument is the event itself. All additional arguments are
697    /// "context" arguments: They have to implement [`EventHandlerContext`].
698    /// This trait is named that way because most of the types implementing it
699    /// give additional context about an event: The room it was in, its raw form
700    /// and other similar things. As two exceptions to this,
701    /// [`Client`] and [`EventHandlerHandle`] also implement the
702    /// `EventHandlerContext` trait so you don't have to clone your client
703    /// into the event handler manually and a handler can decide to remove
704    /// itself.
705    ///
706    /// Some context arguments are not universally applicable. A context
707    /// argument that isn't available for the given event type will result in
708    /// the event handler being skipped and an error being logged. The following
709    /// context argument types are only available for a subset of event types:
710    ///
711    /// * [`Room`] is only available for room-specific events, i.e. not for
712    ///   events like global account data events or presence events.
713    ///
714    /// You can provide custom context via
715    /// [`add_event_handler_context`](Client::add_event_handler_context) and
716    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
717    /// into the event handler.
718    ///
719    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
720    ///
721    /// # Examples
722    ///
723    /// ```no_run
724    /// use matrix_sdk::{
725    ///     deserialized_responses::EncryptionInfo,
726    ///     event_handler::Ctx,
727    ///     ruma::{
728    ///         events::{
729    ///             macros::EventContent,
730    ///             push_rules::PushRulesEvent,
731    ///             room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
732    ///         },
733    ///         push::Action,
734    ///         Int, MilliSecondsSinceUnixEpoch,
735    ///     },
736    ///     Client, Room,
737    /// };
738    /// use serde::{Deserialize, Serialize};
739    ///
740    /// # async fn example(client: Client) {
741    /// client.add_event_handler(
742    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
743    ///         // Common usage: Room event plus room and client.
744    ///     },
745    /// );
746    /// client.add_event_handler(
747    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
748    ///         async move {
749    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
750    ///             // unencrypted events and events that were decrypted by the SDK.
751    ///         }
752    ///     },
753    /// );
754    /// client.add_event_handler(
755    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
756    ///         async move {
757    ///             // A `Vec<Action>` parameter allows you to know which push actions
758    ///             // are applicable for an event. For example, an event with
759    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
760    ///             // in the timeline.
761    ///         }
762    ///     },
763    /// );
764    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
765    ///     // You can omit any or all arguments after the first.
766    /// });
767    ///
768    /// // Registering a temporary event handler:
769    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
770    ///     /* Event handler */
771    /// });
772    /// client.remove_event_handler(handle);
773    ///
774    /// // Registering custom event handler context:
775    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
776    /// struct MyContext {
777    ///     number: usize,
778    /// }
779    /// client.add_event_handler_context(MyContext { number: 5 });
780    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
781    ///     // Use the context
782    /// });
783    ///
784    /// // Custom events work exactly the same way, you just need to declare
785    /// // the content struct and use the EventContent derive macro on it.
786    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
787    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
788    /// struct TokenEventContent {
789    ///     token: String,
790    ///     #[serde(rename = "exp")]
791    ///     expires_at: MilliSecondsSinceUnixEpoch,
792    /// }
793    ///
794    /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
795    ///     todo!("Display the token");
796    /// });
797    ///
798    /// // Event handler closures can also capture local variables.
799    /// // Make sure they are cheap to clone though, because they will be cloned
800    /// // every time the closure is called.
801    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
802    ///
803    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
804    ///     println!("Calling the handler with identifier {data}");
805    /// });
806    /// # }
807    /// ```
808    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
809    where
810        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
811        H: EventHandler<Ev, Ctx>,
812    {
813        self.add_event_handler_impl(handler, None)
814    }
815
816    /// Register a handler for a specific room, and event type.
817    ///
818    /// This method works the same way as
819    /// [`add_event_handler`][Self::add_event_handler], except that the handler
820    /// will only be called for events in the room with the specified ID. See
821    /// that method for more details on event handler functions.
822    ///
823    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
824    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
825    /// your use case.
826    pub fn add_room_event_handler<Ev, Ctx, H>(
827        &self,
828        room_id: &RoomId,
829        handler: H,
830    ) -> EventHandlerHandle
831    where
832        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
833        H: EventHandler<Ev, Ctx>,
834    {
835        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
836    }
837
838    /// Observe a specific event type.
839    ///
840    /// `Ev` represents the kind of event that will be observed. `Ctx`
841    /// represents the context that will come with the event. It relies on the
842    /// same mechanism as [`Client::add_event_handler`]. The main difference is
843    /// that it returns an [`ObservableEventHandler`] and doesn't require a
844    /// user-defined closure. It is possible to subscribe to the
845    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
846    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
847    /// Ctx)`.
848    ///
849    /// Be careful that only the most recent value can be observed. Subscribers
850    /// are notified when a new value is sent, but there is no guarantee
851    /// that they will see all values.
852    ///
853    /// # Example
854    ///
855    /// Let's see a classical usage:
856    ///
857    /// ```
858    /// use futures_util::StreamExt as _;
859    /// use matrix_sdk::{
860    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
861    ///     Client, Room,
862    /// };
863    ///
864    /// # async fn example(client: Client) -> Option<()> {
865    /// let observer =
866    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
867    ///
868    /// let mut subscriber = observer.subscribe();
869    ///
870    /// let (event, (room, push_actions)) = subscriber.next().await?;
871    /// # Some(())
872    /// # }
873    /// ```
874    ///
875    /// Now let's see how to get several contexts that can be useful for you:
876    ///
877    /// ```
878    /// use matrix_sdk::{
879    ///     deserialized_responses::EncryptionInfo,
880    ///     ruma::{
881    ///         events::room::{
882    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
883    ///         },
884    ///         push::Action,
885    ///     },
886    ///     Client, Room,
887    /// };
888    ///
889    /// # async fn example(client: Client) {
890    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
891    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
892    ///
893    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
894    /// // to distinguish between unencrypted events and events that were decrypted
895    /// // by the SDK.
896    /// let _ = client
897    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
898    ///     );
899    ///
900    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
901    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
902    /// // should be highlighted in the timeline.
903    /// let _ =
904    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
905    ///
906    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
907    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
908    /// # }
909    /// ```
910    ///
911    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
912    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
913    where
914        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
915        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
916    {
917        self.observe_room_events_impl(None)
918    }
919
920    /// Observe a specific room, and event type.
921    ///
922    /// This method works the same way as [`Client::observe_events`], except
923    /// that the observability will only be applied for events in the room with
924    /// the specified ID. See that method for more details.
925    ///
926    /// Be careful that only the most recent value can be observed. Subscribers
927    /// are notified when a new value is sent, but there is no guarantee
928    /// that they will see all values.
929    pub fn observe_room_events<Ev, Ctx>(
930        &self,
931        room_id: &RoomId,
932    ) -> ObservableEventHandler<(Ev, Ctx)>
933    where
934        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
935        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
936    {
937        self.observe_room_events_impl(Some(room_id.to_owned()))
938    }
939
940    /// Shared implementation for `Client::observe_events` and
941    /// `Client::observe_room_events`.
942    fn observe_room_events_impl<Ev, Ctx>(
943        &self,
944        room_id: Option<OwnedRoomId>,
945    ) -> ObservableEventHandler<(Ev, Ctx)>
946    where
947        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
948        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
949    {
950        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
951        // new value.
952        let shared_observable = SharedObservable::new(None);
953
954        ObservableEventHandler::new(
955            shared_observable.clone(),
956            self.event_handler_drop_guard(self.add_event_handler_impl(
957                move |event: Ev, context: Ctx| {
958                    shared_observable.set(Some((event, context)));
959
960                    ready(())
961                },
962                room_id,
963            )),
964        )
965    }
966
967    /// Remove the event handler associated with the handle.
968    ///
969    /// Note that you **must not** call `remove_event_handler` from the
970    /// non-async part of an event handler, that is:
971    ///
972    /// ```ignore
973    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
974    ///     // ⚠ this will cause a deadlock ⚠
975    ///     client.remove_event_handler(handle);
976    ///
977    ///     async move {
978    ///         // removing the event handler here is fine
979    ///         client.remove_event_handler(handle);
980    ///     }
981    /// })
982    /// ```
983    ///
984    /// Note also that handlers that remove themselves will still execute with
985    /// events received in the same sync cycle.
986    ///
987    /// # Arguments
988    ///
989    /// `handle` - The [`EventHandlerHandle`] that is returned when
990    /// registering the event handler with [`Client::add_event_handler`].
991    ///
992    /// # Examples
993    ///
994    /// ```no_run
995    /// # use url::Url;
996    /// # use tokio::sync::mpsc;
997    /// #
998    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
999    /// #
1000    /// use matrix_sdk::{
1001    ///     event_handler::EventHandlerHandle,
1002    ///     ruma::events::room::member::SyncRoomMemberEvent, Client,
1003    /// };
1004    /// #
1005    /// # futures_executor::block_on(async {
1006    /// # let client = matrix_sdk::Client::builder()
1007    /// #     .homeserver_url(homeserver)
1008    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1009    /// #     .build()
1010    /// #     .await
1011    /// #     .unwrap();
1012    ///
1013    /// client.add_event_handler(
1014    ///     |ev: SyncRoomMemberEvent,
1015    ///      client: Client,
1016    ///      handle: EventHandlerHandle| async move {
1017    ///         // Common usage: Check arriving Event is the expected one
1018    ///         println!("Expected RoomMemberEvent received!");
1019    ///         client.remove_event_handler(handle);
1020    ///     },
1021    /// );
1022    /// # });
1023    /// ```
1024    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1025        self.inner.event_handlers.remove(handle);
1026    }
1027
1028    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1029    /// the given handle.
1030    ///
1031    /// When the returned value is dropped, the event handler will be removed.
1032    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1033        EventHandlerDropGuard::new(handle, self.clone())
1034    }
1035
1036    /// Add an arbitrary value for use as event handler context.
1037    ///
1038    /// The value can be obtained in an event handler by adding an argument of
1039    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1040    ///
1041    /// If a value of the same type has been added before, it will be
1042    /// overwritten.
1043    ///
1044    /// # Examples
1045    ///
1046    /// ```no_run
1047    /// use matrix_sdk::{
1048    ///     event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1049    ///     Room,
1050    /// };
1051    /// # #[derive(Clone)]
1052    /// # struct SomeType;
1053    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1054    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1055    /// # futures_executor::block_on(async {
1056    /// # let client = matrix_sdk::Client::builder()
1057    /// #     .homeserver_url(homeserver)
1058    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1059    /// #     .build()
1060    /// #     .await
1061    /// #     .unwrap();
1062    ///
1063    /// // Handle used to send messages to the UI part of the app
1064    /// let my_gui_handle: SomeType = obtain_gui_handle();
1065    ///
1066    /// client.add_event_handler_context(my_gui_handle.clone());
1067    /// client.add_event_handler(
1068    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1069    ///         async move {
1070    ///             // gui_handle.send(DisplayMessage { message: ev });
1071    ///         }
1072    ///     },
1073    /// );
1074    /// # });
1075    /// ```
1076    pub fn add_event_handler_context<T>(&self, ctx: T)
1077    where
1078        T: Clone + Send + Sync + 'static,
1079    {
1080        self.inner.event_handlers.add_context(ctx);
1081    }
1082
1083    /// Register a handler for a notification.
1084    ///
1085    /// Similar to [`Client::add_event_handler`], but only allows functions
1086    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1087    /// [`Client`] for now.
1088    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1089    where
1090        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1091        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1092    {
1093        self.inner.notification_handlers.write().await.push(Box::new(
1094            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1095        ));
1096
1097        self
1098    }
1099
1100    /// Subscribe to all updates for the room with the given ID.
1101    ///
1102    /// The returned receiver will receive a new message for each sync response
1103    /// that contains updates for that room.
1104    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1105        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1106            btree_map::Entry::Vacant(entry) => {
1107                let (tx, rx) = broadcast::channel(8);
1108                entry.insert(tx);
1109                rx
1110            }
1111            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1112        }
1113    }
1114
1115    /// Subscribe to all updates to all rooms, whenever any has been received in
1116    /// a sync response.
1117    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1118        self.inner.room_updates_sender.subscribe()
1119    }
1120
1121    pub(crate) async fn notification_handlers(
1122        &self,
1123    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1124        self.inner.notification_handlers.read().await
1125    }
1126
1127    /// Get all the rooms the client knows about.
1128    ///
1129    /// This will return the list of joined, invited, and left rooms.
1130    pub fn rooms(&self) -> Vec<Room> {
1131        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1132    }
1133
1134    /// Get all the rooms the client knows about, filtered by room state.
1135    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1136        self.base_client()
1137            .rooms_filtered(filter)
1138            .into_iter()
1139            .map(|room| Room::new(self.clone(), room))
1140            .collect()
1141    }
1142
1143    /// Get a stream of all the rooms, in addition to the existing rooms.
1144    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1145        let (rooms, stream) = self.base_client().rooms_stream();
1146
1147        let map_room = |room| Room::new(self.clone(), room);
1148
1149        (
1150            rooms.into_iter().map(map_room).collect(),
1151            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1152        )
1153    }
1154
1155    /// Returns the joined rooms this client knows about.
1156    pub fn joined_rooms(&self) -> Vec<Room> {
1157        self.base_client()
1158            .rooms_filtered(RoomStateFilter::JOINED)
1159            .into_iter()
1160            .map(|room| Room::new(self.clone(), room))
1161            .collect()
1162    }
1163
1164    /// Returns the invited rooms this client knows about.
1165    pub fn invited_rooms(&self) -> Vec<Room> {
1166        self.base_client()
1167            .rooms_filtered(RoomStateFilter::INVITED)
1168            .into_iter()
1169            .map(|room| Room::new(self.clone(), room))
1170            .collect()
1171    }
1172
1173    /// Returns the left rooms this client knows about.
1174    pub fn left_rooms(&self) -> Vec<Room> {
1175        self.base_client()
1176            .rooms_filtered(RoomStateFilter::LEFT)
1177            .into_iter()
1178            .map(|room| Room::new(self.clone(), room))
1179            .collect()
1180    }
1181
1182    /// Get a room with the given room id.
1183    ///
1184    /// # Arguments
1185    ///
1186    /// `room_id` - The unique id of the room that should be fetched.
1187    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1188        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1189    }
1190
1191    /// Gets the preview of a room, whether the current user has joined it or
1192    /// not.
1193    pub async fn get_room_preview(
1194        &self,
1195        room_or_alias_id: &RoomOrAliasId,
1196        via: Vec<OwnedServerName>,
1197    ) -> Result<RoomPreview> {
1198        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1199            Ok(room_id) => room_id.to_owned(),
1200            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1201        };
1202
1203        if let Some(room) = self.get_room(&room_id) {
1204            // The cached data can only be trusted if the room state is joined or
1205            // banned: for invite and knock rooms, no updates will be received
1206            // for the rooms after the invite/knock action took place so we may
1207            // have very out to date data for important fields such as
1208            // `join_rule`. For left rooms, the homeserver should return the latest info.
1209            match room.state() {
1210                RoomState::Joined | RoomState::Banned => {
1211                    return Ok(RoomPreview::from_known_room(&room).await);
1212                }
1213                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1214            }
1215        }
1216
1217        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1218    }
1219
1220    /// Resolve a room alias to a room id and a list of servers which know
1221    /// about it.
1222    ///
1223    /// # Arguments
1224    ///
1225    /// `room_alias` - The room alias to be resolved.
1226    pub async fn resolve_room_alias(
1227        &self,
1228        room_alias: &RoomAliasId,
1229    ) -> HttpResult<get_alias::v3::Response> {
1230        let request = get_alias::v3::Request::new(room_alias.to_owned());
1231        self.send(request).await
1232    }
1233
1234    /// Checks if a room alias is not in use yet.
1235    ///
1236    /// Returns:
1237    /// - `Ok(true)` if the room alias is available.
1238    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1239    ///   status code).
1240    /// - An `Err` otherwise.
1241    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1242        match self.resolve_room_alias(alias).await {
1243            // The room alias was resolved, so it's already in use.
1244            Ok(_) => Ok(false),
1245            Err(error) => {
1246                match error.client_api_error_kind() {
1247                    // The room alias wasn't found, so it's available.
1248                    Some(ErrorKind::NotFound) => Ok(true),
1249                    _ => Err(error),
1250                }
1251            }
1252        }
1253    }
1254
1255    /// Adds a new room alias associated with a room to the room directory.
1256    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1257        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1258        self.send(request).await?;
1259        Ok(())
1260    }
1261
1262    /// Removes a room alias from the room directory.
1263    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1264        let request = delete_alias::v3::Request::new(alias.to_owned());
1265        self.send(request).await?;
1266        Ok(())
1267    }
1268
1269    /// Update the homeserver from the login response well-known if needed.
1270    ///
1271    /// # Arguments
1272    ///
1273    /// * `login_well_known` - The `well_known` field from a successful login
1274    ///   response.
1275    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1276        if self.inner.respect_login_well_known {
1277            if let Some(well_known) = login_well_known {
1278                if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1279                    self.set_homeserver(homeserver);
1280                }
1281            }
1282        }
1283    }
1284
1285    /// Similar to [`Client::restore_session_with`], with
1286    /// [`RoomLoadSettings::default()`].
1287    ///
1288    /// # Panics
1289    ///
1290    /// Panics if a session was already restored or logged in.
1291    #[instrument(skip_all)]
1292    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1293        self.restore_session_with(session, RoomLoadSettings::default()).await
1294    }
1295
1296    /// Restore a session previously logged-in using one of the available
1297    /// authentication APIs. The number of rooms to restore is controlled by
1298    /// [`RoomLoadSettings`].
1299    ///
1300    /// See the documentation of the corresponding authentication API's
1301    /// `restore_session` method for more information.
1302    ///
1303    /// # Panics
1304    ///
1305    /// Panics if a session was already restored or logged in.
1306    #[instrument(skip_all)]
1307    pub async fn restore_session_with(
1308        &self,
1309        session: impl Into<AuthSession>,
1310        room_load_settings: RoomLoadSettings,
1311    ) -> Result<()> {
1312        let session = session.into();
1313        match session {
1314            AuthSession::Matrix(session) => {
1315                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1316            }
1317            AuthSession::OAuth(session) => {
1318                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1319            }
1320        }
1321    }
1322
1323    /// Refresh the access token using the authentication API used to log into
1324    /// this session.
1325    ///
1326    /// See the documentation of the authentication API's `refresh_access_token`
1327    /// method for more information.
1328    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1329        let Some(auth_api) = self.auth_api() else {
1330            return Err(RefreshTokenError::RefreshTokenRequired);
1331        };
1332
1333        match auth_api {
1334            AuthApi::Matrix(api) => {
1335                trace!("Token refresh: Using the homeserver.");
1336                Box::pin(api.refresh_access_token()).await?;
1337            }
1338            AuthApi::OAuth(api) => {
1339                trace!("Token refresh: Using OAuth 2.0.");
1340                Box::pin(api.refresh_access_token()).await?;
1341            }
1342        }
1343
1344        Ok(())
1345    }
1346
1347    /// Log out the current session using the proper authentication API.
1348    ///
1349    /// # Errors
1350    ///
1351    /// Returns an error if the session is not authenticated or if an error
1352    /// occurred while making the request to the server.
1353    pub async fn logout(&self) -> Result<(), Error> {
1354        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1355        match auth_api {
1356            AuthApi::Matrix(matrix_auth) => {
1357                matrix_auth.logout().await?;
1358                Ok(())
1359            }
1360            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1361        }
1362    }
1363
1364    /// Get or upload a sync filter.
1365    ///
1366    /// This method will either get a filter ID from the store or upload the
1367    /// filter definition to the homeserver and return the new filter ID.
1368    ///
1369    /// # Arguments
1370    ///
1371    /// * `filter_name` - The unique name of the filter, this name will be used
1372    /// locally to store and identify the filter ID returned by the server.
1373    ///
1374    /// * `definition` - The filter definition that should be uploaded to the
1375    /// server if no filter ID can be found in the store.
1376    ///
1377    /// # Examples
1378    ///
1379    /// ```no_run
1380    /// # use matrix_sdk::{
1381    /// #    Client, config::SyncSettings,
1382    /// #    ruma::api::client::{
1383    /// #        filter::{
1384    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1385    /// #        },
1386    /// #        sync::sync_events::v3::Filter,
1387    /// #    }
1388    /// # };
1389    /// # use url::Url;
1390    /// # async {
1391    /// # let homeserver = Url::parse("http://example.com").unwrap();
1392    /// # let client = Client::new(homeserver).await.unwrap();
1393    /// let mut filter = FilterDefinition::default();
1394    ///
1395    /// // Let's enable member lazy loading.
1396    /// filter.room.state.lazy_load_options =
1397    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1398    ///
1399    /// let filter_id = client
1400    ///     .get_or_upload_filter("sync", filter)
1401    ///     .await
1402    ///     .unwrap();
1403    ///
1404    /// let sync_settings = SyncSettings::new()
1405    ///     .filter(Filter::FilterId(filter_id));
1406    ///
1407    /// let response = client.sync_once(sync_settings).await.unwrap();
1408    /// # };
1409    #[instrument(skip(self, definition))]
1410    pub async fn get_or_upload_filter(
1411        &self,
1412        filter_name: &str,
1413        definition: FilterDefinition,
1414    ) -> Result<String> {
1415        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1416            debug!("Found filter locally");
1417            Ok(filter)
1418        } else {
1419            debug!("Didn't find filter locally");
1420            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1421            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1422            let response = self.send(request).await?;
1423
1424            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1425
1426            Ok(response.filter_id)
1427        }
1428    }
1429
1430    /// Finish joining a room.
1431    ///
1432    /// If the room was an invite that should be marked as a DM, will include it
1433    /// in the DM event after creating the joined room.
1434    async fn finish_join_room(&self, room_id: &RoomId) -> Result<Room> {
1435        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1436            room.state() == RoomState::Invited
1437                && room.is_direct().await.unwrap_or_else(|e| {
1438                    warn!(%room_id, "is_direct() failed: {e}");
1439                    false
1440                })
1441        } else {
1442            false
1443        };
1444
1445        let base_room = self.base_client().room_joined(room_id).await?;
1446        let room = Room::new(self.clone(), base_room);
1447
1448        if mark_as_dm {
1449            room.set_is_direct(true).await?;
1450        }
1451
1452        Ok(room)
1453    }
1454
1455    /// Join a room by `RoomId`.
1456    ///
1457    /// Returns the `Room` in the joined state.
1458    ///
1459    /// # Arguments
1460    ///
1461    /// * `room_id` - The `RoomId` of the room to be joined.
1462    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1463        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1464        let response = self.send(request).await?;
1465        self.finish_join_room(&response.room_id).await
1466    }
1467
1468    /// Join a room by `RoomOrAliasId`.
1469    ///
1470    /// Returns the `Room` in the joined state.
1471    ///
1472    /// # Arguments
1473    ///
1474    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1475    ///   alias looks like `#name:example.com`.
1476    /// * `server_names` - The server names to be used for resolving the alias,
1477    ///   if needs be.
1478    pub async fn join_room_by_id_or_alias(
1479        &self,
1480        alias: &RoomOrAliasId,
1481        server_names: &[OwnedServerName],
1482    ) -> Result<Room> {
1483        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1484            via: server_names.to_owned(),
1485        });
1486        let response = self.send(request).await?;
1487        self.finish_join_room(&response.room_id).await
1488    }
1489
1490    /// Search the homeserver's directory of public rooms.
1491    ///
1492    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1493    /// a `get_public_rooms::Response`.
1494    ///
1495    /// # Arguments
1496    ///
1497    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1498    ///
1499    /// * `since` - Pagination token from a previous request.
1500    ///
1501    /// * `server` - The name of the server, if `None` the requested server is
1502    ///   used.
1503    ///
1504    /// # Examples
1505    /// ```no_run
1506    /// use matrix_sdk::Client;
1507    /// # use url::Url;
1508    /// # let homeserver = Url::parse("http://example.com").unwrap();
1509    /// # let limit = Some(10);
1510    /// # let since = Some("since token");
1511    /// # let server = Some("servername.com".try_into().unwrap());
1512    /// # async {
1513    /// let mut client = Client::new(homeserver).await.unwrap();
1514    ///
1515    /// client.public_rooms(limit, since, server).await;
1516    /// # };
1517    /// ```
1518    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1519    pub async fn public_rooms(
1520        &self,
1521        limit: Option<u32>,
1522        since: Option<&str>,
1523        server: Option<&ServerName>,
1524    ) -> HttpResult<get_public_rooms::v3::Response> {
1525        let limit = limit.map(UInt::from);
1526
1527        let request = assign!(get_public_rooms::v3::Request::new(), {
1528            limit,
1529            since: since.map(ToOwned::to_owned),
1530            server: server.map(ToOwned::to_owned),
1531        });
1532        self.send(request).await
1533    }
1534
1535    /// Create a room with the given parameters.
1536    ///
1537    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1538    /// created room.
1539    ///
1540    /// If you want to create a direct message with one specific user, you can
1541    /// use [`create_dm`][Self::create_dm], which is more convenient than
1542    /// assembling the [`create_room::v3::Request`] yourself.
1543    ///
1544    /// If the `is_direct` field of the request is set to `true` and at least
1545    /// one user is invited, the room will be automatically added to the direct
1546    /// rooms in the account data.
1547    ///
1548    /// # Examples
1549    ///
1550    /// ```no_run
1551    /// use matrix_sdk::{
1552    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1553    ///     Client,
1554    /// };
1555    /// # use url::Url;
1556    /// #
1557    /// # async {
1558    /// # let homeserver = Url::parse("http://example.com").unwrap();
1559    /// let request = CreateRoomRequest::new();
1560    /// let client = Client::new(homeserver).await.unwrap();
1561    /// assert!(client.create_room(request).await.is_ok());
1562    /// # };
1563    /// ```
1564    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1565        let invite = request.invite.clone();
1566        let is_direct_room = request.is_direct;
1567        let response = self.send(request).await?;
1568
1569        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1570
1571        let joined_room = Room::new(self.clone(), base_room);
1572
1573        if is_direct_room && !invite.is_empty() {
1574            if let Err(error) =
1575                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1576            {
1577                // FIXME: Retry in the background
1578                error!("Failed to mark room as DM: {error}");
1579            }
1580        }
1581
1582        Ok(joined_room)
1583    }
1584
1585    /// Create a DM room.
1586    ///
1587    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1588    /// given user being invited, the room marked `is_direct` and both the
1589    /// creator and invitee getting the default maximum power level.
1590    ///
1591    /// If the `e2e-encryption` feature is enabled, the room will also be
1592    /// encrypted.
1593    ///
1594    /// # Arguments
1595    ///
1596    /// * `user_id` - The ID of the user to create a DM for.
1597    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1598        #[cfg(feature = "e2e-encryption")]
1599        let initial_state =
1600            vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1601                .to_raw_any()];
1602
1603        #[cfg(not(feature = "e2e-encryption"))]
1604        let initial_state = vec![];
1605
1606        let request = assign!(create_room::v3::Request::new(), {
1607            invite: vec![user_id.to_owned()],
1608            is_direct: true,
1609            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1610            initial_state,
1611        });
1612
1613        self.create_room(request).await
1614    }
1615
1616    /// Search the homeserver's directory for public rooms with a filter.
1617    ///
1618    /// # Arguments
1619    ///
1620    /// * `room_search` - The easiest way to create this request is using the
1621    ///   `get_public_rooms_filtered::Request` itself.
1622    ///
1623    /// # Examples
1624    ///
1625    /// ```no_run
1626    /// # use url::Url;
1627    /// # use matrix_sdk::Client;
1628    /// # async {
1629    /// # let homeserver = Url::parse("http://example.com")?;
1630    /// use matrix_sdk::ruma::{
1631    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1632    /// };
1633    /// # let mut client = Client::new(homeserver).await?;
1634    ///
1635    /// let mut filter = Filter::new();
1636    /// filter.generic_search_term = Some("rust".to_owned());
1637    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1638    /// request.filter = filter;
1639    ///
1640    /// let response = client.public_rooms_filtered(request).await?;
1641    ///
1642    /// for room in response.chunk {
1643    ///     println!("Found room {room:?}");
1644    /// }
1645    /// # anyhow::Ok(()) };
1646    /// ```
1647    pub async fn public_rooms_filtered(
1648        &self,
1649        request: get_public_rooms_filtered::v3::Request,
1650    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1651        self.send(request).await
1652    }
1653
1654    /// Send an arbitrary request to the server, without updating client state.
1655    ///
1656    /// **Warning:** Because this method *does not* update the client state, it
1657    /// is important to make sure that you account for this yourself, and
1658    /// use wrapper methods where available.  This method should *only* be
1659    /// used if a wrapper method for the endpoint you'd like to use is not
1660    /// available.
1661    ///
1662    /// # Arguments
1663    ///
1664    /// * `request` - A filled out and valid request for the endpoint to be hit
1665    ///
1666    /// * `timeout` - An optional request timeout setting, this overrides the
1667    ///   default request setting if one was set.
1668    ///
1669    /// # Examples
1670    ///
1671    /// ```no_run
1672    /// # use matrix_sdk::{Client, config::SyncSettings};
1673    /// # use url::Url;
1674    /// # async {
1675    /// # let homeserver = Url::parse("http://localhost:8080")?;
1676    /// # let mut client = Client::new(homeserver).await?;
1677    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1678    ///
1679    /// // First construct the request you want to make
1680    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1681    /// // for all available Endpoints
1682    /// let user_id = user_id!("@example:localhost").to_owned();
1683    /// let request = profile::get_profile::v3::Request::new(user_id);
1684    ///
1685    /// // Start the request using Client::send()
1686    /// let response = client.send(request).await?;
1687    ///
1688    /// // Check the corresponding Response struct to find out what types are
1689    /// // returned
1690    /// # anyhow::Ok(()) };
1691    /// ```
1692    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1693    where
1694        Request: OutgoingRequest + Clone + Debug,
1695        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1696    {
1697        SendRequest {
1698            client: self.clone(),
1699            request,
1700            config: None,
1701            send_progress: Default::default(),
1702        }
1703    }
1704
1705    pub(crate) async fn send_inner<Request>(
1706        &self,
1707        request: Request,
1708        config: Option<RequestConfig>,
1709        send_progress: SharedObservable<TransmissionProgress>,
1710    ) -> HttpResult<Request::IncomingResponse>
1711    where
1712        Request: OutgoingRequest + Debug,
1713        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1714    {
1715        let homeserver = self.homeserver().to_string();
1716        let access_token = self.access_token();
1717
1718        self.inner
1719            .http_client
1720            .send(
1721                request,
1722                config,
1723                homeserver,
1724                access_token.as_deref(),
1725                &self.server_versions().await?,
1726                send_progress,
1727            )
1728            .await
1729    }
1730
1731    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1732        _ = self
1733            .inner
1734            .auth_ctx
1735            .session_change_sender
1736            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1737    }
1738
1739    /// Fetches server capabilities from network; no caching.
1740    pub async fn fetch_server_capabilities(
1741        &self,
1742        request_config: Option<RequestConfig>,
1743    ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1744        let resp = self
1745            .inner
1746            .http_client
1747            .send(
1748                get_supported_versions::Request::new(),
1749                request_config,
1750                self.homeserver().to_string(),
1751                None,
1752                &[MatrixVersion::V1_0],
1753                Default::default(),
1754            )
1755            .await?;
1756
1757        // Fill both unstable features and server versions at once.
1758        let mut versions = resp.known_versions().collect::<Vec<_>>();
1759        if versions.is_empty() {
1760            versions.push(MatrixVersion::V1_0);
1761        }
1762
1763        Ok((versions.into(), resp.unstable_features))
1764    }
1765
1766    /// Load server capabilities from storage, or fetch them from network and
1767    /// cache them.
1768    async fn load_or_fetch_server_capabilities(
1769        &self,
1770    ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1771        match self.state_store().get_kv_data(StateStoreDataKey::ServerCapabilities).await {
1772            Ok(Some(stored)) => {
1773                if let Some((versions, unstable_features)) =
1774                    stored.into_server_capabilities().and_then(|cap| cap.maybe_decode())
1775                {
1776                    return Ok((versions.into(), unstable_features));
1777                }
1778            }
1779            Ok(None) => {
1780                // fallthrough: cache is empty
1781            }
1782            Err(err) => {
1783                warn!("error when loading cached server capabilities: {err}");
1784                // fallthrough to network.
1785            }
1786        }
1787
1788        let (versions, unstable_features) = self.fetch_server_capabilities(None).await?;
1789
1790        // Attempt to cache the result in storage.
1791        {
1792            let encoded = ServerCapabilities::new(&versions, unstable_features.clone());
1793            if let Err(err) = self
1794                .state_store()
1795                .set_kv_data(
1796                    StateStoreDataKey::ServerCapabilities,
1797                    StateStoreDataValue::ServerCapabilities(encoded),
1798                )
1799                .await
1800            {
1801                warn!("error when caching server capabilities: {err}");
1802            }
1803        }
1804
1805        Ok((versions, unstable_features))
1806    }
1807
1808    async fn get_or_load_and_cache_server_capabilities<
1809        T,
1810        F: Fn(&ClientServerCapabilities) -> Option<T>,
1811    >(
1812        &self,
1813        f: F,
1814    ) -> HttpResult<T> {
1815        let caps = &self.inner.caches.server_capabilities;
1816        if let Some(val) = f(&*caps.read().await) {
1817            return Ok(val);
1818        }
1819
1820        let mut guard = caps.write().await;
1821        if let Some(val) = f(&guard) {
1822            return Ok(val);
1823        }
1824
1825        let (versions, unstable_features) = self.load_or_fetch_server_capabilities().await?;
1826
1827        guard.server_versions = Some(versions);
1828        guard.unstable_features = Some(unstable_features);
1829
1830        // SAFETY: both fields were set above, so the function will always return some.
1831        Ok(f(&guard).unwrap())
1832    }
1833
1834    /// Get the Matrix versions supported by the homeserver by fetching them
1835    /// from the server or the cache.
1836    ///
1837    /// # Examples
1838    ///
1839    /// ```no_run
1840    /// use ruma::api::MatrixVersion;
1841    /// # use matrix_sdk::{Client, config::SyncSettings};
1842    /// # use url::Url;
1843    /// # async {
1844    /// # let homeserver = Url::parse("http://localhost:8080")?;
1845    /// # let mut client = Client::new(homeserver).await?;
1846    ///
1847    /// let server_versions = client.server_versions().await?;
1848    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1849    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1850    /// # anyhow::Ok(()) };
1851    /// ```
1852    pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1853        self.get_or_load_and_cache_server_capabilities(|caps| caps.server_versions.clone()).await
1854    }
1855
1856    /// Get the unstable features supported by the homeserver by fetching them
1857    /// from the server or the cache.
1858    ///
1859    /// # Examples
1860    ///
1861    /// ```no_run
1862    /// # use matrix_sdk::{Client, config::SyncSettings};
1863    /// # use url::Url;
1864    /// # async {
1865    /// # let homeserver = Url::parse("http://localhost:8080")?;
1866    /// # let mut client = Client::new(homeserver).await?;
1867    /// let unstable_features = client.unstable_features().await?;
1868    /// let supports_msc_x =
1869    ///     unstable_features.get("msc_x").copied().unwrap_or(false);
1870    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1871    /// # anyhow::Ok(()) };
1872    /// ```
1873    pub async fn unstable_features(&self) -> HttpResult<BTreeMap<String, bool>> {
1874        self.get_or_load_and_cache_server_capabilities(|caps| caps.unstable_features.clone()).await
1875    }
1876
1877    /// Empty the server version and unstable features cache.
1878    ///
1879    /// Since the SDK caches server capabilities (versions and unstable
1880    /// features), it's possible to have a stale entry in the cache. This
1881    /// functions makes it possible to force reset it.
1882    pub async fn reset_server_capabilities(&self) -> Result<()> {
1883        // Empty the in-memory caches.
1884        let mut guard = self.inner.caches.server_capabilities.write().await;
1885        guard.server_versions = None;
1886        guard.unstable_features = None;
1887
1888        // Empty the store cache.
1889        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerCapabilities).await?)
1890    }
1891
1892    /// Check whether MSC 4028 is enabled on the homeserver.
1893    ///
1894    /// # Examples
1895    ///
1896    /// ```no_run
1897    /// # use matrix_sdk::{Client, config::SyncSettings};
1898    /// # use url::Url;
1899    /// # async {
1900    /// # let homeserver = Url::parse("http://localhost:8080")?;
1901    /// # let mut client = Client::new(homeserver).await?;
1902    /// let msc4028_enabled =
1903    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
1904    /// # anyhow::Ok(()) };
1905    /// ```
1906    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
1907        Ok(self.unstable_features().await?.get("org.matrix.msc4028").copied().unwrap_or(false))
1908    }
1909
1910    /// Get information of all our own devices.
1911    ///
1912    /// # Examples
1913    ///
1914    /// ```no_run
1915    /// # use matrix_sdk::{Client, config::SyncSettings};
1916    /// # use url::Url;
1917    /// # async {
1918    /// # let homeserver = Url::parse("http://localhost:8080")?;
1919    /// # let mut client = Client::new(homeserver).await?;
1920    /// let response = client.devices().await?;
1921    ///
1922    /// for device in response.devices {
1923    ///     println!(
1924    ///         "Device: {} {}",
1925    ///         device.device_id,
1926    ///         device.display_name.as_deref().unwrap_or("")
1927    ///     );
1928    /// }
1929    /// # anyhow::Ok(()) };
1930    /// ```
1931    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
1932        let request = get_devices::v3::Request::new();
1933
1934        self.send(request).await
1935    }
1936
1937    /// Delete the given devices from the server.
1938    ///
1939    /// # Arguments
1940    ///
1941    /// * `devices` - The list of devices that should be deleted from the
1942    ///   server.
1943    ///
1944    /// * `auth_data` - This request requires user interactive auth, the first
1945    ///   request needs to set this to `None` and will always fail with an
1946    ///   `UiaaResponse`. The response will contain information for the
1947    ///   interactive auth and the same request needs to be made but this time
1948    ///   with some `auth_data` provided.
1949    ///
1950    /// ```no_run
1951    /// # use matrix_sdk::{
1952    /// #    ruma::{api::client::uiaa, device_id},
1953    /// #    Client, Error, config::SyncSettings,
1954    /// # };
1955    /// # use serde_json::json;
1956    /// # use url::Url;
1957    /// # use std::collections::BTreeMap;
1958    /// # async {
1959    /// # let homeserver = Url::parse("http://localhost:8080")?;
1960    /// # let mut client = Client::new(homeserver).await?;
1961    /// let devices = &[device_id!("DEVICEID").to_owned()];
1962    ///
1963    /// if let Err(e) = client.delete_devices(devices, None).await {
1964    ///     if let Some(info) = e.as_uiaa_response() {
1965    ///         let mut password = uiaa::Password::new(
1966    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1967    ///             "wordpass".to_owned(),
1968    ///         );
1969    ///         password.session = info.session.clone();
1970    ///
1971    ///         client
1972    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
1973    ///             .await?;
1974    ///     }
1975    /// }
1976    /// # anyhow::Ok(()) };
1977    pub async fn delete_devices(
1978        &self,
1979        devices: &[OwnedDeviceId],
1980        auth_data: Option<uiaa::AuthData>,
1981    ) -> HttpResult<delete_devices::v3::Response> {
1982        let mut request = delete_devices::v3::Request::new(devices.to_owned());
1983        request.auth = auth_data;
1984
1985        self.send(request).await
1986    }
1987
1988    /// Change the display name of a device owned by the current user.
1989    ///
1990    /// Returns a `update_device::Response` which specifies the result
1991    /// of the operation.
1992    ///
1993    /// # Arguments
1994    ///
1995    /// * `device_id` - The ID of the device to change the display name of.
1996    /// * `display_name` - The new display name to set.
1997    pub async fn rename_device(
1998        &self,
1999        device_id: &DeviceId,
2000        display_name: &str,
2001    ) -> HttpResult<update_device::v3::Response> {
2002        let mut request = update_device::v3::Request::new(device_id.to_owned());
2003        request.display_name = Some(display_name.to_owned());
2004
2005        self.send(request).await
2006    }
2007
2008    /// Synchronize the client's state with the latest state on the server.
2009    ///
2010    /// ## Syncing Events
2011    ///
2012    /// Messages or any other type of event need to be periodically fetched from
2013    /// the server, this is achieved by sending a `/sync` request to the server.
2014    ///
2015    /// The first sync is sent out without a [`token`]. The response of the
2016    /// first sync will contain a [`next_batch`] field which should then be
2017    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2018    /// don't receive the same events multiple times.
2019    ///
2020    /// ## Long Polling
2021    ///
2022    /// A sync should in the usual case always be in flight. The
2023    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2024    /// long the server will wait for new events before it will respond.
2025    /// The server will respond immediately if some new events arrive before the
2026    /// timeout has expired. If no changes arrive and the timeout expires an
2027    /// empty sync response will be sent to the client.
2028    ///
2029    /// This method of sending a request that may not receive a response
2030    /// immediately is called long polling.
2031    ///
2032    /// ## Filtering Events
2033    ///
2034    /// The number or type of messages and events that the client should receive
2035    /// from the server can be altered using a [`Filter`].
2036    ///
2037    /// Filters can be non-trivial and, since they will be sent with every sync
2038    /// request, they may take up a bunch of unnecessary bandwidth.
2039    ///
2040    /// Luckily filters can be uploaded to the server and reused using an unique
2041    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2042    /// method.
2043    ///
2044    /// # Arguments
2045    ///
2046    /// * `sync_settings` - Settings for the sync call, this allows us to set
2047    /// various options to configure the sync:
2048    ///     * [`filter`] - To configure which events we receive and which get
2049    ///       [filtered] by the server
2050    ///     * [`timeout`] - To configure our [long polling] setup.
2051    ///     * [`token`] - To tell the server which events we already received
2052    ///       and where we wish to continue syncing.
2053    ///     * [`full_state`] - To tell the server that we wish to receive all
2054    ///       state events, regardless of our configured [`token`].
2055    ///     * [`set_presence`] - To tell the server to set the presence and to
2056    ///       which state.
2057    ///
2058    /// # Examples
2059    ///
2060    /// ```no_run
2061    /// # use url::Url;
2062    /// # async {
2063    /// # let homeserver = Url::parse("http://localhost:8080")?;
2064    /// # let username = "";
2065    /// # let password = "";
2066    /// use matrix_sdk::{
2067    ///     config::SyncSettings,
2068    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2069    /// };
2070    ///
2071    /// let client = Client::new(homeserver).await?;
2072    /// client.matrix_auth().login_username(username, password).send().await?;
2073    ///
2074    /// // Sync once so we receive the client state and old messages.
2075    /// client.sync_once(SyncSettings::default()).await?;
2076    ///
2077    /// // Register our handler so we start responding once we receive a new
2078    /// // event.
2079    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2080    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2081    /// });
2082    ///
2083    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2084    /// // from our `sync_once()` call automatically.
2085    /// client.sync(SyncSettings::default()).await;
2086    /// # anyhow::Ok(()) };
2087    /// ```
2088    ///
2089    /// [`sync`]: #method.sync
2090    /// [`SyncSettings`]: crate::config::SyncSettings
2091    /// [`token`]: crate::config::SyncSettings#method.token
2092    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2093    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2094    /// [`set_presence`]: ruma::presence::PresenceState
2095    /// [`filter`]: crate::config::SyncSettings#method.filter
2096    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2097    /// [`next_batch`]: SyncResponse#structfield.next_batch
2098    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2099    /// [long polling]: #long-polling
2100    /// [filtered]: #filtering-events
2101    #[instrument(skip(self))]
2102    pub async fn sync_once(
2103        &self,
2104        sync_settings: crate::config::SyncSettings,
2105    ) -> Result<SyncResponse> {
2106        // The sync might not return for quite a while due to the timeout.
2107        // We'll see if there's anything crypto related to send out before we
2108        // sync, i.e. if we closed our client after a sync but before the
2109        // crypto requests were sent out.
2110        //
2111        // This will mostly be a no-op.
2112        #[cfg(feature = "e2e-encryption")]
2113        if let Err(e) = self.send_outgoing_requests().await {
2114            error!(error = ?e, "Error while sending outgoing E2EE requests");
2115        }
2116
2117        let request = assign!(sync_events::v3::Request::new(), {
2118            filter: sync_settings.filter.map(|f| *f),
2119            since: sync_settings.token,
2120            full_state: sync_settings.full_state,
2121            set_presence: sync_settings.set_presence,
2122            timeout: sync_settings.timeout,
2123        });
2124        let mut request_config = self.request_config();
2125        if let Some(timeout) = sync_settings.timeout {
2126            request_config.timeout += timeout;
2127        }
2128
2129        let response = self.send(request).with_request_config(request_config).await?;
2130        let next_batch = response.next_batch.clone();
2131        let response = self.process_sync(response).await?;
2132
2133        #[cfg(feature = "e2e-encryption")]
2134        if let Err(e) = self.send_outgoing_requests().await {
2135            error!(error = ?e, "Error while sending outgoing E2EE requests");
2136        }
2137
2138        self.inner.sync_beat.notify(usize::MAX);
2139
2140        Ok(SyncResponse::new(next_batch, response))
2141    }
2142
2143    /// Repeatedly synchronize the client state with the server.
2144    ///
2145    /// This method will only return on error, if cancellation is needed
2146    /// the method should be wrapped in a cancelable task or the
2147    /// [`Client::sync_with_callback`] method can be used or
2148    /// [`Client::sync_with_result_callback`] if you want to handle error
2149    /// cases in the loop, too.
2150    ///
2151    /// This method will internally call [`Client::sync_once`] in a loop.
2152    ///
2153    /// This method can be used with the [`Client::add_event_handler`]
2154    /// method to react to individual events. If you instead wish to handle
2155    /// events in a bulk manner the [`Client::sync_with_callback`],
2156    /// [`Client::sync_with_result_callback`] and
2157    /// [`Client::sync_stream`] methods can be used instead. Those methods
2158    /// repeatedly return the whole sync response.
2159    ///
2160    /// # Arguments
2161    ///
2162    /// * `sync_settings` - Settings for the sync call. *Note* that those
2163    ///   settings will be only used for the first sync call. See the argument
2164    ///   docs for [`Client::sync_once`] for more info.
2165    ///
2166    /// # Return
2167    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2168    /// up to the user of the API to check the error and decide whether the sync
2169    /// should continue or not.
2170    ///
2171    /// # Examples
2172    ///
2173    /// ```no_run
2174    /// # use url::Url;
2175    /// # async {
2176    /// # let homeserver = Url::parse("http://localhost:8080")?;
2177    /// # let username = "";
2178    /// # let password = "";
2179    /// use matrix_sdk::{
2180    ///     config::SyncSettings,
2181    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2182    /// };
2183    ///
2184    /// let client = Client::new(homeserver).await?;
2185    /// client.matrix_auth().login_username(&username, &password).send().await?;
2186    ///
2187    /// // Register our handler so we start responding once we receive a new
2188    /// // event.
2189    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2190    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2191    /// });
2192    ///
2193    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2194    /// // automatically.
2195    /// client.sync(SyncSettings::default()).await?;
2196    /// # anyhow::Ok(()) };
2197    /// ```
2198    ///
2199    /// [argument docs]: #method.sync_once
2200    /// [`sync_with_callback`]: #method.sync_with_callback
2201    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2202        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2203    }
2204
2205    /// Repeatedly call sync to synchronize the client state with the server.
2206    ///
2207    /// # Arguments
2208    ///
2209    /// * `sync_settings` - Settings for the sync call. *Note* that those
2210    ///   settings will be only used for the first sync call. See the argument
2211    ///   docs for [`Client::sync_once`] for more info.
2212    ///
2213    /// * `callback` - A callback that will be called every time a successful
2214    ///   response has been fetched from the server. The callback must return a
2215    ///   boolean which signalizes if the method should stop syncing. If the
2216    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2217    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2218    ///
2219    /// # Return
2220    /// The sync runs until an error occurs or the
2221    /// callback indicates that the Loop should stop. If the callback asked for
2222    /// a regular stop, the result will be `Ok(())` otherwise the
2223    /// `Err(Error)` is returned.
2224    ///
2225    /// # Examples
2226    ///
2227    /// The following example demonstrates how to sync forever while sending all
2228    /// the interesting events through a mpsc channel to another thread e.g. a
2229    /// UI thread.
2230    ///
2231    /// ```no_run
2232    /// # use std::time::Duration;
2233    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2234    /// # use url::Url;
2235    /// # async {
2236    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2237    /// # let mut client = Client::new(homeserver).await.unwrap();
2238    ///
2239    /// use tokio::sync::mpsc::channel;
2240    ///
2241    /// let (tx, rx) = channel(100);
2242    ///
2243    /// let sync_channel = &tx;
2244    /// let sync_settings = SyncSettings::new()
2245    ///     .timeout(Duration::from_secs(30));
2246    ///
2247    /// client
2248    ///     .sync_with_callback(sync_settings, |response| async move {
2249    ///         let channel = sync_channel;
2250    ///         for (room_id, room) in response.rooms.joined {
2251    ///             for event in room.timeline.events {
2252    ///                 channel.send(event).await.unwrap();
2253    ///             }
2254    ///         }
2255    ///
2256    ///         LoopCtrl::Continue
2257    ///     })
2258    ///     .await;
2259    /// };
2260    /// ```
2261    #[instrument(skip_all)]
2262    pub async fn sync_with_callback<C>(
2263        &self,
2264        sync_settings: crate::config::SyncSettings,
2265        callback: impl Fn(SyncResponse) -> C,
2266    ) -> Result<(), Error>
2267    where
2268        C: Future<Output = LoopCtrl>,
2269    {
2270        self.sync_with_result_callback(sync_settings, |result| async {
2271            Ok(callback(result?).await)
2272        })
2273        .await
2274    }
2275
2276    /// Repeatedly call sync to synchronize the client state with the server.
2277    ///
2278    /// # Arguments
2279    ///
2280    /// * `sync_settings` - Settings for the sync call. *Note* that those
2281    ///   settings will be only used for the first sync call. See the argument
2282    ///   docs for [`Client::sync_once`] for more info.
2283    ///
2284    /// * `callback` - A callback that will be called every time after a
2285    ///   response has been received, failure or not. The callback returns a
2286    ///   `Result<LoopCtrl, Error>`, too. When returning
2287    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2288    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2289    ///   function returns `Ok(())`. In case the callback can't handle the
2290    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2291    ///   which results in the sync ending and the `Err(Error)` being returned.
2292    ///
2293    /// # Return
2294    /// The sync runs until an error occurs that the callback can't handle or
2295    /// the callback indicates that the Loop should stop. If the callback
2296    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2297    /// `Err(Error)` is returned.
2298    ///
2299    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2300    /// this, and are handled first without sending the result to the
2301    /// callback. Only after they have exceeded is the `Result` handed to
2302    /// the callback.
2303    ///
2304    /// # Examples
2305    ///
2306    /// The following example demonstrates how to sync forever while sending all
2307    /// the interesting events through a mpsc channel to another thread e.g. a
2308    /// UI thread.
2309    ///
2310    /// ```no_run
2311    /// # use std::time::Duration;
2312    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2313    /// # use url::Url;
2314    /// # async {
2315    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2316    /// # let mut client = Client::new(homeserver).await.unwrap();
2317    /// #
2318    /// use tokio::sync::mpsc::channel;
2319    ///
2320    /// let (tx, rx) = channel(100);
2321    ///
2322    /// let sync_channel = &tx;
2323    /// let sync_settings = SyncSettings::new()
2324    ///     .timeout(Duration::from_secs(30));
2325    ///
2326    /// client
2327    ///     .sync_with_result_callback(sync_settings, |response| async move {
2328    ///         let channel = sync_channel;
2329    ///         let sync_response = response?;
2330    ///         for (room_id, room) in sync_response.rooms.joined {
2331    ///              for event in room.timeline.events {
2332    ///                  channel.send(event).await.unwrap();
2333    ///               }
2334    ///         }
2335    ///
2336    ///         Ok(LoopCtrl::Continue)
2337    ///     })
2338    ///     .await;
2339    /// };
2340    /// ```
2341    #[instrument(skip(self, callback))]
2342    pub async fn sync_with_result_callback<C>(
2343        &self,
2344        mut sync_settings: crate::config::SyncSettings,
2345        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2346    ) -> Result<(), Error>
2347    where
2348        C: Future<Output = Result<LoopCtrl, Error>>,
2349    {
2350        let mut last_sync_time: Option<Instant> = None;
2351
2352        if sync_settings.token.is_none() {
2353            sync_settings.token = self.sync_token().await;
2354        }
2355
2356        loop {
2357            trace!("Syncing");
2358            let result = self.sync_loop_helper(&mut sync_settings).await;
2359
2360            trace!("Running callback");
2361            if callback(result).await? == LoopCtrl::Break {
2362                trace!("Callback told us to stop");
2363                break;
2364            }
2365            trace!("Done running callback");
2366
2367            Client::delay_sync(&mut last_sync_time).await
2368        }
2369
2370        Ok(())
2371    }
2372
2373    //// Repeatedly synchronize the client state with the server.
2374    ///
2375    /// This method will internally call [`Client::sync_once`] in a loop and is
2376    /// equivalent to the [`Client::sync`] method but the responses are provided
2377    /// as an async stream.
2378    ///
2379    /// # Arguments
2380    ///
2381    /// * `sync_settings` - Settings for the sync call. *Note* that those
2382    ///   settings will be only used for the first sync call. See the argument
2383    ///   docs for [`Client::sync_once`] for more info.
2384    ///
2385    /// # Examples
2386    ///
2387    /// ```no_run
2388    /// # use url::Url;
2389    /// # async {
2390    /// # let homeserver = Url::parse("http://localhost:8080")?;
2391    /// # let username = "";
2392    /// # let password = "";
2393    /// use futures_util::StreamExt;
2394    /// use matrix_sdk::{config::SyncSettings, Client};
2395    ///
2396    /// let client = Client::new(homeserver).await?;
2397    /// client.matrix_auth().login_username(&username, &password).send().await?;
2398    ///
2399    /// let mut sync_stream =
2400    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2401    ///
2402    /// while let Some(Ok(response)) = sync_stream.next().await {
2403    ///     for room in response.rooms.joined.values() {
2404    ///         for e in &room.timeline.events {
2405    ///             if let Ok(event) = e.raw().deserialize() {
2406    ///                 println!("Received event {:?}", event);
2407    ///             }
2408    ///         }
2409    ///     }
2410    /// }
2411    ///
2412    /// # anyhow::Ok(()) };
2413    /// ```
2414    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2415    #[instrument(skip(self))]
2416    pub async fn sync_stream(
2417        &self,
2418        mut sync_settings: crate::config::SyncSettings,
2419    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2420        let mut last_sync_time: Option<Instant> = None;
2421
2422        if sync_settings.token.is_none() {
2423            sync_settings.token = self.sync_token().await;
2424        }
2425
2426        let parent_span = Span::current();
2427
2428        async_stream::stream! {
2429            loop {
2430                yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2431
2432                Client::delay_sync(&mut last_sync_time).await
2433            }
2434        }
2435    }
2436
2437    /// Get the current, if any, sync token of the client.
2438    /// This will be None if the client didn't sync at least once.
2439    pub(crate) async fn sync_token(&self) -> Option<String> {
2440        self.inner.base_client.sync_token().await
2441    }
2442
2443    /// Gets information about the owner of a given access token.
2444    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2445        let request = whoami::v3::Request::new();
2446        self.send(request).await
2447    }
2448
2449    /// Subscribes a new receiver to client SessionChange broadcasts.
2450    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2451        let broadcast = &self.auth_ctx().session_change_sender;
2452        broadcast.subscribe()
2453    }
2454
2455    /// Sets the save/restore session callbacks.
2456    ///
2457    /// This is another mechanism to get synchronous updates to session tokens,
2458    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2459    pub fn set_session_callbacks(
2460        &self,
2461        reload_session_callback: Box<ReloadSessionCallback>,
2462        save_session_callback: Box<SaveSessionCallback>,
2463    ) -> Result<()> {
2464        self.inner
2465            .auth_ctx
2466            .reload_session_callback
2467            .set(reload_session_callback)
2468            .map_err(|_| Error::MultipleSessionCallbacks)?;
2469
2470        self.inner
2471            .auth_ctx
2472            .save_session_callback
2473            .set(save_session_callback)
2474            .map_err(|_| Error::MultipleSessionCallbacks)?;
2475
2476        Ok(())
2477    }
2478
2479    /// Get the notification settings of the current owner of the client.
2480    pub async fn notification_settings(&self) -> NotificationSettings {
2481        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2482        NotificationSettings::new(self.clone(), ruleset)
2483    }
2484
2485    /// Create a new specialized `Client` that can process notifications.
2486    ///
2487    /// See [`CrossProcessStoreLock::new`] to learn more about
2488    /// `cross_process_store_locks_holder_name`.
2489    ///
2490    /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2491    pub async fn notification_client(
2492        &self,
2493        cross_process_store_locks_holder_name: String,
2494    ) -> Result<Client> {
2495        let client = Client {
2496            inner: ClientInner::new(
2497                self.inner.auth_ctx.clone(),
2498                self.server().cloned(),
2499                self.homeserver(),
2500                self.sliding_sync_version(),
2501                self.inner.http_client.clone(),
2502                self.inner
2503                    .base_client
2504                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2505                    .await?,
2506                self.inner.caches.server_capabilities.read().await.clone(),
2507                self.inner.respect_login_well_known,
2508                self.inner.event_cache.clone(),
2509                self.inner.send_queue_data.clone(),
2510                #[cfg(feature = "e2e-encryption")]
2511                self.inner.e2ee.encryption_settings,
2512                #[cfg(feature = "e2e-encryption")]
2513                self.inner.enable_share_history_on_invite,
2514                cross_process_store_locks_holder_name,
2515            )
2516            .await,
2517        };
2518
2519        Ok(client)
2520    }
2521
2522    /// The [`EventCache`] instance for this [`Client`].
2523    pub fn event_cache(&self) -> &EventCache {
2524        // SAFETY: always initialized in the `Client` ctor.
2525        self.inner.event_cache.get().unwrap()
2526    }
2527
2528    /// Waits until an at least partially synced room is received, and returns
2529    /// it.
2530    ///
2531    /// **Note: this function will loop endlessly until either it finds the room
2532    /// or an externally set timeout happens.**
2533    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2534        loop {
2535            if let Some(room) = self.get_room(room_id) {
2536                if room.is_state_partially_or_fully_synced() {
2537                    debug!("Found just created room!");
2538                    return room;
2539                }
2540                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2541            } else {
2542                debug!("Room wasn't found, waiting for sync beat to try again");
2543            }
2544            self.inner.sync_beat.listen().await;
2545        }
2546    }
2547
2548    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2549    /// join it.
2550    pub async fn knock(
2551        &self,
2552        room_id_or_alias: OwnedRoomOrAliasId,
2553        reason: Option<String>,
2554        server_names: Vec<OwnedServerName>,
2555    ) -> Result<Room> {
2556        let request =
2557            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2558        let response = self.send(request).await?;
2559        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2560        Ok(Room::new(self.clone(), base_room))
2561    }
2562
2563    /// Checks whether the provided `user_id` belongs to an ignored user.
2564    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2565        self.base_client().is_user_ignored(user_id).await
2566    }
2567
2568    /// Gets the `max_upload_size` value from the homeserver, getting either a
2569    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2570    /// missing.
2571    ///
2572    /// Check the spec for more info:
2573    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2574    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2575        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2576        if let Some(data) = max_upload_size_lock.get() {
2577            return Ok(data.to_owned());
2578        }
2579
2580        let response = self
2581            .send(ruma::api::client::authenticated_media::get_media_config::v1::Request::default())
2582            .await?;
2583
2584        match max_upload_size_lock.set(response.upload_size) {
2585            Ok(_) => Ok(response.upload_size),
2586            Err(error) => {
2587                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2588            }
2589        }
2590    }
2591}
2592
2593/// A weak reference to the inner client, useful when trying to get a handle
2594/// on the owning client.
2595#[derive(Clone)]
2596pub(crate) struct WeakClient {
2597    client: Weak<ClientInner>,
2598}
2599
2600impl WeakClient {
2601    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2602    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2603        Self { client: Arc::downgrade(client) }
2604    }
2605
2606    /// Construct a [`WeakClient`] from a [`Client`].
2607    pub fn from_client(client: &Client) -> Self {
2608        Self::from_inner(&client.inner)
2609    }
2610
2611    /// Attempts to get a [`Client`] from this [`WeakClient`].
2612    pub fn get(&self) -> Option<Client> {
2613        self.client.upgrade().map(|inner| Client { inner })
2614    }
2615
2616    /// Gets the number of strong (`Arc`) pointers still pointing to this
2617    /// client.
2618    #[allow(dead_code)]
2619    pub fn strong_count(&self) -> usize {
2620        self.client.strong_count()
2621    }
2622}
2623
2624#[derive(Clone)]
2625struct ClientServerCapabilities {
2626    /// The Matrix versions the server supports (well-known ones only).
2627    server_versions: Option<Box<[MatrixVersion]>>,
2628
2629    /// The unstable features and their on/off state on the server.
2630    unstable_features: Option<BTreeMap<String, bool>>,
2631}
2632
2633// The http mocking library is not supported for wasm32
2634#[cfg(all(test, not(target_family = "wasm")))]
2635pub(crate) mod tests {
2636    use std::{sync::Arc, time::Duration};
2637
2638    use assert_matches::assert_matches;
2639    use assert_matches2::assert_let;
2640    use eyeball::SharedObservable;
2641    use futures_util::{pin_mut, FutureExt};
2642    use js_int::{uint, UInt};
2643    use matrix_sdk_base::{
2644        store::{MemoryStore, StoreConfig},
2645        RoomState,
2646    };
2647    use matrix_sdk_test::{
2648        async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
2649        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
2650    };
2651    #[cfg(target_family = "wasm")]
2652    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2653
2654    use ruma::{
2655        api::{client::room::create_room::v3::Request as CreateRoomRequest, MatrixVersion},
2656        assign,
2657        events::{
2658            ignored_user_list::IgnoredUserListEventContent,
2659            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
2660        },
2661        owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2662    };
2663    use serde_json::json;
2664    use stream_assert::{assert_next_matches, assert_pending};
2665    use tokio::{
2666        spawn,
2667        time::{sleep, timeout},
2668    };
2669    use url::Url;
2670    use wiremock::{
2671        matchers::{body_json, header, method, path, query_param_is_missing},
2672        Mock, MockServer, ResponseTemplate,
2673    };
2674
2675    use super::Client;
2676    use crate::{
2677        client::{futures::SendMediaUploadRequest, WeakClient},
2678        config::{RequestConfig, SyncSettings},
2679        futures::SendRequest,
2680        media::MediaError,
2681        test_utils::{
2682            logged_in_client, mocks::MatrixMockServer, no_retry_test_client, set_client_session,
2683            test_client_builder, test_client_builder_with_server,
2684        },
2685        Error, TransmissionProgress,
2686    };
2687
2688    #[async_test]
2689    async fn test_account_data() {
2690        let server = MockServer::start().await;
2691        let client = logged_in_client(Some(server.uri())).await;
2692
2693        Mock::given(method("GET"))
2694            .and(path("/_matrix/client/r0/sync".to_owned()))
2695            .and(header("authorization", "Bearer 1234"))
2696            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::SYNC))
2697            .mount(&server)
2698            .await;
2699
2700        let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
2701        let _response = client.sync_once(sync_settings).await.unwrap();
2702
2703        let content = client
2704            .account()
2705            .account_data::<IgnoredUserListEventContent>()
2706            .await
2707            .unwrap()
2708            .unwrap()
2709            .deserialize()
2710            .unwrap();
2711
2712        assert_eq!(content.ignored_users.len(), 1);
2713    }
2714
2715    #[async_test]
2716    async fn test_successful_discovery() {
2717        // Imagine this is `matrix.org`.
2718        let server = MockServer::start().await;
2719
2720        // Imagine this is `matrix-client.matrix.org`.
2721        let homeserver = MockServer::start().await;
2722
2723        // Imagine Alice has the user ID `@alice:matrix.org`.
2724        let server_url = server.uri();
2725        let domain = server_url.strip_prefix("http://").unwrap();
2726        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2727
2728        // The `.well-known` is on the server (e.g. `matrix.org`).
2729        Mock::given(method("GET"))
2730            .and(path("/.well-known/matrix/client"))
2731            .respond_with(ResponseTemplate::new(200).set_body_raw(
2732                test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", &homeserver.uri()),
2733                "application/json",
2734            ))
2735            .mount(&server)
2736            .await;
2737
2738        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2739        Mock::given(method("GET"))
2740            .and(path("/_matrix/client/versions"))
2741            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2742            .mount(&homeserver)
2743            .await;
2744
2745        let client = Client::builder()
2746            .insecure_server_name_no_tls(alice.server_name())
2747            .build()
2748            .await
2749            .unwrap();
2750
2751        assert_eq!(client.server().unwrap(), &Url::parse(&server.uri()).unwrap());
2752        assert_eq!(client.homeserver(), Url::parse(&homeserver.uri()).unwrap());
2753    }
2754
2755    #[async_test]
2756    async fn test_discovery_broken_server() {
2757        let server = MockServer::start().await;
2758        let server_url = server.uri();
2759        let domain = server_url.strip_prefix("http://").unwrap();
2760        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2761
2762        Mock::given(method("GET"))
2763            .and(path("/.well-known/matrix/client"))
2764            .respond_with(ResponseTemplate::new(404))
2765            .mount(&server)
2766            .await;
2767
2768        assert!(
2769            Client::builder()
2770                .insecure_server_name_no_tls(alice.server_name())
2771                .build()
2772                .await
2773                .is_err(),
2774            "Creating a client from a user ID should fail when the .well-known request fails."
2775        );
2776    }
2777
2778    #[async_test]
2779    async fn test_room_creation() {
2780        let server = MockServer::start().await;
2781        let client = logged_in_client(Some(server.uri())).await;
2782
2783        let response = SyncResponseBuilder::default()
2784            .add_joined_room(
2785                JoinedRoomBuilder::default()
2786                    .add_state_event(StateTestEvent::Member)
2787                    .add_state_event(StateTestEvent::PowerLevels),
2788            )
2789            .build_sync_response();
2790
2791        client.inner.base_client.receive_sync_response(response).await.unwrap();
2792
2793        assert_eq!(client.homeserver(), Url::parse(&server.uri()).unwrap());
2794
2795        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
2796        assert_eq!(room.state(), RoomState::Joined);
2797    }
2798
2799    #[async_test]
2800    async fn test_retry_limit_http_requests() {
2801        let server = MockServer::start().await;
2802        let client = test_client_builder(Some(server.uri()))
2803            .request_config(RequestConfig::new().retry_limit(3))
2804            .build()
2805            .await
2806            .unwrap();
2807
2808        assert!(client.request_config().retry_limit.unwrap() == 3);
2809
2810        Mock::given(method("POST"))
2811            .and(path("/_matrix/client/r0/login"))
2812            .respond_with(ResponseTemplate::new(501))
2813            .expect(3)
2814            .mount(&server)
2815            .await;
2816
2817        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2818    }
2819
2820    #[async_test]
2821    async fn test_retry_timeout_http_requests() {
2822        // Keep this timeout small so that the test doesn't take long
2823        let retry_timeout = Duration::from_secs(5);
2824        let server = MockServer::start().await;
2825        let client = test_client_builder(Some(server.uri()))
2826            .request_config(RequestConfig::new().max_retry_time(retry_timeout))
2827            .build()
2828            .await
2829            .unwrap();
2830
2831        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
2832
2833        Mock::given(method("POST"))
2834            .and(path("/_matrix/client/r0/login"))
2835            .respond_with(ResponseTemplate::new(501))
2836            .expect(2..)
2837            .mount(&server)
2838            .await;
2839
2840        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2841    }
2842
2843    #[async_test]
2844    async fn test_short_retry_initial_http_requests() {
2845        let server = MockServer::start().await;
2846        let client = test_client_builder(Some(server.uri())).build().await.unwrap();
2847
2848        Mock::given(method("POST"))
2849            .and(path("/_matrix/client/r0/login"))
2850            .respond_with(ResponseTemplate::new(501))
2851            .expect(3..)
2852            .mount(&server)
2853            .await;
2854
2855        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2856    }
2857
2858    #[async_test]
2859    async fn test_no_retry_http_requests() {
2860        let server = MockServer::start().await;
2861        let client = logged_in_client(Some(server.uri())).await;
2862
2863        Mock::given(method("GET"))
2864            .and(path("/_matrix/client/r0/devices"))
2865            .respond_with(ResponseTemplate::new(501))
2866            .expect(1)
2867            .mount(&server)
2868            .await;
2869
2870        client.devices().await.unwrap_err();
2871    }
2872
2873    #[async_test]
2874    async fn test_set_homeserver() {
2875        let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2876        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
2877
2878        let homeserver = Url::parse("http://example.com/").unwrap();
2879        client.set_homeserver(homeserver.clone());
2880        assert_eq!(client.homeserver(), homeserver);
2881    }
2882
2883    #[async_test]
2884    async fn test_search_user_request() {
2885        let server = MockServer::start().await;
2886        let client = logged_in_client(Some(server.uri())).await;
2887
2888        Mock::given(method("POST"))
2889            .and(path("_matrix/client/r0/user_directory/search"))
2890            .and(body_json(&*test_json::search_users::SEARCH_USERS_REQUEST))
2891            .respond_with(
2892                ResponseTemplate::new(200)
2893                    .set_body_json(&*test_json::search_users::SEARCH_USERS_RESPONSE),
2894            )
2895            .mount(&server)
2896            .await;
2897
2898        let response = client.search_users("test", 50).await.unwrap();
2899        assert_eq!(response.results.len(), 1);
2900        let result = &response.results[0];
2901        assert_eq!(result.user_id.to_string(), "@test:example.me");
2902        assert_eq!(result.display_name.clone().unwrap(), "Test");
2903        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
2904        assert!(!response.limited);
2905    }
2906
2907    #[async_test]
2908    async fn test_request_unstable_features() {
2909        let server = MockServer::start().await;
2910        let client = logged_in_client(Some(server.uri())).await;
2911
2912        Mock::given(method("GET"))
2913            .and(path("_matrix/client/versions"))
2914            .respond_with(
2915                ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2916            )
2917            .mount(&server)
2918            .await;
2919
2920        let unstable_features = client.unstable_features().await.unwrap();
2921        assert_eq!(unstable_features.get("org.matrix.e2e_cross_signing"), Some(&true));
2922        assert_eq!(unstable_features.get("you.shall.pass"), None);
2923    }
2924
2925    #[async_test]
2926    async fn test_can_homeserver_push_encrypted_event_to_device() {
2927        let server = MockServer::start().await;
2928        let client = logged_in_client(Some(server.uri())).await;
2929
2930        Mock::given(method("GET"))
2931            .and(path("_matrix/client/versions"))
2932            .respond_with(
2933                ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2934            )
2935            .mount(&server)
2936            .await;
2937
2938        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
2939        assert!(msc4028_enabled);
2940    }
2941
2942    #[async_test]
2943    async fn test_recently_visited_rooms() {
2944        // Tracking recently visited rooms requires authentication
2945        let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2946        assert_matches!(
2947            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
2948            Err(Error::AuthenticationRequired)
2949        );
2950
2951        let client = logged_in_client(None).await;
2952        let account = client.account();
2953
2954        // We should start off with an empty list
2955        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
2956
2957        // Tracking a valid room id should add it to the list
2958        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2959        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2960        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2961
2962        // And the existing list shouldn't be changed
2963        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2964        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2965
2966        // Tracking the same room again shouldn't change the list
2967        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2968        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2969        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2970
2971        // Tracking a second room should add it to the front of the list
2972        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
2973        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2974        assert_eq!(
2975            account.get_recently_visited_rooms().await.unwrap(),
2976            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
2977        );
2978
2979        // Tracking the first room yet again should move it to the front of the list
2980        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2981        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2982        assert_eq!(
2983            account.get_recently_visited_rooms().await.unwrap(),
2984            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
2985        );
2986
2987        // Tracking should be capped at 20
2988        for n in 0..20 {
2989            account
2990                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
2991                .await
2992                .unwrap();
2993        }
2994
2995        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
2996
2997        // And the initial rooms should've been pushed out
2998        let rooms = account.get_recently_visited_rooms().await.unwrap();
2999        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3000        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3001
3002        // And the last tracked room should be the first
3003        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3004    }
3005
3006    #[async_test]
3007    async fn test_client_no_cycle_with_event_cache() {
3008        let client = logged_in_client(None).await;
3009
3010        // Wait for the init tasks to die.
3011        sleep(Duration::from_secs(1)).await;
3012
3013        let weak_client = WeakClient::from_client(&client);
3014        assert_eq!(weak_client.strong_count(), 1);
3015
3016        {
3017            let room_id = room_id!("!room:example.org");
3018
3019            // Have the client know the room.
3020            let response = SyncResponseBuilder::default()
3021                .add_joined_room(JoinedRoomBuilder::new(room_id))
3022                .build_sync_response();
3023            client.inner.base_client.receive_sync_response(response).await.unwrap();
3024
3025            client.event_cache().subscribe().unwrap();
3026
3027            let (_room_event_cache, _drop_handles) =
3028                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3029        }
3030
3031        drop(client);
3032
3033        // Give a bit of time for background tasks to die.
3034        sleep(Duration::from_secs(1)).await;
3035
3036        // The weak client must be the last reference to the client now.
3037        assert_eq!(weak_client.strong_count(), 0);
3038        let client = weak_client.get();
3039        assert!(
3040            client.is_none(),
3041            "too many strong references to the client: {}",
3042            Arc::strong_count(&client.unwrap().inner)
3043        );
3044    }
3045
3046    #[async_test]
3047    async fn test_server_capabilities_caching() {
3048        let server = MockServer::start().await;
3049        let server_url = server.uri();
3050        let domain = server_url.strip_prefix("http://").unwrap();
3051        let server_name = <&ServerName>::try_from(domain).unwrap();
3052
3053        Mock::given(method("GET"))
3054            .and(path("/.well-known/matrix/client"))
3055            .respond_with(ResponseTemplate::new(200).set_body_raw(
3056                test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
3057                "application/json",
3058            ))
3059            .named("well known mock")
3060            .expect(2)
3061            .mount(&server)
3062            .await;
3063
3064        let versions_mock = Mock::given(method("GET"))
3065            .and(path("/_matrix/client/versions"))
3066            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3067            .named("first versions mock")
3068            .expect(1)
3069            .mount_as_scoped(&server)
3070            .await;
3071
3072        let memory_store = Arc::new(MemoryStore::new());
3073        let client = Client::builder()
3074            .insecure_server_name_no_tls(server_name)
3075            .store_config(
3076                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3077                    .state_store(memory_store.clone()),
3078            )
3079            .build()
3080            .await
3081            .unwrap();
3082
3083        assert_eq!(client.server_versions().await.unwrap().len(), 1);
3084
3085        // This second call hits the in-memory cache.
3086        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3087
3088        drop(client);
3089
3090        let client = Client::builder()
3091            .insecure_server_name_no_tls(server_name)
3092            .store_config(
3093                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3094                    .state_store(memory_store.clone()),
3095            )
3096            .build()
3097            .await
3098            .unwrap();
3099
3100        // This third call hits the on-disk cache.
3101        assert_eq!(
3102            client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
3103            Some(&true)
3104        );
3105
3106        drop(versions_mock);
3107        server.verify().await;
3108
3109        // Now, reset the cache, and observe the endpoint being called again once.
3110        client.reset_server_capabilities().await.unwrap();
3111
3112        Mock::given(method("GET"))
3113            .and(path("/_matrix/client/versions"))
3114            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3115            .expect(1)
3116            .named("second versions mock")
3117            .mount(&server)
3118            .await;
3119
3120        // Hits network again.
3121        assert_eq!(client.server_versions().await.unwrap().len(), 1);
3122        // Hits in-memory cache again.
3123        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3124    }
3125
3126    #[async_test]
3127    async fn test_no_network_doesnt_cause_infinite_retries() {
3128        // Note: not `no_retry_test_client` or `logged_in_client` which uses the former,
3129        // since we want infinite retries for transient errors.
3130        let client =
3131            test_client_builder(None).request_config(RequestConfig::new()).build().await.unwrap();
3132        set_client_session(&client).await;
3133
3134        // We don't define a mock server on purpose here, so that the error is really a
3135        // network error.
3136        client.whoami().await.unwrap_err();
3137    }
3138
3139    #[async_test]
3140    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3141        let (client_builder, server) = test_client_builder_with_server().await;
3142        let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3143        set_client_session(&client).await;
3144
3145        let builder = Mock::given(method("GET"))
3146            .and(path("/_matrix/client/r0/sync"))
3147            .and(header("authorization", "Bearer 1234"))
3148            .and(query_param_is_missing("since"));
3149
3150        let room_id = room_id!("!room:example.org");
3151        let joined_room_builder = JoinedRoomBuilder::new(room_id);
3152        let mut sync_response_builder = SyncResponseBuilder::new();
3153        sync_response_builder.add_joined_room(joined_room_builder);
3154        let response_body = sync_response_builder.build_json_sync_response();
3155
3156        builder
3157            .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3158            .mount(&server)
3159            .await;
3160
3161        client.sync_once(SyncSettings::default()).await.unwrap();
3162
3163        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3164        assert_eq!(room.room_id(), room_id);
3165    }
3166
3167    #[async_test]
3168    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3169        let (client_builder, server) = test_client_builder_with_server().await;
3170        let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3171        set_client_session(&client).await;
3172
3173        let builder = Mock::given(method("GET"))
3174            .and(path("/_matrix/client/r0/sync"))
3175            .and(header("authorization", "Bearer 1234"))
3176            .and(query_param_is_missing("since"));
3177
3178        let room_id = room_id!("!room:example.org");
3179        let joined_room_builder = JoinedRoomBuilder::new(room_id);
3180        let mut sync_response_builder = SyncResponseBuilder::new();
3181        sync_response_builder.add_joined_room(joined_room_builder);
3182        let response_body = sync_response_builder.build_json_sync_response();
3183
3184        builder
3185            .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3186            .mount(&server)
3187            .await;
3188
3189        let client = Arc::new(client);
3190
3191        // Perform the /sync request with a delay so it starts after the
3192        // `await_room_remote_echo` call has happened
3193        spawn({
3194            let client = client.clone();
3195            async move {
3196                sleep(Duration::from_millis(100)).await;
3197                client.sync_once(SyncSettings::default()).await.unwrap();
3198            }
3199        });
3200
3201        let room =
3202            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3203        assert_eq!(room.room_id(), room_id);
3204    }
3205
3206    #[async_test]
3207    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3208        let (client_builder, _) = test_client_builder_with_server().await;
3209        let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3210        set_client_session(&client).await;
3211
3212        let room_id = room_id!("!room:example.org");
3213        // Room is not present so the client won't be able to find it. The call will
3214        // timeout.
3215        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3216    }
3217
3218    #[async_test]
3219    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3220        let (client_builder, server) = test_client_builder_with_server().await;
3221        let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3222        set_client_session(&client).await;
3223
3224        Mock::given(method("POST"))
3225            .and(path("_matrix/client/r0/createRoom"))
3226            .and(header("authorization", "Bearer 1234"))
3227            .respond_with(
3228                ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!room:example.org"})),
3229            )
3230            .mount(&server)
3231            .await;
3232
3233        // Create a room in the internal store
3234        let room = client
3235            .create_room(assign!(CreateRoomRequest::new(), {
3236                invite: vec![],
3237                is_direct: false,
3238            }))
3239            .await
3240            .unwrap();
3241
3242        // Room is locally present, but not synced, the call will timeout
3243        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3244            .await
3245            .unwrap_err();
3246    }
3247
3248    #[async_test]
3249    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3250        let server = MatrixMockServer::new().await;
3251        let client = server.client_builder().build().await;
3252
3253        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3254
3255        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3256        assert_matches!(ret, Ok(true));
3257    }
3258
3259    #[async_test]
3260    async fn test_is_room_alias_available_if_alias_is_resolved() {
3261        let server = MatrixMockServer::new().await;
3262        let client = server.client_builder().build().await;
3263
3264        server
3265            .mock_room_directory_resolve_alias()
3266            .ok("!some_room_id:matrix.org", Vec::new())
3267            .expect(1)
3268            .mount()
3269            .await;
3270
3271        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3272        assert_matches!(ret, Ok(false));
3273    }
3274
3275    #[async_test]
3276    async fn test_is_room_alias_available_if_error_found() {
3277        let server = MatrixMockServer::new().await;
3278        let client = server.client_builder().build().await;
3279
3280        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3281
3282        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3283        assert_matches!(ret, Err(_));
3284    }
3285
3286    #[async_test]
3287    async fn test_create_room_alias() {
3288        let server = MatrixMockServer::new().await;
3289        let client = server.client_builder().build().await;
3290
3291        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3292
3293        let ret = client
3294            .create_room_alias(
3295                room_alias_id!("#some_alias:matrix.org"),
3296                room_id!("!some_room:matrix.org"),
3297            )
3298            .await;
3299        assert_matches!(ret, Ok(()));
3300    }
3301
3302    #[async_test]
3303    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3304        let server = MatrixMockServer::new().await;
3305        let client = server.client_builder().build().await;
3306
3307        let room_id = room_id!("!a-room:matrix.org");
3308
3309        // Make sure the summary endpoint is called once
3310        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3311
3312        // We create a locally cached invited room
3313        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3314
3315        // And we get a preview, the server endpoint was reached
3316        let preview = client
3317            .get_room_preview(room_id.into(), Vec::new())
3318            .await
3319            .expect("Room preview should be retrieved");
3320
3321        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3322    }
3323
3324    #[async_test]
3325    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3326        let server = MatrixMockServer::new().await;
3327        let client = server.client_builder().build().await;
3328
3329        let room_id = room_id!("!a-room:matrix.org");
3330
3331        // Make sure the summary endpoint is called once
3332        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3333
3334        // We create a locally cached left room
3335        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3336
3337        // And we get a preview, the server endpoint was reached
3338        let preview = client
3339            .get_room_preview(room_id.into(), Vec::new())
3340            .await
3341            .expect("Room preview should be retrieved");
3342
3343        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3344    }
3345
3346    #[async_test]
3347    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3348        let server = MatrixMockServer::new().await;
3349        let client = server.client_builder().build().await;
3350
3351        let room_id = room_id!("!a-room:matrix.org");
3352
3353        // Make sure the summary endpoint is called once
3354        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3355
3356        // We create a locally cached knocked room
3357        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3358
3359        // And we get a preview, the server endpoint was reached
3360        let preview = client
3361            .get_room_preview(room_id.into(), Vec::new())
3362            .await
3363            .expect("Room preview should be retrieved");
3364
3365        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3366    }
3367
3368    #[async_test]
3369    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3370        let server = MatrixMockServer::new().await;
3371        let client = server.client_builder().build().await;
3372
3373        let room_id = room_id!("!a-room:matrix.org");
3374
3375        // Make sure the summary endpoint is not called
3376        server.mock_room_summary().ok(room_id).never().mount().await;
3377
3378        // We create a locally cached joined room
3379        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3380
3381        // And we get a preview, no server endpoint was reached
3382        let preview = client
3383            .get_room_preview(room_id.into(), Vec::new())
3384            .await
3385            .expect("Room preview should be retrieved");
3386
3387        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3388    }
3389
3390    #[async_test]
3391    async fn test_media_preview_config() {
3392        let server = MatrixMockServer::new().await;
3393        let client = server.client_builder().build().await;
3394
3395        server
3396            .mock_sync()
3397            .ok_and_run(&client, |builder| {
3398                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3399                "content": {
3400                    "media_previews": "private",
3401                    "invite_avatars": "off"
3402                },
3403                "type": "m.media_preview_config"
3404                  })));
3405            })
3406            .await;
3407
3408        let (initial_value, stream) =
3409            client.account().observe_media_preview_config().await.unwrap();
3410
3411        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3412        assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3413        assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3414        pin_mut!(stream);
3415        assert_pending!(stream);
3416
3417        server
3418            .mock_sync()
3419            .ok_and_run(&client, |builder| {
3420                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3421                "content": {
3422                    "media_previews": "off",
3423                    "invite_avatars": "on"
3424                },
3425                "type": "m.media_preview_config"
3426                  })));
3427            })
3428            .await;
3429
3430        assert_next_matches!(
3431            stream,
3432            MediaPreviewConfigEventContent {
3433                media_previews: MediaPreviews::Off,
3434                invite_avatars: InviteAvatars::On,
3435                ..
3436            }
3437        );
3438        assert_pending!(stream);
3439    }
3440
3441    #[async_test]
3442    async fn test_unstable_media_preview_config() {
3443        let server = MatrixMockServer::new().await;
3444        let client = server.client_builder().build().await;
3445
3446        server
3447            .mock_sync()
3448            .ok_and_run(&client, |builder| {
3449                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3450                "content": {
3451                    "media_previews": "private",
3452                    "invite_avatars": "off"
3453                },
3454                "type": "io.element.msc4278.media_preview_config"
3455                  })));
3456            })
3457            .await;
3458
3459        let (initial_value, stream) =
3460            client.account().observe_media_preview_config().await.unwrap();
3461
3462        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3463        assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3464        assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3465        pin_mut!(stream);
3466        assert_pending!(stream);
3467
3468        server
3469            .mock_sync()
3470            .ok_and_run(&client, |builder| {
3471                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3472                "content": {
3473                    "media_previews": "off",
3474                    "invite_avatars": "on"
3475                },
3476                "type": "io.element.msc4278.media_preview_config"
3477                  })));
3478            })
3479            .await;
3480
3481        assert_next_matches!(
3482            stream,
3483            MediaPreviewConfigEventContent {
3484                media_previews: MediaPreviews::Off,
3485                invite_avatars: InviteAvatars::On,
3486                ..
3487            }
3488        );
3489        assert_pending!(stream);
3490    }
3491
3492    #[async_test]
3493    async fn test_media_preview_config_not_found() {
3494        let server = MatrixMockServer::new().await;
3495        let client = server.client_builder().build().await;
3496
3497        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3498
3499        assert!(initial_value.is_none());
3500    }
3501
3502    #[async_test]
3503    async fn test_load_or_fetch_max_upload_size() {
3504        let server = MatrixMockServer::new().await;
3505        let client = server.client_builder().build().await;
3506
3507        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3508
3509        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3510        client.load_or_fetch_max_upload_size().await.unwrap();
3511
3512        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3513    }
3514
3515    #[async_test]
3516    async fn test_uploading_a_too_large_media_file() {
3517        let server = MatrixMockServer::new().await;
3518        let client = server.client_builder().build().await;
3519
3520        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3521        client.load_or_fetch_max_upload_size().await.unwrap();
3522        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3523
3524        let data = vec![1, 2];
3525        let upload_request =
3526            ruma::api::client::media::create_content::v3::Request::new(data.clone());
3527        let request = SendRequest {
3528            client: client.clone(),
3529            request: upload_request,
3530            config: None,
3531            send_progress: SharedObservable::new(TransmissionProgress::default()),
3532        };
3533        let media_request = SendMediaUploadRequest::new(request);
3534
3535        let error = media_request.await.err();
3536        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
3537        assert_eq!(max, uint!(1));
3538        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
3539    }
3540}