matrix_sdk_ui/room_list_service/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! `RoomListService` API.
16//!
17//! The `RoomListService` is a UI API dedicated to present a list of Matrix
18//! rooms to the user. The syncing is handled by [`SlidingSync`]. The idea is to
19//! expose a simple API to handle most of the client app use cases, like:
20//! Showing and updating a list of rooms, filtering a list of rooms, handling
21//! particular updates of a range of rooms (the ones the client app is showing
22//! to the view, i.e. the rooms present in the viewport) etc.
23//!
24//! As such, the `RoomListService` works as an opinionated state machine. The
25//! states are defined by [`State`]. Actions are attached to the each state
26//! transition.
27//!
28//! The API is purposely small. Sliding Sync is versatile. `RoomListService` is
29//! _one_ specific usage of Sliding Sync.
30//!
31//! # Basic principle
32//!
33//! `RoomListService` works with 1 Sliding Sync List:
34//!
35//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the only
36//!   list. Its goal is to load all the user' rooms. It starts with a
37//!   [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small
38//!   set of rooms) to load the first rooms quickly, and then updates to a
39//!   [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the
40//!   background”: it will sync the existing rooms and will fetch new rooms, by
41//!   a certain batch size.
42//!
43//! This behavior has proven to be empirically satisfying to provide a fast and
44//! fluid user experience for a Matrix client.
45//!
46//! [`RoomListService::all_rooms`] provides a way to get a [`RoomList`] for all
47//! the rooms. From that, calling [`RoomList::entries_with_dynamic_adapters`]
48//! provides a way to get a stream of rooms. This stream is sorted, can be
49//! filtered, and the filter can be changed over time.
50//!
51//! [`RoomListService::state`] provides a way to get a stream of the state
52//! machine's state, which can be pretty helpful for the client app.
53
54pub mod filters;
55mod room_list;
56pub mod sorters;
57mod state;
58
59use std::{sync::Arc, time::Duration};
60
61use async_stream::stream;
62use eyeball::Subscriber;
63use futures_util::{pin_mut, Stream, StreamExt};
64use matrix_sdk::{
65    event_cache::EventCacheError, timeout::timeout, Client, Error as SlidingSyncError, Room,
66    SlidingSync, SlidingSyncList, SlidingSyncMode,
67};
68pub use room_list::*;
69use ruma::{
70    api::client::sync::sync_events::v5 as http, assign, directory::RoomTypeFilter,
71    events::StateEventType, OwnedRoomId, RoomId, UInt,
72};
73pub use state::*;
74use thiserror::Error;
75use tracing::{debug, error};
76
77/// The default `required_state` constant value for sliding sync lists and
78/// sliding sync room subscriptions.
79const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
80    (StateEventType::RoomName, ""),
81    (StateEventType::RoomEncryption, ""),
82    (StateEventType::RoomMember, "$LAZY"),
83    (StateEventType::RoomMember, "$ME"),
84    (StateEventType::RoomTopic, ""),
85    // Temporary workaround for https://github.com/matrix-org/matrix-rust-sdk/issues/5285
86    (StateEventType::RoomAvatar, ""),
87    (StateEventType::RoomCanonicalAlias, ""),
88    (StateEventType::RoomPowerLevels, ""),
89    (StateEventType::CallMember, "*"),
90    (StateEventType::RoomJoinRules, ""),
91    (StateEventType::RoomTombstone, ""),
92    // Those two events are required to properly compute room previews.
93    // `StateEventType::RoomCreate` is also necessary to compute the room
94    // version, and thus handling the tombstoned room correctly.
95    (StateEventType::RoomCreate, ""),
96    (StateEventType::RoomHistoryVisibility, ""),
97    // Required to correctly calculate the room display name.
98    (StateEventType::MemberHints, ""),
99];
100
101/// The default `required_state` constant value for sliding sync room
102/// subscriptions that must be added to `DEFAULT_REQUIRED_STATE`.
103const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
104    &[(StateEventType::RoomPinnedEvents, "")];
105
106/// The default `timeline_limit` value when used with room subscriptions.
107const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
108
109/// The [`RoomListService`] type. See the module's documentation to learn more.
110#[derive(Debug)]
111pub struct RoomListService {
112    /// Client that has created this [`RoomListService`].
113    client: Client,
114
115    /// The Sliding Sync instance.
116    sliding_sync: Arc<SlidingSync>,
117
118    /// The current state of the `RoomListService`.
119    ///
120    /// `RoomListService` is a simple state-machine.
121    state_machine: StateMachine,
122}
123
124impl RoomListService {
125    /// Create a new `RoomList`.
126    ///
127    /// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list
128    /// already pre-configured.
129    ///
130    /// This won't start an encryption sync, and it's the user's responsibility
131    /// to create one in this case using
132    /// [`EncryptionSyncService`][crate::encryption_sync_service::EncryptionSyncService].
133    pub async fn new(client: Client) -> Result<Self, Error> {
134        Self::new_with_share_pos(client, true).await
135    }
136
137    /// Like [`RoomListService::new`] but with a flag to turn the
138    /// [`SlidingSyncBuilder::share_pos`] on and off.
139    ///
140    /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
141    pub async fn new_with_share_pos(client: Client, share_pos: bool) -> Result<Self, Error> {
142        let mut builder = client
143            .sliding_sync("room-list")
144            .map_err(Error::SlidingSync)?
145            .with_account_data_extension(
146                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
147            )
148            .with_receipt_extension(assign!(http::request::Receipts::default(), {
149                enabled: Some(true),
150                rooms: Some(vec![http::request::ExtensionRoomConfig::AllSubscribed])
151            }))
152            .with_typing_extension(assign!(http::request::Typing::default(), {
153                enabled: Some(true),
154            }));
155
156        if share_pos {
157            // We don't deal with encryption device messages here so this is safe
158            builder = builder.share_pos();
159        }
160
161        let sliding_sync = builder
162            .add_cached_list(
163                SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
164                    .sync_mode(
165                        SlidingSyncMode::new_selective()
166                            .add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
167                    )
168                    .timeline_limit(1)
169                    .required_state(
170                        DEFAULT_REQUIRED_STATE
171                            .iter()
172                            .map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
173                            .collect(),
174                    )
175                    .filters(Some(assign!(http::request::ListFilters::default(), {
176                        // As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444)
177                        // If unset, both invited and joined rooms are returned. If false, no invited rooms are
178                        // returned. If true, only invited rooms are returned.
179                        is_invite: None,
180                        not_room_types: vec![RoomTypeFilter::Space],
181                    }))),
182            )
183            .await
184            .map_err(Error::SlidingSync)?
185            .build()
186            .await
187            .map(Arc::new)
188            .map_err(Error::SlidingSync)?;
189
190        // Eagerly subscribe the event cache to sync responses.
191        client.event_cache().subscribe()?;
192
193        let state_machine = StateMachine::new();
194
195        // If the sliding sync has successfully restored a sync position, skip the
196        // waiting for the initial sync, and set the state to `SettingUp`; this
197        // way, the first sync will move us to the steady state, and update the
198        // sliding sync list to use the growing sync mode.
199        if sliding_sync.has_pos().await {
200            state_machine.set(State::SettingUp);
201        }
202
203        Ok(Self { client, sliding_sync, state_machine })
204    }
205
206    /// Start to sync the room list.
207    ///
208    /// It's the main method of this entire API. Calling `sync` allows to
209    /// receive updates on the room list: new rooms, rooms updates etc. Those
210    /// updates can be read with [`RoomList::entries`] for example. This method
211    /// returns a [`Stream`] where produced items only hold an empty value
212    /// in case of a sync success, otherwise an error.
213    ///
214    /// The `RoomListService`' state machine is run by this method.
215    ///
216    /// Stopping the [`Stream`] (i.e. by calling [`Self::stop_sync`]), and
217    /// calling [`Self::sync`] again will resume from the previous state of
218    /// the state machine.
219    ///
220    /// This should be used only for testing. In practice, most users should be
221    /// using the [`SyncService`] instead.
222    #[doc(hidden)]
223    pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
224        stream! {
225            let sync = self.sliding_sync.sync();
226            pin_mut!(sync);
227
228            // This is a state machine implementation.
229            // Things happen in this order:
230            //
231            // 1. The next state is calculated,
232            // 2. The actions associated to the next state are run,
233            // 3. A sync is done,
234            // 4. The next state is stored.
235            loop {
236                debug!("Run a sync iteration");
237
238                // Calculate the next state, and run the associated actions.
239                let next_state = self.state_machine.next(&self.sliding_sync).await?;
240
241                // Do the sync.
242                match sync.next().await {
243                    // Got a successful result while syncing.
244                    Some(Ok(_update_summary)) => {
245                        debug!(state = ?next_state, "New state");
246
247                        // Update the state.
248                        self.state_machine.set(next_state);
249
250                        yield Ok(());
251                    }
252
253                    // Got an error while syncing.
254                    Some(Err(error)) => {
255                        debug!(expected_state = ?next_state, "New state is an error");
256
257                        let next_state = State::Error { from: Box::new(next_state) };
258                        self.state_machine.set(next_state);
259
260                        yield Err(Error::SlidingSync(error));
261
262                        break;
263                    }
264
265                    // Sync loop has terminated.
266                    None => {
267                        debug!(expected_state = ?next_state, "New state is a termination");
268
269                        let next_state = State::Terminated { from: Box::new(next_state) };
270                        self.state_machine.set(next_state);
271
272                        break;
273                    }
274                }
275            }
276        }
277    }
278
279    /// Force to stop the sync of the `RoomListService` started by
280    /// [`Self::sync`].
281    ///
282    /// It's of utter importance to call this method rather than stop polling
283    /// the `Stream` returned by [`Self::sync`] because it will force the
284    /// cancellation and exit the sync loop, i.e. it will cancel any
285    /// in-flight HTTP requests, cancel any pending futures etc. and put the
286    /// service into a termination state.
287    ///
288    /// Ideally, one wants to consume the `Stream` returned by [`Self::sync`]
289    /// until it returns `None`, because of [`Self::stop_sync`], so that it
290    /// ensures the states are correctly placed.
291    ///
292    /// Stopping the sync of the room list via this method will put the
293    /// state-machine into the [`State::Terminated`] state.
294    ///
295    /// This should be used only for testing. In practice, most users should be
296    /// using the [`SyncService`] instead.
297    #[doc(hidden)]
298    pub fn stop_sync(&self) -> Result<(), Error> {
299        self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
300    }
301
302    /// Force the sliding sync session to expire.
303    ///
304    /// This is used by [`SyncService`][crate::SyncService].
305    ///
306    /// **Warning**: This method **must not** be called while the sync loop is
307    /// running!
308    pub(crate) async fn expire_sync_session(&self) {
309        self.sliding_sync.expire_session().await;
310
311        // Usually, when the session expires, it leads the state to be `Error`,
312        // thus some actions (like refreshing the lists) are executed. However,
313        // if the sync loop has been stopped manually, the state is `Terminated`, and
314        // when the session is forced to expire, the state remains `Terminated`, thus
315        // the actions aren't executed as expected. Consequently, let's update the
316        // state.
317        if let State::Terminated { from } = self.state_machine.get() {
318            self.state_machine.set(State::Error { from });
319        }
320    }
321
322    /// Get a [`Stream`] of [`SyncIndicator`].
323    ///
324    /// Read the documentation of [`SyncIndicator`] to learn more about it.
325    pub fn sync_indicator(
326        &self,
327        delay_before_showing: Duration,
328        delay_before_hiding: Duration,
329    ) -> impl Stream<Item = SyncIndicator> {
330        let mut state = self.state();
331
332        stream! {
333            // Ensure the `SyncIndicator` is always hidden to start with.
334            yield SyncIndicator::Hide;
335
336            // Let's not wait for an update to happen. The `SyncIndicator` must be
337            // computed as fast as possible.
338            let mut current_state = state.next_now();
339
340            loop {
341                let (sync_indicator, yield_delay) = match current_state {
342                    State::Init | State::SettingUp | State::Error { .. } => {
343                        (SyncIndicator::Show, delay_before_showing)
344                    }
345
346                    State::Recovering | State::Running | State::Terminated { .. } => {
347                        (SyncIndicator::Hide, delay_before_hiding)
348                    }
349                };
350
351                // `state.next().await` has a maximum of `yield_delay` time to execute…
352                let next_state = match timeout(state.next(), yield_delay).await {
353                    // A new state has been received before `yield_delay` time. The new
354                    // `sync_indicator` value won't be yielded.
355                    Ok(next_state) => next_state,
356
357                    // No new state has been received before `yield_delay` time. The
358                    // `sync_indicator` value can be yielded.
359                    Err(_) => {
360                        yield sync_indicator;
361
362                        // Now that `sync_indicator` has been yielded, let's wait on
363                        // the next state again.
364                        state.next().await
365                    }
366                };
367
368                if let Some(next_state) = next_state {
369                    // Update the `current_state`.
370                    current_state = next_state;
371                } else {
372                    // Something is broken with the state. Let's stop this stream too.
373                    break;
374                }
375            }
376        }
377    }
378
379    /// Get the [`Client`] that has been used to create [`Self`].
380    pub fn client(&self) -> &Client {
381        &self.client
382    }
383
384    /// Get a subscriber to the state.
385    pub fn state(&self) -> Subscriber<State> {
386        self.state_machine.subscribe()
387    }
388
389    async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
390        RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
391    }
392
393    /// Get a [`RoomList`] for all rooms.
394    pub async fn all_rooms(&self) -> Result<RoomList, Error> {
395        self.list_for(ALL_ROOMS_LIST_NAME).await
396    }
397
398    /// Get a [`Room`] if it exists.
399    pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
400        self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))
401    }
402
403    /// Subscribe to rooms.
404    ///
405    /// It means that all events from these rooms will be received every time,
406    /// no matter how the `RoomList` is configured.
407    ///
408    /// [`LatestEvents::listen_to_room`][listen_to_room] will be called for each
409    /// room in `room_ids`, so that the [`LatestEventValue`] will automatically
410    /// be calculated and updated for these rooms, for free.
411    ///
412    /// [listen_to_room]: matrix_sdk::latest_events::LatestEvents::listen_to_room
413    /// [`LatestEventValue`]: matrix_sdk::latest_events::LatestEventValue
414    pub async fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
415        // Calculate the settings for the room subscriptions.
416        let settings = assign!(http::request::RoomSubscription::default(), {
417            required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
418                (state_event.clone(), (*value).to_owned())
419            })
420            .chain(
421                DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
422                    (state_event.clone(), (*value).to_owned())
423                })
424            )
425            .collect(),
426            timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
427        });
428
429        // Decide whether the in-flight request (if any) should be cancelled if needed.
430        let cancel_in_flight_request = match self.state_machine.get() {
431            State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
432                false
433            }
434            State::SettingUp | State::Running => true,
435        };
436
437        // Before subscribing, let's listen these rooms to calculate their latest
438        // events.
439        let latest_events = self.client.latest_events().await;
440
441        for room_id in room_ids {
442            if let Err(error) = latest_events.listen_to_room(room_id).await {
443                // Let's not fail the room subscription. Instead, emit a log because it's very
444                // unlikely to happen.
445                error!(?error, ?room_id, "Failed to listen to the latest event for this room");
446            }
447        }
448
449        // Subscribe to the rooms.
450        self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings), cancel_in_flight_request)
451    }
452
453    #[cfg(test)]
454    pub fn sliding_sync(&self) -> &SlidingSync {
455        &self.sliding_sync
456    }
457}
458
459/// [`RoomList`]'s errors.
460#[derive(Debug, Error)]
461pub enum Error {
462    /// Error from [`matrix_sdk::SlidingSync`].
463    #[error(transparent)]
464    SlidingSync(SlidingSyncError),
465
466    /// An operation has been requested on an unknown list.
467    #[error("Unknown list `{0}`")]
468    UnknownList(String),
469
470    /// The requested room doesn't exist.
471    #[error("Room `{0}` not found")]
472    RoomNotFound(OwnedRoomId),
473
474    #[error(transparent)]
475    EventCache(#[from] EventCacheError),
476}
477
478/// An hint whether a _sync spinner/loader/toaster_ should be prompted to the
479/// user, indicating that the [`RoomListService`] is syncing.
480///
481/// This is entirely arbitrary and optinionated. Of course, once
482/// [`RoomListService::sync`] has been called, it's going to be constantly
483/// syncing, until [`RoomListService::stop_sync`] is called, or until an error
484/// happened. But in some cases, it's better for the user experience to prompt
485/// to the user that a sync is happening. It's usually the first sync, or the
486/// recovering sync. However, the sync indicator must be prompted if the
487/// aforementioned sync is “slow”, otherwise the indicator is likely to “blink”
488/// pretty fast, which can be very confusing. It's also common to indicate to
489/// the user that a syncing is happening in case of a network error, that
490/// something is catching up etc.
491#[derive(Debug, Eq, PartialEq)]
492pub enum SyncIndicator {
493    /// Show the sync indicator.
494    Show,
495
496    /// Hide the sync indicator.
497    Hide,
498}
499
500#[cfg(test)]
501mod tests {
502    use std::future::ready;
503
504    use futures_util::{pin_mut, StreamExt};
505    use matrix_sdk::{
506        config::RequestConfig, test_utils::client::mock_matrix_session, Client, SlidingSyncMode,
507    };
508    use matrix_sdk_test::async_test;
509    use ruma::api::MatrixVersion;
510    use serde_json::json;
511    use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
512
513    use super::{Error, RoomListService, State, ALL_ROOMS_LIST_NAME};
514
515    async fn new_client() -> (Client, MockServer) {
516        let session = mock_matrix_session();
517
518        let server = MockServer::start().await;
519        let client = Client::builder()
520            .homeserver_url(server.uri())
521            .server_versions([MatrixVersion::V1_0])
522            .request_config(RequestConfig::new().disable_retry())
523            .build()
524            .await
525            .unwrap();
526        client.restore_session(session).await.unwrap();
527
528        (client, server)
529    }
530
531    pub(super) async fn new_room_list() -> Result<RoomListService, Error> {
532        let (client, _) = new_client().await;
533
534        RoomListService::new(client).await
535    }
536
537    struct SlidingSyncMatcher;
538
539    impl Match for SlidingSyncMatcher {
540        fn matches(&self, request: &Request) -> bool {
541            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
542                && request.method == Method::POST
543        }
544    }
545
546    #[async_test]
547    async fn test_all_rooms_are_declared() -> Result<(), Error> {
548        let room_list = new_room_list().await?;
549        let sliding_sync = room_list.sliding_sync();
550
551        // List is present, in Selective mode.
552        assert_eq!(
553            sliding_sync
554                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
555                    list.sync_mode(),
556                    SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
557                )))
558                .await,
559            Some(true)
560        );
561
562        Ok(())
563    }
564
565    #[async_test]
566    async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
567        let (client, server) = new_client().await;
568
569        let room_list = RoomListService::new(client).await?;
570
571        let sync = room_list.sync();
572        pin_mut!(sync);
573
574        // Run a first sync.
575        {
576            let _mock_guard = Mock::given(SlidingSyncMatcher)
577                .respond_with(move |_request: &Request| {
578                    ResponseTemplate::new(200).set_body_json(json!({
579                        "pos": "0",
580                        "lists": {
581                            ALL_ROOMS_LIST_NAME: {
582                                "count": 0,
583                                "ops": [],
584                            },
585                        },
586                        "rooms": {},
587                    }))
588                })
589                .mount_as_scoped(&server)
590                .await;
591
592            let _ = sync.next().await;
593        }
594
595        assert_eq!(room_list.state().get(), State::SettingUp);
596
597        // Stop the sync.
598        room_list.stop_sync()?;
599
600        // Do another sync.
601        let _ = sync.next().await;
602
603        // State is `Terminated`, as expected!
604        assert_eq!(
605            room_list.state_machine.get(),
606            State::Terminated { from: Box::new(State::Running) }
607        );
608
609        // Now, let's make the sliding sync session to expire.
610        room_list.expire_sync_session().await;
611
612        // State is `Error`, as a regular session expiration would generate!
613        assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
614
615        Ok(())
616    }
617}