matrix_sdk_ui/room_list_service/
state.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! States and actions for the `RoomList` state machine.
16
17use std::{future::ready, sync::Mutex};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{sliding_sync::Range, SlidingSync, SlidingSyncMode};
21use ruma::time::{Duration, Instant};
22
23use super::Error;
24
25pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
26
27/// The state of the [`super::RoomList`].
28#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30    /// That's the first initial state.
31    Init,
32
33    /// At this state, the first rooms have been synced.
34    SettingUp,
35
36    /// At this state, the system is recovering from `Error` or `Terminated`, or
37    /// the time between the last sync was too long (see
38    /// `StateMachine::state_lifespan` to learn more). It's similar to
39    /// `SettingUp` but some lists may already exist, actions
40    /// are then slightly different.
41    Recovering,
42
43    /// At this state, all rooms are syncing.
44    Running,
45
46    /// At this state, the sync has been stopped because an error happened.
47    Error { from: Box<State> },
48
49    /// At this state, the sync has been stopped because it was requested.
50    Terminated { from: Box<State> },
51}
52
53/// Default value for `StateMachine::state_lifespan`.
54const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
55
56/// The state machine used to transition between the [`State`]s.
57#[derive(Debug)]
58pub struct StateMachine {
59    /// The current state of the `RoomListService`.
60    state: SharedObservable<State>,
61
62    /// Last time the state has been updated.
63    ///
64    /// When the state has not been updated since a long time, we want to enter
65    /// the [`State::Recovering`] state. Why do we need to do that? Because in
66    /// some cases, the user might have received many updates between two
67    /// distant syncs. If the sliding sync list range was too large, like
68    /// 0..=499, the next sync is likely to be heavy and potentially slow.
69    /// In this case, it's preferable to jump back onto `Recovering`, which will
70    /// reset the range, so that the next sync will be fast for the client.
71    ///
72    /// To be used in coordination with `Self::state_lifespan`.
73    ///
74    /// This mutex is only taken for short periods of time, so it's sync.
75    last_state_update_time: Mutex<Instant>,
76
77    /// The maximum time before considering the state as “too old”.
78    ///
79    /// To be used in coordination with `Self::last_state_update_time`.
80    state_lifespan: Duration,
81}
82
83impl StateMachine {
84    pub(super) fn new() -> Self {
85        StateMachine {
86            state: SharedObservable::new(State::Init),
87            last_state_update_time: Mutex::new(Instant::now()),
88            state_lifespan: DEFAULT_STATE_LIFESPAN,
89        }
90    }
91
92    /// Get the current state.
93    pub(super) fn get(&self) -> State {
94        self.state.get()
95    }
96
97    /// Set the new state.
98    ///
99    /// Setting a new state will update `Self::last_state_update`.
100    pub(super) fn set(&self, state: State) {
101        let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
102        *last_state_update_time = Instant::now();
103
104        self.state.set(state);
105    }
106
107    /// Subscribe to state updates.
108    pub fn subscribe(&self) -> Subscriber<State> {
109        self.state.subscribe()
110    }
111
112    /// Transition to the next state, and execute the necessary transition on
113    /// the sliding sync list.
114    pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
115        use State::*;
116
117        let next_state = match self.get() {
118            Init => SettingUp,
119
120            SettingUp | Recovering => {
121                set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
122                Running
123            }
124
125            Running => {
126                // We haven't changed the state for a while, we go back to `Recovering` to avoid
127                // requesting potentially large data. See `Self::last_state_update` to learn
128                // the details.
129                if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
130                    set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
131
132                    Recovering
133                } else {
134                    Running
135                }
136            }
137
138            Error { from: previous_state } | Terminated { from: previous_state } => {
139                match previous_state.as_ref() {
140                    // Unreachable state.
141                    Error { .. } | Terminated { .. } => {
142                        unreachable!(
143                            "It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
144                        );
145                    }
146
147                    // If the previous state was `Running`, we enter the `Recovering` state.
148                    Running => {
149                        set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
150                        Recovering
151                    }
152
153                    // Jump back to the previous state that led to this termination.
154                    state => state.to_owned(),
155                }
156            }
157        };
158
159        Ok(next_state)
160    }
161}
162
163async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
164    sliding_sync
165        .on_list(ALL_ROOMS_LIST_NAME, |list| {
166            list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
167
168            ready(())
169        })
170        .await
171        .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
172}
173
174async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
175    sliding_sync
176        .on_list(ALL_ROOMS_LIST_NAME, |list| {
177            list.set_sync_mode(
178                SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
179            );
180
181            ready(())
182        })
183        .await
184        .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
185}
186
187/// Default `batch_size` for the selective sync-mode of the
188/// `ALL_ROOMS_LIST_NAME` list.
189pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
190
191/// Default `batch_size` for the growing sync-mode of the `ALL_ROOMS_LIST_NAME`
192/// list.
193pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
194
195#[cfg(test)]
196mod tests {
197    use matrix_sdk_test::async_test;
198    use tokio::time::sleep;
199
200    use super::{super::tests::new_room_list, *};
201
202    #[async_test]
203    async fn test_states() -> Result<(), Error> {
204        let room_list = new_room_list().await?;
205        let sliding_sync = room_list.sliding_sync();
206
207        let state_machine = StateMachine::new();
208
209        // Hypothetical error.
210        {
211            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
212
213            // Back to the previous state.
214            state_machine.set(state_machine.next(sliding_sync).await?);
215            assert_eq!(state_machine.get(), State::Init);
216        }
217
218        // Hypothetical termination.
219        {
220            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
221
222            // Back to the previous state.
223            state_machine.set(state_machine.next(sliding_sync).await?);
224            assert_eq!(state_machine.get(), State::Init);
225        }
226
227        // Next state.
228        state_machine.set(state_machine.next(sliding_sync).await?);
229        assert_eq!(state_machine.get(), State::SettingUp);
230
231        // Hypothetical error.
232        {
233            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
234
235            // Back to the previous state.
236            state_machine.set(state_machine.next(sliding_sync).await?);
237            assert_eq!(state_machine.get(), State::SettingUp);
238        }
239
240        // Hypothetical termination.
241        {
242            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
243
244            // Back to the previous state.
245            state_machine.set(state_machine.next(sliding_sync).await?);
246            assert_eq!(state_machine.get(), State::SettingUp);
247        }
248
249        // Next state.
250        state_machine.set(state_machine.next(sliding_sync).await?);
251        assert_eq!(state_machine.get(), State::Running);
252
253        // Hypothetical error.
254        {
255            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
256
257            // Jump to the **recovering** state!
258            state_machine.set(state_machine.next(sliding_sync).await?);
259            assert_eq!(state_machine.get(), State::Recovering);
260
261            // Now, back to the previous state.
262            state_machine.set(state_machine.next(sliding_sync).await?);
263            assert_eq!(state_machine.get(), State::Running);
264        }
265
266        // Hypothetical termination.
267        {
268            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
269
270            // Jump to the **recovering** state!
271            state_machine.set(state_machine.next(sliding_sync).await?);
272            assert_eq!(state_machine.get(), State::Recovering);
273
274            // Now, back to the previous state.
275            state_machine.set(state_machine.next(sliding_sync).await?);
276            assert_eq!(state_machine.get(), State::Running);
277        }
278
279        // Hypothetical error when recovering.
280        {
281            state_machine.set(State::Error { from: Box::new(State::Recovering) });
282
283            // Back to the previous state.
284            state_machine.set(state_machine.next(sliding_sync).await?);
285            assert_eq!(state_machine.get(), State::Recovering);
286        }
287
288        // Hypothetical termination when recovering.
289        {
290            state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
291
292            // Back to the previous state.
293            state_machine.set(state_machine.next(sliding_sync).await?);
294            assert_eq!(state_machine.get(), State::Recovering);
295        }
296
297        Ok(())
298    }
299
300    #[async_test]
301    async fn test_recover_state_after_delay() -> Result<(), Error> {
302        let room_list = new_room_list().await?;
303        let sliding_sync = room_list.sliding_sync();
304
305        let mut state_machine = StateMachine::new();
306        state_machine.state_lifespan = Duration::from_millis(50);
307
308        {
309            state_machine.set(state_machine.next(sliding_sync).await?);
310            assert_eq!(state_machine.get(), State::SettingUp);
311
312            state_machine.set(state_machine.next(sliding_sync).await?);
313            assert_eq!(state_machine.get(), State::Running);
314
315            state_machine.set(state_machine.next(sliding_sync).await?);
316            assert_eq!(state_machine.get(), State::Running);
317
318            state_machine.set(state_machine.next(sliding_sync).await?);
319            assert_eq!(state_machine.get(), State::Running);
320        }
321
322        // Time passes.
323        sleep(Duration::from_millis(100)).await;
324
325        {
326            // Time has elapsed, time to recover.
327            state_machine.set(state_machine.next(sliding_sync).await?);
328            assert_eq!(state_machine.get(), State::Recovering);
329
330            state_machine.set(state_machine.next(sliding_sync).await?);
331            assert_eq!(state_machine.get(), State::Running);
332
333            state_machine.set(state_machine.next(sliding_sync).await?);
334            assert_eq!(state_machine.get(), State::Running);
335
336            state_machine.set(state_machine.next(sliding_sync).await?);
337            assert_eq!(state_machine.get(), State::Running);
338        }
339
340        // Time passes, again. Just to test everything is going well.
341        sleep(Duration::from_millis(100)).await;
342
343        {
344            // Time has elapsed, time to recover.
345            state_machine.set(state_machine.next(sliding_sync).await?);
346            assert_eq!(state_machine.get(), State::Recovering);
347
348            state_machine.set(state_machine.next(sliding_sync).await?);
349            assert_eq!(state_machine.get(), State::Running);
350
351            state_machine.set(state_machine.next(sliding_sync).await?);
352            assert_eq!(state_machine.get(), State::Running);
353
354            state_machine.set(state_machine.next(sliding_sync).await?);
355            assert_eq!(state_machine.get(), State::Running);
356        }
357
358        Ok(())
359    }
360
361    #[async_test]
362    async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
363    {
364        let room_list = new_room_list().await?;
365        let sliding_sync = room_list.sliding_sync();
366
367        // List is present, in Selective mode.
368        assert_eq!(
369            sliding_sync
370                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
371                    list.sync_mode(),
372                    SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
373                )))
374                .await,
375            Some(true)
376        );
377
378        // Run the action!
379        set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
380
381        // List is still present, in Growing mode.
382        assert_eq!(
383            sliding_sync
384                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
385                    list.sync_mode(),
386                    SlidingSyncMode::Growing {
387                        batch_size, ..
388                    } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
389                )))
390                .await,
391            Some(true)
392        );
393
394        // Run the other action!
395        set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
396
397        // List is still present, in Selective mode.
398        assert_eq!(
399            sliding_sync
400                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
401                    list.sync_mode(),
402                    SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
403                )))
404                .await,
405            Some(true)
406        );
407
408        Ok(())
409    }
410}