matrix_sdk_ui/timeline/
pagination.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{pin_mut, StreamExt as _};
19use matrix_sdk::event_cache::{self, EventCacheError, RoomPaginationStatus};
20use tracing::{instrument, warn};
21
22use super::Error;
23
24impl super::Timeline {
25    /// Add more events to the start of the timeline.
26    ///
27    /// Returns whether we hit the start of the timeline.
28    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
29    pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
30        if self.controller.is_live() {
31            match self.controller.live_lazy_paginate_backwards(num_events).await {
32                Some(needed_num_events) => {
33                    num_events = needed_num_events.try_into().expect(
34                        "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
35                    );
36                }
37                None => {
38                    // We could adjust the skip count to a lower value, while passing the requested
39                    // number of events. We *may* have reached the start of the timeline, but since
40                    // we're fulfilling the caller's request, assume it's not the case and return
41                    // false here. A subsequent call will go to the `Some()` arm of this match, and
42                    // cause a call to the event cache's pagination.
43                    return Ok(false);
44                }
45            }
46
47            Ok(self.live_paginate_backwards(num_events).await?)
48        } else if let Some(thread_root) = self.controller.thread_root() {
49            // Note: in the future (when the event cache implements persistent storage for
50            // threads), we might need to load the related events too here.
51            Ok(self.event_cache.paginate_thread_backwards(thread_root, num_events).await?)
52        } else {
53            Ok(self.controller.focused_paginate_backwards(num_events).await?)
54        }
55    }
56
57    /// Add more events to the end of the timeline.
58    ///
59    /// Returns whether we hit the end of the timeline.
60    #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
61    pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
62        if self.controller.is_live() {
63            Ok(true)
64        } else {
65            Ok(self.controller.focused_paginate_forwards(num_events).await?)
66        }
67    }
68
69    /// Paginate backwards in live mode.
70    ///
71    /// This can only be called when the timeline is in live mode, not focused
72    /// on a specific event.
73    ///
74    /// Returns whether we hit the start of the timeline.
75    async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
76        loop {
77            match self.event_cache.pagination().run_backwards_once(batch_size).await {
78                Ok(outcome) => {
79                    // As an exceptional contract, restart the back-pagination if we received an
80                    // empty chunk.
81                    if outcome.reached_start || !outcome.events.is_empty() {
82                        if outcome.reached_start {
83                            self.controller.insert_timeline_start_if_missing().await;
84                        }
85                        return Ok(outcome.reached_start);
86                    }
87                }
88
89                Err(EventCacheError::AlreadyBackpaginating) => {
90                    // Treat an already running pagination exceptionally, returning false so that
91                    // the caller retries later.
92                    warn!("Another pagination request is already happening, returning early");
93                    return Ok(false);
94                }
95
96                // Propagate other errors as such.
97                Err(err) => return Err(err),
98            }
99        }
100    }
101
102    /// Subscribe to the back-pagination status of a live timeline.
103    ///
104    /// This will return `None` if the timeline is in the focused mode.
105    ///
106    /// Note: this may send multiple Paginating/Idle sequences during a single
107    /// call to [`Self::paginate_backwards()`].
108    pub async fn live_back_pagination_status(
109        &self,
110    ) -> Option<(RoomPaginationStatus, impl Stream<Item = RoomPaginationStatus>)> {
111        if !self.controller.is_live() {
112            return None;
113        }
114
115        let pagination = self.event_cache.pagination();
116
117        let mut status = pagination.status();
118
119        let current_value = self.controller.map_pagination_status(status.next_now()).await;
120
121        let controller = self.controller.clone();
122        let stream = Box::pin(stream! {
123            let status_stream = status.dedup();
124
125            pin_mut!(status_stream);
126
127            while let Some(state) = status_stream.next().await {
128                let state = controller.map_pagination_status(state).await;
129
130                match state {
131                    RoomPaginationStatus::Idle { hit_timeline_start } => {
132                        if hit_timeline_start {
133                            controller.insert_timeline_start_if_missing().await;
134                        }
135                    }
136                    RoomPaginationStatus::Paginating => {}
137                }
138
139                yield state;
140            }
141        });
142
143        Some((current_value, stream))
144    }
145}