1use std::sync::Arc;
16
17use matrix_sdk::{executor::spawn, Room};
18use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
19use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
20use tracing::{info_span, Instrument, Span};
21
22use super::{
23 controller::{TimelineController, TimelineSettings},
24 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
25};
26use crate::{
27 timeline::{
28 controller::spawn_crypto_tasks,
29 tasks::{
30 pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task,
31 thread_updates_task,
32 },
33 },
34 unable_to_decrypt_hook::UtdHookManager,
35};
36
37#[must_use]
40#[derive(Debug)]
41pub struct TimelineBuilder {
42 room: Room,
43 settings: TimelineSettings,
44 focus: TimelineFocus,
45
46 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
49
50 internal_id_prefix: Option<String>,
52}
53
54impl TimelineBuilder {
55 pub fn new(room: &Room) -> Self {
56 Self {
57 room: room.clone(),
58 settings: TimelineSettings::default(),
59 unable_to_decrypt_hook: None,
60 focus: TimelineFocus::Live { hide_threaded_events: false },
61 internal_id_prefix: None,
62 }
63 }
64
65 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
69 self.focus = focus;
70 self
71 }
72
73 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
78 self.unable_to_decrypt_hook = Some(hook);
79 self
80 }
81
82 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
87 self.internal_id_prefix = Some(prefix);
88 self
89 }
90
91 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
94 self.settings.date_divider_mode = mode;
95 self
96 }
97
98 pub fn track_read_marker_and_receipts(mut self) -> Self {
101 self.settings.track_read_receipts = true;
102 self
103 }
104
105 pub fn event_filter<F>(mut self, filter: F) -> Self
131 where
132 F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool
133 + SendOutsideWasm
134 + SyncOutsideWasm
135 + 'static,
136 {
137 self.settings.event_filter = Arc::new(filter);
138 self
139 }
140
141 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
145 self.settings.add_failed_to_parse = add;
146 self
147 }
148
149 #[tracing::instrument(
151 skip(self),
152 fields(
153 room_id = ?self.room.room_id(),
154 track_read_receipts = self.settings.track_read_receipts,
155 )
156 )]
157 pub async fn build(self) -> Result<Timeline, Error> {
158 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
159
160 room.client().event_cache().subscribe()?;
162
163 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
164 let (_, event_subscriber) = room_event_cache.subscribe().await;
165
166 let is_room_encrypted = room
167 .latest_encryption_state()
168 .await
169 .map(|state| state.is_encrypted())
170 .ok()
171 .unwrap_or_default();
172
173 let controller = TimelineController::new(
174 room.clone(),
175 focus.clone(),
176 internal_id_prefix.clone(),
177 unable_to_decrypt_hook,
178 is_room_encrypted,
179 settings,
180 );
181
182 let has_events = controller.init_focus(&focus, &room_event_cache).await?;
183
184 let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents { .. }) {
185 Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone())))
186 } else {
187 None
188 };
189
190 let room_update_join_handle = spawn({
191 let span = info_span!(
192 parent: Span::none(),
193 "live_update_handler",
194 room_id = ?room.room_id(),
195 focus = focus.debug_string(),
196 prefix = internal_id_prefix
197 );
198 span.follows_from(Span::current());
199
200 room_event_cache_updates_task(
201 room_event_cache.clone(),
202 controller.clone(),
203 event_subscriber,
204 focus.clone(),
205 )
206 .instrument(span)
207 });
208
209 let thread_update_join_handle = if let Some(root) = controller.thread_root() {
210 Some({
211 let span = info_span!(
212 parent: Span::none(),
213 "thread_live_update_handler",
214 room_id = ?room.room_id(),
215 focus = focus.debug_string(),
216 prefix = internal_id_prefix
217 );
218 span.follows_from(Span::current());
219
220 let (_events, receiver) = room_event_cache.subscribe_to_thread(root.clone()).await;
223
224 spawn(
225 thread_updates_task(
226 receiver,
227 room_event_cache.clone(),
228 controller.clone(),
229 root,
230 )
231 .instrument(span),
232 )
233 })
234 } else {
235 None
236 };
237
238 let local_echo_listener_handle = {
239 let timeline_controller = controller.clone();
240 let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
241
242 spawn({
243 for echo in local_echoes {
245 timeline_controller.handle_local_echo(echo).await;
246 }
247
248 let span = info_span!(
249 parent: Span::none(),
250 "local_echo_handler",
251 room_id = ?room.room_id(),
252 focus = focus.debug_string(),
253 prefix = internal_id_prefix
254 );
255 span.follows_from(Span::current());
256
257 room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
258 })
259 };
260
261 let crypto_drop_handles = spawn_crypto_tasks(room, controller.clone()).await;
262
263 let timeline = Timeline {
264 controller,
265 event_cache: room_event_cache,
266 drop_handle: Arc::new(TimelineDropHandle {
267 _crypto_drop_handles: crypto_drop_handles,
268 room_update_join_handle,
269 thread_update_join_handle,
270 pinned_events_join_handle,
271 local_echo_listener_handle,
272 _event_cache_drop_handle: event_cache_drop,
273 }),
274 };
275
276 if has_events {
277 timeline.retry_decryption_for_all_events().await;
281 }
282
283 Ok(timeline)
284 }
285}