matrix_sdk_ui/timeline/pagination.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_rx::StreamExt as _;
16use async_stream::stream;
17use futures_core::Stream;
18use futures_util::{pin_mut, StreamExt as _};
19use matrix_sdk::event_cache::{self, EventCacheError, RoomPaginationStatus};
20use tracing::{instrument, warn};
21
22use super::Error;
23
24impl super::Timeline {
25 /// Add more events to the start of the timeline.
26 ///
27 /// Returns whether we hit the start of the timeline.
28 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
29 pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
30 if self.controller.is_live() {
31 match self.controller.live_lazy_paginate_backwards(num_events).await {
32 Some(needed_num_events) => {
33 num_events = needed_num_events.try_into().expect(
34 "failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
35 );
36 }
37 None => {
38 // We could adjust the skip count to a lower value, while passing the requested
39 // number of events. We *may* have reached the start of the timeline, but since
40 // we're fulfilling the caller's request, assume it's not the case and return
41 // false here. A subsequent call will go to the `Some()` arm of this match, and
42 // cause a call to the event cache's pagination.
43 return Ok(false);
44 }
45 }
46
47 Ok(self.live_paginate_backwards(num_events).await?)
48 } else if let Some(thread_root) = self.controller.thread_root() {
49 // Note: in the future (when the event cache implements persistent storage for
50 // threads), we might need to load the related events too here.
51 Ok(self.event_cache.paginate_thread_backwards(thread_root, num_events).await?)
52 } else {
53 Ok(self.controller.focused_paginate_backwards(num_events).await?)
54 }
55 }
56
57 /// Add more events to the end of the timeline.
58 ///
59 /// Returns whether we hit the end of the timeline.
60 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
61 pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
62 if self.controller.is_live() {
63 Ok(true)
64 } else {
65 Ok(self.controller.focused_paginate_forwards(num_events).await?)
66 }
67 }
68
69 /// Paginate backwards in live mode.
70 ///
71 /// This can only be called when the timeline is in live mode, not focused
72 /// on a specific event.
73 ///
74 /// Returns whether we hit the start of the timeline.
75 async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
76 loop {
77 match self.event_cache.pagination().run_backwards_once(batch_size).await {
78 Ok(outcome) => {
79 // As an exceptional contract, restart the back-pagination if we received an
80 // empty chunk.
81 if outcome.reached_start || !outcome.events.is_empty() {
82 if outcome.reached_start {
83 self.controller.insert_timeline_start_if_missing().await;
84 }
85 return Ok(outcome.reached_start);
86 }
87 }
88
89 Err(EventCacheError::AlreadyBackpaginating) => {
90 // Treat an already running pagination exceptionally, returning false so that
91 // the caller retries later.
92 warn!("Another pagination request is already happening, returning early");
93 return Ok(false);
94 }
95
96 // Propagate other errors as such.
97 Err(err) => return Err(err),
98 }
99 }
100 }
101
102 /// Subscribe to the back-pagination status of a live timeline.
103 ///
104 /// This will return `None` if the timeline is in the focused mode.
105 ///
106 /// Note: this may send multiple Paginating/Idle sequences during a single
107 /// call to [`Self::paginate_backwards()`].
108 pub async fn live_back_pagination_status(
109 &self,
110 ) -> Option<(RoomPaginationStatus, impl Stream<Item = RoomPaginationStatus>)> {
111 if !self.controller.is_live() {
112 return None;
113 }
114
115 let pagination = self.event_cache.pagination();
116
117 let mut status = pagination.status();
118
119 let current_value = self.controller.map_pagination_status(status.next_now()).await;
120
121 let controller = self.controller.clone();
122 let stream = Box::pin(stream! {
123 let status_stream = status.dedup();
124
125 pin_mut!(status_stream);
126
127 while let Some(state) = status_stream.next().await {
128 let state = controller.map_pagination_status(state).await;
129
130 match state {
131 RoomPaginationStatus::Idle { hit_timeline_start } => {
132 if hit_timeline_start {
133 controller.insert_timeline_start_if_missing().await;
134 }
135 }
136 RoomPaginationStatus::Paginating => {}
137 }
138
139 yield state;
140 }
141 });
142
143 Some((current_value, stream))
144 }
145}