1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26
27use events::{sort_positions_descending, Gap};
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::{AmbiguityChange, TimelineEvent},
32 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
33};
34use ruma::{
35 events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
36 serde::Raw,
37 EventId, OwnedEventId, OwnedRoomId,
38};
39use tokio::sync::{
40 broadcast::{Receiver, Sender},
41 mpsc, Notify, RwLock,
42};
43use tracing::{error, instrument, trace, warn};
44
45use super::{
46 deduplicator::DeduplicationOutcome, AllEventsCache, AutoShrinkChannelPayload, EventsOrigin,
47 Result, RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
48};
49use crate::{client::WeakClient, room::WeakRoom};
50
51pub(super) mod events;
52
53#[derive(Clone)]
57pub struct RoomEventCache {
58 pub(super) inner: Arc<RoomEventCacheInner>,
59}
60
61impl fmt::Debug for RoomEventCache {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 f.debug_struct("RoomEventCache").finish_non_exhaustive()
64 }
65}
66
67#[allow(missing_debug_implementations)]
70pub struct RoomEventCacheListener {
71 recv: Receiver<RoomEventCacheUpdate>,
73
74 room_id: OwnedRoomId,
76
77 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
79
80 listener_count: Arc<AtomicUsize>,
82}
83
84impl Drop for RoomEventCacheListener {
85 fn drop(&mut self) {
86 let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst);
87
88 trace!("dropping a room event cache listener; previous count: {previous_listener_count}");
89
90 if previous_listener_count == 1 {
91 let mut room_id = self.room_id.clone();
95
96 let mut num_attempts = 0;
102
103 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
104 num_attempts += 1;
105
106 if num_attempts > 1024 {
107 warn!("couldn't send notification to the auto-shrink channel after 1024 attempts; giving up");
110 return;
111 }
112
113 match err {
114 mpsc::error::TrySendError::Full(stolen_room_id) => {
115 room_id = stolen_room_id;
116 }
117 mpsc::error::TrySendError::Closed(_) => return,
118 }
119 }
120
121 trace!("sent notification to the parent channel that we were the last listener");
122 }
123 }
124}
125
126impl Deref for RoomEventCacheListener {
127 type Target = Receiver<RoomEventCacheUpdate>;
128
129 fn deref(&self) -> &Self::Target {
130 &self.recv
131 }
132}
133
134impl DerefMut for RoomEventCacheListener {
135 fn deref_mut(&mut self) -> &mut Self::Target {
136 &mut self.recv
137 }
138}
139
140impl RoomEventCache {
141 pub(super) fn new(
143 client: WeakClient,
144 state: RoomEventCacheState,
145 pagination_status: SharedObservable<RoomPaginationStatus>,
146 room_id: OwnedRoomId,
147 all_events_cache: Arc<RwLock<AllEventsCache>>,
148 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
149 ) -> Self {
150 Self {
151 inner: Arc::new(RoomEventCacheInner::new(
152 client,
153 state,
154 pagination_status,
155 room_id,
156 all_events_cache,
157 auto_shrink_sender,
158 )),
159 }
160 }
161
162 pub async fn subscribe(&self) -> (Vec<TimelineEvent>, RoomEventCacheListener) {
165 let state = self.inner.state.read().await;
166 let events = state.events().events().map(|(_position, item)| item.clone()).collect();
167
168 let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst);
169 trace!("added a room event cache listener; new count: {}", previous_listener_count + 1);
170
171 let recv = self.inner.sender.subscribe();
172 let listener = RoomEventCacheListener {
173 recv,
174 room_id: self.inner.room_id.clone(),
175 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
176 listener_count: state.listener_count.clone(),
177 };
178
179 (events, listener)
180 }
181
182 pub fn pagination(&self) -> RoomPagination {
185 RoomPagination { inner: self.inner.clone() }
186 }
187
188 pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
190 let Ok(maybe_position_and_event) = self.inner.state.read().await.find_event(event_id).await
192 else {
193 error!("Failed to find the event");
194
195 return None;
196 };
197
198 if let Some(event) = maybe_position_and_event.map(|(_location, _position, event)| event) {
200 Some(event)
201 } else if let Some((room_id, event)) =
202 self.inner.all_events.read().await.events.get(event_id).cloned()
203 {
204 (room_id == self.inner.room_id).then_some(event)
205 } else {
206 None
207 }
208 }
209
210 pub async fn event_with_relations(
215 &self,
216 event_id: &EventId,
217 filter: Option<Vec<RelationType>>,
218 ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
219 let cache = self.inner.all_events.read().await;
220 if let Some((_, event)) = cache.events.get(event_id) {
221 let related_events = cache.collect_related_events(event_id, filter.as_deref());
222 Some((event.clone(), related_events))
223 } else {
224 None
225 }
226 }
227
228 pub async fn clear(&self) -> Result<()> {
233 let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
235
236 self.inner.all_events.write().await.clear();
238
239 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
241 diffs: updates_as_vector_diffs,
242 origin: EventsOrigin::Cache,
243 });
244
245 Ok(())
246 }
247
248 pub(crate) async fn save_event(&self, event: TimelineEvent) {
254 if let Some(event_id) = event.event_id() {
255 let mut cache = self.inner.all_events.write().await;
256
257 cache.append_related_event(&event);
258 cache.events.insert(event_id, (self.inner.room_id.clone(), event));
259 } else {
260 warn!("couldn't save event without event id in the event cache");
261 }
262 }
263
264 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
271 let mut cache = self.inner.all_events.write().await;
272 for event in events {
273 if let Some(event_id) = event.event_id() {
274 cache.append_related_event(&event);
275 cache.events.insert(event_id, (self.inner.room_id.clone(), event));
276 } else {
277 warn!("couldn't save event without event id in the event cache");
278 }
279 }
280 }
281
282 pub async fn debug_string(&self) -> Vec<String> {
285 self.inner.state.read().await.events().debug_string()
286 }
287}
288
289pub(super) struct RoomEventCacheInner {
291 room_id: OwnedRoomId,
293
294 pub weak_room: WeakRoom,
295
296 pub sender: Sender<RoomEventCacheUpdate>,
298
299 pub state: RwLock<RoomEventCacheState>,
301
302 all_events: Arc<RwLock<AllEventsCache>>,
307
308 pub pagination_batch_token_notifier: Notify,
310
311 pub pagination_status: SharedObservable<RoomPaginationStatus>,
312
313 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
318}
319
320impl RoomEventCacheInner {
321 fn new(
324 client: WeakClient,
325 state: RoomEventCacheState,
326 pagination_status: SharedObservable<RoomPaginationStatus>,
327 room_id: OwnedRoomId,
328 all_events_cache: Arc<RwLock<AllEventsCache>>,
329 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
330 ) -> Self {
331 let sender = Sender::new(32);
332 let weak_room = WeakRoom::new(client, room_id);
333 Self {
334 room_id: weak_room.room_id().to_owned(),
335 weak_room,
336 state: RwLock::new(state),
337 all_events: all_events_cache,
338 sender,
339 pagination_batch_token_notifier: Default::default(),
340 auto_shrink_sender,
341 pagination_status,
342 }
343 }
344
345 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
346 if account_data.is_empty() {
347 return;
348 }
349
350 let mut handled_read_marker = false;
351
352 trace!("Handling account data");
353
354 for raw_event in account_data {
355 match raw_event.deserialize() {
356 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
357 if handled_read_marker {
360 continue;
361 }
362
363 handled_read_marker = true;
364
365 let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
367 event_id: ev.content.event_id,
368 });
369 }
370
371 Ok(_) => {
372 }
375
376 Err(e) => {
377 let event_type = raw_event.get_field::<String>("type").ok().flatten();
378 warn!(event_type, "Failed to deserialize account data: {e}");
379 }
380 }
381 }
382 }
383
384 #[instrument(skip_all, fields(room_id = %self.room_id))]
385 pub(super) async fn handle_joined_room_update(
386 &self,
387 has_storage: bool,
388 updates: JoinedRoomUpdate,
389 ) -> Result<()> {
390 self.handle_timeline(
391 has_storage,
392 updates.timeline,
393 updates.ephemeral.clone(),
394 updates.ambiguity_changes,
395 )
396 .await?;
397
398 self.handle_account_data(updates.account_data);
399
400 Ok(())
401 }
402
403 async fn handle_timeline(
404 &self,
405 has_storage: bool,
406 timeline: Timeline,
407 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
408 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
409 ) -> Result<()> {
410 if !has_storage && timeline.limited {
411 trace!("limited timeline, clearing all previous events and pushing new events");
415
416 self.replace_all_events_by(
417 timeline.events,
418 timeline.prev_batch,
419 ephemeral_events,
420 ambiguity_changes,
421 EventsOrigin::Sync,
422 )
423 .await?;
424 } else {
425 trace!("adding new events");
427
428 let prev_batch =
436 if has_storage && !timeline.limited { None } else { timeline.prev_batch };
437
438 let mut state = self.state.write().await;
439 self.append_events_locked(
440 &mut state,
441 timeline.events,
442 prev_batch,
443 ephemeral_events,
444 ambiguity_changes,
445 )
446 .await?;
447 }
448
449 Ok(())
450 }
451
452 #[instrument(skip_all, fields(room_id = %self.room_id))]
453 pub(super) async fn handle_left_room_update(
454 &self,
455 has_storage: bool,
456 updates: LeftRoomUpdate,
457 ) -> Result<()> {
458 self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
459 .await?;
460 Ok(())
461 }
462
463 pub(super) async fn replace_all_events_by(
466 &self,
467 timeline_events: Vec<TimelineEvent>,
468 prev_batch: Option<String>,
469 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
470 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
471 events_origin: EventsOrigin,
472 ) -> Result<()> {
473 let mut state = self.state.write().await;
475
476 let updates_as_vector_diffs = state.reset().await?;
478
479 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
481 diffs: updates_as_vector_diffs,
482 origin: events_origin,
483 });
484
485 self.append_events_locked(
487 &mut state,
488 timeline_events,
489 prev_batch.clone(),
490 ephemeral_events,
491 ambiguity_changes,
492 )
493 .await?;
494
495 Ok(())
496 }
497
498 async fn append_events_locked(
503 &self,
504 state: &mut RoomEventCacheState,
505 timeline_events: Vec<TimelineEvent>,
506 prev_batch: Option<String>,
507 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
508 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
509 ) -> Result<()> {
510 if timeline_events.is_empty()
511 && prev_batch.is_none()
512 && ephemeral_events.is_empty()
513 && ambiguity_changes.is_empty()
514 {
515 return Ok(());
516 }
517
518 let (
519 DeduplicationOutcome {
520 all_events: events,
521 in_memory_duplicated_event_ids,
522 in_store_duplicated_event_ids,
523 },
524 all_duplicates,
525 ) = state.collect_valid_and_duplicated_events(timeline_events).await?;
526
527 let timeline_event_diffs = if all_duplicates {
532 vec![]
534 } else {
535 let mut timeline_event_diffs = state
541 .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
542 .await?;
543
544 let new_timeline_event_diffs = state
547 .with_events_mut(|room_events| {
548 if !all_duplicates {
553 if let Some(prev_token) = &prev_batch {
554 let prev_chunk_to_remove =
557 room_events.rchunks().next().and_then(|chunk| {
558 (chunk.is_items() && chunk.num_items() == 0)
559 .then_some(chunk.identifier())
560 });
561
562 room_events.push_gap(Gap { prev_token: prev_token.clone() });
563
564 if let Some(prev_chunk_to_remove) = prev_chunk_to_remove {
565 room_events.remove_empty_chunk_at(prev_chunk_to_remove).expect(
566 "we just checked the chunk is there, and it's an empty item chunk",
567 );
568 }
569 }
570 }
571
572 room_events.push_events(events.clone());
573
574 events.clone()
575 })
576 .await?;
577
578 timeline_event_diffs.extend(new_timeline_event_diffs);
579
580 if prev_batch.is_some() && !all_duplicates {
581 if let Some(diffs) = state.shrink_to_last_chunk().await? {
589 timeline_event_diffs = diffs;
592 }
593 }
594
595 {
596 let mut all_events_cache = self.all_events.write().await;
598
599 for event in events {
600 if let Some(event_id) = event.event_id() {
601 all_events_cache.append_related_event(&event);
602 all_events_cache
603 .events
604 .insert(event_id.to_owned(), (self.room_id.clone(), event));
605 }
606 }
607 }
608
609 timeline_event_diffs
610 };
611
612 if prev_batch.is_some() {
615 self.pagination_batch_token_notifier.notify_one();
616 }
617
618 {
620 if !timeline_event_diffs.is_empty() {
621 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
622 diffs: timeline_event_diffs,
623 origin: EventsOrigin::Sync,
624 });
625 }
626
627 if !ephemeral_events.is_empty() {
628 let _ = self
629 .sender
630 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
631 }
632
633 if !ambiguity_changes.is_empty() {
634 let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
635 }
636 }
637
638 Ok(())
639 }
640}
641
642#[derive(Debug)]
645pub(super) enum LoadMoreEventsBackwardsOutcome {
646 Gap {
648 prev_token: Option<String>,
651 },
652
653 StartOfTimeline,
655
656 Events {
658 events: Vec<TimelineEvent>,
659 timeline_event_diffs: Vec<VectorDiff<TimelineEvent>>,
660 reached_start: bool,
661 },
662
663 WaitForInitialPrevToken,
665}
666
667mod private {
669 use std::sync::{atomic::AtomicUsize, Arc};
670
671 use eyeball::SharedObservable;
672 use eyeball_im::VectorDiff;
673 use matrix_sdk_base::{
674 apply_redaction,
675 deserialized_responses::{TimelineEvent, TimelineEventKind},
676 event_cache::{store::EventCacheStoreLock, Event, Gap},
677 linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifierGenerator, Position, Update},
678 };
679 use matrix_sdk_common::executor::spawn;
680 use once_cell::sync::OnceCell;
681 use ruma::{
682 events::{
683 room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent, MessageLikeEventType,
684 },
685 serde::Raw,
686 EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
687 };
688 use tracing::{debug, error, instrument, trace, warn};
689
690 use super::{
691 super::{
692 deduplicator::{DeduplicationOutcome, Deduplicator},
693 EventCacheError,
694 },
695 events::RoomEvents,
696 sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
697 };
698 use crate::event_cache::RoomPaginationStatus;
699
700 pub struct RoomEventCacheState {
705 room: OwnedRoomId,
707
708 room_version: RoomVersionId,
710
711 store: Arc<OnceCell<EventCacheStoreLock>>,
716
717 events: RoomEvents,
719
720 deduplicator: Deduplicator,
722
723 pub waited_for_initial_prev_token: bool,
728
729 pagination_status: SharedObservable<RoomPaginationStatus>,
730
731 pub(super) listener_count: Arc<AtomicUsize>,
734 }
735
736 impl RoomEventCacheState {
737 pub async fn new(
747 room_id: OwnedRoomId,
748 room_version: RoomVersionId,
749 store: Arc<OnceCell<EventCacheStoreLock>>,
750 pagination_status: SharedObservable<RoomPaginationStatus>,
751 ) -> Result<Self, EventCacheError> {
752 let (events, deduplicator) = if let Some(store) = store.get() {
753 let store_lock = store.lock().await?;
754
755 let linked_chunk = match store_lock
756 .load_last_chunk(&room_id)
757 .await
758 .map_err(EventCacheError::from)
759 .and_then(|(last_chunk, chunk_identifier_generator)| {
760 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
761 .map_err(EventCacheError::from)
762 }) {
763 Ok(linked_chunk) => linked_chunk,
764
765 Err(err) => {
766 error!("error when reloading a linked chunk from memory: {err}");
767
768 store_lock
770 .handle_linked_chunk_updates(&room_id, vec![Update::Clear])
771 .await?;
772
773 None
775 }
776 };
777
778 (
779 RoomEvents::with_initial_linked_chunk(linked_chunk),
780 Deduplicator::new_store_based(room_id.clone(), store.clone()),
781 )
782 } else {
783 (RoomEvents::default(), Deduplicator::new_memory_based())
784 };
785
786 Ok(Self {
787 room: room_id,
788 room_version,
789 store,
790 events,
791 deduplicator,
792 waited_for_initial_prev_token: false,
793 listener_count: Default::default(),
794 pagination_status,
795 })
796 }
797
798 pub async fn collect_valid_and_duplicated_events(
825 &mut self,
826 events: Vec<Event>,
827 ) -> Result<(DeduplicationOutcome, bool), EventCacheError> {
828 let deduplication_outcome =
829 self.deduplicator.filter_duplicate_events(events, &self.events).await?;
830
831 let number_of_events = deduplication_outcome.all_events.len();
832 let number_of_deduplicated_events =
833 deduplication_outcome.in_memory_duplicated_event_ids.len()
834 + deduplication_outcome.in_store_duplicated_event_ids.len();
835
836 let all_duplicates =
837 number_of_events > 0 && number_of_events == number_of_deduplicated_events;
838
839 Ok((deduplication_outcome, all_duplicates))
840 }
841
842 fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
845 if self.events.events().next().is_some() {
850 LoadMoreEventsBackwardsOutcome::StartOfTimeline
853 } else if !self.waited_for_initial_prev_token {
854 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
856 } else {
857 LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
861 }
862 }
863
864 pub(in super::super) async fn load_more_events_backwards(
866 &mut self,
867 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
868 let Some(store) = self.store.get() else {
869 if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
876 return Ok(LoadMoreEventsBackwardsOutcome::Gap {
877 prev_token: Some(prev_token),
878 });
879 }
880
881 return Ok(self.conclude_load_more_for_fully_loaded_chunk());
882 };
883
884 if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
887 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
888 }
889
890 let first_chunk_identifier =
893 self.events.chunks().next().expect("a linked chunk is never empty").identifier();
894
895 let store = store.lock().await?;
896
897 let new_first_chunk =
899 match store.load_previous_chunk(&self.room, first_chunk_identifier).await {
900 Ok(Some(new_first_chunk)) => {
901 new_first_chunk
903 }
904
905 Ok(None) => {
906 return Ok(self.conclude_load_more_for_fully_loaded_chunk());
908 }
909
910 Err(err) => {
911 error!("error when loading the previous chunk of a linked chunk: {err}");
912
913 store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
915
916 return Err(err.into());
918 }
919 };
920
921 let chunk_content = new_first_chunk.content.clone();
922
923 let reached_start = new_first_chunk.previous.is_none();
926
927 if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
928 error!("error when inserting the previous chunk into its linked chunk: {err}");
929
930 store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
932
933 return Err(err.into());
935 };
936
937 let _ = self.events.store_updates().take();
940
941 let timeline_event_diffs = self.events.updates_as_vector_diffs();
943
944 Ok(match chunk_content {
945 ChunkContent::Gap(gap) => {
946 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
947 }
948
949 ChunkContent::Items(events) => LoadMoreEventsBackwardsOutcome::Events {
950 events,
951 timeline_event_diffs,
952 reached_start,
953 },
954 })
955 }
956
957 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
966 pub(super) async fn shrink_to_last_chunk(
967 &mut self,
968 ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
969 let Some(store) = self.store.get() else {
970 return Ok(None);
973 };
974
975 let store_lock = store.lock().await?;
976
977 let (last_chunk, chunk_identifier_generator) = match store_lock
979 .load_last_chunk(&self.room)
980 .await
981 {
982 Ok(pair) => pair,
983
984 Err(err) => {
985 error!("error when reloading a linked chunk from memory: {err}");
987
988 store_lock.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
990
991 (None, ChunkIdentifierGenerator::new_from_scratch())
993 }
994 };
995
996 debug!("unloading the linked chunk, and resetting it to its last chunk");
997
998 if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) {
1001 error!("error when replacing the linked chunk: {err}");
1002 return self.reset().await.map(Some);
1003 }
1004
1005 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1009
1010 let _ = self.events.store_updates().take();
1013
1014 let diffs = self.events.updates_as_vector_diffs();
1017 assert!(matches!(diffs[0], VectorDiff::Clear));
1018
1019 Ok(Some(diffs))
1020 }
1021
1022 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1025 pub(crate) async fn auto_shrink_if_no_listeners(
1026 &mut self,
1027 ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
1028 let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst);
1029
1030 trace!(listener_count, "received request to auto-shrink");
1031
1032 if listener_count == 0 {
1033 self.shrink_to_last_chunk().await
1036 } else {
1037 Ok(None)
1038 }
1039 }
1040
1041 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1045 let mut closure = || -> Option<()> {
1049 let mut val: serde_json::Value = event.deserialize_as().ok()?;
1050 let unsigned = val.get_mut("unsigned")?;
1051 let unsigned_obj = unsigned.as_object_mut()?;
1052 if unsigned_obj.remove("m.relations").is_some() {
1053 *event = Raw::new(&val).ok()?.cast();
1054 }
1055 None
1056 };
1057 let _ = closure();
1058 }
1059
1060 fn strip_relations_from_event(ev: &mut TimelineEvent) {
1061 match &mut ev.kind {
1062 TimelineEventKind::Decrypted(decrypted) => {
1063 decrypted.unsigned_encryption_info = None;
1066
1067 Self::strip_relations_if_present(&mut decrypted.event);
1069 }
1070
1071 TimelineEventKind::UnableToDecrypt { event, .. }
1072 | TimelineEventKind::PlainText { event } => {
1073 Self::strip_relations_if_present(event);
1074 }
1075 }
1076 }
1077
1078 fn strip_relations_from_events(items: &mut [TimelineEvent]) {
1080 for ev in items.iter_mut() {
1081 Self::strip_relations_from_event(ev);
1082 }
1083 }
1084
1085 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1091 pub(crate) async fn remove_events(
1092 &mut self,
1093 in_memory_events: Vec<(OwnedEventId, Position)>,
1094 in_store_events: Vec<(OwnedEventId, Position)>,
1095 ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1096 {
1098 let mut positions = in_store_events
1099 .into_iter()
1100 .map(|(_event_id, position)| position)
1101 .collect::<Vec<_>>();
1102
1103 sort_positions_descending(&mut positions);
1104
1105 self.send_updates_to_store(
1106 positions
1107 .into_iter()
1108 .map(|position| Update::RemoveItem { at: position })
1109 .collect(),
1110 )
1111 .await?;
1112 }
1113
1114 let timeline_event_diffs = self
1116 .with_events_mut(|room_events| {
1117 room_events
1119 .remove_events_by_position(
1120 in_memory_events
1121 .into_iter()
1122 .map(|(_event_id, position)| position)
1123 .collect(),
1124 )
1125 .expect("failed to remove an event");
1126
1127 vec![]
1128 })
1129 .await?;
1130
1131 Ok(timeline_event_diffs)
1132 }
1133
1134 #[instrument(skip_all)]
1136 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1137 let updates = self.events.store_updates().take();
1138 self.send_updates_to_store(updates).await
1139 }
1140
1141 pub async fn send_updates_to_store(
1142 &mut self,
1143 mut updates: Vec<Update<TimelineEvent, Gap>>,
1144 ) -> Result<(), EventCacheError> {
1145 let Some(store) = self.store.get() else {
1146 return Ok(());
1147 };
1148
1149 if updates.is_empty() {
1150 return Ok(());
1151 }
1152
1153 for update in updates.iter_mut() {
1155 match update {
1156 Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1157 Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1158 Update::NewItemsChunk { .. }
1160 | Update::NewGapChunk { .. }
1161 | Update::RemoveChunk(_)
1162 | Update::RemoveItem { .. }
1163 | Update::DetachLastItems { .. }
1164 | Update::StartReattachItems
1165 | Update::EndReattachItems
1166 | Update::Clear => {}
1167 }
1168 }
1169
1170 let store = store.clone();
1177 let room_id = self.room.clone();
1178
1179 spawn(async move {
1180 let store = store.lock().await?;
1181
1182 trace!(%room_id, ?updates, "sending linked chunk updates to the store");
1183 store.handle_linked_chunk_updates(&room_id, updates).await?;
1184 trace!("linked chunk updates applied");
1185
1186 super::Result::Ok(())
1187 })
1188 .await
1189 .expect("joining failed")?;
1190
1191 Ok(())
1192 }
1193
1194 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1200 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1201 self.events.reset();
1202
1203 self.propagate_changes().await?;
1204
1205 self.waited_for_initial_prev_token = false;
1209 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1211
1212 let diff_updates = self.events.updates_as_vector_diffs();
1213
1214 debug_assert_eq!(diff_updates.len(), 1);
1216 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1217
1218 Ok(diff_updates)
1219 }
1220
1221 pub fn events(&self) -> &RoomEvents {
1223 &self.events
1224 }
1225
1226 pub async fn find_event(
1231 &self,
1232 event_id: &EventId,
1233 ) -> Result<Option<(EventLocation, Position, TimelineEvent)>, EventCacheError> {
1234 let room_id = self.room.as_ref();
1235
1236 for (position, event) in self.events().revents() {
1239 if event.event_id().as_deref() == Some(event_id) {
1240 return Ok(Some((EventLocation::Memory, position, event.clone())));
1241 }
1242 }
1243
1244 let Some(store) = self.store.get() else {
1245 return Ok(None);
1247 };
1248
1249 let store = store.lock().await?;
1250
1251 Ok(store
1252 .find_event(room_id, event_id)
1253 .await?
1254 .map(|(position, event)| (EventLocation::Store, position, event)))
1255 }
1256
1257 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1268 pub async fn with_events_mut<F>(
1269 &mut self,
1270 func: F,
1271 ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError>
1272 where
1273 F: FnOnce(&mut RoomEvents) -> Vec<TimelineEvent>,
1274 {
1275 let events_to_post_process = func(&mut self.events);
1276
1277 self.propagate_changes().await?;
1279
1280 for event in &events_to_post_process {
1281 self.maybe_apply_new_redaction(event).await?;
1282 }
1283
1284 if !self.waited_for_initial_prev_token
1287 && self.events.chunks().any(|chunk| chunk.is_gap())
1288 {
1289 self.waited_for_initial_prev_token = true;
1290 }
1291
1292 let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
1293
1294 Ok(updates_as_vector_diffs)
1295 }
1296
1297 #[instrument(skip_all)]
1301 async fn maybe_apply_new_redaction(
1302 &mut self,
1303 event: &Event,
1304 ) -> Result<(), EventCacheError> {
1305 let raw_event = event.raw();
1306
1307 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1310 raw_event.get_field::<MessageLikeEventType>("type")
1311 else {
1312 return Ok(());
1313 };
1314
1315 let Ok(AnySyncTimelineEvent::MessageLike(
1318 ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction),
1319 )) = event.raw().deserialize()
1320 else {
1321 return Ok(());
1322 };
1323
1324 let Some(event_id) = redaction.redacts(&self.room_version) else {
1325 warn!("missing target event id from the redaction event");
1326 return Ok(());
1327 };
1328
1329 if let Some((location, position, target_event)) = self.find_event(event_id).await? {
1331 if let Ok(deserialized) = target_event.raw().deserialize() {
1333 match deserialized {
1334 AnySyncTimelineEvent::MessageLike(ev) => {
1335 if ev.is_redacted() {
1336 return Ok(());
1337 }
1338 }
1339 AnySyncTimelineEvent::State(ev) => {
1340 if ev.is_redacted() {
1341 return Ok(());
1342 }
1343 }
1344 }
1345 }
1346
1347 if let Some(redacted_event) = apply_redaction(
1348 target_event.raw(),
1349 event.raw().cast_ref::<SyncRoomRedactionEvent>(),
1350 &self.room_version,
1351 ) {
1352 let mut copy = target_event.clone();
1353
1354 copy.replace_raw(redacted_event.cast());
1359
1360 match location {
1361 EventLocation::Memory => {
1362 self.events
1363 .replace_event_at(position, copy)
1364 .expect("should have been a valid position of an item");
1365 }
1366 EventLocation::Store => {
1367 self.send_updates_to_store(vec![Update::ReplaceItem {
1368 at: position,
1369 item: copy,
1370 }])
1371 .await?;
1372 }
1373 }
1374 }
1375 } else {
1376 trace!("redacted event is missing from the linked chunk");
1377 }
1378
1379 Ok(())
1382 }
1383 }
1384}
1385
1386pub(super) enum EventLocation {
1388 Memory,
1390
1391 Store,
1393}
1394
1395pub(super) use private::RoomEventCacheState;
1396
1397#[cfg(test)]
1398mod tests {
1399 use std::sync::Arc;
1400
1401 use assert_matches::assert_matches;
1402 use assert_matches2::assert_let;
1403 use matrix_sdk_base::{
1404 event_cache::{
1405 store::{EventCacheStore as _, MemoryStore},
1406 Gap,
1407 },
1408 linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1409 store::StoreConfig,
1410 sync::{JoinedRoomUpdate, Timeline},
1411 };
1412 use matrix_sdk_common::deserialized_responses::TimelineEvent;
1413 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
1414 use ruma::{
1415 event_id,
1416 events::{
1417 relation::RelationType, room::message::RoomMessageEventContentWithoutRelation,
1418 AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1419 },
1420 room_id, user_id, RoomId,
1421 };
1422
1423 use crate::test_utils::{client::MockClientBuilder, logged_in_client};
1424
1425 #[async_test]
1426 async fn test_event_with_redaction_relation() {
1427 let original_id = event_id!("$original");
1428 let related_id = event_id!("$related");
1429 let room_id = room_id!("!galette:saucisse.bzh");
1430 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1431
1432 assert_relations(
1433 room_id,
1434 f.text_msg("Original event").event_id(original_id).into(),
1435 f.redaction(original_id).event_id(related_id).into(),
1436 f,
1437 )
1438 .await;
1439 }
1440
1441 #[async_test]
1442 async fn test_event_with_edit_relation() {
1443 let original_id = event_id!("$original");
1444 let related_id = event_id!("$related");
1445 let room_id = room_id!("!galette:saucisse.bzh");
1446 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1447
1448 assert_relations(
1449 room_id,
1450 f.text_msg("Original event").event_id(original_id).into(),
1451 f.text_msg("* An edited event")
1452 .edit(
1453 original_id,
1454 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1455 )
1456 .event_id(related_id)
1457 .into(),
1458 f,
1459 )
1460 .await;
1461 }
1462
1463 #[async_test]
1464 async fn test_event_with_reply_relation() {
1465 let original_id = event_id!("$original");
1466 let related_id = event_id!("$related");
1467 let room_id = room_id!("!galette:saucisse.bzh");
1468 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1469
1470 assert_relations(
1471 room_id,
1472 f.text_msg("Original event").event_id(original_id).into(),
1473 f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
1474 f,
1475 )
1476 .await;
1477 }
1478
1479 #[async_test]
1480 async fn test_event_with_thread_reply_relation() {
1481 let original_id = event_id!("$original");
1482 let related_id = event_id!("$related");
1483 let room_id = room_id!("!galette:saucisse.bzh");
1484 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1485
1486 assert_relations(
1487 room_id,
1488 f.text_msg("Original event").event_id(original_id).into(),
1489 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
1490 f,
1491 )
1492 .await;
1493 }
1494
1495 #[async_test]
1496 async fn test_event_with_reaction_relation() {
1497 let original_id = event_id!("$original");
1498 let related_id = event_id!("$related");
1499 let room_id = room_id!("!galette:saucisse.bzh");
1500 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1501
1502 assert_relations(
1503 room_id,
1504 f.text_msg("Original event").event_id(original_id).into(),
1505 f.reaction(original_id, ":D").event_id(related_id).into(),
1506 f,
1507 )
1508 .await;
1509 }
1510
1511 #[async_test]
1512 async fn test_event_with_poll_response_relation() {
1513 let original_id = event_id!("$original");
1514 let related_id = event_id!("$related");
1515 let room_id = room_id!("!galette:saucisse.bzh");
1516 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1517
1518 assert_relations(
1519 room_id,
1520 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1521 .event_id(original_id)
1522 .into(),
1523 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
1524 f,
1525 )
1526 .await;
1527 }
1528
1529 #[async_test]
1530 async fn test_event_with_poll_end_relation() {
1531 let original_id = event_id!("$original");
1532 let related_id = event_id!("$related");
1533 let room_id = room_id!("!galette:saucisse.bzh");
1534 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1535
1536 assert_relations(
1537 room_id,
1538 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1539 .event_id(original_id)
1540 .into(),
1541 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
1542 f,
1543 )
1544 .await;
1545 }
1546
1547 #[async_test]
1548 async fn test_event_with_filtered_relationships() {
1549 let original_id = event_id!("$original");
1550 let related_id = event_id!("$related");
1551 let associated_related_id = event_id!("$recursive_related");
1552 let room_id = room_id!("!galette:saucisse.bzh");
1553 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1554
1555 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1556 let related_event = event_factory
1557 .text_msg("* Edited event")
1558 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1559 .event_id(related_id)
1560 .into();
1561 let associated_related_event =
1562 event_factory.redaction(related_id).event_id(associated_related_id).into();
1563
1564 let client = logged_in_client(None).await;
1565
1566 let event_cache = client.event_cache();
1567 event_cache.subscribe().unwrap();
1568
1569 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1570 let room = client.get_room(room_id).unwrap();
1571
1572 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1573
1574 room_event_cache.save_event(original_event).await;
1576
1577 room_event_cache.save_event(related_event).await;
1579
1580 room_event_cache.save_event(associated_related_event).await;
1582
1583 let filter = Some(vec![RelationType::Replacement]);
1584 let (event, related_events) =
1585 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1586 let cached_event_id = event.event_id().unwrap();
1588 assert_eq!(cached_event_id, original_id);
1589
1590 assert_eq!(related_events.len(), 2);
1592
1593 let related_event_id = related_events[0].event_id().unwrap();
1594 assert_eq!(related_event_id, related_id);
1595 let related_event_id = related_events[1].event_id().unwrap();
1596 assert_eq!(related_event_id, associated_related_id);
1597
1598 let filter = Some(vec![RelationType::Thread]);
1600 let (event, related_events) =
1601 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1602 let cached_event_id = event.event_id().unwrap();
1604 assert_eq!(cached_event_id, original_id);
1605 assert!(related_events.is_empty());
1607 }
1608
1609 #[async_test]
1610 async fn test_event_with_recursive_relation() {
1611 let original_id = event_id!("$original");
1612 let related_id = event_id!("$related");
1613 let associated_related_id = event_id!("$recursive_related");
1614 let room_id = room_id!("!galette:saucisse.bzh");
1615 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1616
1617 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1618 let related_event = event_factory
1619 .text_msg("* Edited event")
1620 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1621 .event_id(related_id)
1622 .into();
1623 let associated_related_event =
1624 event_factory.redaction(related_id).event_id(associated_related_id).into();
1625
1626 let client = logged_in_client(None).await;
1627
1628 let event_cache = client.event_cache();
1629 event_cache.subscribe().unwrap();
1630
1631 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1632 let room = client.get_room(room_id).unwrap();
1633
1634 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1635
1636 room_event_cache.save_event(original_event).await;
1638
1639 room_event_cache.save_event(related_event).await;
1641
1642 room_event_cache.save_event(associated_related_event).await;
1644
1645 let (event, related_events) =
1646 room_event_cache.event_with_relations(original_id, None).await.unwrap();
1647 let cached_event_id = event.event_id().unwrap();
1649 assert_eq!(cached_event_id, original_id);
1650
1651 assert_eq!(related_events.len(), 2);
1653
1654 let related_event_id = related_events[0].event_id().unwrap();
1655 assert_eq!(related_event_id, related_id);
1656 let related_event_id = related_events[1].event_id().unwrap();
1657 assert_eq!(related_event_id, associated_related_id);
1658 }
1659
1660 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1662 async fn test_write_to_storage() {
1663 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1664
1665 let room_id = room_id!("!galette:saucisse.bzh");
1666 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1667
1668 let event_cache_store = Arc::new(MemoryStore::new());
1669
1670 let client = MockClientBuilder::new("http://localhost".to_owned())
1671 .store_config(
1672 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1673 )
1674 .build()
1675 .await;
1676
1677 let event_cache = client.event_cache();
1678
1679 event_cache.subscribe().unwrap();
1681 event_cache.enable_storage().unwrap();
1682
1683 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1684 let room = client.get_room(room_id).unwrap();
1685
1686 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1687
1688 let timeline = Timeline {
1690 limited: true,
1691 prev_batch: Some("raclette".to_owned()),
1692 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1693 };
1694
1695 room_event_cache
1696 .inner
1697 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1698 .await
1699 .unwrap();
1700
1701 let linked_chunk =
1702 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1703 .unwrap()
1704 .unwrap();
1705
1706 assert_eq!(linked_chunk.chunks().count(), 2);
1707
1708 let mut chunks = linked_chunk.chunks();
1709
1710 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1712 assert_eq!(gap.prev_token, "raclette");
1713 });
1714
1715 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1717 assert_eq!(events.len(), 1);
1718 let deserialized = events[0].raw().deserialize().unwrap();
1719 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1720 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1721 });
1722
1723 assert!(chunks.next().is_none());
1725 }
1726
1727 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1729 async fn test_write_to_storage_strips_bundled_relations() {
1730 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1731 use ruma::events::BundledMessageLikeRelations;
1732
1733 let room_id = room_id!("!galette:saucisse.bzh");
1734 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1735
1736 let event_cache_store = Arc::new(MemoryStore::new());
1737
1738 let client = MockClientBuilder::new("http://localhost".to_owned())
1739 .store_config(
1740 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1741 )
1742 .build()
1743 .await;
1744
1745 let event_cache = client.event_cache();
1746
1747 event_cache.subscribe().unwrap();
1749 event_cache.enable_storage().unwrap();
1750
1751 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1752 let room = client.get_room(room_id).unwrap();
1753
1754 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1755
1756 let mut relations = BundledMessageLikeRelations::new();
1758 relations.replace =
1759 Some(Box::new(f.text_msg("Hello, Kind Sir").sender(*ALICE).into_raw_sync()));
1760 let ev = f.text_msg("hey yo").sender(*ALICE).bundled_relations(relations).into_event();
1761
1762 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1763
1764 room_event_cache
1765 .inner
1766 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1767 .await
1768 .unwrap();
1769
1770 {
1772 let (events, _) = room_event_cache.subscribe().await;
1773
1774 assert_eq!(events.len(), 1);
1775
1776 let ev = events[0].raw().deserialize().unwrap();
1777 assert_let!(
1778 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1779 );
1780
1781 let original = msg.as_original().unwrap();
1782 assert_eq!(original.content.body(), "hey yo");
1783 assert!(original.unsigned.relations.replace.is_some());
1784 }
1785
1786 let linked_chunk =
1788 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1789 .unwrap()
1790 .unwrap();
1791
1792 assert_eq!(linked_chunk.chunks().count(), 1);
1793
1794 let mut chunks = linked_chunk.chunks();
1795 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1796 assert_eq!(events.len(), 1);
1797
1798 let ev = events[0].raw().deserialize().unwrap();
1799 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1800
1801 let original = msg.as_original().unwrap();
1802 assert_eq!(original.content.body(), "hey yo");
1803 assert!(original.unsigned.relations.replace.is_none());
1804 });
1805
1806 assert!(chunks.next().is_none());
1808 }
1809
1810 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1812 async fn test_clear() {
1813 use eyeball_im::VectorDiff;
1814 use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1815
1816 use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
1817
1818 let room_id = room_id!("!galette:saucisse.bzh");
1819 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1820
1821 let event_cache_store = Arc::new(MemoryStore::new());
1822
1823 let event_id1 = event_id!("$1");
1824 let event_id2 = event_id!("$2");
1825
1826 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1827 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1828
1829 event_cache_store
1831 .handle_linked_chunk_updates(
1832 room_id,
1833 vec![
1834 Update::NewItemsChunk {
1836 previous: None,
1837 new: ChunkIdentifier::new(0),
1838 next: None,
1839 },
1840 Update::NewGapChunk {
1842 previous: Some(ChunkIdentifier::new(0)),
1843 new: ChunkIdentifier::new(42),
1845 next: None,
1846 gap: Gap { prev_token: "comté".to_owned() },
1847 },
1848 Update::NewItemsChunk {
1850 previous: Some(ChunkIdentifier::new(42)),
1851 new: ChunkIdentifier::new(1),
1852 next: None,
1853 },
1854 Update::PushItems {
1855 at: Position::new(ChunkIdentifier::new(1), 0),
1856 items: vec![ev1.clone()],
1857 },
1858 Update::NewItemsChunk {
1860 previous: Some(ChunkIdentifier::new(1)),
1861 new: ChunkIdentifier::new(2),
1862 next: None,
1863 },
1864 Update::PushItems {
1865 at: Position::new(ChunkIdentifier::new(2), 0),
1866 items: vec![ev2.clone()],
1867 },
1868 ],
1869 )
1870 .await
1871 .unwrap();
1872
1873 let client = MockClientBuilder::new("http://localhost".to_owned())
1874 .store_config(
1875 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1876 )
1877 .build()
1878 .await;
1879
1880 let event_cache = client.event_cache();
1881
1882 event_cache.subscribe().unwrap();
1884 event_cache.enable_storage().unwrap();
1885
1886 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1887 let room = client.get_room(room_id).unwrap();
1888
1889 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1890
1891 let (items, mut stream) = room_event_cache.subscribe().await;
1892
1893 {
1895 assert!(room_event_cache.event(event_id1).await.is_some());
1896 assert!(room_event_cache.event(event_id2).await.is_some());
1897 }
1898
1899 {
1901 assert_eq!(items.len(), 1);
1903 assert_eq!(items[0].event_id().unwrap(), event_id2);
1904
1905 assert!(stream.is_empty());
1906 }
1907
1908 {
1910 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1911
1912 assert_let_timeout!(
1913 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1914 );
1915 assert_eq!(diffs.len(), 1);
1916 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1917 assert_eq!(event.event_id().unwrap(), event_id1);
1919 });
1920
1921 assert!(stream.is_empty());
1922 }
1923
1924 room_event_cache.clear().await.unwrap();
1926
1927 assert_let_timeout!(
1929 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1930 );
1931 assert_eq!(diffs.len(), 1);
1932 assert_let!(VectorDiff::Clear = &diffs[0]);
1933
1934 assert!(room_event_cache.event(event_id1).await.is_none());
1936
1937 let (items, _) = room_event_cache.subscribe().await;
1938 assert!(items.is_empty());
1939
1940 let linked_chunk =
1942 from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1943 .unwrap()
1944 .unwrap();
1945
1946 assert_eq!(linked_chunk.num_items(), 0);
1950 }
1951
1952 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1954 async fn test_load_from_storage() {
1955 use eyeball_im::VectorDiff;
1956
1957 use super::RoomEventCacheUpdate;
1958 use crate::assert_let_timeout;
1959
1960 let room_id = room_id!("!galette:saucisse.bzh");
1961 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1962
1963 let event_cache_store = Arc::new(MemoryStore::new());
1964
1965 let event_id1 = event_id!("$1");
1966 let event_id2 = event_id!("$2");
1967
1968 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1969 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1970
1971 event_cache_store
1973 .handle_linked_chunk_updates(
1974 room_id,
1975 vec![
1976 Update::NewItemsChunk {
1978 previous: None,
1979 new: ChunkIdentifier::new(0),
1980 next: None,
1981 },
1982 Update::NewGapChunk {
1984 previous: Some(ChunkIdentifier::new(0)),
1985 new: ChunkIdentifier::new(42),
1987 next: None,
1988 gap: Gap { prev_token: "cheddar".to_owned() },
1989 },
1990 Update::NewItemsChunk {
1992 previous: Some(ChunkIdentifier::new(42)),
1993 new: ChunkIdentifier::new(1),
1994 next: None,
1995 },
1996 Update::PushItems {
1997 at: Position::new(ChunkIdentifier::new(1), 0),
1998 items: vec![ev1.clone()],
1999 },
2000 Update::NewItemsChunk {
2002 previous: Some(ChunkIdentifier::new(1)),
2003 new: ChunkIdentifier::new(2),
2004 next: None,
2005 },
2006 Update::PushItems {
2007 at: Position::new(ChunkIdentifier::new(2), 0),
2008 items: vec![ev2.clone()],
2009 },
2010 ],
2011 )
2012 .await
2013 .unwrap();
2014
2015 let client = MockClientBuilder::new("http://localhost".to_owned())
2016 .store_config(
2017 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
2018 )
2019 .build()
2020 .await;
2021
2022 let event_cache = client.event_cache();
2023
2024 event_cache.subscribe().unwrap();
2026 event_cache.enable_storage().unwrap();
2027
2028 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2029 let room = client.get_room(room_id).unwrap();
2030
2031 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2032
2033 let (items, mut stream) = room_event_cache.subscribe().await;
2034
2035 assert_eq!(items.len(), 1);
2038 assert_eq!(items[0].event_id().unwrap(), event_id2);
2039 assert!(stream.is_empty());
2040
2041 assert!(room_event_cache.event(event_id1).await.is_some());
2043 assert!(room_event_cache.event(event_id2).await.is_some());
2044
2045 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2047
2048 assert_let_timeout!(
2049 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2050 );
2051 assert_eq!(diffs.len(), 1);
2052 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2053 assert_eq!(event.event_id().unwrap(), event_id1);
2054 });
2055
2056 assert!(stream.is_empty());
2057
2058 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2060 room_event_cache
2061 .inner
2062 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
2063 .await
2064 .unwrap();
2065
2066 let (items, _stream) = room_event_cache.subscribe().await;
2071 assert_eq!(items.len(), 2);
2072 assert_eq!(items[0].event_id().unwrap(), event_id1);
2073 assert_eq!(items[1].event_id().unwrap(), event_id2);
2074 }
2075
2076 #[cfg(not(target_arch = "wasm32"))] #[async_test]
2078 async fn test_load_from_storage_resilient_to_failure() {
2079 let room_id = room_id!("!fondue:patate.ch");
2080 let event_cache_store = Arc::new(MemoryStore::new());
2081
2082 let event = EventFactory::new()
2083 .room(room_id)
2084 .sender(user_id!("@ben:saucisse.bzh"))
2085 .text_msg("foo")
2086 .event_id(event_id!("$42"))
2087 .into_event();
2088
2089 event_cache_store
2091 .handle_linked_chunk_updates(
2092 room_id,
2093 vec![
2094 Update::NewItemsChunk {
2095 previous: None,
2096 new: ChunkIdentifier::new(0),
2097 next: None,
2098 },
2099 Update::PushItems {
2100 at: Position::new(ChunkIdentifier::new(0), 0),
2101 items: vec![event],
2102 },
2103 Update::NewItemsChunk {
2104 previous: Some(ChunkIdentifier::new(0)),
2105 new: ChunkIdentifier::new(1),
2106 next: Some(ChunkIdentifier::new(0)),
2107 },
2108 ],
2109 )
2110 .await
2111 .unwrap();
2112
2113 let client = MockClientBuilder::new("http://localhost".to_owned())
2114 .store_config(
2115 StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
2116 )
2117 .build()
2118 .await;
2119
2120 let event_cache = client.event_cache();
2121
2122 event_cache.subscribe().unwrap();
2124 event_cache.enable_storage().unwrap();
2125
2126 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2127 let room = client.get_room(room_id).unwrap();
2128
2129 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2130
2131 let (items, _stream) = room_event_cache.subscribe().await;
2132
2133 assert!(items.is_empty());
2136
2137 let raw_chunks = event_cache_store.load_all_chunks(room_id).await.unwrap();
2140 assert!(raw_chunks.is_empty());
2141 }
2142
2143 #[cfg(not(target_arch = "wasm32"))] #[async_test]
2145 async fn test_no_useless_gaps() {
2146 use crate::event_cache::room::LoadMoreEventsBackwardsOutcome;
2147
2148 let room_id = room_id!("!galette:saucisse.bzh");
2149
2150 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2151
2152 let event_cache = client.event_cache();
2153 event_cache.subscribe().unwrap();
2154
2155 let has_storage = true; event_cache.enable_storage().unwrap();
2157
2158 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2159 let room = client.get_room(room_id).unwrap();
2160 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2161
2162 let f = EventFactory::new().room(room_id).sender(*ALICE);
2163
2164 room_event_cache
2167 .inner
2168 .handle_joined_room_update(
2169 has_storage,
2170 JoinedRoomUpdate {
2171 timeline: Timeline {
2172 limited: true,
2173 prev_batch: Some("raclette".to_owned()),
2174 events: vec![f.text_msg("hey yo").into_event()],
2175 },
2176 ..Default::default()
2177 },
2178 )
2179 .await
2180 .unwrap();
2181
2182 {
2183 let mut state = room_event_cache.inner.state.write().await;
2184
2185 let mut num_gaps = 0;
2186 let mut num_events = 0;
2187
2188 for c in state.events().chunks() {
2189 match c.content() {
2190 ChunkContent::Items(items) => num_events += items.len(),
2191 ChunkContent::Gap(_) => num_gaps += 1,
2192 }
2193 }
2194
2195 assert_eq!(num_gaps, 0);
2198 assert_eq!(num_events, 1);
2199
2200 assert_matches!(
2202 state.load_more_events_backwards().await.unwrap(),
2203 LoadMoreEventsBackwardsOutcome::Gap { .. }
2204 );
2205
2206 num_gaps = 0;
2207 num_events = 0;
2208 for c in state.events().chunks() {
2209 match c.content() {
2210 ChunkContent::Items(items) => num_events += items.len(),
2211 ChunkContent::Gap(_) => num_gaps += 1,
2212 }
2213 }
2214
2215 assert_eq!(num_gaps, 1);
2217 assert_eq!(num_events, 1);
2218 }
2219
2220 room_event_cache
2223 .inner
2224 .handle_joined_room_update(
2225 has_storage,
2226 JoinedRoomUpdate {
2227 timeline: Timeline {
2228 limited: false,
2229 prev_batch: Some("fondue".to_owned()),
2230 events: vec![f.text_msg("sup").into_event()],
2231 },
2232 ..Default::default()
2233 },
2234 )
2235 .await
2236 .unwrap();
2237
2238 {
2239 let state = room_event_cache.inner.state.read().await;
2240
2241 let mut num_gaps = 0;
2242 let mut num_events = 0;
2243
2244 for c in state.events().chunks() {
2245 match c.content() {
2246 ChunkContent::Items(items) => num_events += items.len(),
2247 ChunkContent::Gap(gap) => {
2248 assert_eq!(gap.prev_token, "raclette");
2249 num_gaps += 1;
2250 }
2251 }
2252 }
2253
2254 assert_eq!(num_gaps, 1);
2256 assert_eq!(num_events, 2);
2257 }
2258 }
2259
2260 async fn assert_relations(
2261 room_id: &RoomId,
2262 original_event: TimelineEvent,
2263 related_event: TimelineEvent,
2264 event_factory: EventFactory,
2265 ) {
2266 let client = logged_in_client(None).await;
2267
2268 let event_cache = client.event_cache();
2269 event_cache.subscribe().unwrap();
2270
2271 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2272 let room = client.get_room(room_id).unwrap();
2273
2274 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2275
2276 let original_event_id = original_event.event_id().unwrap();
2278 room_event_cache.save_event(original_event).await;
2279
2280 let unrelated_id = event_id!("$2");
2282 room_event_cache
2283 .save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
2284 .await;
2285
2286 let related_id = related_event.event_id().unwrap();
2288 room_event_cache.save_event(related_event).await;
2289
2290 let (event, related_events) =
2291 room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
2292 let cached_event_id = event.event_id().unwrap();
2294 assert_eq!(cached_event_id, original_event_id);
2295
2296 assert_eq!(related_events.len(), 1);
2298 let related_event_id = related_events[0].event_id().unwrap();
2299 assert_eq!(related_event_id, related_id);
2300 }
2301
2302 #[cfg(not(target_arch = "wasm32"))] #[async_test]
2304 async fn test_shrink_to_last_chunk() {
2305 use eyeball_im::VectorDiff;
2306
2307 use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2308
2309 let room_id = room_id!("!galette:saucisse.bzh");
2310
2311 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2312
2313 let f = EventFactory::new().room(room_id);
2314
2315 let evid1 = event_id!("$1");
2316 let evid2 = event_id!("$2");
2317
2318 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2319 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2320
2321 {
2323 let store = client.event_cache_store();
2324 let store = store.lock().await.unwrap();
2325 store
2326 .handle_linked_chunk_updates(
2327 room_id,
2328 vec![
2329 Update::NewItemsChunk {
2330 previous: None,
2331 new: ChunkIdentifier::new(0),
2332 next: None,
2333 },
2334 Update::PushItems {
2335 at: Position::new(ChunkIdentifier::new(0), 0),
2336 items: vec![ev1],
2337 },
2338 Update::NewItemsChunk {
2339 previous: Some(ChunkIdentifier::new(0)),
2340 new: ChunkIdentifier::new(1),
2341 next: None,
2342 },
2343 Update::PushItems {
2344 at: Position::new(ChunkIdentifier::new(1), 0),
2345 items: vec![ev2],
2346 },
2347 ],
2348 )
2349 .await
2350 .unwrap();
2351 }
2352
2353 let event_cache = client.event_cache();
2354 event_cache.subscribe().unwrap();
2355 event_cache.enable_storage().unwrap();
2356
2357 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2358 let room = client.get_room(room_id).unwrap();
2359 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2360
2361 let (events, mut stream) = room_event_cache.subscribe().await;
2363 assert_eq!(events.len(), 1);
2364 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2365 assert!(stream.is_empty());
2366
2367 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2369 assert_eq!(outcome.events.len(), 1);
2370 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2371 assert!(outcome.reached_start);
2372
2373 assert_let_timeout!(
2375 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2376 );
2377 assert_eq!(diffs.len(), 1);
2378 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2379 assert_eq!(value.event_id().as_deref(), Some(evid1));
2380 });
2381
2382 assert!(stream.is_empty());
2383
2384 let diffs = room_event_cache
2386 .inner
2387 .state
2388 .write()
2389 .await
2390 .shrink_to_last_chunk()
2391 .await
2392 .expect("shrinking should succeed")
2393 .unwrap();
2394
2395 assert_eq!(diffs.len(), 2);
2397 assert_matches!(&diffs[0], VectorDiff::Clear);
2398 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
2399 assert_eq!(values.len(), 1);
2400 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
2401 });
2402
2403 assert!(stream.is_empty());
2404
2405 let (events, _) = room_event_cache.subscribe().await;
2407 assert_eq!(events.len(), 1);
2408 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2409
2410 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2413 assert_eq!(outcome.events.len(), 1);
2414 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2415 assert!(outcome.reached_start);
2416 }
2417
2418 #[cfg(not(target_arch = "wasm32"))] #[async_test]
2420 async fn test_auto_shrink_after_all_subscribers_are_gone() {
2421 use eyeball_im::VectorDiff;
2422 use tokio::task::yield_now;
2423
2424 use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2425
2426 let room_id = room_id!("!galette:saucisse.bzh");
2427
2428 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2429
2430 let f = EventFactory::new().room(room_id);
2431
2432 let evid1 = event_id!("$1");
2433 let evid2 = event_id!("$2");
2434
2435 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2436 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2437
2438 {
2440 let store = client.event_cache_store();
2441 let store = store.lock().await.unwrap();
2442 store
2443 .handle_linked_chunk_updates(
2444 room_id,
2445 vec![
2446 Update::NewItemsChunk {
2447 previous: None,
2448 new: ChunkIdentifier::new(0),
2449 next: None,
2450 },
2451 Update::PushItems {
2452 at: Position::new(ChunkIdentifier::new(0), 0),
2453 items: vec![ev1],
2454 },
2455 Update::NewItemsChunk {
2456 previous: Some(ChunkIdentifier::new(0)),
2457 new: ChunkIdentifier::new(1),
2458 next: None,
2459 },
2460 Update::PushItems {
2461 at: Position::new(ChunkIdentifier::new(1), 0),
2462 items: vec![ev2],
2463 },
2464 ],
2465 )
2466 .await
2467 .unwrap();
2468 }
2469
2470 let event_cache = client.event_cache();
2471 event_cache.subscribe().unwrap();
2472 event_cache.enable_storage().unwrap();
2473
2474 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2475 let room = client.get_room(room_id).unwrap();
2476 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2477
2478 let (events1, mut stream1) = room_event_cache.subscribe().await;
2480 assert_eq!(events1.len(), 1);
2481 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
2482 assert!(stream1.is_empty());
2483
2484 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2486 assert_eq!(outcome.events.len(), 1);
2487 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2488 assert!(outcome.reached_start);
2489
2490 assert_let_timeout!(
2493 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
2494 );
2495 assert_eq!(diffs.len(), 1);
2496 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2497 assert_eq!(value.event_id().as_deref(), Some(evid1));
2498 });
2499
2500 assert!(stream1.is_empty());
2501
2502 let (events2, stream2) = room_event_cache.subscribe().await;
2506 assert_eq!(events2.len(), 2);
2507 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
2508 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
2509 assert!(stream2.is_empty());
2510
2511 drop(stream1);
2513 yield_now().await;
2514
2515 assert!(stream2.is_empty());
2517
2518 drop(stream2);
2520 yield_now().await;
2521
2522 {
2525 let state = room_event_cache.inner.state.read().await;
2527 assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0);
2528 }
2529
2530 let (events3, _stream2) = room_event_cache.subscribe().await;
2532 assert_eq!(events3.len(), 1);
2533 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2534 }
2535}