1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::{VectorDiff, VectorSubscriberStream};
20use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::crypto::OlmMachine;
25use matrix_sdk::{
26 deserialized_responses::TimelineEvent,
27 event_cache::{RoomEventCache, RoomPaginationStatus},
28 paginators::{PaginationResult, Paginator},
29 send_queue::{
30 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31 },
32 Result, Room,
33};
34use ruma::{
35 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
36 events::{
37 poll::unstable_start::UnstablePollStartEventContent,
38 reaction::ReactionEventContent,
39 receipt::{Receipt, ReceiptThread, ReceiptType},
40 relation::Annotation,
41 room::message::{MessageType, Relation},
42 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
43 AnySyncTimelineEvent, MessageLikeEventType,
44 },
45 serde::Raw,
46 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
47 TransactionId, UserId,
48};
49#[cfg(test)]
50use ruma::{events::receipt::ReceiptEventContent, OwnedRoomId, RoomId};
51use tokio::sync::{RwLock, RwLockWriteGuard};
52use tracing::{debug, error, field::debug, info, instrument, trace, warn};
53
54pub(super) use self::{
55 metadata::{RelativePosition, TimelineMetadata},
56 observable_items::{
57 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
58 ObservableItemsTransactionEntry,
59 },
60 state::TimelineState,
61 state_transaction::TimelineStateTransaction,
62};
63use super::{
64 algorithms::{rfind_event_by_id, rfind_event_item},
65 event_item::{ReactionStatus, RemoteEventOrigin},
66 item::TimelineUniqueId,
67 subscriber::TimelineSubscriber,
68 traits::{Decryptor, RoomDataProvider},
69 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
70 PaginationError, Profile, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem,
71 TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
72};
73use crate::{
74 timeline::{
75 algorithms::rfind_event_by_item_id,
76 date_dividers::DateDividerAdjuster,
77 event_item::TimelineItemHandle,
78 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79 MsgLikeContent, MsgLikeKind, TimelineEventFilterFn,
80 },
81 unable_to_decrypt_hook::UtdHookManager,
82};
83
84pub(in crate::timeline) mod aggregations;
85mod decryption_retry_task;
86mod metadata;
87mod observable_items;
88mod read_receipts;
89mod state;
90mod state_transaction;
91
92pub(super) use aggregations::*;
93pub(super) use decryption_retry_task::{spawn_crypto_tasks, CryptoDropHandles};
94
95#[derive(Debug)]
101pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
102 Live {
104 hide_threaded_events: bool,
106 },
107
108 Event {
111 paginator: Paginator<P>,
113
114 hide_threaded_events: bool,
116 },
117
118 Thread {
120 root_event_id: OwnedEventId,
122 },
123
124 PinnedEvents {
125 loader: PinnedEventsLoader,
126 },
127}
128
129impl<P: RoomDataProvider> TimelineFocusKind<P> {
130 pub(super) fn receipt_thread(&self) -> ReceiptThread {
137 match self {
138 TimelineFocusKind::Live { hide_threaded_events }
139 | TimelineFocusKind::Event { hide_threaded_events, .. } => {
140 if *hide_threaded_events {
141 ReceiptThread::Main
142 } else {
143 ReceiptThread::Unthreaded
144 }
145 }
146 TimelineFocusKind::Thread { root_event_id } => {
147 ReceiptThread::Thread(root_event_id.clone())
148 }
149 TimelineFocusKind::PinnedEvents { .. } => ReceiptThread::Unthreaded,
150 }
151 }
152}
153
154#[derive(Clone, Debug)]
155pub(super) struct TimelineController<P: RoomDataProvider = Room, D: Decryptor = Room> {
156 state: Arc<RwLock<TimelineState<P>>>,
158
159 focus: Arc<TimelineFocusKind<P>>,
161
162 pub(crate) room_data_provider: P,
166
167 pub(super) settings: TimelineSettings,
169
170 decryption_retry_task: DecryptionRetryTask<P, D>,
173}
174
175#[derive(Clone)]
176pub(super) struct TimelineSettings {
177 pub(super) track_read_receipts: bool,
179
180 pub(super) event_filter: Arc<TimelineEventFilterFn>,
183
184 pub(super) add_failed_to_parse: bool,
186
187 pub(super) date_divider_mode: DateDividerMode,
189}
190
191#[cfg(not(tarpaulin_include))]
192impl fmt::Debug for TimelineSettings {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 f.debug_struct("TimelineSettings")
195 .field("track_read_receipts", &self.track_read_receipts)
196 .field("add_failed_to_parse", &self.add_failed_to_parse)
197 .finish_non_exhaustive()
198 }
199}
200
201impl Default for TimelineSettings {
202 fn default() -> Self {
203 Self {
204 track_read_receipts: false,
205 event_filter: Arc::new(default_event_filter),
206 add_failed_to_parse: true,
207 date_divider_mode: DateDividerMode::Daily,
208 }
209 }
210}
211
212pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVersionId) -> bool {
222 match event {
223 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
224 if ev.redacts(room_version).is_some() {
225 false
228 } else {
229 ev.event_type() != MessageLikeEventType::Reaction
232 }
233 }
234
235 AnySyncTimelineEvent::MessageLike(msg) => {
236 match msg.original_content() {
237 None => {
238 msg.event_type() != MessageLikeEventType::Reaction
241 }
242
243 Some(original_content) => {
244 match original_content {
245 AnyMessageLikeEventContent::RoomMessage(content) => {
246 if content
247 .relates_to
248 .as_ref()
249 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
250 {
251 return false;
253 }
254
255 match content.msgtype {
256 MessageType::Audio(_)
257 | MessageType::Emote(_)
258 | MessageType::File(_)
259 | MessageType::Image(_)
260 | MessageType::Location(_)
261 | MessageType::Notice(_)
262 | MessageType::ServerNotice(_)
263 | MessageType::Text(_)
264 | MessageType::Video(_)
265 | MessageType::VerificationRequest(_) => true,
266 #[cfg(feature = "unstable-msc4274")]
267 MessageType::Gallery(_) => true,
268 _ => false,
269 }
270 }
271
272 AnyMessageLikeEventContent::Sticker(_)
273 | AnyMessageLikeEventContent::UnstablePollStart(
274 UnstablePollStartEventContent::New(_),
275 )
276 | AnyMessageLikeEventContent::CallInvite(_)
277 | AnyMessageLikeEventContent::CallNotify(_)
278 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
279
280 _ => false,
281 }
282 }
283 }
284 }
285
286 AnySyncTimelineEvent::State(_) => {
287 true
289 }
290 }
291}
292
293impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
294 pub(super) fn new(
295 room_data_provider: P,
296 focus: TimelineFocus,
297 internal_id_prefix: Option<String>,
298 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
299 is_room_encrypted: bool,
300 settings: TimelineSettings,
301 ) -> Self {
302 let focus = match focus {
303 TimelineFocus::Live { hide_threaded_events } => {
304 TimelineFocusKind::Live { hide_threaded_events }
305 }
306
307 TimelineFocus::Event { hide_threaded_events, .. } => {
308 let paginator = Paginator::new(room_data_provider.clone());
309 TimelineFocusKind::Event { paginator, hide_threaded_events }
310 }
311
312 TimelineFocus::Thread { root_event_id, .. } => {
313 TimelineFocusKind::Thread { root_event_id }
314 }
315
316 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
317 TimelineFocusKind::PinnedEvents {
318 loader: PinnedEventsLoader::new(
319 Arc::new(room_data_provider.clone()),
320 max_events_to_load as usize,
321 max_concurrent_requests as usize,
322 ),
323 }
324 }
325 };
326
327 let focus = Arc::new(focus);
328 let state = Arc::new(RwLock::new(TimelineState::new(
329 focus.clone(),
330 room_data_provider.own_user_id().to_owned(),
331 room_data_provider.room_version(),
332 internal_id_prefix,
333 unable_to_decrypt_hook,
334 is_room_encrypted,
335 )));
336
337 let decryption_retry_task =
338 DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
339
340 Self { state, focus, room_data_provider, settings, decryption_retry_task }
341 }
342
343 pub(super) async fn init_focus(
350 &self,
351 focus: &TimelineFocus,
352 room_event_cache: &RoomEventCache,
353 ) -> Result<bool, Error> {
354 match focus {
355 TimelineFocus::Live { .. } => {
356 let events = room_event_cache.events().await;
358
359 let has_events = !events.is_empty();
360
361 self.replace_with_initial_remote_events(
362 events.into_iter(),
363 RemoteEventOrigin::Cache,
364 )
365 .await;
366
367 match room_event_cache.pagination().status().get() {
368 RoomPaginationStatus::Idle { hit_timeline_start } => {
369 if hit_timeline_start {
370 self.insert_timeline_start_if_missing().await;
373 }
374 }
375 RoomPaginationStatus::Paginating => {}
376 }
377
378 Ok(has_events)
379 }
380
381 TimelineFocus::Event { target: event_id, num_context_events, .. } => {
382 let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
383 unreachable!();
385 };
386
387 let start_from_result = paginator
389 .start_from(event_id, (*num_context_events).into())
390 .await
391 .map_err(PaginationError::Paginator)?;
392
393 let has_events = !start_from_result.events.is_empty();
394
395 self.replace_with_initial_remote_events(
396 start_from_result.events.into_iter(),
397 RemoteEventOrigin::Pagination,
398 )
399 .await;
400
401 Ok(has_events)
402 }
403
404 TimelineFocus::Thread { root_event_id, .. } => {
405 let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
406 let has_events = !events.is_empty();
407
408 let mut related_events = Vector::new();
412 for event_id in events.iter().filter_map(|event| event.event_id()) {
413 if let Some((_original, related)) =
414 room_event_cache.find_event_with_relations(&event_id, None).await
415 {
416 related_events.extend(related);
417 }
418 }
419
420 self.replace_with_initial_remote_events(
421 events.into_iter(),
422 RemoteEventOrigin::Cache,
423 )
424 .await;
425
426 if !related_events.is_empty() {
428 self.handle_remote_aggregations(
429 vec![VectorDiff::Append { values: related_events }],
430 RemoteEventOrigin::Cache,
431 )
432 .await;
433 }
434
435 Ok(has_events)
436 }
437
438 TimelineFocus::PinnedEvents { .. } => {
439 let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
440 unreachable!();
442 };
443
444 let Some(loaded_events) =
445 loader.load_events().await.map_err(Error::PinnedEventsError)?
446 else {
447 return Ok(false);
449 };
450
451 let has_events = !loaded_events.is_empty();
452
453 self.replace_with_initial_remote_events(
454 loaded_events.into_iter(),
455 RemoteEventOrigin::Pagination,
456 )
457 .await;
458
459 Ok(has_events)
460 }
461 }
462 }
463
464 pub async fn handle_encryption_state_changes(&self) {
469 let mut room_info = self.room_data_provider.room_info();
470
471 let mark_encrypted = || async {
473 let mut state = self.state.write().await;
474 state.meta.is_room_encrypted = true;
475 state.mark_all_events_as_encrypted();
476 };
477
478 if room_info.get().encryption_state().is_encrypted() {
479 mark_encrypted().await;
482 return;
483 }
484
485 while let Some(info) = room_info.next().await {
486 if info.encryption_state().is_encrypted() {
487 mark_encrypted().await;
488 break;
491 }
492 }
493 }
494
495 pub(crate) async fn reload_pinned_events(
496 &self,
497 ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
498 if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
499 loader.load_events().await
500 } else {
501 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
502 }
503 }
504
505 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
514 let state = self.state.read().await;
515
516 let (count, needs) = state
517 .meta
518 .subscriber_skip_count
519 .compute_next_when_paginating_backwards(num_events.into());
520
521 let is_live_timeline = true;
523 state.meta.subscriber_skip_count.update(count, is_live_timeline);
524
525 needs
526 }
527
528 pub(super) async fn focused_paginate_backwards(
533 &self,
534 num_events: u16,
535 ) -> Result<bool, PaginationError> {
536 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
537 TimelineFocusKind::Live { .. }
538 | TimelineFocusKind::PinnedEvents { .. }
539 | TimelineFocusKind::Thread { .. } => {
540 return Err(PaginationError::NotSupported);
541 }
542 TimelineFocusKind::Event { paginator, .. } => paginator
543 .paginate_backward(num_events.into())
544 .await
545 .map_err(PaginationError::Paginator)?,
546 };
547
548 self.handle_remote_events_with_diffs(
551 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
552 RemoteEventOrigin::Pagination,
553 )
554 .await;
555
556 Ok(hit_end_of_timeline)
557 }
558
559 pub(super) async fn focused_paginate_forwards(
564 &self,
565 num_events: u16,
566 ) -> Result<bool, PaginationError> {
567 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
568 TimelineFocusKind::Live { .. }
569 | TimelineFocusKind::PinnedEvents { .. }
570 | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
571
572 TimelineFocusKind::Event { paginator, .. } => paginator
573 .paginate_forward(num_events.into())
574 .await
575 .map_err(PaginationError::Paginator)?,
576 };
577
578 self.handle_remote_events_with_diffs(
581 vec![VectorDiff::Append { values: events.into() }],
582 RemoteEventOrigin::Pagination,
583 )
584 .await;
585
586 Ok(hit_end_of_timeline)
587 }
588
589 pub(super) fn is_live(&self) -> bool {
591 matches!(&*self.focus, TimelineFocusKind::Live { .. })
592 }
593
594 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
595 as_variant!(&*self.focus, TimelineFocusKind::Thread { root_event_id } => root_event_id.clone())
596 }
597
598 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
602 self.state.read().await.items.clone_items()
603 }
604
605 #[cfg(test)]
606 pub(super) async fn subscribe_raw(
607 &self,
608 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
609 self.state.read().await.items.subscribe().into_values_and_stream()
610 }
611
612 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
613 let state = self.state.read().await;
614
615 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
616 }
617
618 pub(super) async fn subscribe_filter_map<U, F>(
619 &self,
620 f: F,
621 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
622 where
623 U: Clone,
624 F: Fn(Arc<TimelineItem>) -> Option<U>,
625 {
626 self.state.read().await.items.subscribe().filter_map(f)
627 }
628
629 #[instrument(skip_all)]
633 pub(super) async fn toggle_reaction_local(
634 &self,
635 item_id: &TimelineEventItemId,
636 key: &str,
637 ) -> Result<bool, Error> {
638 let mut state = self.state.write().await;
639
640 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
641 warn!("Timeline item not found, can't add reaction");
642 return Err(Error::FailedToToggleReaction);
643 };
644
645 let user_id = self.room_data_provider.own_user_id();
646 let prev_status = item
647 .content()
648 .reactions()
649 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
650
651 let Some(prev_status) = prev_status else {
652 match item.handle() {
654 TimelineItemHandle::Local(send_handle) => {
655 if send_handle
656 .react(key.to_owned())
657 .await
658 .map_err(|err| Error::SendQueueError(err.into()))?
659 .is_some()
660 {
661 trace!("adding a reaction to a local echo");
662 return Ok(true);
663 }
664
665 warn!("couldn't toggle reaction for local echo");
666 return Ok(false);
667 }
668
669 TimelineItemHandle::Remote(event_id) => {
670 trace!("adding a reaction to a remote echo");
674 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
675 self.room_data_provider
676 .send(ReactionEventContent::from(annotation).into())
677 .await?;
678 return Ok(true);
679 }
680 }
681 };
682
683 trace!("removing a previous reaction");
684 match prev_status {
685 ReactionStatus::LocalToLocal(send_reaction_handle) => {
686 if let Some(handle) = send_reaction_handle {
687 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
688 warn!("unexpectedly unable to abort sending of local reaction");
691 }
692 } else {
693 warn!("no send reaction handle (this should only happen in testing contexts)");
694 }
695 }
696
697 ReactionStatus::LocalToRemote(send_handle) => {
698 trace!("aborting send of the previous reaction that was a local echo");
701 if let Some(handle) = send_handle {
702 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
703 warn!("unexpectedly unable to abort sending of local reaction");
706 }
707 } else {
708 warn!("no send handle (this should only happen in testing contexts)");
709 }
710 }
711
712 ReactionStatus::RemoteToRemote(event_id) => {
713 let Some(annotated_event_id) =
715 item.as_remote().map(|event_item| event_item.event_id.clone())
716 else {
717 warn!("remote reaction to remote event, but the associated item isn't remote");
718 return Ok(false);
719 };
720
721 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
722 let reaction_info = reactions.remove_reaction(user_id, key);
723
724 if reaction_info.is_some() {
725 let new_item = item.with_reactions(reactions);
726 state.items.replace(item_pos, new_item);
727 } else {
728 warn!(
729 "reaction is missing on the item, not removing it locally, \
730 but sending redaction."
731 );
732 }
733
734 drop(state);
736
737 trace!("sending redact for a previous reaction");
738 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
739 if let Some(reaction_info) = reaction_info {
740 debug!("sending redact failed, adding the reaction back to the list");
741
742 let mut state = self.state.write().await;
743 if let Some((item_pos, item)) =
744 rfind_event_by_id(&state.items, &annotated_event_id)
745 {
746 let mut reactions =
748 item.content().reactions().cloned().unwrap_or_default();
749 reactions
750 .entry(key.to_owned())
751 .or_default()
752 .insert(user_id.to_owned(), reaction_info);
753 let new_item = item.with_reactions(reactions);
754 state.items.replace(item_pos, new_item);
755 } else {
756 warn!(
757 "couldn't find item to re-add reaction anymore; \
758 maybe it's been redacted?"
759 );
760 }
761 }
762
763 return Err(err);
764 }
765 }
766 }
767
768 Ok(false)
769 }
770
771 pub(super) async fn handle_remote_events_with_diffs(
773 &self,
774 diffs: Vec<VectorDiff<TimelineEvent>>,
775 origin: RemoteEventOrigin,
776 ) {
777 if diffs.is_empty() {
778 return;
779 }
780
781 let mut state = self.state.write().await;
782 state
783 .handle_remote_events_with_diffs(
784 diffs,
785 origin,
786 &self.room_data_provider,
787 &self.settings,
788 )
789 .await
790 }
791
792 pub(super) async fn handle_remote_aggregations(
794 &self,
795 diffs: Vec<VectorDiff<TimelineEvent>>,
796 origin: RemoteEventOrigin,
797 ) {
798 if diffs.is_empty() {
799 return;
800 }
801
802 let mut state = self.state.write().await;
803 state
804 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
805 .await
806 }
807
808 pub(super) async fn clear(&self) {
809 self.state.write().await.clear();
810 }
811
812 pub(super) async fn replace_with_initial_remote_events<Events>(
820 &self,
821 events: Events,
822 origin: RemoteEventOrigin,
823 ) where
824 Events: IntoIterator + ExactSizeIterator,
825 <Events as IntoIterator>::Item: Into<TimelineEvent>,
826 {
827 let mut state = self.state.write().await;
828
829 let track_read_markers = self.settings.track_read_receipts;
830 if track_read_markers {
831 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
832 state
833 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
834 .await;
835 }
836
837 if !state.items.is_empty() || events.len() > 0 {
843 state
844 .replace_with_remote_events(
845 events,
846 origin,
847 &self.room_data_provider,
848 &self.settings,
849 )
850 .await;
851 }
852
853 if track_read_markers {
854 if let Some(fully_read_event_id) =
855 self.room_data_provider.load_fully_read_marker().await
856 {
857 state.handle_fully_read_marker(fully_read_event_id);
858 }
859 }
860 }
861
862 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
863 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
864 }
865
866 pub(super) async fn handle_ephemeral_events(
867 &self,
868 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
869 ) {
870 let mut state = self.state.write().await;
871 state.handle_ephemeral_events(events, &self.room_data_provider).await;
872 }
873
874 #[instrument(skip_all)]
876 pub(super) async fn handle_local_event(
877 &self,
878 txn_id: OwnedTransactionId,
879 content: AnyMessageLikeEventContent,
880 send_handle: Option<SendHandle>,
881 ) {
882 let sender = self.room_data_provider.own_user_id().to_owned();
883 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
884
885 let date_divider_mode = self.settings.date_divider_mode.clone();
886
887 let mut state = self.state.write().await;
888 state
889 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
890 .await;
891 }
892
893 #[instrument(skip(self))]
898 pub(super) async fn update_event_send_state(
899 &self,
900 txn_id: &TransactionId,
901 send_state: EventSendState,
902 ) {
903 let mut state = self.state.write().await;
904 let mut txn = state.transaction();
905
906 let new_event_id: Option<&EventId> =
907 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
908
909 if rfind_event_item(&txn.items, |it| {
912 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
913 })
914 .is_some()
915 {
916 trace!("Remote echo received before send-event response");
918
919 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
920
921 if let Some((idx, _)) = local_echo {
925 warn!("Message echo got duplicated, removing the local one");
926 txn.items.remove(idx);
927
928 let mut adjuster =
930 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
931 adjuster.run(&mut txn.items, &mut txn.meta);
932 }
933
934 txn.commit();
935 return;
936 }
937
938 let result = rfind_event_item(&txn.items, |it| {
940 it.transaction_id() == Some(txn_id)
941 || new_event_id.is_some()
942 && it.event_id() == new_event_id
943 && it.as_local().is_some()
944 });
945
946 let Some((idx, item)) = result else {
947 if let Some(new_event_id) = new_event_id {
952 if txn.meta.aggregations.mark_aggregation_as_sent(
953 txn_id.to_owned(),
954 new_event_id.to_owned(),
955 &mut txn.items,
956 &txn.meta.room_version,
957 ) {
958 trace!("Aggregation marked as sent");
959 txn.commit();
960 return;
961 }
962
963 trace!("Sent aggregation was not found");
964 }
965
966 warn!("Timeline item not found, can't update send state");
967 return;
968 };
969
970 let Some(local_item) = item.as_local() else {
971 warn!("We looked for a local item, but it transitioned to remote.");
972 return;
973 };
974
975 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
978 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
979 }
980
981 if let Some(new_event_id) = new_event_id {
984 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
985 }
986
987 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
988 txn.items.replace(idx, new_item);
989
990 txn.commit();
991 }
992
993 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
994 let mut state = self.state.write().await;
995
996 if let Some((idx, _)) =
997 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
998 {
999 let mut txn = state.transaction();
1000
1001 txn.items.remove(idx);
1002
1003 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1006 adjuster.run(&mut txn.items, &mut txn.meta);
1007
1008 txn.meta.update_read_marker(&mut txn.items);
1009
1010 txn.commit();
1011
1012 debug!("discarded local echo");
1013 return true;
1014 }
1015
1016 let mut txn = state.transaction();
1019
1020 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1022 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1023 &mut txn.items,
1024 ) {
1025 Ok(val) => val,
1026 Err(err) => {
1027 warn!("error when discarding local echo for an aggregation: {err}");
1028 true
1030 }
1031 };
1032
1033 if found_aggregation {
1034 txn.commit();
1035 }
1036
1037 found_aggregation
1038 }
1039
1040 pub(super) async fn replace_local_echo(
1041 &self,
1042 txn_id: &TransactionId,
1043 content: AnyMessageLikeEventContent,
1044 ) -> bool {
1045 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1046 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1051 return false;
1052 };
1053
1054 let mut state = self.state.write().await;
1055 let mut txn = state.transaction();
1056
1057 let Some((idx, prev_item)) =
1058 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1059 else {
1060 debug!("Can't find local echo to replace");
1061 return false;
1062 };
1063
1064 let ti_kind = {
1067 let Some(prev_local_item) = prev_item.as_local() else {
1068 warn!("We looked for a local item, but it transitioned as remote??");
1069 return false;
1070 };
1071 prev_local_item.with_send_state(EventSendState::NotSentYet)
1072 };
1073
1074 let new_item = TimelineItem::new(
1076 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1077 content.msgtype,
1078 content.mentions,
1079 prev_item.content().reactions().cloned().unwrap_or_default(),
1080 prev_item.content().thread_root(),
1081 prev_item.content().in_reply_to(),
1082 prev_item.content().thread_summary(),
1083 )),
1084 prev_item.internal_id.to_owned(),
1085 );
1086
1087 txn.items.replace(idx, new_item);
1088
1089 txn.commit();
1093
1094 debug!("Replaced local echo");
1095 true
1096 }
1097
1098 async fn retry_event_decryption_inner(
1099 &self,
1100 decryptor: D,
1101 session_ids: Option<BTreeSet<String>>,
1102 ) {
1103 self.decryption_retry_task.decrypt(decryptor, session_ids, self.settings.clone()).await;
1104 }
1105
1106 pub(super) async fn set_sender_profiles_pending(&self) {
1107 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1108 }
1109
1110 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1111 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1112 }
1113
1114 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1115 self.state.write().await.items.for_each(|mut entry| {
1116 let Some(event_item) = entry.as_event() else { return };
1117 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1118 let new_item = entry.with_kind(TimelineItemKind::Event(
1119 event_item.with_sender_profile(profile_state.clone()),
1120 ));
1121 ObservableItemsEntry::replace(&mut entry, new_item);
1122 }
1123 });
1124 }
1125
1126 pub(super) async fn update_missing_sender_profiles(&self) {
1127 trace!("Updating missing sender profiles");
1128
1129 let mut state = self.state.write().await;
1130 let mut entries = state.items.entries();
1131 while let Some(mut entry) = entries.next() {
1132 let Some(event_item) = entry.as_event() else { continue };
1133 let event_id = event_item.event_id().map(debug);
1134 let transaction_id = event_item.transaction_id().map(debug);
1135
1136 if event_item.sender_profile().is_ready() {
1137 trace!(event_id, transaction_id, "Profile already set");
1138 continue;
1139 }
1140
1141 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1142 Some(profile) => {
1143 trace!(event_id, transaction_id, "Adding profile");
1144 let updated_item =
1145 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1146 let new_item = entry.with_kind(updated_item);
1147 ObservableItemsEntry::replace(&mut entry, new_item);
1148 }
1149 None => {
1150 if !event_item.sender_profile().is_unavailable() {
1151 trace!(event_id, transaction_id, "Marking profile unavailable");
1152 let updated_item =
1153 event_item.with_sender_profile(TimelineDetails::Unavailable);
1154 let new_item = entry.with_kind(updated_item);
1155 ObservableItemsEntry::replace(&mut entry, new_item);
1156 } else {
1157 debug!(event_id, transaction_id, "Profile already marked unavailable");
1158 }
1159 }
1160 }
1161 }
1162
1163 trace!("Done updating missing sender profiles");
1164 }
1165
1166 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1168 trace!("Forcing update of sender profiles: {sender_ids:?}");
1169
1170 let mut state = self.state.write().await;
1171 let mut entries = state.items.entries();
1172 while let Some(mut entry) = entries.next() {
1173 let Some(event_item) = entry.as_event() else { continue };
1174 if !sender_ids.contains(event_item.sender()) {
1175 continue;
1176 }
1177
1178 let event_id = event_item.event_id().map(debug);
1179 let transaction_id = event_item.transaction_id().map(debug);
1180
1181 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1182 Some(profile) => {
1183 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1184 {
1185 debug!(event_id, transaction_id, "Profile already up-to-date");
1186 } else {
1187 trace!(event_id, transaction_id, "Updating profile");
1188 let updated_item =
1189 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1190 let new_item = entry.with_kind(updated_item);
1191 ObservableItemsEntry::replace(&mut entry, new_item);
1192 }
1193 }
1194 None => {
1195 if !event_item.sender_profile().is_unavailable() {
1196 trace!(event_id, transaction_id, "Marking profile unavailable");
1197 let updated_item =
1198 event_item.with_sender_profile(TimelineDetails::Unavailable);
1199 let new_item = entry.with_kind(updated_item);
1200 ObservableItemsEntry::replace(&mut entry, new_item);
1201 } else {
1202 debug!(event_id, transaction_id, "Profile already marked unavailable");
1203 }
1204 }
1205 }
1206 }
1207
1208 trace!("Done forcing update of sender profiles");
1209 }
1210
1211 #[cfg(test)]
1212 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1213 let own_user_id = self.room_data_provider.own_user_id();
1214 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1215 }
1216
1217 pub(super) async fn latest_user_read_receipt(
1221 &self,
1222 user_id: &UserId,
1223 ) -> Option<(OwnedEventId, Receipt)> {
1224 let receipt_thread = self.focus.receipt_thread();
1225
1226 self.state
1227 .read()
1228 .await
1229 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1230 .await
1231 }
1232
1233 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1236 &self,
1237 user_id: &UserId,
1238 ) -> Option<OwnedEventId> {
1239 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1240 }
1241
1242 pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
1244 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1245 }
1246
1247 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1249 match echo.content {
1250 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1251 let content = match serialized_event.deserialize() {
1252 Ok(d) => d,
1253 Err(err) => {
1254 warn!("error deserializing local echo: {err}");
1255 return;
1256 }
1257 };
1258
1259 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1260 .await;
1261
1262 if let Some(send_error) = send_error {
1263 self.update_event_send_state(
1264 &echo.transaction_id,
1265 EventSendState::SendingFailed {
1266 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1267 send_error,
1268 ))),
1269 is_recoverable: false,
1270 },
1271 )
1272 .await;
1273 }
1274 }
1275
1276 LocalEchoContent::React { key, send_handle, applies_to } => {
1277 self.handle_local_reaction(key, send_handle, applies_to).await;
1278 }
1279 }
1280 }
1281
1282 #[instrument(skip(self, send_handle))]
1284 async fn handle_local_reaction(
1285 &self,
1286 reaction_key: String,
1287 send_handle: SendReactionHandle,
1288 applies_to: OwnedTransactionId,
1289 ) {
1290 let mut state = self.state.write().await;
1291 let mut tr = state.transaction();
1292
1293 let target = TimelineEventItemId::TransactionId(applies_to);
1294
1295 let reaction_txn_id = send_handle.transaction_id().to_owned();
1296 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1297 let aggregation = Aggregation::new(
1298 TimelineEventItemId::TransactionId(reaction_txn_id),
1299 AggregationKind::Reaction {
1300 key: reaction_key.clone(),
1301 sender: self.room_data_provider.own_user_id().to_owned(),
1302 timestamp: MilliSecondsSinceUnixEpoch::now(),
1303 reaction_status,
1304 },
1305 );
1306
1307 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1308 find_item_and_apply_aggregation(
1309 &tr.meta.aggregations,
1310 &mut tr.items,
1311 &target,
1312 aggregation,
1313 &tr.meta.room_version,
1314 );
1315
1316 tr.commit();
1317 }
1318
1319 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1321 match update {
1322 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1323 self.handle_local_echo(echo).await;
1324 }
1325
1326 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1327 if !self.discard_local_echo(&transaction_id).await {
1328 warn!("couldn't find the local echo to discard");
1329 }
1330 }
1331
1332 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1333 let content = match new_content.deserialize() {
1334 Ok(d) => d,
1335 Err(err) => {
1336 warn!("error deserializing local echo (upon edit): {err}");
1337 return;
1338 }
1339 };
1340
1341 if !self.replace_local_echo(&transaction_id, content).await {
1342 warn!("couldn't find the local echo to replace");
1343 }
1344 }
1345
1346 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1347 self.update_event_send_state(
1348 &transaction_id,
1349 EventSendState::SendingFailed { error, is_recoverable },
1350 )
1351 .await;
1352 }
1353
1354 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1355 self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
1356 }
1357
1358 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1359 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1360 .await;
1361 }
1362
1363 RoomSendQueueUpdate::UploadedMedia { related_to, .. } => {
1364 info!(txn_id = %related_to, "some media for a media event has been uploaded");
1366 }
1367 }
1368 }
1369
1370 pub async fn insert_timeline_start_if_missing(&self) {
1373 let mut state = self.state.write().await;
1374 let mut txn = state.transaction();
1375 txn.items.push_timeline_start_if_missing(
1376 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1377 );
1378 txn.commit();
1379 }
1380
1381 pub(super) async fn make_replied_to(
1387 &self,
1388 event: TimelineEvent,
1389 ) -> Result<Option<EmbeddedEvent>, Error> {
1390 let state = self.state.read().await;
1391 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1392 }
1393}
1394
1395impl TimelineController {
1396 pub(super) fn room(&self) -> &Room {
1397 &self.room_data_provider
1398 }
1399
1400 #[instrument(skip(self))]
1403 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1404 let state_guard = self.state.write().await;
1405 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1406 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1407 let remote_item = item
1408 .as_remote()
1409 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1410 .clone();
1411
1412 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1413 debug!("Event is not a message");
1414 return Ok(());
1415 };
1416 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1417 debug!("Event is not a reply");
1418 return Ok(());
1419 };
1420 if let TimelineDetails::Pending = &in_reply_to.event {
1421 debug!("Replied-to event is already being fetched");
1422 return Ok(());
1423 }
1424 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1425 debug!("Replied-to event has already been fetched");
1426 return Ok(());
1427 }
1428
1429 let internal_id = item.internal_id.to_owned();
1430 let item = item.clone();
1431 let event = fetch_replied_to_event(
1432 state_guard,
1433 &self.state,
1434 index,
1435 &item,
1436 internal_id,
1437 &msglike,
1438 &in_reply_to.event_id,
1439 self.room(),
1440 )
1441 .await?;
1442
1443 let mut state = self.state.write().await;
1446 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1447 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1448
1449 let TimelineItemContent::MsgLike(MsgLikeContent {
1452 kind: MsgLikeKind::Message(message),
1453 reactions,
1454 thread_root,
1455 in_reply_to,
1456 thread_summary,
1457 }) = item.content().clone()
1458 else {
1459 info!("Event is no longer a message (redacted?)");
1460 return Ok(());
1461 };
1462 let Some(in_reply_to) = in_reply_to else {
1463 warn!("Event no longer has a reply (bug?)");
1464 return Ok(());
1465 };
1466
1467 trace!("Updating in-reply-to details");
1470 let internal_id = item.internal_id.to_owned();
1471 let mut item = item.clone();
1472 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1473 kind: MsgLikeKind::Message(message),
1474 reactions,
1475 thread_root,
1476 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1477 thread_summary,
1478 }));
1479 state.items.replace(index, TimelineItem::new(item, internal_id));
1480
1481 Ok(())
1482 }
1483
1484 pub(super) fn infer_thread_for_read_receipt(
1490 &self,
1491 receipt_type: &SendReceiptType,
1492 ) -> ReceiptThread {
1493 if matches!(receipt_type, SendReceiptType::FullyRead) {
1494 ReceiptThread::Unthreaded
1495 } else {
1496 self.focus.receipt_thread()
1497 }
1498 }
1499
1500 pub(super) async fn should_send_receipt(
1504 &self,
1505 receipt_type: &SendReceiptType,
1506 receipt_thread: &ReceiptThread,
1507 event_id: &EventId,
1508 ) -> bool {
1509 let own_user_id = self.room().own_user_id();
1510 let state = self.state.read().await;
1511 let room = self.room();
1512
1513 match receipt_type {
1514 SendReceiptType::Read => {
1515 if let Some((old_pub_read, _)) = state
1516 .meta
1517 .user_receipt(
1518 own_user_id,
1519 ReceiptType::Read,
1520 receipt_thread.clone(),
1521 room,
1522 state.items.all_remote_events(),
1523 )
1524 .await
1525 {
1526 trace!(%old_pub_read, "found a previous public receipt");
1527 if let Some(relative_pos) = state.meta.compare_events_positions(
1528 &old_pub_read,
1529 event_id,
1530 state.items.all_remote_events(),
1531 ) {
1532 trace!(
1533 "event referred to new receipt is {relative_pos:?} the previous receipt"
1534 );
1535 return relative_pos == RelativePosition::After;
1536 }
1537 }
1538 }
1539
1540 SendReceiptType::ReadPrivate => {
1543 if let Some((old_priv_read, _)) =
1544 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1545 {
1546 trace!(%old_priv_read, "found a previous private receipt");
1547 if let Some(relative_pos) = state.meta.compare_events_positions(
1548 &old_priv_read,
1549 event_id,
1550 state.items.all_remote_events(),
1551 ) {
1552 trace!(
1553 "event referred to new receipt is {relative_pos:?} the previous receipt"
1554 );
1555 return relative_pos == RelativePosition::After;
1556 }
1557 }
1558 }
1559
1560 SendReceiptType::FullyRead => {
1561 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1562 {
1563 if let Some(relative_pos) = state.meta.compare_events_positions(
1564 &prev_event_id,
1565 event_id,
1566 state.items.all_remote_events(),
1567 ) {
1568 return relative_pos == RelativePosition::After;
1569 }
1570 }
1571 }
1572
1573 _ => {}
1574 }
1575
1576 true
1578 }
1579
1580 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1583 let state = self.state.read().await;
1584 state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1585 }
1586
1587 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1588 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1589 self.retry_event_decryption_inner(self.room().clone(), session_ids).await
1590 }
1591
1592 pub(super) async fn map_pagination_status(
1600 &self,
1601 status: RoomPaginationStatus,
1602 ) -> RoomPaginationStatus {
1603 match status {
1604 RoomPaginationStatus::Idle { hit_timeline_start } => {
1605 if hit_timeline_start {
1606 let state = self.state.read().await;
1607 if state.meta.subscriber_skip_count.get() > 0 {
1611 return RoomPaginationStatus::Idle { hit_timeline_start: false };
1612 }
1613 }
1614 }
1615 RoomPaginationStatus::Paginating => {}
1616 }
1617
1618 status
1620 }
1621}
1622
1623#[cfg(test)]
1624impl<P: RoomDataProvider> TimelineController<P, (OlmMachine, OwnedRoomId)> {
1625 pub(super) async fn retry_event_decryption_test(
1626 &self,
1627 room_id: &RoomId,
1628 olm_machine: OlmMachine,
1629 session_ids: Option<BTreeSet<String>>,
1630 ) {
1631 self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
1632 }
1633}
1634
1635#[allow(clippy::too_many_arguments)]
1636async fn fetch_replied_to_event<P: RoomDataProvider>(
1637 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1638 state_lock: &RwLock<TimelineState<P>>,
1639 index: usize,
1640 item: &EventTimelineItem,
1641 internal_id: TimelineUniqueId,
1642 msglike: &MsgLikeContent,
1643 in_reply_to: &EventId,
1644 room: &Room,
1645) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1646 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1647 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1648 trace!("Found replied-to event locally");
1649 return Ok(details);
1650 }
1651
1652 trace!("Setting in-reply-to details to pending");
1655 let in_reply_to_details =
1656 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1657
1658 let event_item = item
1659 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1660
1661 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1662 state_guard.items.replace(index, new_timeline_item);
1663
1664 drop(state_guard);
1666
1667 trace!("Fetching replied-to event");
1668 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1669 Ok(timeline_event) => {
1670 let state = state_lock.read().await;
1671
1672 let replied_to_item =
1673 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1674
1675 if let Some(item) = replied_to_item {
1676 TimelineDetails::Ready(Box::new(item))
1677 } else {
1678 return Err(Error::UnsupportedEvent);
1680 }
1681 }
1682
1683 Err(e) => TimelineDetails::Error(Arc::new(e)),
1684 };
1685
1686 Ok(res)
1687}