matrix_sdk_ui/timeline/
builder.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use matrix_sdk::{executor::spawn, Room};
18use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
19use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
20use tracing::{info_span, Instrument, Span};
21
22use super::{
23    controller::{TimelineController, TimelineSettings},
24    DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
25};
26use crate::{
27    timeline::{
28        controller::spawn_crypto_tasks,
29        tasks::{
30            pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task,
31            thread_updates_task,
32        },
33    },
34    unable_to_decrypt_hook::UtdHookManager,
35};
36
37/// Builder that allows creating and configuring various parts of a
38/// [`Timeline`].
39#[must_use]
40#[derive(Debug)]
41pub struct TimelineBuilder {
42    room: Room,
43    settings: TimelineSettings,
44    focus: TimelineFocus,
45
46    /// An optional hook to call whenever we run into an unable-to-decrypt or a
47    /// late-decryption event.
48    unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
49
50    /// An optional prefix for internal IDs.
51    internal_id_prefix: Option<String>,
52}
53
54impl TimelineBuilder {
55    pub fn new(room: &Room) -> Self {
56        Self {
57            room: room.clone(),
58            settings: TimelineSettings::default(),
59            unable_to_decrypt_hook: None,
60            focus: TimelineFocus::Live { hide_threaded_events: false },
61            internal_id_prefix: None,
62        }
63    }
64
65    /// Sets up the initial focus for this timeline.
66    ///
67    /// This can be changed later on while the timeline is alive.
68    pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
69        self.focus = focus;
70        self
71    }
72
73    /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
74    /// we're building.
75    ///
76    /// If it was previously set before, will overwrite the previous one.
77    pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
78        self.unable_to_decrypt_hook = Some(hook);
79        self
80    }
81
82    /// Sets the internal id prefix for this timeline.
83    ///
84    /// The prefix will be prepended to any internal ID using when generating
85    /// timeline IDs for this timeline.
86    pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
87        self.internal_id_prefix = Some(prefix);
88        self
89    }
90
91    /// Chose when to insert the date separators, either in between each day
92    /// or each month.
93    pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
94        self.settings.date_divider_mode = mode;
95        self
96    }
97
98    /// Enable tracking of the fully-read marker and the read receipts on the
99    /// timeline.
100    pub fn track_read_marker_and_receipts(mut self) -> Self {
101        self.settings.track_read_receipts = true;
102        self
103    }
104
105    /// Use the given filter to choose whether to add events to the timeline.
106    ///
107    /// # Arguments
108    ///
109    /// * `filter` - A function that takes a deserialized event, and should
110    ///   return `true` if the event should be added to the `Timeline`.
111    ///
112    /// If this is not overridden, the timeline uses the default filter that
113    /// only allows events that are materialized into a `Timeline` item. For
114    /// instance, reactions and edits don't get their own timeline item (as
115    /// they affect another existing one), so they're "filtered out" to
116    /// reflect that.
117    ///
118    /// You can use the default event filter with
119    /// [`crate::timeline::default_event_filter`] so as to chain it with
120    /// your own event filter, if you want to avoid situations where a read
121    /// receipt would be attached to an event that doesn't get its own
122    /// timeline item.
123    ///
124    /// Note that currently:
125    ///
126    /// - Not all event types have a representation as a `TimelineItem` so these
127    ///   are not added no matter what the filter returns.
128    /// - It is not possible to filter out `m.room.encrypted` events (otherwise
129    ///   they couldn't be decrypted when the appropriate room key arrives).
130    pub fn event_filter<F>(mut self, filter: F) -> Self
131    where
132        F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool
133            + SendOutsideWasm
134            + SyncOutsideWasm
135            + 'static,
136    {
137        self.settings.event_filter = Arc::new(filter);
138        self
139    }
140
141    /// Whether to add events that failed to deserialize to the timeline.
142    ///
143    /// Defaults to `true`.
144    pub fn add_failed_to_parse(mut self, add: bool) -> Self {
145        self.settings.add_failed_to_parse = add;
146        self
147    }
148
149    /// Create a [`Timeline`] with the options set on this builder.
150    #[tracing::instrument(
151        skip(self),
152        fields(
153            room_id = ?self.room.room_id(),
154            track_read_receipts = self.settings.track_read_receipts,
155        )
156    )]
157    pub async fn build(self) -> Result<Timeline, Error> {
158        let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
159
160        // Subscribe the event cache to sync responses, in case we hadn't done it yet.
161        room.client().event_cache().subscribe()?;
162
163        let (room_event_cache, event_cache_drop) = room.event_cache().await?;
164        let (_, event_subscriber) = room_event_cache.subscribe().await;
165
166        let is_room_encrypted = room
167            .latest_encryption_state()
168            .await
169            .map(|state| state.is_encrypted())
170            .ok()
171            .unwrap_or_default();
172
173        let controller = TimelineController::new(
174            room.clone(),
175            focus.clone(),
176            internal_id_prefix.clone(),
177            unable_to_decrypt_hook,
178            is_room_encrypted,
179            settings,
180        );
181
182        let has_events = controller.init_focus(&focus, &room_event_cache).await?;
183
184        let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents { .. }) {
185            Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone())))
186        } else {
187            None
188        };
189
190        let room_update_join_handle = spawn({
191            let span = info_span!(
192                parent: Span::none(),
193                "live_update_handler",
194                room_id = ?room.room_id(),
195                focus = focus.debug_string(),
196                prefix = internal_id_prefix
197            );
198            span.follows_from(Span::current());
199
200            room_event_cache_updates_task(
201                room_event_cache.clone(),
202                controller.clone(),
203                event_subscriber,
204                focus.clone(),
205            )
206            .instrument(span)
207        });
208
209        let thread_update_join_handle = if let Some(root) = controller.thread_root() {
210            Some({
211                let span = info_span!(
212                    parent: Span::none(),
213                    "thread_live_update_handler",
214                    room_id = ?room.room_id(),
215                    focus = focus.debug_string(),
216                    prefix = internal_id_prefix
217                );
218                span.follows_from(Span::current());
219
220                // Note: must be done here *before* spawning the task, to avoid race conditions
221                // with event cache updates happening in the background.
222                let (_events, receiver) = room_event_cache.subscribe_to_thread(root.clone()).await;
223
224                spawn(
225                    thread_updates_task(
226                        receiver,
227                        room_event_cache.clone(),
228                        controller.clone(),
229                        root,
230                    )
231                    .instrument(span),
232                )
233            })
234        } else {
235            None
236        };
237
238        let local_echo_listener_handle = {
239            let timeline_controller = controller.clone();
240            let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
241
242            spawn({
243                // Handles existing local echoes first.
244                for echo in local_echoes {
245                    timeline_controller.handle_local_echo(echo).await;
246                }
247
248                let span = info_span!(
249                    parent: Span::none(),
250                    "local_echo_handler",
251                    room_id = ?room.room_id(),
252                    focus = focus.debug_string(),
253                    prefix = internal_id_prefix
254                );
255                span.follows_from(Span::current());
256
257                room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
258            })
259        };
260
261        let crypto_drop_handles = spawn_crypto_tasks(room, controller.clone()).await;
262
263        let timeline = Timeline {
264            controller,
265            event_cache: room_event_cache,
266            drop_handle: Arc::new(TimelineDropHandle {
267                _crypto_drop_handles: crypto_drop_handles,
268                room_update_join_handle,
269                thread_update_join_handle,
270                pinned_events_join_handle,
271                local_echo_listener_handle,
272                _event_cache_drop_handle: event_cache_drop,
273            }),
274        };
275
276        if has_events {
277            // The events we're injecting might be encrypted events, but we might
278            // have received the room key to decrypt them while nobody was listening to the
279            // `m.room_key` event, let's retry now.
280            timeline.retry_decryption_for_all_events().await;
281        }
282
283        Ok(timeline)
284    }
285}