matrix_sdk_ui/room_list_service/room_list.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{future::ready, sync::Arc};
16
17use async_cell::sync::AsyncCell;
18use async_rx::StreamExt as _;
19use async_stream::stream;
20use eyeball::{SharedObservable, Subscriber};
21use eyeball_im::{Vector, VectorDiff};
22use eyeball_im_util::vector::VectorObserverExt;
23use futures_util::{pin_mut, stream, Stream, StreamExt as _};
24use matrix_sdk::{
25 executor::{spawn, JoinHandle},
26 Client, SlidingSync, SlidingSyncList,
27};
28use matrix_sdk_base::RoomInfoNotableUpdate;
29use tokio::{
30 select,
31 sync::broadcast::{self, error::RecvError},
32};
33use tracing::{error, trace};
34
35use super::{
36 filters::BoxedFilterFn,
37 sorters::{new_sorter_lexicographic, new_sorter_name, new_sorter_recency},
38 Error, Room, State,
39};
40
41/// A `RoomList` represents a list of rooms, from a
42/// [`RoomListService`](super::RoomListService).
43#[derive(Debug)]
44pub struct RoomList {
45 client: Client,
46 sliding_sync_list: SlidingSyncList,
47 loading_state: SharedObservable<RoomListLoadingState>,
48 loading_state_task: JoinHandle<()>,
49}
50
51impl Drop for RoomList {
52 fn drop(&mut self) {
53 self.loading_state_task.abort();
54 }
55}
56
57impl RoomList {
58 pub(super) async fn new(
59 client: &Client,
60 sliding_sync: &Arc<SlidingSync>,
61 sliding_sync_list_name: &str,
62 room_list_service_state: Subscriber<State>,
63 ) -> Result<Self, Error> {
64 let sliding_sync_list = sliding_sync
65 .on_list(sliding_sync_list_name, |list| ready(list.clone()))
66 .await
67 .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
68
69 let loading_state =
70 SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
71 Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
72 maximum_number_of_rooms: Some(maximum_number_of_rooms),
73 },
74 None => RoomListLoadingState::NotLoaded,
75 });
76
77 Ok(Self {
78 client: client.clone(),
79 sliding_sync_list: sliding_sync_list.clone(),
80 loading_state: loading_state.clone(),
81 loading_state_task: spawn(async move {
82 pin_mut!(room_list_service_state);
83
84 // As soon as `RoomListService` changes its state, if it isn't
85 // `Terminated` nor `Error`, we know we have fetched something,
86 // so the room list is loaded.
87 while let Some(state) = room_list_service_state.next().await {
88 use State::*;
89
90 match state {
91 Terminated { .. } | Error { .. } | Init => (),
92 SettingUp | Recovering | Running => break,
93 }
94 }
95
96 // Let's jump from `NotLoaded` to `Loaded`.
97 let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
98
99 loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
100
101 // Wait for updates on the maximum number of rooms to update again.
102 let mut maximum_number_of_rooms_stream =
103 sliding_sync_list.maximum_number_of_rooms_stream();
104
105 while let Some(maximum_number_of_rooms) =
106 maximum_number_of_rooms_stream.next().await
107 {
108 loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
109 }
110 }),
111 })
112 }
113
114 /// Get a subscriber to the room list loading state.
115 ///
116 /// This method will send out the current loading state as the first update.
117 pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
118 self.loading_state.subscribe_reset()
119 }
120
121 /// Get a stream of rooms.
122 fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
123 self.client.rooms_stream()
124 }
125
126 /// Get a configurable stream of rooms.
127 ///
128 /// It's possible to provide a filter that will filter out room list
129 /// entries, and that it's also possible to “paginate” over the entries by
130 /// `page_size`. The rooms are also sorted.
131 ///
132 /// The returned stream will only start yielding diffs once a filter is set
133 /// through the returned [`RoomListDynamicEntriesController`]. For every
134 /// call to [`RoomListDynamicEntriesController::set_filter`], the stream
135 /// will yield a [`VectorDiff::Reset`] followed by any updates of the
136 /// room list under that filter (until the next reset).
137 pub fn entries_with_dynamic_adapters(
138 &self,
139 page_size: usize,
140 ) -> (impl Stream<Item = Vec<VectorDiff<Room>>> + '_, RoomListDynamicEntriesController) {
141 let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
142 let list = self.sliding_sync_list.clone();
143
144 let filter_fn_cell = AsyncCell::shared();
145
146 let limit = SharedObservable::<usize>::new(page_size);
147 let limit_stream = limit.subscribe();
148
149 let dynamic_entries_controller = RoomListDynamicEntriesController::new(
150 filter_fn_cell.clone(),
151 page_size,
152 limit,
153 list.maximum_number_of_rooms_stream(),
154 );
155
156 let stream = stream! {
157 loop {
158 let filter_fn = filter_fn_cell.take().await;
159
160 let (raw_values, raw_stream) = self.entries();
161
162 // Combine normal stream events with other updates from rooms
163 let merged_streams = merge_stream_and_receiver(raw_values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
164
165 let (values, stream) = (raw_values, merged_streams)
166 .filter(filter_fn)
167 .sort_by(new_sorter_lexicographic(vec![
168 Box::new(new_sorter_recency()),
169 Box::new(new_sorter_name())
170 ]))
171 .dynamic_head_with_initial_value(page_size, limit_stream.clone());
172
173 // Clearing the stream before chaining with the real stream.
174 yield stream::once(ready(vec![VectorDiff::Reset { values }]))
175 .chain(stream);
176 }
177 }
178 .switch();
179
180 (stream, dynamic_entries_controller)
181 }
182}
183
184/// This function remembers the current state of the unfiltered room list, so it
185/// knows where all rooms are. When the receiver is triggered, a Set operation
186/// for the room position is inserted to the stream.
187fn merge_stream_and_receiver(
188 mut raw_current_values: Vector<Room>,
189 raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
190 mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
191) -> impl Stream<Item = Vec<VectorDiff<Room>>> {
192 stream! {
193 pin_mut!(raw_stream);
194
195 loop {
196 select! {
197 // We want to give priority on updates from `raw_stream` as it will necessarily trigger a “refresh” of the rooms.
198 biased;
199
200 diffs = raw_stream.next() => {
201 if let Some(diffs) = diffs {
202 for diff in &diffs {
203 diff.clone().map(|room| {
204 trace!(room = %room.room_id(), "updated in response");
205 room
206 }).apply(&mut raw_current_values);
207 }
208
209 yield diffs;
210 } else {
211 // Restart immediately, don't keep on waiting for the receiver
212 break;
213 }
214 }
215
216 update = room_info_notable_update_receiver.recv() => {
217 match update {
218 Ok(update) => {
219 // Emit a `VectorDiff::Set` for the specific rooms.
220 if let Some(index) = raw_current_values.iter().position(|room| room.room_id() == update.room_id) {
221 let room = &raw_current_values[index];
222 let update = VectorDiff::Set { index, value: room.clone() };
223 yield vec![update];
224 }
225 }
226
227 Err(RecvError::Closed) => {
228 error!("Cannot receive room info notable updates because the sender has been closed");
229
230 break;
231 }
232
233 Err(RecvError::Lagged(n)) => {
234 error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
235 }
236 }
237 }
238 }
239 }
240 }
241}
242
243/// The loading state of a [`RoomList`].
244///
245/// When a [`RoomList`] is displayed to the user, it can be in various states.
246/// This enum tries to represent those states with a correct level of
247/// abstraction.
248#[derive(Clone, Debug, PartialEq, Eq)]
249pub enum RoomListLoadingState {
250 /// The [`RoomList`] has not been loaded yet, i.e. a sync might run
251 /// or not run at all, there is nothing to show in this `RoomList` yet.
252 /// It's a good opportunity to show a placeholder to the user.
253 ///
254 /// From [`Self::NotLoaded`], it's only possible to move to
255 /// [`Self::Loaded`].
256 NotLoaded,
257
258 /// The [`RoomList`] has been loaded, i.e. a sync has been run, or more
259 /// syncs are running, there is probably something to show to the user.
260 /// Either the user has 0 room, in this case, it's a good opportunity to
261 /// show a special screen for that, or the user has multiple rooms, and it's
262 /// the classical room list.
263 ///
264 /// The number of rooms is represented by `maximum_number_of_rooms`.
265 ///
266 /// From [`Self::Loaded`], it's not possible to move back to
267 /// [`Self::NotLoaded`].
268 Loaded {
269 /// The maximum number of rooms a [`RoomList`] contains.
270 ///
271 /// It does not mean that there are exactly this many rooms to display.
272 /// Usually, the room entries are represented by [`Room`]. The room
273 /// entry might have been synced or not synced yet, but we know for sure
274 /// (from the server), that there will be this amount of rooms in the
275 /// list at the end.
276 ///
277 /// Note that it's an `Option`, because it may be possible that the
278 /// server did miss to send us this value. It's up to you, dear reader,
279 /// to know which default to adopt in case of `None`.
280 maximum_number_of_rooms: Option<u32>,
281 },
282}
283
284/// Controller for the [`RoomList`] dynamic entries.
285///
286/// To get one value of this type, use
287/// [`RoomList::entries_with_dynamic_adapters`]
288pub struct RoomListDynamicEntriesController {
289 filter: Arc<AsyncCell<BoxedFilterFn>>,
290 page_size: usize,
291 limit: SharedObservable<usize>,
292 maximum_number_of_rooms: Subscriber<Option<u32>>,
293}
294
295impl RoomListDynamicEntriesController {
296 fn new(
297 filter: Arc<AsyncCell<BoxedFilterFn>>,
298 page_size: usize,
299 limit_stream: SharedObservable<usize>,
300 maximum_number_of_rooms: Subscriber<Option<u32>>,
301 ) -> Self {
302 Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
303 }
304
305 /// Set the filter.
306 ///
307 /// If the associated stream has been dropped, returns `false` to indicate
308 /// the operation didn't have an effect.
309 pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
310 if Arc::strong_count(&self.filter) == 1 {
311 // there is no other reference to the boxed filter fn, setting it
312 // would be pointless (no new references can be created from self,
313 // either)
314 false
315 } else {
316 self.filter.set(filter);
317 true
318 }
319 }
320
321 /// Add one page, i.e. view `page_size` more entries in the room list if
322 /// any.
323 pub fn add_one_page(&self) {
324 let Some(max) = self.maximum_number_of_rooms.get() else {
325 return;
326 };
327
328 let max: usize = max.try_into().unwrap();
329 let limit = self.limit.get();
330
331 if limit < max {
332 // With this logic, it is possible that `limit` becomes greater than `max` if
333 // `max - limit < page_size`, and that's perfectly fine. It's OK to have a
334 // `limit` greater than `max`, but it's not OK to increase the limit
335 // indefinitely.
336 self.limit.set_if_not_eq(limit + self.page_size);
337 }
338 }
339
340 /// Reset the one page, i.e. forget all pages and move back to the first
341 /// page.
342 pub fn reset_to_one_page(&self) {
343 self.limit.set_if_not_eq(self.page_size);
344 }
345}