matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{btree_map, BTreeMap},
19 fmt::{self, Debug},
20 future::{ready, Future},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use caches::ClientCaches;
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::store::LockableCryptoStore;
32use matrix_sdk_base::{
33 event_cache::store::EventCacheStoreLock,
34 store::{DynStateStore, RoomLoadSettings, ServerCapabilities},
35 sync::{Notification, RoomUpdates},
36 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
37 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
38};
39use matrix_sdk_common::ttl_cache::TtlCache;
40#[cfg(feature = "e2e-encryption")]
41use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
42use ruma::{
43 api::{
44 client::{
45 account::whoami,
46 alias::{create_alias, delete_alias, get_alias},
47 device::{delete_devices, get_devices, update_device},
48 directory::{get_public_rooms, get_public_rooms_filtered},
49 discovery::{
50 get_capabilities::{self, Capabilities},
51 get_supported_versions,
52 },
53 error::ErrorKind,
54 filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
55 knock::knock_room,
56 membership::{join_room_by_id, join_room_by_id_or_alias},
57 room::create_room,
58 session::login::v3::DiscoveryInfo,
59 sync::sync_events,
60 uiaa,
61 user_directory::search_users,
62 },
63 error::FromHttpResponseError,
64 MatrixVersion, OutgoingRequest,
65 },
66 assign,
67 push::Ruleset,
68 time::Instant,
69 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
70 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
71};
72use serde::de::DeserializeOwned;
73use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
74use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
75use url::Url;
76
77use self::futures::SendRequest;
78use crate::{
79 authentication::{
80 matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
81 SaveSessionCallback,
82 },
83 config::RequestConfig,
84 deduplicating_handler::DeduplicatingHandler,
85 error::HttpResult,
86 event_cache::EventCache,
87 event_handler::{
88 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
89 EventHandlerStore, ObservableEventHandler, SyncEvent,
90 },
91 http_client::HttpClient,
92 media::MediaError,
93 notification_settings::NotificationSettings,
94 room_preview::RoomPreview,
95 send_queue::SendQueueData,
96 sliding_sync::Version as SlidingSyncVersion,
97 sync::{RoomUpdate, SyncResponse},
98 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
99 Room, SessionTokens, TransmissionProgress,
100};
101#[cfg(feature = "e2e-encryption")]
102use crate::{
103 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
104 store_locks::CrossProcessStoreLock,
105};
106
107mod builder;
108pub(crate) mod caches;
109pub(crate) mod futures;
110
111pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
112
113#[cfg(not(target_family = "wasm"))]
114type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
115#[cfg(target_family = "wasm")]
116type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
117
118#[cfg(not(target_family = "wasm"))]
119type NotificationHandlerFn =
120 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
121#[cfg(target_family = "wasm")]
122type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
123
124/// Enum controlling if a loop running callbacks should continue or abort.
125///
126/// This is mainly used in the [`sync_with_callback`] method, the return value
127/// of the provided callback controls if the sync loop should be exited.
128///
129/// [`sync_with_callback`]: #method.sync_with_callback
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum LoopCtrl {
132 /// Continue running the loop.
133 Continue,
134 /// Break out of the loop.
135 Break,
136}
137
138/// Represents changes that can occur to a `Client`s `Session`.
139#[derive(Debug, Clone, PartialEq)]
140pub enum SessionChange {
141 /// The session's token is no longer valid.
142 UnknownToken {
143 /// Whether or not the session was soft logged out
144 soft_logout: bool,
145 },
146 /// The session's tokens have been refreshed.
147 TokensRefreshed,
148}
149
150/// An async/await enabled Matrix client.
151///
152/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
153#[derive(Clone)]
154pub struct Client {
155 pub(crate) inner: Arc<ClientInner>,
156}
157
158#[derive(Default)]
159pub(crate) struct ClientLocks {
160 /// Lock ensuring that only a single room may be marked as a DM at once.
161 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
162 /// explanation.
163 pub(crate) mark_as_dm_lock: Mutex<()>,
164
165 /// Lock ensuring that only a single secret store is getting opened at the
166 /// same time.
167 ///
168 /// This is important so we don't accidentally create multiple different new
169 /// default secret storage keys.
170 #[cfg(feature = "e2e-encryption")]
171 pub(crate) open_secret_store_lock: Mutex<()>,
172
173 /// Lock ensuring that we're only storing a single secret at a time.
174 ///
175 /// Take a look at the [`SecretStore::put_secret`] method for a more
176 /// detailed explanation.
177 ///
178 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
179 #[cfg(feature = "e2e-encryption")]
180 pub(crate) store_secret_lock: Mutex<()>,
181
182 /// Lock ensuring that only one method at a time might modify our backup.
183 #[cfg(feature = "e2e-encryption")]
184 pub(crate) backup_modify_lock: Mutex<()>,
185
186 /// Lock ensuring that we're going to attempt to upload backups for a single
187 /// requester.
188 #[cfg(feature = "e2e-encryption")]
189 pub(crate) backup_upload_lock: Mutex<()>,
190
191 /// Handler making sure we only have one group session sharing request in
192 /// flight per room.
193 #[cfg(feature = "e2e-encryption")]
194 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
195
196 /// Lock making sure we're only doing one key claim request at a time.
197 #[cfg(feature = "e2e-encryption")]
198 pub(crate) key_claim_lock: Mutex<()>,
199
200 /// Handler to ensure that only one members request is running at a time,
201 /// given a room.
202 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
203
204 /// Handler to ensure that only one encryption state request is running at a
205 /// time, given a room.
206 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
207
208 /// Deduplicating handler for sending read receipts. The string is an
209 /// internal implementation detail, see [`Self::send_single_receipt`].
210 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
211
212 #[cfg(feature = "e2e-encryption")]
213 pub(crate) cross_process_crypto_store_lock:
214 OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
215
216 /// Latest "generation" of data known by the crypto store.
217 ///
218 /// This is a counter that only increments, set in the database (and can
219 /// wrap). It's incremented whenever some process acquires a lock for the
220 /// first time. *This assumes the crypto store lock is being held, to
221 /// avoid data races on writing to this value in the store*.
222 ///
223 /// The current process will maintain this value in local memory and in the
224 /// DB over time. Observing a different value than the one read in
225 /// memory, when reading from the store indicates that somebody else has
226 /// written into the database under our feet.
227 ///
228 /// TODO: this should live in the `OlmMachine`, since it's information
229 /// related to the lock. As of today (2023-07-28), we blow up the entire
230 /// olm machine when there's a generation mismatch. So storing the
231 /// generation in the olm machine would make the client think there's
232 /// *always* a mismatch, and that's why we need to store the generation
233 /// outside the `OlmMachine`.
234 #[cfg(feature = "e2e-encryption")]
235 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
236}
237
238pub(crate) struct ClientInner {
239 /// All the data related to authentication and authorization.
240 pub(crate) auth_ctx: Arc<AuthCtx>,
241
242 /// The URL of the server.
243 ///
244 /// Not to be confused with the `Self::homeserver`. `server` is usually
245 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
246 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
247 /// homeserver (at the time of writing — 2024-08-28).
248 ///
249 /// This value is optional depending on how the `Client` has been built.
250 /// If it's been built from a homeserver URL directly, we don't know the
251 /// server. However, if the `Client` has been built from a server URL or
252 /// name, then the homeserver has been discovered, and we know both.
253 server: Option<Url>,
254
255 /// The URL of the homeserver to connect to.
256 ///
257 /// This is the URL for the client-server Matrix API.
258 homeserver: StdRwLock<Url>,
259
260 /// The sliding sync version.
261 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
262
263 /// The underlying HTTP client.
264 pub(crate) http_client: HttpClient,
265
266 /// User session data.
267 pub(super) base_client: BaseClient,
268
269 /// Collection of in-memory caches for the [`Client`].
270 pub(crate) caches: ClientCaches,
271
272 /// Collection of locks individual client methods might want to use, either
273 /// to ensure that only a single call to a method happens at once or to
274 /// deduplicate multiple calls to a method.
275 pub(crate) locks: ClientLocks,
276
277 /// The cross-process store locks holder name.
278 ///
279 /// The SDK provides cross-process store locks (see
280 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
281 /// `holder_name` is the value used for all cross-process store locks
282 /// used by this `Client`.
283 ///
284 /// If multiple `Client`s are running in different processes, this
285 /// value MUST be different for each `Client`.
286 cross_process_store_locks_holder_name: String,
287
288 /// A mapping of the times at which the current user sent typing notices,
289 /// keyed by room.
290 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
291
292 /// Event handlers. See `add_event_handler`.
293 pub(crate) event_handlers: EventHandlerStore,
294
295 /// Notification handlers. See `register_notification_handler`.
296 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
297
298 /// The sender-side of channels used to receive room updates.
299 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
300
301 /// The sender-side of a channel used to observe all the room updates of a
302 /// sync response.
303 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
304
305 /// Whether the client should update its homeserver URL with the discovery
306 /// information present in the login response.
307 respect_login_well_known: bool,
308
309 /// An event that can be listened on to wait for a successful sync. The
310 /// event will only be fired if a sync loop is running. Can be used for
311 /// synchronization, e.g. if we send out a request to create a room, we can
312 /// wait for the sync to get the data to fetch a room object from the state
313 /// store.
314 pub(crate) sync_beat: event_listener::Event,
315
316 /// A central cache for events, inactive first.
317 ///
318 /// It becomes active when [`EventCache::subscribe`] is called.
319 pub(crate) event_cache: OnceCell<EventCache>,
320
321 /// End-to-end encryption related state.
322 #[cfg(feature = "e2e-encryption")]
323 pub(crate) e2ee: EncryptionData,
324
325 /// The verification state of our own device.
326 #[cfg(feature = "e2e-encryption")]
327 pub(crate) verification_state: SharedObservable<VerificationState>,
328
329 /// Whether to enable the experimental support for sending and receiving
330 /// encrypted room history on invite, per [MSC4268].
331 ///
332 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
333 #[cfg(feature = "e2e-encryption")]
334 pub(crate) enable_share_history_on_invite: bool,
335
336 /// Data related to the [`SendQueue`].
337 ///
338 /// [`SendQueue`]: crate::send_queue::SendQueue
339 pub(crate) send_queue_data: Arc<SendQueueData>,
340
341 /// The `max_upload_size` value of the homeserver, it contains the max
342 /// request size you can send.
343 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
344}
345
346impl ClientInner {
347 /// Create a new `ClientInner`.
348 ///
349 /// All the fields passed as parameters here are those that must be cloned
350 /// upon instantiation of a sub-client, e.g. a client specialized for
351 /// notifications.
352 #[allow(clippy::too_many_arguments)]
353 async fn new(
354 auth_ctx: Arc<AuthCtx>,
355 server: Option<Url>,
356 homeserver: Url,
357 sliding_sync_version: SlidingSyncVersion,
358 http_client: HttpClient,
359 base_client: BaseClient,
360 server_capabilities: ClientServerCapabilities,
361 respect_login_well_known: bool,
362 event_cache: OnceCell<EventCache>,
363 send_queue: Arc<SendQueueData>,
364 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
365 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
366 cross_process_store_locks_holder_name: String,
367 ) -> Arc<Self> {
368 let caches = ClientCaches {
369 server_capabilities: server_capabilities.into(),
370 server_metadata: Mutex::new(TtlCache::new()),
371 };
372
373 let client = Self {
374 server,
375 homeserver: StdRwLock::new(homeserver),
376 auth_ctx,
377 sliding_sync_version: StdRwLock::new(sliding_sync_version),
378 http_client,
379 base_client,
380 caches,
381 locks: Default::default(),
382 cross_process_store_locks_holder_name,
383 typing_notice_times: Default::default(),
384 event_handlers: Default::default(),
385 notification_handlers: Default::default(),
386 room_update_channels: Default::default(),
387 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
388 // ballast for all observers to catch up.
389 room_updates_sender: broadcast::Sender::new(32),
390 respect_login_well_known,
391 sync_beat: event_listener::Event::new(),
392 event_cache,
393 send_queue_data: send_queue,
394 #[cfg(feature = "e2e-encryption")]
395 e2ee: EncryptionData::new(encryption_settings),
396 #[cfg(feature = "e2e-encryption")]
397 verification_state: SharedObservable::new(VerificationState::Unknown),
398 #[cfg(feature = "e2e-encryption")]
399 enable_share_history_on_invite,
400 server_max_upload_size: Mutex::new(OnceCell::new()),
401 };
402
403 #[allow(clippy::let_and_return)]
404 let client = Arc::new(client);
405
406 #[cfg(feature = "e2e-encryption")]
407 client.e2ee.initialize_room_key_tasks(&client);
408
409 let _ = client
410 .event_cache
411 .get_or_init(|| async {
412 EventCache::new(
413 WeakClient::from_inner(&client),
414 client.base_client.event_cache_store().clone(),
415 )
416 })
417 .await;
418
419 client
420 }
421}
422
423#[cfg(not(tarpaulin_include))]
424impl Debug for Client {
425 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
426 write!(fmt, "Client")
427 }
428}
429
430impl Client {
431 /// Create a new [`Client`] that will use the given homeserver.
432 ///
433 /// # Arguments
434 ///
435 /// * `homeserver_url` - The homeserver that the client should connect to.
436 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
437 Self::builder().homeserver_url(homeserver_url).build().await
438 }
439
440 /// Returns a subscriber that publishes an event every time the ignore user
441 /// list changes.
442 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
443 self.inner.base_client.subscribe_to_ignore_user_list_changes()
444 }
445
446 /// Create a new [`ClientBuilder`].
447 pub fn builder() -> ClientBuilder {
448 ClientBuilder::new()
449 }
450
451 pub(crate) fn base_client(&self) -> &BaseClient {
452 &self.inner.base_client
453 }
454
455 /// The underlying HTTP client.
456 pub fn http_client(&self) -> &reqwest::Client {
457 &self.inner.http_client.inner
458 }
459
460 pub(crate) fn locks(&self) -> &ClientLocks {
461 &self.inner.locks
462 }
463
464 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
465 &self.inner.auth_ctx
466 }
467
468 /// The cross-process store locks holder name.
469 ///
470 /// The SDK provides cross-process store locks (see
471 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
472 /// `holder_name` is the value used for all cross-process store locks
473 /// used by this `Client`.
474 pub fn cross_process_store_locks_holder_name(&self) -> &str {
475 &self.inner.cross_process_store_locks_holder_name
476 }
477
478 /// Change the homeserver URL used by this client.
479 ///
480 /// # Arguments
481 ///
482 /// * `homeserver_url` - The new URL to use.
483 fn set_homeserver(&self, homeserver_url: Url) {
484 *self.inner.homeserver.write().unwrap() = homeserver_url;
485 }
486
487 /// Get the capabilities of the homeserver.
488 ///
489 /// This method should be used to check what features are supported by the
490 /// homeserver.
491 ///
492 /// # Examples
493 ///
494 /// ```no_run
495 /// # use matrix_sdk::Client;
496 /// # use url::Url;
497 /// # async {
498 /// # let homeserver = Url::parse("http://example.com")?;
499 /// let client = Client::new(homeserver).await?;
500 ///
501 /// let capabilities = client.get_capabilities().await?;
502 ///
503 /// if capabilities.change_password.enabled {
504 /// // Change password
505 /// }
506 /// # anyhow::Ok(()) };
507 /// ```
508 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
509 let res = self.send(get_capabilities::v3::Request::new()).await?;
510 Ok(res.capabilities)
511 }
512
513 /// Get a copy of the default request config.
514 ///
515 /// The default request config is what's used when sending requests if no
516 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
517 /// function with such a parameter.
518 ///
519 /// If the default request config was not customized through
520 /// [`ClientBuilder`] when creating this `Client`, the returned value will
521 /// be equivalent to [`RequestConfig::default()`].
522 pub fn request_config(&self) -> RequestConfig {
523 self.inner.http_client.request_config
524 }
525
526 /// Check whether the client has been activated.
527 ///
528 /// A client is considered active when:
529 ///
530 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
531 /// is logged in,
532 /// 2. Has loaded cached data from storage,
533 /// 3. If encryption is enabled, it also initialized or restored its
534 /// `OlmMachine`.
535 pub fn is_active(&self) -> bool {
536 self.inner.base_client.is_active()
537 }
538
539 /// The server used by the client.
540 ///
541 /// See `Self::server` to learn more.
542 pub fn server(&self) -> Option<&Url> {
543 self.inner.server.as_ref()
544 }
545
546 /// The homeserver of the client.
547 pub fn homeserver(&self) -> Url {
548 self.inner.homeserver.read().unwrap().clone()
549 }
550
551 /// Get the sliding sync version.
552 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
553 self.inner.sliding_sync_version.read().unwrap().clone()
554 }
555
556 /// Override the sliding sync version.
557 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
558 let mut lock = self.inner.sliding_sync_version.write().unwrap();
559 *lock = version;
560 }
561
562 /// Get the Matrix user session meta information.
563 ///
564 /// If the client is currently logged in, this will return a
565 /// [`SessionMeta`] object which contains the user ID and device ID.
566 /// Otherwise it returns `None`.
567 pub fn session_meta(&self) -> Option<&SessionMeta> {
568 self.base_client().session_meta()
569 }
570
571 /// Returns a receiver that gets events for each room info update. To watch
572 /// for new events, use `receiver.resubscribe()`. Each event contains the
573 /// room and a boolean whether this event should trigger a room list update.
574 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
575 self.base_client().room_info_notable_update_receiver()
576 }
577
578 /// Performs a search for users.
579 /// The search is performed case-insensitively on user IDs and display names
580 ///
581 /// # Arguments
582 ///
583 /// * `search_term` - The search term for the search
584 /// * `limit` - The maximum number of results to return. Defaults to 10.
585 ///
586 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
587 pub async fn search_users(
588 &self,
589 search_term: &str,
590 limit: u64,
591 ) -> HttpResult<search_users::v3::Response> {
592 let mut request = search_users::v3::Request::new(search_term.to_owned());
593
594 if let Some(limit) = UInt::new(limit) {
595 request.limit = limit;
596 }
597
598 self.send(request).await
599 }
600
601 /// Get the user id of the current owner of the client.
602 pub fn user_id(&self) -> Option<&UserId> {
603 self.session_meta().map(|s| s.user_id.as_ref())
604 }
605
606 /// Get the device ID that identifies the current session.
607 pub fn device_id(&self) -> Option<&DeviceId> {
608 self.session_meta().map(|s| s.device_id.as_ref())
609 }
610
611 /// Get the current access token for this session.
612 ///
613 /// Will be `None` if the client has not been logged in.
614 pub fn access_token(&self) -> Option<String> {
615 self.auth_ctx().access_token()
616 }
617
618 /// Get the current tokens for this session.
619 ///
620 /// To be notified of changes in the session tokens, use
621 /// [`Client::subscribe_to_session_changes()`] or
622 /// [`Client::set_session_callbacks()`].
623 ///
624 /// Returns `None` if the client has not been logged in.
625 pub fn session_tokens(&self) -> Option<SessionTokens> {
626 self.auth_ctx().session_tokens()
627 }
628
629 /// Access the authentication API used to log in this client.
630 ///
631 /// Will be `None` if the client has not been logged in.
632 pub fn auth_api(&self) -> Option<AuthApi> {
633 match self.auth_ctx().auth_data.get()? {
634 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
635 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
636 }
637 }
638
639 /// Get the whole session info of this client.
640 ///
641 /// Will be `None` if the client has not been logged in.
642 ///
643 /// Can be used with [`Client::restore_session`] to restore a previously
644 /// logged-in session.
645 pub fn session(&self) -> Option<AuthSession> {
646 match self.auth_api()? {
647 AuthApi::Matrix(api) => api.session().map(Into::into),
648 AuthApi::OAuth(api) => api.full_session().map(Into::into),
649 }
650 }
651
652 /// Get a reference to the state store.
653 pub fn state_store(&self) -> &DynStateStore {
654 self.base_client().state_store()
655 }
656
657 /// Get a reference to the event cache store.
658 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
659 self.base_client().event_cache_store()
660 }
661
662 /// Access the native Matrix authentication API with this client.
663 pub fn matrix_auth(&self) -> MatrixAuth {
664 MatrixAuth::new(self.clone())
665 }
666
667 /// Get the account of the current owner of the client.
668 pub fn account(&self) -> Account {
669 Account::new(self.clone())
670 }
671
672 /// Get the encryption manager of the client.
673 #[cfg(feature = "e2e-encryption")]
674 pub fn encryption(&self) -> Encryption {
675 Encryption::new(self.clone())
676 }
677
678 /// Get the media manager of the client.
679 pub fn media(&self) -> Media {
680 Media::new(self.clone())
681 }
682
683 /// Get the pusher manager of the client.
684 pub fn pusher(&self) -> Pusher {
685 Pusher::new(self.clone())
686 }
687
688 /// Access the OAuth 2.0 API of the client.
689 pub fn oauth(&self) -> OAuth {
690 OAuth::new(self.clone())
691 }
692
693 /// Register a handler for a specific event type.
694 ///
695 /// The handler is a function or closure with one or more arguments. The
696 /// first argument is the event itself. All additional arguments are
697 /// "context" arguments: They have to implement [`EventHandlerContext`].
698 /// This trait is named that way because most of the types implementing it
699 /// give additional context about an event: The room it was in, its raw form
700 /// and other similar things. As two exceptions to this,
701 /// [`Client`] and [`EventHandlerHandle`] also implement the
702 /// `EventHandlerContext` trait so you don't have to clone your client
703 /// into the event handler manually and a handler can decide to remove
704 /// itself.
705 ///
706 /// Some context arguments are not universally applicable. A context
707 /// argument that isn't available for the given event type will result in
708 /// the event handler being skipped and an error being logged. The following
709 /// context argument types are only available for a subset of event types:
710 ///
711 /// * [`Room`] is only available for room-specific events, i.e. not for
712 /// events like global account data events or presence events.
713 ///
714 /// You can provide custom context via
715 /// [`add_event_handler_context`](Client::add_event_handler_context) and
716 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
717 /// into the event handler.
718 ///
719 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
720 ///
721 /// # Examples
722 ///
723 /// ```no_run
724 /// use matrix_sdk::{
725 /// deserialized_responses::EncryptionInfo,
726 /// event_handler::Ctx,
727 /// ruma::{
728 /// events::{
729 /// macros::EventContent,
730 /// push_rules::PushRulesEvent,
731 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
732 /// },
733 /// push::Action,
734 /// Int, MilliSecondsSinceUnixEpoch,
735 /// },
736 /// Client, Room,
737 /// };
738 /// use serde::{Deserialize, Serialize};
739 ///
740 /// # async fn example(client: Client) {
741 /// client.add_event_handler(
742 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
743 /// // Common usage: Room event plus room and client.
744 /// },
745 /// );
746 /// client.add_event_handler(
747 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
748 /// async move {
749 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
750 /// // unencrypted events and events that were decrypted by the SDK.
751 /// }
752 /// },
753 /// );
754 /// client.add_event_handler(
755 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
756 /// async move {
757 /// // A `Vec<Action>` parameter allows you to know which push actions
758 /// // are applicable for an event. For example, an event with
759 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
760 /// // in the timeline.
761 /// }
762 /// },
763 /// );
764 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
765 /// // You can omit any or all arguments after the first.
766 /// });
767 ///
768 /// // Registering a temporary event handler:
769 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
770 /// /* Event handler */
771 /// });
772 /// client.remove_event_handler(handle);
773 ///
774 /// // Registering custom event handler context:
775 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
776 /// struct MyContext {
777 /// number: usize,
778 /// }
779 /// client.add_event_handler_context(MyContext { number: 5 });
780 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
781 /// // Use the context
782 /// });
783 ///
784 /// // Custom events work exactly the same way, you just need to declare
785 /// // the content struct and use the EventContent derive macro on it.
786 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
787 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
788 /// struct TokenEventContent {
789 /// token: String,
790 /// #[serde(rename = "exp")]
791 /// expires_at: MilliSecondsSinceUnixEpoch,
792 /// }
793 ///
794 /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
795 /// todo!("Display the token");
796 /// });
797 ///
798 /// // Event handler closures can also capture local variables.
799 /// // Make sure they are cheap to clone though, because they will be cloned
800 /// // every time the closure is called.
801 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
802 ///
803 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
804 /// println!("Calling the handler with identifier {data}");
805 /// });
806 /// # }
807 /// ```
808 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
809 where
810 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
811 H: EventHandler<Ev, Ctx>,
812 {
813 self.add_event_handler_impl(handler, None)
814 }
815
816 /// Register a handler for a specific room, and event type.
817 ///
818 /// This method works the same way as
819 /// [`add_event_handler`][Self::add_event_handler], except that the handler
820 /// will only be called for events in the room with the specified ID. See
821 /// that method for more details on event handler functions.
822 ///
823 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
824 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
825 /// your use case.
826 pub fn add_room_event_handler<Ev, Ctx, H>(
827 &self,
828 room_id: &RoomId,
829 handler: H,
830 ) -> EventHandlerHandle
831 where
832 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
833 H: EventHandler<Ev, Ctx>,
834 {
835 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
836 }
837
838 /// Observe a specific event type.
839 ///
840 /// `Ev` represents the kind of event that will be observed. `Ctx`
841 /// represents the context that will come with the event. It relies on the
842 /// same mechanism as [`Client::add_event_handler`]. The main difference is
843 /// that it returns an [`ObservableEventHandler`] and doesn't require a
844 /// user-defined closure. It is possible to subscribe to the
845 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
846 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
847 /// Ctx)`.
848 ///
849 /// Be careful that only the most recent value can be observed. Subscribers
850 /// are notified when a new value is sent, but there is no guarantee
851 /// that they will see all values.
852 ///
853 /// # Example
854 ///
855 /// Let's see a classical usage:
856 ///
857 /// ```
858 /// use futures_util::StreamExt as _;
859 /// use matrix_sdk::{
860 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
861 /// Client, Room,
862 /// };
863 ///
864 /// # async fn example(client: Client) -> Option<()> {
865 /// let observer =
866 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
867 ///
868 /// let mut subscriber = observer.subscribe();
869 ///
870 /// let (event, (room, push_actions)) = subscriber.next().await?;
871 /// # Some(())
872 /// # }
873 /// ```
874 ///
875 /// Now let's see how to get several contexts that can be useful for you:
876 ///
877 /// ```
878 /// use matrix_sdk::{
879 /// deserialized_responses::EncryptionInfo,
880 /// ruma::{
881 /// events::room::{
882 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
883 /// },
884 /// push::Action,
885 /// },
886 /// Client, Room,
887 /// };
888 ///
889 /// # async fn example(client: Client) {
890 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
891 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
892 ///
893 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
894 /// // to distinguish between unencrypted events and events that were decrypted
895 /// // by the SDK.
896 /// let _ = client
897 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
898 /// );
899 ///
900 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
901 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
902 /// // should be highlighted in the timeline.
903 /// let _ =
904 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
905 ///
906 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
907 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
908 /// # }
909 /// ```
910 ///
911 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
912 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
913 where
914 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
915 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
916 {
917 self.observe_room_events_impl(None)
918 }
919
920 /// Observe a specific room, and event type.
921 ///
922 /// This method works the same way as [`Client::observe_events`], except
923 /// that the observability will only be applied for events in the room with
924 /// the specified ID. See that method for more details.
925 ///
926 /// Be careful that only the most recent value can be observed. Subscribers
927 /// are notified when a new value is sent, but there is no guarantee
928 /// that they will see all values.
929 pub fn observe_room_events<Ev, Ctx>(
930 &self,
931 room_id: &RoomId,
932 ) -> ObservableEventHandler<(Ev, Ctx)>
933 where
934 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
935 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
936 {
937 self.observe_room_events_impl(Some(room_id.to_owned()))
938 }
939
940 /// Shared implementation for `Client::observe_events` and
941 /// `Client::observe_room_events`.
942 fn observe_room_events_impl<Ev, Ctx>(
943 &self,
944 room_id: Option<OwnedRoomId>,
945 ) -> ObservableEventHandler<(Ev, Ctx)>
946 where
947 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
948 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
949 {
950 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
951 // new value.
952 let shared_observable = SharedObservable::new(None);
953
954 ObservableEventHandler::new(
955 shared_observable.clone(),
956 self.event_handler_drop_guard(self.add_event_handler_impl(
957 move |event: Ev, context: Ctx| {
958 shared_observable.set(Some((event, context)));
959
960 ready(())
961 },
962 room_id,
963 )),
964 )
965 }
966
967 /// Remove the event handler associated with the handle.
968 ///
969 /// Note that you **must not** call `remove_event_handler` from the
970 /// non-async part of an event handler, that is:
971 ///
972 /// ```ignore
973 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
974 /// // ⚠ this will cause a deadlock ⚠
975 /// client.remove_event_handler(handle);
976 ///
977 /// async move {
978 /// // removing the event handler here is fine
979 /// client.remove_event_handler(handle);
980 /// }
981 /// })
982 /// ```
983 ///
984 /// Note also that handlers that remove themselves will still execute with
985 /// events received in the same sync cycle.
986 ///
987 /// # Arguments
988 ///
989 /// `handle` - The [`EventHandlerHandle`] that is returned when
990 /// registering the event handler with [`Client::add_event_handler`].
991 ///
992 /// # Examples
993 ///
994 /// ```no_run
995 /// # use url::Url;
996 /// # use tokio::sync::mpsc;
997 /// #
998 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
999 /// #
1000 /// use matrix_sdk::{
1001 /// event_handler::EventHandlerHandle,
1002 /// ruma::events::room::member::SyncRoomMemberEvent, Client,
1003 /// };
1004 /// #
1005 /// # futures_executor::block_on(async {
1006 /// # let client = matrix_sdk::Client::builder()
1007 /// # .homeserver_url(homeserver)
1008 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1009 /// # .build()
1010 /// # .await
1011 /// # .unwrap();
1012 ///
1013 /// client.add_event_handler(
1014 /// |ev: SyncRoomMemberEvent,
1015 /// client: Client,
1016 /// handle: EventHandlerHandle| async move {
1017 /// // Common usage: Check arriving Event is the expected one
1018 /// println!("Expected RoomMemberEvent received!");
1019 /// client.remove_event_handler(handle);
1020 /// },
1021 /// );
1022 /// # });
1023 /// ```
1024 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1025 self.inner.event_handlers.remove(handle);
1026 }
1027
1028 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1029 /// the given handle.
1030 ///
1031 /// When the returned value is dropped, the event handler will be removed.
1032 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1033 EventHandlerDropGuard::new(handle, self.clone())
1034 }
1035
1036 /// Add an arbitrary value for use as event handler context.
1037 ///
1038 /// The value can be obtained in an event handler by adding an argument of
1039 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1040 ///
1041 /// If a value of the same type has been added before, it will be
1042 /// overwritten.
1043 ///
1044 /// # Examples
1045 ///
1046 /// ```no_run
1047 /// use matrix_sdk::{
1048 /// event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1049 /// Room,
1050 /// };
1051 /// # #[derive(Clone)]
1052 /// # struct SomeType;
1053 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1054 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1055 /// # futures_executor::block_on(async {
1056 /// # let client = matrix_sdk::Client::builder()
1057 /// # .homeserver_url(homeserver)
1058 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1059 /// # .build()
1060 /// # .await
1061 /// # .unwrap();
1062 ///
1063 /// // Handle used to send messages to the UI part of the app
1064 /// let my_gui_handle: SomeType = obtain_gui_handle();
1065 ///
1066 /// client.add_event_handler_context(my_gui_handle.clone());
1067 /// client.add_event_handler(
1068 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1069 /// async move {
1070 /// // gui_handle.send(DisplayMessage { message: ev });
1071 /// }
1072 /// },
1073 /// );
1074 /// # });
1075 /// ```
1076 pub fn add_event_handler_context<T>(&self, ctx: T)
1077 where
1078 T: Clone + Send + Sync + 'static,
1079 {
1080 self.inner.event_handlers.add_context(ctx);
1081 }
1082
1083 /// Register a handler for a notification.
1084 ///
1085 /// Similar to [`Client::add_event_handler`], but only allows functions
1086 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1087 /// [`Client`] for now.
1088 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1089 where
1090 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1091 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1092 {
1093 self.inner.notification_handlers.write().await.push(Box::new(
1094 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1095 ));
1096
1097 self
1098 }
1099
1100 /// Subscribe to all updates for the room with the given ID.
1101 ///
1102 /// The returned receiver will receive a new message for each sync response
1103 /// that contains updates for that room.
1104 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1105 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1106 btree_map::Entry::Vacant(entry) => {
1107 let (tx, rx) = broadcast::channel(8);
1108 entry.insert(tx);
1109 rx
1110 }
1111 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1112 }
1113 }
1114
1115 /// Subscribe to all updates to all rooms, whenever any has been received in
1116 /// a sync response.
1117 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1118 self.inner.room_updates_sender.subscribe()
1119 }
1120
1121 pub(crate) async fn notification_handlers(
1122 &self,
1123 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1124 self.inner.notification_handlers.read().await
1125 }
1126
1127 /// Get all the rooms the client knows about.
1128 ///
1129 /// This will return the list of joined, invited, and left rooms.
1130 pub fn rooms(&self) -> Vec<Room> {
1131 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1132 }
1133
1134 /// Get all the rooms the client knows about, filtered by room state.
1135 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1136 self.base_client()
1137 .rooms_filtered(filter)
1138 .into_iter()
1139 .map(|room| Room::new(self.clone(), room))
1140 .collect()
1141 }
1142
1143 /// Get a stream of all the rooms, in addition to the existing rooms.
1144 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1145 let (rooms, stream) = self.base_client().rooms_stream();
1146
1147 let map_room = |room| Room::new(self.clone(), room);
1148
1149 (
1150 rooms.into_iter().map(map_room).collect(),
1151 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1152 )
1153 }
1154
1155 /// Returns the joined rooms this client knows about.
1156 pub fn joined_rooms(&self) -> Vec<Room> {
1157 self.base_client()
1158 .rooms_filtered(RoomStateFilter::JOINED)
1159 .into_iter()
1160 .map(|room| Room::new(self.clone(), room))
1161 .collect()
1162 }
1163
1164 /// Returns the invited rooms this client knows about.
1165 pub fn invited_rooms(&self) -> Vec<Room> {
1166 self.base_client()
1167 .rooms_filtered(RoomStateFilter::INVITED)
1168 .into_iter()
1169 .map(|room| Room::new(self.clone(), room))
1170 .collect()
1171 }
1172
1173 /// Returns the left rooms this client knows about.
1174 pub fn left_rooms(&self) -> Vec<Room> {
1175 self.base_client()
1176 .rooms_filtered(RoomStateFilter::LEFT)
1177 .into_iter()
1178 .map(|room| Room::new(self.clone(), room))
1179 .collect()
1180 }
1181
1182 /// Get a room with the given room id.
1183 ///
1184 /// # Arguments
1185 ///
1186 /// `room_id` - The unique id of the room that should be fetched.
1187 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1188 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1189 }
1190
1191 /// Gets the preview of a room, whether the current user has joined it or
1192 /// not.
1193 pub async fn get_room_preview(
1194 &self,
1195 room_or_alias_id: &RoomOrAliasId,
1196 via: Vec<OwnedServerName>,
1197 ) -> Result<RoomPreview> {
1198 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1199 Ok(room_id) => room_id.to_owned(),
1200 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1201 };
1202
1203 if let Some(room) = self.get_room(&room_id) {
1204 // The cached data can only be trusted if the room state is joined or
1205 // banned: for invite and knock rooms, no updates will be received
1206 // for the rooms after the invite/knock action took place so we may
1207 // have very out to date data for important fields such as
1208 // `join_rule`. For left rooms, the homeserver should return the latest info.
1209 match room.state() {
1210 RoomState::Joined | RoomState::Banned => {
1211 return Ok(RoomPreview::from_known_room(&room).await);
1212 }
1213 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1214 }
1215 }
1216
1217 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1218 }
1219
1220 /// Resolve a room alias to a room id and a list of servers which know
1221 /// about it.
1222 ///
1223 /// # Arguments
1224 ///
1225 /// `room_alias` - The room alias to be resolved.
1226 pub async fn resolve_room_alias(
1227 &self,
1228 room_alias: &RoomAliasId,
1229 ) -> HttpResult<get_alias::v3::Response> {
1230 let request = get_alias::v3::Request::new(room_alias.to_owned());
1231 self.send(request).await
1232 }
1233
1234 /// Checks if a room alias is not in use yet.
1235 ///
1236 /// Returns:
1237 /// - `Ok(true)` if the room alias is available.
1238 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1239 /// status code).
1240 /// - An `Err` otherwise.
1241 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1242 match self.resolve_room_alias(alias).await {
1243 // The room alias was resolved, so it's already in use.
1244 Ok(_) => Ok(false),
1245 Err(error) => {
1246 match error.client_api_error_kind() {
1247 // The room alias wasn't found, so it's available.
1248 Some(ErrorKind::NotFound) => Ok(true),
1249 _ => Err(error),
1250 }
1251 }
1252 }
1253 }
1254
1255 /// Adds a new room alias associated with a room to the room directory.
1256 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1257 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1258 self.send(request).await?;
1259 Ok(())
1260 }
1261
1262 /// Removes a room alias from the room directory.
1263 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1264 let request = delete_alias::v3::Request::new(alias.to_owned());
1265 self.send(request).await?;
1266 Ok(())
1267 }
1268
1269 /// Update the homeserver from the login response well-known if needed.
1270 ///
1271 /// # Arguments
1272 ///
1273 /// * `login_well_known` - The `well_known` field from a successful login
1274 /// response.
1275 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1276 if self.inner.respect_login_well_known {
1277 if let Some(well_known) = login_well_known {
1278 if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1279 self.set_homeserver(homeserver);
1280 }
1281 }
1282 }
1283 }
1284
1285 /// Similar to [`Client::restore_session_with`], with
1286 /// [`RoomLoadSettings::default()`].
1287 ///
1288 /// # Panics
1289 ///
1290 /// Panics if a session was already restored or logged in.
1291 #[instrument(skip_all)]
1292 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1293 self.restore_session_with(session, RoomLoadSettings::default()).await
1294 }
1295
1296 /// Restore a session previously logged-in using one of the available
1297 /// authentication APIs. The number of rooms to restore is controlled by
1298 /// [`RoomLoadSettings`].
1299 ///
1300 /// See the documentation of the corresponding authentication API's
1301 /// `restore_session` method for more information.
1302 ///
1303 /// # Panics
1304 ///
1305 /// Panics if a session was already restored or logged in.
1306 #[instrument(skip_all)]
1307 pub async fn restore_session_with(
1308 &self,
1309 session: impl Into<AuthSession>,
1310 room_load_settings: RoomLoadSettings,
1311 ) -> Result<()> {
1312 let session = session.into();
1313 match session {
1314 AuthSession::Matrix(session) => {
1315 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1316 }
1317 AuthSession::OAuth(session) => {
1318 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1319 }
1320 }
1321 }
1322
1323 /// Refresh the access token using the authentication API used to log into
1324 /// this session.
1325 ///
1326 /// See the documentation of the authentication API's `refresh_access_token`
1327 /// method for more information.
1328 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1329 let Some(auth_api) = self.auth_api() else {
1330 return Err(RefreshTokenError::RefreshTokenRequired);
1331 };
1332
1333 match auth_api {
1334 AuthApi::Matrix(api) => {
1335 trace!("Token refresh: Using the homeserver.");
1336 Box::pin(api.refresh_access_token()).await?;
1337 }
1338 AuthApi::OAuth(api) => {
1339 trace!("Token refresh: Using OAuth 2.0.");
1340 Box::pin(api.refresh_access_token()).await?;
1341 }
1342 }
1343
1344 Ok(())
1345 }
1346
1347 /// Log out the current session using the proper authentication API.
1348 ///
1349 /// # Errors
1350 ///
1351 /// Returns an error if the session is not authenticated or if an error
1352 /// occurred while making the request to the server.
1353 pub async fn logout(&self) -> Result<(), Error> {
1354 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1355 match auth_api {
1356 AuthApi::Matrix(matrix_auth) => {
1357 matrix_auth.logout().await?;
1358 Ok(())
1359 }
1360 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1361 }
1362 }
1363
1364 /// Get or upload a sync filter.
1365 ///
1366 /// This method will either get a filter ID from the store or upload the
1367 /// filter definition to the homeserver and return the new filter ID.
1368 ///
1369 /// # Arguments
1370 ///
1371 /// * `filter_name` - The unique name of the filter, this name will be used
1372 /// locally to store and identify the filter ID returned by the server.
1373 ///
1374 /// * `definition` - The filter definition that should be uploaded to the
1375 /// server if no filter ID can be found in the store.
1376 ///
1377 /// # Examples
1378 ///
1379 /// ```no_run
1380 /// # use matrix_sdk::{
1381 /// # Client, config::SyncSettings,
1382 /// # ruma::api::client::{
1383 /// # filter::{
1384 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1385 /// # },
1386 /// # sync::sync_events::v3::Filter,
1387 /// # }
1388 /// # };
1389 /// # use url::Url;
1390 /// # async {
1391 /// # let homeserver = Url::parse("http://example.com").unwrap();
1392 /// # let client = Client::new(homeserver).await.unwrap();
1393 /// let mut filter = FilterDefinition::default();
1394 ///
1395 /// // Let's enable member lazy loading.
1396 /// filter.room.state.lazy_load_options =
1397 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1398 ///
1399 /// let filter_id = client
1400 /// .get_or_upload_filter("sync", filter)
1401 /// .await
1402 /// .unwrap();
1403 ///
1404 /// let sync_settings = SyncSettings::new()
1405 /// .filter(Filter::FilterId(filter_id));
1406 ///
1407 /// let response = client.sync_once(sync_settings).await.unwrap();
1408 /// # };
1409 #[instrument(skip(self, definition))]
1410 pub async fn get_or_upload_filter(
1411 &self,
1412 filter_name: &str,
1413 definition: FilterDefinition,
1414 ) -> Result<String> {
1415 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1416 debug!("Found filter locally");
1417 Ok(filter)
1418 } else {
1419 debug!("Didn't find filter locally");
1420 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1421 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1422 let response = self.send(request).await?;
1423
1424 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1425
1426 Ok(response.filter_id)
1427 }
1428 }
1429
1430 /// Finish joining a room.
1431 ///
1432 /// If the room was an invite that should be marked as a DM, will include it
1433 /// in the DM event after creating the joined room.
1434 async fn finish_join_room(&self, room_id: &RoomId) -> Result<Room> {
1435 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1436 room.state() == RoomState::Invited
1437 && room.is_direct().await.unwrap_or_else(|e| {
1438 warn!(%room_id, "is_direct() failed: {e}");
1439 false
1440 })
1441 } else {
1442 false
1443 };
1444
1445 let base_room = self.base_client().room_joined(room_id).await?;
1446 let room = Room::new(self.clone(), base_room);
1447
1448 if mark_as_dm {
1449 room.set_is_direct(true).await?;
1450 }
1451
1452 Ok(room)
1453 }
1454
1455 /// Join a room by `RoomId`.
1456 ///
1457 /// Returns the `Room` in the joined state.
1458 ///
1459 /// # Arguments
1460 ///
1461 /// * `room_id` - The `RoomId` of the room to be joined.
1462 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1463 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1464 let response = self.send(request).await?;
1465 self.finish_join_room(&response.room_id).await
1466 }
1467
1468 /// Join a room by `RoomOrAliasId`.
1469 ///
1470 /// Returns the `Room` in the joined state.
1471 ///
1472 /// # Arguments
1473 ///
1474 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1475 /// alias looks like `#name:example.com`.
1476 /// * `server_names` - The server names to be used for resolving the alias,
1477 /// if needs be.
1478 pub async fn join_room_by_id_or_alias(
1479 &self,
1480 alias: &RoomOrAliasId,
1481 server_names: &[OwnedServerName],
1482 ) -> Result<Room> {
1483 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1484 via: server_names.to_owned(),
1485 });
1486 let response = self.send(request).await?;
1487 self.finish_join_room(&response.room_id).await
1488 }
1489
1490 /// Search the homeserver's directory of public rooms.
1491 ///
1492 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1493 /// a `get_public_rooms::Response`.
1494 ///
1495 /// # Arguments
1496 ///
1497 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1498 ///
1499 /// * `since` - Pagination token from a previous request.
1500 ///
1501 /// * `server` - The name of the server, if `None` the requested server is
1502 /// used.
1503 ///
1504 /// # Examples
1505 /// ```no_run
1506 /// use matrix_sdk::Client;
1507 /// # use url::Url;
1508 /// # let homeserver = Url::parse("http://example.com").unwrap();
1509 /// # let limit = Some(10);
1510 /// # let since = Some("since token");
1511 /// # let server = Some("servername.com".try_into().unwrap());
1512 /// # async {
1513 /// let mut client = Client::new(homeserver).await.unwrap();
1514 ///
1515 /// client.public_rooms(limit, since, server).await;
1516 /// # };
1517 /// ```
1518 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1519 pub async fn public_rooms(
1520 &self,
1521 limit: Option<u32>,
1522 since: Option<&str>,
1523 server: Option<&ServerName>,
1524 ) -> HttpResult<get_public_rooms::v3::Response> {
1525 let limit = limit.map(UInt::from);
1526
1527 let request = assign!(get_public_rooms::v3::Request::new(), {
1528 limit,
1529 since: since.map(ToOwned::to_owned),
1530 server: server.map(ToOwned::to_owned),
1531 });
1532 self.send(request).await
1533 }
1534
1535 /// Create a room with the given parameters.
1536 ///
1537 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1538 /// created room.
1539 ///
1540 /// If you want to create a direct message with one specific user, you can
1541 /// use [`create_dm`][Self::create_dm], which is more convenient than
1542 /// assembling the [`create_room::v3::Request`] yourself.
1543 ///
1544 /// If the `is_direct` field of the request is set to `true` and at least
1545 /// one user is invited, the room will be automatically added to the direct
1546 /// rooms in the account data.
1547 ///
1548 /// # Examples
1549 ///
1550 /// ```no_run
1551 /// use matrix_sdk::{
1552 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1553 /// Client,
1554 /// };
1555 /// # use url::Url;
1556 /// #
1557 /// # async {
1558 /// # let homeserver = Url::parse("http://example.com").unwrap();
1559 /// let request = CreateRoomRequest::new();
1560 /// let client = Client::new(homeserver).await.unwrap();
1561 /// assert!(client.create_room(request).await.is_ok());
1562 /// # };
1563 /// ```
1564 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1565 let invite = request.invite.clone();
1566 let is_direct_room = request.is_direct;
1567 let response = self.send(request).await?;
1568
1569 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1570
1571 let joined_room = Room::new(self.clone(), base_room);
1572
1573 if is_direct_room && !invite.is_empty() {
1574 if let Err(error) =
1575 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1576 {
1577 // FIXME: Retry in the background
1578 error!("Failed to mark room as DM: {error}");
1579 }
1580 }
1581
1582 Ok(joined_room)
1583 }
1584
1585 /// Create a DM room.
1586 ///
1587 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1588 /// given user being invited, the room marked `is_direct` and both the
1589 /// creator and invitee getting the default maximum power level.
1590 ///
1591 /// If the `e2e-encryption` feature is enabled, the room will also be
1592 /// encrypted.
1593 ///
1594 /// # Arguments
1595 ///
1596 /// * `user_id` - The ID of the user to create a DM for.
1597 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1598 #[cfg(feature = "e2e-encryption")]
1599 let initial_state =
1600 vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1601 .to_raw_any()];
1602
1603 #[cfg(not(feature = "e2e-encryption"))]
1604 let initial_state = vec![];
1605
1606 let request = assign!(create_room::v3::Request::new(), {
1607 invite: vec![user_id.to_owned()],
1608 is_direct: true,
1609 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1610 initial_state,
1611 });
1612
1613 self.create_room(request).await
1614 }
1615
1616 /// Search the homeserver's directory for public rooms with a filter.
1617 ///
1618 /// # Arguments
1619 ///
1620 /// * `room_search` - The easiest way to create this request is using the
1621 /// `get_public_rooms_filtered::Request` itself.
1622 ///
1623 /// # Examples
1624 ///
1625 /// ```no_run
1626 /// # use url::Url;
1627 /// # use matrix_sdk::Client;
1628 /// # async {
1629 /// # let homeserver = Url::parse("http://example.com")?;
1630 /// use matrix_sdk::ruma::{
1631 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1632 /// };
1633 /// # let mut client = Client::new(homeserver).await?;
1634 ///
1635 /// let mut filter = Filter::new();
1636 /// filter.generic_search_term = Some("rust".to_owned());
1637 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1638 /// request.filter = filter;
1639 ///
1640 /// let response = client.public_rooms_filtered(request).await?;
1641 ///
1642 /// for room in response.chunk {
1643 /// println!("Found room {room:?}");
1644 /// }
1645 /// # anyhow::Ok(()) };
1646 /// ```
1647 pub async fn public_rooms_filtered(
1648 &self,
1649 request: get_public_rooms_filtered::v3::Request,
1650 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1651 self.send(request).await
1652 }
1653
1654 /// Send an arbitrary request to the server, without updating client state.
1655 ///
1656 /// **Warning:** Because this method *does not* update the client state, it
1657 /// is important to make sure that you account for this yourself, and
1658 /// use wrapper methods where available. This method should *only* be
1659 /// used if a wrapper method for the endpoint you'd like to use is not
1660 /// available.
1661 ///
1662 /// # Arguments
1663 ///
1664 /// * `request` - A filled out and valid request for the endpoint to be hit
1665 ///
1666 /// * `timeout` - An optional request timeout setting, this overrides the
1667 /// default request setting if one was set.
1668 ///
1669 /// # Examples
1670 ///
1671 /// ```no_run
1672 /// # use matrix_sdk::{Client, config::SyncSettings};
1673 /// # use url::Url;
1674 /// # async {
1675 /// # let homeserver = Url::parse("http://localhost:8080")?;
1676 /// # let mut client = Client::new(homeserver).await?;
1677 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1678 ///
1679 /// // First construct the request you want to make
1680 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1681 /// // for all available Endpoints
1682 /// let user_id = user_id!("@example:localhost").to_owned();
1683 /// let request = profile::get_profile::v3::Request::new(user_id);
1684 ///
1685 /// // Start the request using Client::send()
1686 /// let response = client.send(request).await?;
1687 ///
1688 /// // Check the corresponding Response struct to find out what types are
1689 /// // returned
1690 /// # anyhow::Ok(()) };
1691 /// ```
1692 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1693 where
1694 Request: OutgoingRequest + Clone + Debug,
1695 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1696 {
1697 SendRequest {
1698 client: self.clone(),
1699 request,
1700 config: None,
1701 send_progress: Default::default(),
1702 }
1703 }
1704
1705 pub(crate) async fn send_inner<Request>(
1706 &self,
1707 request: Request,
1708 config: Option<RequestConfig>,
1709 send_progress: SharedObservable<TransmissionProgress>,
1710 ) -> HttpResult<Request::IncomingResponse>
1711 where
1712 Request: OutgoingRequest + Debug,
1713 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1714 {
1715 let homeserver = self.homeserver().to_string();
1716 let access_token = self.access_token();
1717
1718 self.inner
1719 .http_client
1720 .send(
1721 request,
1722 config,
1723 homeserver,
1724 access_token.as_deref(),
1725 &self.server_versions().await?,
1726 send_progress,
1727 )
1728 .await
1729 }
1730
1731 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1732 _ = self
1733 .inner
1734 .auth_ctx
1735 .session_change_sender
1736 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1737 }
1738
1739 /// Fetches server capabilities from network; no caching.
1740 pub async fn fetch_server_capabilities(
1741 &self,
1742 request_config: Option<RequestConfig>,
1743 ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1744 let resp = self
1745 .inner
1746 .http_client
1747 .send(
1748 get_supported_versions::Request::new(),
1749 request_config,
1750 self.homeserver().to_string(),
1751 None,
1752 &[MatrixVersion::V1_0],
1753 Default::default(),
1754 )
1755 .await?;
1756
1757 // Fill both unstable features and server versions at once.
1758 let mut versions = resp.known_versions().collect::<Vec<_>>();
1759 if versions.is_empty() {
1760 versions.push(MatrixVersion::V1_0);
1761 }
1762
1763 Ok((versions.into(), resp.unstable_features))
1764 }
1765
1766 /// Load server capabilities from storage, or fetch them from network and
1767 /// cache them.
1768 async fn load_or_fetch_server_capabilities(
1769 &self,
1770 ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1771 match self.state_store().get_kv_data(StateStoreDataKey::ServerCapabilities).await {
1772 Ok(Some(stored)) => {
1773 if let Some((versions, unstable_features)) =
1774 stored.into_server_capabilities().and_then(|cap| cap.maybe_decode())
1775 {
1776 return Ok((versions.into(), unstable_features));
1777 }
1778 }
1779 Ok(None) => {
1780 // fallthrough: cache is empty
1781 }
1782 Err(err) => {
1783 warn!("error when loading cached server capabilities: {err}");
1784 // fallthrough to network.
1785 }
1786 }
1787
1788 let (versions, unstable_features) = self.fetch_server_capabilities(None).await?;
1789
1790 // Attempt to cache the result in storage.
1791 {
1792 let encoded = ServerCapabilities::new(&versions, unstable_features.clone());
1793 if let Err(err) = self
1794 .state_store()
1795 .set_kv_data(
1796 StateStoreDataKey::ServerCapabilities,
1797 StateStoreDataValue::ServerCapabilities(encoded),
1798 )
1799 .await
1800 {
1801 warn!("error when caching server capabilities: {err}");
1802 }
1803 }
1804
1805 Ok((versions, unstable_features))
1806 }
1807
1808 async fn get_or_load_and_cache_server_capabilities<
1809 T,
1810 F: Fn(&ClientServerCapabilities) -> Option<T>,
1811 >(
1812 &self,
1813 f: F,
1814 ) -> HttpResult<T> {
1815 let caps = &self.inner.caches.server_capabilities;
1816 if let Some(val) = f(&*caps.read().await) {
1817 return Ok(val);
1818 }
1819
1820 let mut guard = caps.write().await;
1821 if let Some(val) = f(&guard) {
1822 return Ok(val);
1823 }
1824
1825 let (versions, unstable_features) = self.load_or_fetch_server_capabilities().await?;
1826
1827 guard.server_versions = Some(versions);
1828 guard.unstable_features = Some(unstable_features);
1829
1830 // SAFETY: both fields were set above, so the function will always return some.
1831 Ok(f(&guard).unwrap())
1832 }
1833
1834 /// Get the Matrix versions supported by the homeserver by fetching them
1835 /// from the server or the cache.
1836 ///
1837 /// # Examples
1838 ///
1839 /// ```no_run
1840 /// use ruma::api::MatrixVersion;
1841 /// # use matrix_sdk::{Client, config::SyncSettings};
1842 /// # use url::Url;
1843 /// # async {
1844 /// # let homeserver = Url::parse("http://localhost:8080")?;
1845 /// # let mut client = Client::new(homeserver).await?;
1846 ///
1847 /// let server_versions = client.server_versions().await?;
1848 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1849 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1850 /// # anyhow::Ok(()) };
1851 /// ```
1852 pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1853 self.get_or_load_and_cache_server_capabilities(|caps| caps.server_versions.clone()).await
1854 }
1855
1856 /// Get the unstable features supported by the homeserver by fetching them
1857 /// from the server or the cache.
1858 ///
1859 /// # Examples
1860 ///
1861 /// ```no_run
1862 /// # use matrix_sdk::{Client, config::SyncSettings};
1863 /// # use url::Url;
1864 /// # async {
1865 /// # let homeserver = Url::parse("http://localhost:8080")?;
1866 /// # let mut client = Client::new(homeserver).await?;
1867 /// let unstable_features = client.unstable_features().await?;
1868 /// let supports_msc_x =
1869 /// unstable_features.get("msc_x").copied().unwrap_or(false);
1870 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1871 /// # anyhow::Ok(()) };
1872 /// ```
1873 pub async fn unstable_features(&self) -> HttpResult<BTreeMap<String, bool>> {
1874 self.get_or_load_and_cache_server_capabilities(|caps| caps.unstable_features.clone()).await
1875 }
1876
1877 /// Empty the server version and unstable features cache.
1878 ///
1879 /// Since the SDK caches server capabilities (versions and unstable
1880 /// features), it's possible to have a stale entry in the cache. This
1881 /// functions makes it possible to force reset it.
1882 pub async fn reset_server_capabilities(&self) -> Result<()> {
1883 // Empty the in-memory caches.
1884 let mut guard = self.inner.caches.server_capabilities.write().await;
1885 guard.server_versions = None;
1886 guard.unstable_features = None;
1887
1888 // Empty the store cache.
1889 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerCapabilities).await?)
1890 }
1891
1892 /// Check whether MSC 4028 is enabled on the homeserver.
1893 ///
1894 /// # Examples
1895 ///
1896 /// ```no_run
1897 /// # use matrix_sdk::{Client, config::SyncSettings};
1898 /// # use url::Url;
1899 /// # async {
1900 /// # let homeserver = Url::parse("http://localhost:8080")?;
1901 /// # let mut client = Client::new(homeserver).await?;
1902 /// let msc4028_enabled =
1903 /// client.can_homeserver_push_encrypted_event_to_device().await?;
1904 /// # anyhow::Ok(()) };
1905 /// ```
1906 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
1907 Ok(self.unstable_features().await?.get("org.matrix.msc4028").copied().unwrap_or(false))
1908 }
1909
1910 /// Get information of all our own devices.
1911 ///
1912 /// # Examples
1913 ///
1914 /// ```no_run
1915 /// # use matrix_sdk::{Client, config::SyncSettings};
1916 /// # use url::Url;
1917 /// # async {
1918 /// # let homeserver = Url::parse("http://localhost:8080")?;
1919 /// # let mut client = Client::new(homeserver).await?;
1920 /// let response = client.devices().await?;
1921 ///
1922 /// for device in response.devices {
1923 /// println!(
1924 /// "Device: {} {}",
1925 /// device.device_id,
1926 /// device.display_name.as_deref().unwrap_or("")
1927 /// );
1928 /// }
1929 /// # anyhow::Ok(()) };
1930 /// ```
1931 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
1932 let request = get_devices::v3::Request::new();
1933
1934 self.send(request).await
1935 }
1936
1937 /// Delete the given devices from the server.
1938 ///
1939 /// # Arguments
1940 ///
1941 /// * `devices` - The list of devices that should be deleted from the
1942 /// server.
1943 ///
1944 /// * `auth_data` - This request requires user interactive auth, the first
1945 /// request needs to set this to `None` and will always fail with an
1946 /// `UiaaResponse`. The response will contain information for the
1947 /// interactive auth and the same request needs to be made but this time
1948 /// with some `auth_data` provided.
1949 ///
1950 /// ```no_run
1951 /// # use matrix_sdk::{
1952 /// # ruma::{api::client::uiaa, device_id},
1953 /// # Client, Error, config::SyncSettings,
1954 /// # };
1955 /// # use serde_json::json;
1956 /// # use url::Url;
1957 /// # use std::collections::BTreeMap;
1958 /// # async {
1959 /// # let homeserver = Url::parse("http://localhost:8080")?;
1960 /// # let mut client = Client::new(homeserver).await?;
1961 /// let devices = &[device_id!("DEVICEID").to_owned()];
1962 ///
1963 /// if let Err(e) = client.delete_devices(devices, None).await {
1964 /// if let Some(info) = e.as_uiaa_response() {
1965 /// let mut password = uiaa::Password::new(
1966 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1967 /// "wordpass".to_owned(),
1968 /// );
1969 /// password.session = info.session.clone();
1970 ///
1971 /// client
1972 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
1973 /// .await?;
1974 /// }
1975 /// }
1976 /// # anyhow::Ok(()) };
1977 pub async fn delete_devices(
1978 &self,
1979 devices: &[OwnedDeviceId],
1980 auth_data: Option<uiaa::AuthData>,
1981 ) -> HttpResult<delete_devices::v3::Response> {
1982 let mut request = delete_devices::v3::Request::new(devices.to_owned());
1983 request.auth = auth_data;
1984
1985 self.send(request).await
1986 }
1987
1988 /// Change the display name of a device owned by the current user.
1989 ///
1990 /// Returns a `update_device::Response` which specifies the result
1991 /// of the operation.
1992 ///
1993 /// # Arguments
1994 ///
1995 /// * `device_id` - The ID of the device to change the display name of.
1996 /// * `display_name` - The new display name to set.
1997 pub async fn rename_device(
1998 &self,
1999 device_id: &DeviceId,
2000 display_name: &str,
2001 ) -> HttpResult<update_device::v3::Response> {
2002 let mut request = update_device::v3::Request::new(device_id.to_owned());
2003 request.display_name = Some(display_name.to_owned());
2004
2005 self.send(request).await
2006 }
2007
2008 /// Synchronize the client's state with the latest state on the server.
2009 ///
2010 /// ## Syncing Events
2011 ///
2012 /// Messages or any other type of event need to be periodically fetched from
2013 /// the server, this is achieved by sending a `/sync` request to the server.
2014 ///
2015 /// The first sync is sent out without a [`token`]. The response of the
2016 /// first sync will contain a [`next_batch`] field which should then be
2017 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2018 /// don't receive the same events multiple times.
2019 ///
2020 /// ## Long Polling
2021 ///
2022 /// A sync should in the usual case always be in flight. The
2023 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2024 /// long the server will wait for new events before it will respond.
2025 /// The server will respond immediately if some new events arrive before the
2026 /// timeout has expired. If no changes arrive and the timeout expires an
2027 /// empty sync response will be sent to the client.
2028 ///
2029 /// This method of sending a request that may not receive a response
2030 /// immediately is called long polling.
2031 ///
2032 /// ## Filtering Events
2033 ///
2034 /// The number or type of messages and events that the client should receive
2035 /// from the server can be altered using a [`Filter`].
2036 ///
2037 /// Filters can be non-trivial and, since they will be sent with every sync
2038 /// request, they may take up a bunch of unnecessary bandwidth.
2039 ///
2040 /// Luckily filters can be uploaded to the server and reused using an unique
2041 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2042 /// method.
2043 ///
2044 /// # Arguments
2045 ///
2046 /// * `sync_settings` - Settings for the sync call, this allows us to set
2047 /// various options to configure the sync:
2048 /// * [`filter`] - To configure which events we receive and which get
2049 /// [filtered] by the server
2050 /// * [`timeout`] - To configure our [long polling] setup.
2051 /// * [`token`] - To tell the server which events we already received
2052 /// and where we wish to continue syncing.
2053 /// * [`full_state`] - To tell the server that we wish to receive all
2054 /// state events, regardless of our configured [`token`].
2055 /// * [`set_presence`] - To tell the server to set the presence and to
2056 /// which state.
2057 ///
2058 /// # Examples
2059 ///
2060 /// ```no_run
2061 /// # use url::Url;
2062 /// # async {
2063 /// # let homeserver = Url::parse("http://localhost:8080")?;
2064 /// # let username = "";
2065 /// # let password = "";
2066 /// use matrix_sdk::{
2067 /// config::SyncSettings,
2068 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2069 /// };
2070 ///
2071 /// let client = Client::new(homeserver).await?;
2072 /// client.matrix_auth().login_username(username, password).send().await?;
2073 ///
2074 /// // Sync once so we receive the client state and old messages.
2075 /// client.sync_once(SyncSettings::default()).await?;
2076 ///
2077 /// // Register our handler so we start responding once we receive a new
2078 /// // event.
2079 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2080 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2081 /// });
2082 ///
2083 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2084 /// // from our `sync_once()` call automatically.
2085 /// client.sync(SyncSettings::default()).await;
2086 /// # anyhow::Ok(()) };
2087 /// ```
2088 ///
2089 /// [`sync`]: #method.sync
2090 /// [`SyncSettings`]: crate::config::SyncSettings
2091 /// [`token`]: crate::config::SyncSettings#method.token
2092 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2093 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2094 /// [`set_presence`]: ruma::presence::PresenceState
2095 /// [`filter`]: crate::config::SyncSettings#method.filter
2096 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2097 /// [`next_batch`]: SyncResponse#structfield.next_batch
2098 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2099 /// [long polling]: #long-polling
2100 /// [filtered]: #filtering-events
2101 #[instrument(skip(self))]
2102 pub async fn sync_once(
2103 &self,
2104 sync_settings: crate::config::SyncSettings,
2105 ) -> Result<SyncResponse> {
2106 // The sync might not return for quite a while due to the timeout.
2107 // We'll see if there's anything crypto related to send out before we
2108 // sync, i.e. if we closed our client after a sync but before the
2109 // crypto requests were sent out.
2110 //
2111 // This will mostly be a no-op.
2112 #[cfg(feature = "e2e-encryption")]
2113 if let Err(e) = self.send_outgoing_requests().await {
2114 error!(error = ?e, "Error while sending outgoing E2EE requests");
2115 }
2116
2117 let request = assign!(sync_events::v3::Request::new(), {
2118 filter: sync_settings.filter.map(|f| *f),
2119 since: sync_settings.token,
2120 full_state: sync_settings.full_state,
2121 set_presence: sync_settings.set_presence,
2122 timeout: sync_settings.timeout,
2123 });
2124 let mut request_config = self.request_config();
2125 if let Some(timeout) = sync_settings.timeout {
2126 request_config.timeout += timeout;
2127 }
2128
2129 let response = self.send(request).with_request_config(request_config).await?;
2130 let next_batch = response.next_batch.clone();
2131 let response = self.process_sync(response).await?;
2132
2133 #[cfg(feature = "e2e-encryption")]
2134 if let Err(e) = self.send_outgoing_requests().await {
2135 error!(error = ?e, "Error while sending outgoing E2EE requests");
2136 }
2137
2138 self.inner.sync_beat.notify(usize::MAX);
2139
2140 Ok(SyncResponse::new(next_batch, response))
2141 }
2142
2143 /// Repeatedly synchronize the client state with the server.
2144 ///
2145 /// This method will only return on error, if cancellation is needed
2146 /// the method should be wrapped in a cancelable task or the
2147 /// [`Client::sync_with_callback`] method can be used or
2148 /// [`Client::sync_with_result_callback`] if you want to handle error
2149 /// cases in the loop, too.
2150 ///
2151 /// This method will internally call [`Client::sync_once`] in a loop.
2152 ///
2153 /// This method can be used with the [`Client::add_event_handler`]
2154 /// method to react to individual events. If you instead wish to handle
2155 /// events in a bulk manner the [`Client::sync_with_callback`],
2156 /// [`Client::sync_with_result_callback`] and
2157 /// [`Client::sync_stream`] methods can be used instead. Those methods
2158 /// repeatedly return the whole sync response.
2159 ///
2160 /// # Arguments
2161 ///
2162 /// * `sync_settings` - Settings for the sync call. *Note* that those
2163 /// settings will be only used for the first sync call. See the argument
2164 /// docs for [`Client::sync_once`] for more info.
2165 ///
2166 /// # Return
2167 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2168 /// up to the user of the API to check the error and decide whether the sync
2169 /// should continue or not.
2170 ///
2171 /// # Examples
2172 ///
2173 /// ```no_run
2174 /// # use url::Url;
2175 /// # async {
2176 /// # let homeserver = Url::parse("http://localhost:8080")?;
2177 /// # let username = "";
2178 /// # let password = "";
2179 /// use matrix_sdk::{
2180 /// config::SyncSettings,
2181 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2182 /// };
2183 ///
2184 /// let client = Client::new(homeserver).await?;
2185 /// client.matrix_auth().login_username(&username, &password).send().await?;
2186 ///
2187 /// // Register our handler so we start responding once we receive a new
2188 /// // event.
2189 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2190 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2191 /// });
2192 ///
2193 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2194 /// // automatically.
2195 /// client.sync(SyncSettings::default()).await?;
2196 /// # anyhow::Ok(()) };
2197 /// ```
2198 ///
2199 /// [argument docs]: #method.sync_once
2200 /// [`sync_with_callback`]: #method.sync_with_callback
2201 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2202 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2203 }
2204
2205 /// Repeatedly call sync to synchronize the client state with the server.
2206 ///
2207 /// # Arguments
2208 ///
2209 /// * `sync_settings` - Settings for the sync call. *Note* that those
2210 /// settings will be only used for the first sync call. See the argument
2211 /// docs for [`Client::sync_once`] for more info.
2212 ///
2213 /// * `callback` - A callback that will be called every time a successful
2214 /// response has been fetched from the server. The callback must return a
2215 /// boolean which signalizes if the method should stop syncing. If the
2216 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2217 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2218 ///
2219 /// # Return
2220 /// The sync runs until an error occurs or the
2221 /// callback indicates that the Loop should stop. If the callback asked for
2222 /// a regular stop, the result will be `Ok(())` otherwise the
2223 /// `Err(Error)` is returned.
2224 ///
2225 /// # Examples
2226 ///
2227 /// The following example demonstrates how to sync forever while sending all
2228 /// the interesting events through a mpsc channel to another thread e.g. a
2229 /// UI thread.
2230 ///
2231 /// ```no_run
2232 /// # use std::time::Duration;
2233 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2234 /// # use url::Url;
2235 /// # async {
2236 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2237 /// # let mut client = Client::new(homeserver).await.unwrap();
2238 ///
2239 /// use tokio::sync::mpsc::channel;
2240 ///
2241 /// let (tx, rx) = channel(100);
2242 ///
2243 /// let sync_channel = &tx;
2244 /// let sync_settings = SyncSettings::new()
2245 /// .timeout(Duration::from_secs(30));
2246 ///
2247 /// client
2248 /// .sync_with_callback(sync_settings, |response| async move {
2249 /// let channel = sync_channel;
2250 /// for (room_id, room) in response.rooms.joined {
2251 /// for event in room.timeline.events {
2252 /// channel.send(event).await.unwrap();
2253 /// }
2254 /// }
2255 ///
2256 /// LoopCtrl::Continue
2257 /// })
2258 /// .await;
2259 /// };
2260 /// ```
2261 #[instrument(skip_all)]
2262 pub async fn sync_with_callback<C>(
2263 &self,
2264 sync_settings: crate::config::SyncSettings,
2265 callback: impl Fn(SyncResponse) -> C,
2266 ) -> Result<(), Error>
2267 where
2268 C: Future<Output = LoopCtrl>,
2269 {
2270 self.sync_with_result_callback(sync_settings, |result| async {
2271 Ok(callback(result?).await)
2272 })
2273 .await
2274 }
2275
2276 /// Repeatedly call sync to synchronize the client state with the server.
2277 ///
2278 /// # Arguments
2279 ///
2280 /// * `sync_settings` - Settings for the sync call. *Note* that those
2281 /// settings will be only used for the first sync call. See the argument
2282 /// docs for [`Client::sync_once`] for more info.
2283 ///
2284 /// * `callback` - A callback that will be called every time after a
2285 /// response has been received, failure or not. The callback returns a
2286 /// `Result<LoopCtrl, Error>`, too. When returning
2287 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2288 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2289 /// function returns `Ok(())`. In case the callback can't handle the
2290 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2291 /// which results in the sync ending and the `Err(Error)` being returned.
2292 ///
2293 /// # Return
2294 /// The sync runs until an error occurs that the callback can't handle or
2295 /// the callback indicates that the Loop should stop. If the callback
2296 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2297 /// `Err(Error)` is returned.
2298 ///
2299 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2300 /// this, and are handled first without sending the result to the
2301 /// callback. Only after they have exceeded is the `Result` handed to
2302 /// the callback.
2303 ///
2304 /// # Examples
2305 ///
2306 /// The following example demonstrates how to sync forever while sending all
2307 /// the interesting events through a mpsc channel to another thread e.g. a
2308 /// UI thread.
2309 ///
2310 /// ```no_run
2311 /// # use std::time::Duration;
2312 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2313 /// # use url::Url;
2314 /// # async {
2315 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2316 /// # let mut client = Client::new(homeserver).await.unwrap();
2317 /// #
2318 /// use tokio::sync::mpsc::channel;
2319 ///
2320 /// let (tx, rx) = channel(100);
2321 ///
2322 /// let sync_channel = &tx;
2323 /// let sync_settings = SyncSettings::new()
2324 /// .timeout(Duration::from_secs(30));
2325 ///
2326 /// client
2327 /// .sync_with_result_callback(sync_settings, |response| async move {
2328 /// let channel = sync_channel;
2329 /// let sync_response = response?;
2330 /// for (room_id, room) in sync_response.rooms.joined {
2331 /// for event in room.timeline.events {
2332 /// channel.send(event).await.unwrap();
2333 /// }
2334 /// }
2335 ///
2336 /// Ok(LoopCtrl::Continue)
2337 /// })
2338 /// .await;
2339 /// };
2340 /// ```
2341 #[instrument(skip(self, callback))]
2342 pub async fn sync_with_result_callback<C>(
2343 &self,
2344 mut sync_settings: crate::config::SyncSettings,
2345 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2346 ) -> Result<(), Error>
2347 where
2348 C: Future<Output = Result<LoopCtrl, Error>>,
2349 {
2350 let mut last_sync_time: Option<Instant> = None;
2351
2352 if sync_settings.token.is_none() {
2353 sync_settings.token = self.sync_token().await;
2354 }
2355
2356 loop {
2357 trace!("Syncing");
2358 let result = self.sync_loop_helper(&mut sync_settings).await;
2359
2360 trace!("Running callback");
2361 if callback(result).await? == LoopCtrl::Break {
2362 trace!("Callback told us to stop");
2363 break;
2364 }
2365 trace!("Done running callback");
2366
2367 Client::delay_sync(&mut last_sync_time).await
2368 }
2369
2370 Ok(())
2371 }
2372
2373 //// Repeatedly synchronize the client state with the server.
2374 ///
2375 /// This method will internally call [`Client::sync_once`] in a loop and is
2376 /// equivalent to the [`Client::sync`] method but the responses are provided
2377 /// as an async stream.
2378 ///
2379 /// # Arguments
2380 ///
2381 /// * `sync_settings` - Settings for the sync call. *Note* that those
2382 /// settings will be only used for the first sync call. See the argument
2383 /// docs for [`Client::sync_once`] for more info.
2384 ///
2385 /// # Examples
2386 ///
2387 /// ```no_run
2388 /// # use url::Url;
2389 /// # async {
2390 /// # let homeserver = Url::parse("http://localhost:8080")?;
2391 /// # let username = "";
2392 /// # let password = "";
2393 /// use futures_util::StreamExt;
2394 /// use matrix_sdk::{config::SyncSettings, Client};
2395 ///
2396 /// let client = Client::new(homeserver).await?;
2397 /// client.matrix_auth().login_username(&username, &password).send().await?;
2398 ///
2399 /// let mut sync_stream =
2400 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2401 ///
2402 /// while let Some(Ok(response)) = sync_stream.next().await {
2403 /// for room in response.rooms.joined.values() {
2404 /// for e in &room.timeline.events {
2405 /// if let Ok(event) = e.raw().deserialize() {
2406 /// println!("Received event {:?}", event);
2407 /// }
2408 /// }
2409 /// }
2410 /// }
2411 ///
2412 /// # anyhow::Ok(()) };
2413 /// ```
2414 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2415 #[instrument(skip(self))]
2416 pub async fn sync_stream(
2417 &self,
2418 mut sync_settings: crate::config::SyncSettings,
2419 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2420 let mut last_sync_time: Option<Instant> = None;
2421
2422 if sync_settings.token.is_none() {
2423 sync_settings.token = self.sync_token().await;
2424 }
2425
2426 let parent_span = Span::current();
2427
2428 async_stream::stream! {
2429 loop {
2430 yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2431
2432 Client::delay_sync(&mut last_sync_time).await
2433 }
2434 }
2435 }
2436
2437 /// Get the current, if any, sync token of the client.
2438 /// This will be None if the client didn't sync at least once.
2439 pub(crate) async fn sync_token(&self) -> Option<String> {
2440 self.inner.base_client.sync_token().await
2441 }
2442
2443 /// Gets information about the owner of a given access token.
2444 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2445 let request = whoami::v3::Request::new();
2446 self.send(request).await
2447 }
2448
2449 /// Subscribes a new receiver to client SessionChange broadcasts.
2450 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2451 let broadcast = &self.auth_ctx().session_change_sender;
2452 broadcast.subscribe()
2453 }
2454
2455 /// Sets the save/restore session callbacks.
2456 ///
2457 /// This is another mechanism to get synchronous updates to session tokens,
2458 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2459 pub fn set_session_callbacks(
2460 &self,
2461 reload_session_callback: Box<ReloadSessionCallback>,
2462 save_session_callback: Box<SaveSessionCallback>,
2463 ) -> Result<()> {
2464 self.inner
2465 .auth_ctx
2466 .reload_session_callback
2467 .set(reload_session_callback)
2468 .map_err(|_| Error::MultipleSessionCallbacks)?;
2469
2470 self.inner
2471 .auth_ctx
2472 .save_session_callback
2473 .set(save_session_callback)
2474 .map_err(|_| Error::MultipleSessionCallbacks)?;
2475
2476 Ok(())
2477 }
2478
2479 /// Get the notification settings of the current owner of the client.
2480 pub async fn notification_settings(&self) -> NotificationSettings {
2481 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2482 NotificationSettings::new(self.clone(), ruleset)
2483 }
2484
2485 /// Create a new specialized `Client` that can process notifications.
2486 ///
2487 /// See [`CrossProcessStoreLock::new`] to learn more about
2488 /// `cross_process_store_locks_holder_name`.
2489 ///
2490 /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2491 pub async fn notification_client(
2492 &self,
2493 cross_process_store_locks_holder_name: String,
2494 ) -> Result<Client> {
2495 let client = Client {
2496 inner: ClientInner::new(
2497 self.inner.auth_ctx.clone(),
2498 self.server().cloned(),
2499 self.homeserver(),
2500 self.sliding_sync_version(),
2501 self.inner.http_client.clone(),
2502 self.inner
2503 .base_client
2504 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2505 .await?,
2506 self.inner.caches.server_capabilities.read().await.clone(),
2507 self.inner.respect_login_well_known,
2508 self.inner.event_cache.clone(),
2509 self.inner.send_queue_data.clone(),
2510 #[cfg(feature = "e2e-encryption")]
2511 self.inner.e2ee.encryption_settings,
2512 #[cfg(feature = "e2e-encryption")]
2513 self.inner.enable_share_history_on_invite,
2514 cross_process_store_locks_holder_name,
2515 )
2516 .await,
2517 };
2518
2519 Ok(client)
2520 }
2521
2522 /// The [`EventCache`] instance for this [`Client`].
2523 pub fn event_cache(&self) -> &EventCache {
2524 // SAFETY: always initialized in the `Client` ctor.
2525 self.inner.event_cache.get().unwrap()
2526 }
2527
2528 /// Waits until an at least partially synced room is received, and returns
2529 /// it.
2530 ///
2531 /// **Note: this function will loop endlessly until either it finds the room
2532 /// or an externally set timeout happens.**
2533 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2534 loop {
2535 if let Some(room) = self.get_room(room_id) {
2536 if room.is_state_partially_or_fully_synced() {
2537 debug!("Found just created room!");
2538 return room;
2539 }
2540 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2541 } else {
2542 debug!("Room wasn't found, waiting for sync beat to try again");
2543 }
2544 self.inner.sync_beat.listen().await;
2545 }
2546 }
2547
2548 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2549 /// join it.
2550 pub async fn knock(
2551 &self,
2552 room_id_or_alias: OwnedRoomOrAliasId,
2553 reason: Option<String>,
2554 server_names: Vec<OwnedServerName>,
2555 ) -> Result<Room> {
2556 let request =
2557 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2558 let response = self.send(request).await?;
2559 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2560 Ok(Room::new(self.clone(), base_room))
2561 }
2562
2563 /// Checks whether the provided `user_id` belongs to an ignored user.
2564 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2565 self.base_client().is_user_ignored(user_id).await
2566 }
2567
2568 /// Gets the `max_upload_size` value from the homeserver, getting either a
2569 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2570 /// missing.
2571 ///
2572 /// Check the spec for more info:
2573 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2574 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2575 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2576 if let Some(data) = max_upload_size_lock.get() {
2577 return Ok(data.to_owned());
2578 }
2579
2580 let response = self
2581 .send(ruma::api::client::authenticated_media::get_media_config::v1::Request::default())
2582 .await?;
2583
2584 match max_upload_size_lock.set(response.upload_size) {
2585 Ok(_) => Ok(response.upload_size),
2586 Err(error) => {
2587 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2588 }
2589 }
2590 }
2591}
2592
2593/// A weak reference to the inner client, useful when trying to get a handle
2594/// on the owning client.
2595#[derive(Clone)]
2596pub(crate) struct WeakClient {
2597 client: Weak<ClientInner>,
2598}
2599
2600impl WeakClient {
2601 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2602 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2603 Self { client: Arc::downgrade(client) }
2604 }
2605
2606 /// Construct a [`WeakClient`] from a [`Client`].
2607 pub fn from_client(client: &Client) -> Self {
2608 Self::from_inner(&client.inner)
2609 }
2610
2611 /// Attempts to get a [`Client`] from this [`WeakClient`].
2612 pub fn get(&self) -> Option<Client> {
2613 self.client.upgrade().map(|inner| Client { inner })
2614 }
2615
2616 /// Gets the number of strong (`Arc`) pointers still pointing to this
2617 /// client.
2618 #[allow(dead_code)]
2619 pub fn strong_count(&self) -> usize {
2620 self.client.strong_count()
2621 }
2622}
2623
2624#[derive(Clone)]
2625struct ClientServerCapabilities {
2626 /// The Matrix versions the server supports (well-known ones only).
2627 server_versions: Option<Box<[MatrixVersion]>>,
2628
2629 /// The unstable features and their on/off state on the server.
2630 unstable_features: Option<BTreeMap<String, bool>>,
2631}
2632
2633// The http mocking library is not supported for wasm32
2634#[cfg(all(test, not(target_family = "wasm")))]
2635pub(crate) mod tests {
2636 use std::{sync::Arc, time::Duration};
2637
2638 use assert_matches::assert_matches;
2639 use assert_matches2::assert_let;
2640 use eyeball::SharedObservable;
2641 use futures_util::{pin_mut, FutureExt};
2642 use js_int::{uint, UInt};
2643 use matrix_sdk_base::{
2644 store::{MemoryStore, StoreConfig},
2645 RoomState,
2646 };
2647 use matrix_sdk_test::{
2648 async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
2649 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
2650 };
2651 #[cfg(target_family = "wasm")]
2652 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2653
2654 use ruma::{
2655 api::{client::room::create_room::v3::Request as CreateRoomRequest, MatrixVersion},
2656 assign,
2657 events::{
2658 ignored_user_list::IgnoredUserListEventContent,
2659 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
2660 },
2661 owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2662 };
2663 use serde_json::json;
2664 use stream_assert::{assert_next_matches, assert_pending};
2665 use tokio::{
2666 spawn,
2667 time::{sleep, timeout},
2668 };
2669 use url::Url;
2670 use wiremock::{
2671 matchers::{body_json, header, method, path, query_param_is_missing},
2672 Mock, MockServer, ResponseTemplate,
2673 };
2674
2675 use super::Client;
2676 use crate::{
2677 client::{futures::SendMediaUploadRequest, WeakClient},
2678 config::{RequestConfig, SyncSettings},
2679 futures::SendRequest,
2680 media::MediaError,
2681 test_utils::{
2682 logged_in_client, mocks::MatrixMockServer, no_retry_test_client, set_client_session,
2683 test_client_builder, test_client_builder_with_server,
2684 },
2685 Error, TransmissionProgress,
2686 };
2687
2688 #[async_test]
2689 async fn test_account_data() {
2690 let server = MockServer::start().await;
2691 let client = logged_in_client(Some(server.uri())).await;
2692
2693 Mock::given(method("GET"))
2694 .and(path("/_matrix/client/r0/sync".to_owned()))
2695 .and(header("authorization", "Bearer 1234"))
2696 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::SYNC))
2697 .mount(&server)
2698 .await;
2699
2700 let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
2701 let _response = client.sync_once(sync_settings).await.unwrap();
2702
2703 let content = client
2704 .account()
2705 .account_data::<IgnoredUserListEventContent>()
2706 .await
2707 .unwrap()
2708 .unwrap()
2709 .deserialize()
2710 .unwrap();
2711
2712 assert_eq!(content.ignored_users.len(), 1);
2713 }
2714
2715 #[async_test]
2716 async fn test_successful_discovery() {
2717 // Imagine this is `matrix.org`.
2718 let server = MockServer::start().await;
2719
2720 // Imagine this is `matrix-client.matrix.org`.
2721 let homeserver = MockServer::start().await;
2722
2723 // Imagine Alice has the user ID `@alice:matrix.org`.
2724 let server_url = server.uri();
2725 let domain = server_url.strip_prefix("http://").unwrap();
2726 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2727
2728 // The `.well-known` is on the server (e.g. `matrix.org`).
2729 Mock::given(method("GET"))
2730 .and(path("/.well-known/matrix/client"))
2731 .respond_with(ResponseTemplate::new(200).set_body_raw(
2732 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", &homeserver.uri()),
2733 "application/json",
2734 ))
2735 .mount(&server)
2736 .await;
2737
2738 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2739 Mock::given(method("GET"))
2740 .and(path("/_matrix/client/versions"))
2741 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2742 .mount(&homeserver)
2743 .await;
2744
2745 let client = Client::builder()
2746 .insecure_server_name_no_tls(alice.server_name())
2747 .build()
2748 .await
2749 .unwrap();
2750
2751 assert_eq!(client.server().unwrap(), &Url::parse(&server.uri()).unwrap());
2752 assert_eq!(client.homeserver(), Url::parse(&homeserver.uri()).unwrap());
2753 }
2754
2755 #[async_test]
2756 async fn test_discovery_broken_server() {
2757 let server = MockServer::start().await;
2758 let server_url = server.uri();
2759 let domain = server_url.strip_prefix("http://").unwrap();
2760 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2761
2762 Mock::given(method("GET"))
2763 .and(path("/.well-known/matrix/client"))
2764 .respond_with(ResponseTemplate::new(404))
2765 .mount(&server)
2766 .await;
2767
2768 assert!(
2769 Client::builder()
2770 .insecure_server_name_no_tls(alice.server_name())
2771 .build()
2772 .await
2773 .is_err(),
2774 "Creating a client from a user ID should fail when the .well-known request fails."
2775 );
2776 }
2777
2778 #[async_test]
2779 async fn test_room_creation() {
2780 let server = MockServer::start().await;
2781 let client = logged_in_client(Some(server.uri())).await;
2782
2783 let response = SyncResponseBuilder::default()
2784 .add_joined_room(
2785 JoinedRoomBuilder::default()
2786 .add_state_event(StateTestEvent::Member)
2787 .add_state_event(StateTestEvent::PowerLevels),
2788 )
2789 .build_sync_response();
2790
2791 client.inner.base_client.receive_sync_response(response).await.unwrap();
2792
2793 assert_eq!(client.homeserver(), Url::parse(&server.uri()).unwrap());
2794
2795 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
2796 assert_eq!(room.state(), RoomState::Joined);
2797 }
2798
2799 #[async_test]
2800 async fn test_retry_limit_http_requests() {
2801 let server = MockServer::start().await;
2802 let client = test_client_builder(Some(server.uri()))
2803 .request_config(RequestConfig::new().retry_limit(3))
2804 .build()
2805 .await
2806 .unwrap();
2807
2808 assert!(client.request_config().retry_limit.unwrap() == 3);
2809
2810 Mock::given(method("POST"))
2811 .and(path("/_matrix/client/r0/login"))
2812 .respond_with(ResponseTemplate::new(501))
2813 .expect(3)
2814 .mount(&server)
2815 .await;
2816
2817 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2818 }
2819
2820 #[async_test]
2821 async fn test_retry_timeout_http_requests() {
2822 // Keep this timeout small so that the test doesn't take long
2823 let retry_timeout = Duration::from_secs(5);
2824 let server = MockServer::start().await;
2825 let client = test_client_builder(Some(server.uri()))
2826 .request_config(RequestConfig::new().max_retry_time(retry_timeout))
2827 .build()
2828 .await
2829 .unwrap();
2830
2831 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
2832
2833 Mock::given(method("POST"))
2834 .and(path("/_matrix/client/r0/login"))
2835 .respond_with(ResponseTemplate::new(501))
2836 .expect(2..)
2837 .mount(&server)
2838 .await;
2839
2840 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2841 }
2842
2843 #[async_test]
2844 async fn test_short_retry_initial_http_requests() {
2845 let server = MockServer::start().await;
2846 let client = test_client_builder(Some(server.uri())).build().await.unwrap();
2847
2848 Mock::given(method("POST"))
2849 .and(path("/_matrix/client/r0/login"))
2850 .respond_with(ResponseTemplate::new(501))
2851 .expect(3..)
2852 .mount(&server)
2853 .await;
2854
2855 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2856 }
2857
2858 #[async_test]
2859 async fn test_no_retry_http_requests() {
2860 let server = MockServer::start().await;
2861 let client = logged_in_client(Some(server.uri())).await;
2862
2863 Mock::given(method("GET"))
2864 .and(path("/_matrix/client/r0/devices"))
2865 .respond_with(ResponseTemplate::new(501))
2866 .expect(1)
2867 .mount(&server)
2868 .await;
2869
2870 client.devices().await.unwrap_err();
2871 }
2872
2873 #[async_test]
2874 async fn test_set_homeserver() {
2875 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2876 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
2877
2878 let homeserver = Url::parse("http://example.com/").unwrap();
2879 client.set_homeserver(homeserver.clone());
2880 assert_eq!(client.homeserver(), homeserver);
2881 }
2882
2883 #[async_test]
2884 async fn test_search_user_request() {
2885 let server = MockServer::start().await;
2886 let client = logged_in_client(Some(server.uri())).await;
2887
2888 Mock::given(method("POST"))
2889 .and(path("_matrix/client/r0/user_directory/search"))
2890 .and(body_json(&*test_json::search_users::SEARCH_USERS_REQUEST))
2891 .respond_with(
2892 ResponseTemplate::new(200)
2893 .set_body_json(&*test_json::search_users::SEARCH_USERS_RESPONSE),
2894 )
2895 .mount(&server)
2896 .await;
2897
2898 let response = client.search_users("test", 50).await.unwrap();
2899 assert_eq!(response.results.len(), 1);
2900 let result = &response.results[0];
2901 assert_eq!(result.user_id.to_string(), "@test:example.me");
2902 assert_eq!(result.display_name.clone().unwrap(), "Test");
2903 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
2904 assert!(!response.limited);
2905 }
2906
2907 #[async_test]
2908 async fn test_request_unstable_features() {
2909 let server = MockServer::start().await;
2910 let client = logged_in_client(Some(server.uri())).await;
2911
2912 Mock::given(method("GET"))
2913 .and(path("_matrix/client/versions"))
2914 .respond_with(
2915 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2916 )
2917 .mount(&server)
2918 .await;
2919
2920 let unstable_features = client.unstable_features().await.unwrap();
2921 assert_eq!(unstable_features.get("org.matrix.e2e_cross_signing"), Some(&true));
2922 assert_eq!(unstable_features.get("you.shall.pass"), None);
2923 }
2924
2925 #[async_test]
2926 async fn test_can_homeserver_push_encrypted_event_to_device() {
2927 let server = MockServer::start().await;
2928 let client = logged_in_client(Some(server.uri())).await;
2929
2930 Mock::given(method("GET"))
2931 .and(path("_matrix/client/versions"))
2932 .respond_with(
2933 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2934 )
2935 .mount(&server)
2936 .await;
2937
2938 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
2939 assert!(msc4028_enabled);
2940 }
2941
2942 #[async_test]
2943 async fn test_recently_visited_rooms() {
2944 // Tracking recently visited rooms requires authentication
2945 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2946 assert_matches!(
2947 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
2948 Err(Error::AuthenticationRequired)
2949 );
2950
2951 let client = logged_in_client(None).await;
2952 let account = client.account();
2953
2954 // We should start off with an empty list
2955 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
2956
2957 // Tracking a valid room id should add it to the list
2958 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2959 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2960 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2961
2962 // And the existing list shouldn't be changed
2963 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2964 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2965
2966 // Tracking the same room again shouldn't change the list
2967 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2968 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2969 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2970
2971 // Tracking a second room should add it to the front of the list
2972 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
2973 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2974 assert_eq!(
2975 account.get_recently_visited_rooms().await.unwrap(),
2976 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
2977 );
2978
2979 // Tracking the first room yet again should move it to the front of the list
2980 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2981 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2982 assert_eq!(
2983 account.get_recently_visited_rooms().await.unwrap(),
2984 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
2985 );
2986
2987 // Tracking should be capped at 20
2988 for n in 0..20 {
2989 account
2990 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
2991 .await
2992 .unwrap();
2993 }
2994
2995 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
2996
2997 // And the initial rooms should've been pushed out
2998 let rooms = account.get_recently_visited_rooms().await.unwrap();
2999 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3000 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3001
3002 // And the last tracked room should be the first
3003 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3004 }
3005
3006 #[async_test]
3007 async fn test_client_no_cycle_with_event_cache() {
3008 let client = logged_in_client(None).await;
3009
3010 // Wait for the init tasks to die.
3011 sleep(Duration::from_secs(1)).await;
3012
3013 let weak_client = WeakClient::from_client(&client);
3014 assert_eq!(weak_client.strong_count(), 1);
3015
3016 {
3017 let room_id = room_id!("!room:example.org");
3018
3019 // Have the client know the room.
3020 let response = SyncResponseBuilder::default()
3021 .add_joined_room(JoinedRoomBuilder::new(room_id))
3022 .build_sync_response();
3023 client.inner.base_client.receive_sync_response(response).await.unwrap();
3024
3025 client.event_cache().subscribe().unwrap();
3026
3027 let (_room_event_cache, _drop_handles) =
3028 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3029 }
3030
3031 drop(client);
3032
3033 // Give a bit of time for background tasks to die.
3034 sleep(Duration::from_secs(1)).await;
3035
3036 // The weak client must be the last reference to the client now.
3037 assert_eq!(weak_client.strong_count(), 0);
3038 let client = weak_client.get();
3039 assert!(
3040 client.is_none(),
3041 "too many strong references to the client: {}",
3042 Arc::strong_count(&client.unwrap().inner)
3043 );
3044 }
3045
3046 #[async_test]
3047 async fn test_server_capabilities_caching() {
3048 let server = MockServer::start().await;
3049 let server_url = server.uri();
3050 let domain = server_url.strip_prefix("http://").unwrap();
3051 let server_name = <&ServerName>::try_from(domain).unwrap();
3052
3053 Mock::given(method("GET"))
3054 .and(path("/.well-known/matrix/client"))
3055 .respond_with(ResponseTemplate::new(200).set_body_raw(
3056 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
3057 "application/json",
3058 ))
3059 .named("well known mock")
3060 .expect(2)
3061 .mount(&server)
3062 .await;
3063
3064 let versions_mock = Mock::given(method("GET"))
3065 .and(path("/_matrix/client/versions"))
3066 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3067 .named("first versions mock")
3068 .expect(1)
3069 .mount_as_scoped(&server)
3070 .await;
3071
3072 let memory_store = Arc::new(MemoryStore::new());
3073 let client = Client::builder()
3074 .insecure_server_name_no_tls(server_name)
3075 .store_config(
3076 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3077 .state_store(memory_store.clone()),
3078 )
3079 .build()
3080 .await
3081 .unwrap();
3082
3083 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3084
3085 // This second call hits the in-memory cache.
3086 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3087
3088 drop(client);
3089
3090 let client = Client::builder()
3091 .insecure_server_name_no_tls(server_name)
3092 .store_config(
3093 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3094 .state_store(memory_store.clone()),
3095 )
3096 .build()
3097 .await
3098 .unwrap();
3099
3100 // This third call hits the on-disk cache.
3101 assert_eq!(
3102 client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
3103 Some(&true)
3104 );
3105
3106 drop(versions_mock);
3107 server.verify().await;
3108
3109 // Now, reset the cache, and observe the endpoint being called again once.
3110 client.reset_server_capabilities().await.unwrap();
3111
3112 Mock::given(method("GET"))
3113 .and(path("/_matrix/client/versions"))
3114 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3115 .expect(1)
3116 .named("second versions mock")
3117 .mount(&server)
3118 .await;
3119
3120 // Hits network again.
3121 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3122 // Hits in-memory cache again.
3123 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3124 }
3125
3126 #[async_test]
3127 async fn test_no_network_doesnt_cause_infinite_retries() {
3128 // Note: not `no_retry_test_client` or `logged_in_client` which uses the former,
3129 // since we want infinite retries for transient errors.
3130 let client =
3131 test_client_builder(None).request_config(RequestConfig::new()).build().await.unwrap();
3132 set_client_session(&client).await;
3133
3134 // We don't define a mock server on purpose here, so that the error is really a
3135 // network error.
3136 client.whoami().await.unwrap_err();
3137 }
3138
3139 #[async_test]
3140 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3141 let (client_builder, server) = test_client_builder_with_server().await;
3142 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3143 set_client_session(&client).await;
3144
3145 let builder = Mock::given(method("GET"))
3146 .and(path("/_matrix/client/r0/sync"))
3147 .and(header("authorization", "Bearer 1234"))
3148 .and(query_param_is_missing("since"));
3149
3150 let room_id = room_id!("!room:example.org");
3151 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3152 let mut sync_response_builder = SyncResponseBuilder::new();
3153 sync_response_builder.add_joined_room(joined_room_builder);
3154 let response_body = sync_response_builder.build_json_sync_response();
3155
3156 builder
3157 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3158 .mount(&server)
3159 .await;
3160
3161 client.sync_once(SyncSettings::default()).await.unwrap();
3162
3163 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3164 assert_eq!(room.room_id(), room_id);
3165 }
3166
3167 #[async_test]
3168 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3169 let (client_builder, server) = test_client_builder_with_server().await;
3170 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3171 set_client_session(&client).await;
3172
3173 let builder = Mock::given(method("GET"))
3174 .and(path("/_matrix/client/r0/sync"))
3175 .and(header("authorization", "Bearer 1234"))
3176 .and(query_param_is_missing("since"));
3177
3178 let room_id = room_id!("!room:example.org");
3179 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3180 let mut sync_response_builder = SyncResponseBuilder::new();
3181 sync_response_builder.add_joined_room(joined_room_builder);
3182 let response_body = sync_response_builder.build_json_sync_response();
3183
3184 builder
3185 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3186 .mount(&server)
3187 .await;
3188
3189 let client = Arc::new(client);
3190
3191 // Perform the /sync request with a delay so it starts after the
3192 // `await_room_remote_echo` call has happened
3193 spawn({
3194 let client = client.clone();
3195 async move {
3196 sleep(Duration::from_millis(100)).await;
3197 client.sync_once(SyncSettings::default()).await.unwrap();
3198 }
3199 });
3200
3201 let room =
3202 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3203 assert_eq!(room.room_id(), room_id);
3204 }
3205
3206 #[async_test]
3207 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3208 let (client_builder, _) = test_client_builder_with_server().await;
3209 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3210 set_client_session(&client).await;
3211
3212 let room_id = room_id!("!room:example.org");
3213 // Room is not present so the client won't be able to find it. The call will
3214 // timeout.
3215 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3216 }
3217
3218 #[async_test]
3219 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3220 let (client_builder, server) = test_client_builder_with_server().await;
3221 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3222 set_client_session(&client).await;
3223
3224 Mock::given(method("POST"))
3225 .and(path("_matrix/client/r0/createRoom"))
3226 .and(header("authorization", "Bearer 1234"))
3227 .respond_with(
3228 ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!room:example.org"})),
3229 )
3230 .mount(&server)
3231 .await;
3232
3233 // Create a room in the internal store
3234 let room = client
3235 .create_room(assign!(CreateRoomRequest::new(), {
3236 invite: vec![],
3237 is_direct: false,
3238 }))
3239 .await
3240 .unwrap();
3241
3242 // Room is locally present, but not synced, the call will timeout
3243 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3244 .await
3245 .unwrap_err();
3246 }
3247
3248 #[async_test]
3249 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3250 let server = MatrixMockServer::new().await;
3251 let client = server.client_builder().build().await;
3252
3253 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3254
3255 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3256 assert_matches!(ret, Ok(true));
3257 }
3258
3259 #[async_test]
3260 async fn test_is_room_alias_available_if_alias_is_resolved() {
3261 let server = MatrixMockServer::new().await;
3262 let client = server.client_builder().build().await;
3263
3264 server
3265 .mock_room_directory_resolve_alias()
3266 .ok("!some_room_id:matrix.org", Vec::new())
3267 .expect(1)
3268 .mount()
3269 .await;
3270
3271 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3272 assert_matches!(ret, Ok(false));
3273 }
3274
3275 #[async_test]
3276 async fn test_is_room_alias_available_if_error_found() {
3277 let server = MatrixMockServer::new().await;
3278 let client = server.client_builder().build().await;
3279
3280 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3281
3282 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3283 assert_matches!(ret, Err(_));
3284 }
3285
3286 #[async_test]
3287 async fn test_create_room_alias() {
3288 let server = MatrixMockServer::new().await;
3289 let client = server.client_builder().build().await;
3290
3291 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3292
3293 let ret = client
3294 .create_room_alias(
3295 room_alias_id!("#some_alias:matrix.org"),
3296 room_id!("!some_room:matrix.org"),
3297 )
3298 .await;
3299 assert_matches!(ret, Ok(()));
3300 }
3301
3302 #[async_test]
3303 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3304 let server = MatrixMockServer::new().await;
3305 let client = server.client_builder().build().await;
3306
3307 let room_id = room_id!("!a-room:matrix.org");
3308
3309 // Make sure the summary endpoint is called once
3310 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3311
3312 // We create a locally cached invited room
3313 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3314
3315 // And we get a preview, the server endpoint was reached
3316 let preview = client
3317 .get_room_preview(room_id.into(), Vec::new())
3318 .await
3319 .expect("Room preview should be retrieved");
3320
3321 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3322 }
3323
3324 #[async_test]
3325 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3326 let server = MatrixMockServer::new().await;
3327 let client = server.client_builder().build().await;
3328
3329 let room_id = room_id!("!a-room:matrix.org");
3330
3331 // Make sure the summary endpoint is called once
3332 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3333
3334 // We create a locally cached left room
3335 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3336
3337 // And we get a preview, the server endpoint was reached
3338 let preview = client
3339 .get_room_preview(room_id.into(), Vec::new())
3340 .await
3341 .expect("Room preview should be retrieved");
3342
3343 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3344 }
3345
3346 #[async_test]
3347 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3348 let server = MatrixMockServer::new().await;
3349 let client = server.client_builder().build().await;
3350
3351 let room_id = room_id!("!a-room:matrix.org");
3352
3353 // Make sure the summary endpoint is called once
3354 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3355
3356 // We create a locally cached knocked room
3357 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3358
3359 // And we get a preview, the server endpoint was reached
3360 let preview = client
3361 .get_room_preview(room_id.into(), Vec::new())
3362 .await
3363 .expect("Room preview should be retrieved");
3364
3365 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3366 }
3367
3368 #[async_test]
3369 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3370 let server = MatrixMockServer::new().await;
3371 let client = server.client_builder().build().await;
3372
3373 let room_id = room_id!("!a-room:matrix.org");
3374
3375 // Make sure the summary endpoint is not called
3376 server.mock_room_summary().ok(room_id).never().mount().await;
3377
3378 // We create a locally cached joined room
3379 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3380
3381 // And we get a preview, no server endpoint was reached
3382 let preview = client
3383 .get_room_preview(room_id.into(), Vec::new())
3384 .await
3385 .expect("Room preview should be retrieved");
3386
3387 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3388 }
3389
3390 #[async_test]
3391 async fn test_media_preview_config() {
3392 let server = MatrixMockServer::new().await;
3393 let client = server.client_builder().build().await;
3394
3395 server
3396 .mock_sync()
3397 .ok_and_run(&client, |builder| {
3398 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3399 "content": {
3400 "media_previews": "private",
3401 "invite_avatars": "off"
3402 },
3403 "type": "m.media_preview_config"
3404 })));
3405 })
3406 .await;
3407
3408 let (initial_value, stream) =
3409 client.account().observe_media_preview_config().await.unwrap();
3410
3411 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3412 assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3413 assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3414 pin_mut!(stream);
3415 assert_pending!(stream);
3416
3417 server
3418 .mock_sync()
3419 .ok_and_run(&client, |builder| {
3420 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3421 "content": {
3422 "media_previews": "off",
3423 "invite_avatars": "on"
3424 },
3425 "type": "m.media_preview_config"
3426 })));
3427 })
3428 .await;
3429
3430 assert_next_matches!(
3431 stream,
3432 MediaPreviewConfigEventContent {
3433 media_previews: MediaPreviews::Off,
3434 invite_avatars: InviteAvatars::On,
3435 ..
3436 }
3437 );
3438 assert_pending!(stream);
3439 }
3440
3441 #[async_test]
3442 async fn test_unstable_media_preview_config() {
3443 let server = MatrixMockServer::new().await;
3444 let client = server.client_builder().build().await;
3445
3446 server
3447 .mock_sync()
3448 .ok_and_run(&client, |builder| {
3449 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3450 "content": {
3451 "media_previews": "private",
3452 "invite_avatars": "off"
3453 },
3454 "type": "io.element.msc4278.media_preview_config"
3455 })));
3456 })
3457 .await;
3458
3459 let (initial_value, stream) =
3460 client.account().observe_media_preview_config().await.unwrap();
3461
3462 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3463 assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3464 assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3465 pin_mut!(stream);
3466 assert_pending!(stream);
3467
3468 server
3469 .mock_sync()
3470 .ok_and_run(&client, |builder| {
3471 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3472 "content": {
3473 "media_previews": "off",
3474 "invite_avatars": "on"
3475 },
3476 "type": "io.element.msc4278.media_preview_config"
3477 })));
3478 })
3479 .await;
3480
3481 assert_next_matches!(
3482 stream,
3483 MediaPreviewConfigEventContent {
3484 media_previews: MediaPreviews::Off,
3485 invite_avatars: InviteAvatars::On,
3486 ..
3487 }
3488 );
3489 assert_pending!(stream);
3490 }
3491
3492 #[async_test]
3493 async fn test_media_preview_config_not_found() {
3494 let server = MatrixMockServer::new().await;
3495 let client = server.client_builder().build().await;
3496
3497 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3498
3499 assert!(initial_value.is_none());
3500 }
3501
3502 #[async_test]
3503 async fn test_load_or_fetch_max_upload_size() {
3504 let server = MatrixMockServer::new().await;
3505 let client = server.client_builder().build().await;
3506
3507 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3508
3509 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3510 client.load_or_fetch_max_upload_size().await.unwrap();
3511
3512 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3513 }
3514
3515 #[async_test]
3516 async fn test_uploading_a_too_large_media_file() {
3517 let server = MatrixMockServer::new().await;
3518 let client = server.client_builder().build().await;
3519
3520 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3521 client.load_or_fetch_max_upload_size().await.unwrap();
3522 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3523
3524 let data = vec![1, 2];
3525 let upload_request =
3526 ruma::api::client::media::create_content::v3::Request::new(data.clone());
3527 let request = SendRequest {
3528 client: client.clone(),
3529 request: upload_request,
3530 config: None,
3531 send_progress: SharedObservable::new(TransmissionProgress::default()),
3532 };
3533 let media_request = SendMediaUploadRequest::new(request);
3534
3535 let error = media_request.await.err();
3536 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
3537 assert_eq!(max, uint!(1));
3538 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
3539 }
3540}