matrix_sdk/event_cache/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All event cache types for a single room.
16
17use std::{
18    collections::BTreeMap,
19    fmt,
20    ops::{Deref, DerefMut},
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26
27use events::{sort_positions_descending, Gap};
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31    deserialized_responses::{AmbiguityChange, TimelineEvent},
32    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
33};
34use ruma::{
35    events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
36    serde::Raw,
37    EventId, OwnedEventId, OwnedRoomId,
38};
39use tokio::sync::{
40    broadcast::{Receiver, Sender},
41    mpsc, Notify, RwLock,
42};
43use tracing::{error, instrument, trace, warn};
44
45use super::{
46    deduplicator::DeduplicationOutcome, AllEventsCache, AutoShrinkChannelPayload, EventsOrigin,
47    Result, RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
48};
49use crate::{client::WeakClient, room::WeakRoom};
50
51pub(super) mod events;
52
53/// A subset of an event cache, for a room.
54///
55/// Cloning is shallow, and thus is cheap to do.
56#[derive(Clone)]
57pub struct RoomEventCache {
58    pub(super) inner: Arc<RoomEventCacheInner>,
59}
60
61impl fmt::Debug for RoomEventCache {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        f.debug_struct("RoomEventCache").finish_non_exhaustive()
64    }
65}
66
67/// Thin wrapper for a room event cache listener, so as to trigger side-effects
68/// when all listeners are gone.
69#[allow(missing_debug_implementations)]
70pub struct RoomEventCacheListener {
71    /// Underlying receiver of the room event cache's updates.
72    recv: Receiver<RoomEventCacheUpdate>,
73
74    /// To which room are we listening?
75    room_id: OwnedRoomId,
76
77    /// Sender to the auto-shrink channel.
78    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
79
80    /// Shared instance of the auto-shrinker.
81    listener_count: Arc<AtomicUsize>,
82}
83
84impl Drop for RoomEventCacheListener {
85    fn drop(&mut self) {
86        let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst);
87
88        trace!("dropping a room event cache listener; previous count: {previous_listener_count}");
89
90        if previous_listener_count == 1 {
91            // We were the last instance of the listener; let the auto-shrinker know by
92            // notifying it of our room id.
93
94            let mut room_id = self.room_id.clone();
95
96            // Try to send without waiting for channel capacity, and restart in a spin-loop
97            // if it failed (until a maximum number of attempts is reached, or
98            // the send was successful). The channel shouldn't be super busy in
99            // general, so this should resolve quickly enough.
100
101            let mut num_attempts = 0;
102
103            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
104                num_attempts += 1;
105
106                if num_attempts > 1024 {
107                    // If we've tried too many times, just give up with a warning; after all, this
108                    // is only an optimization.
109                    warn!("couldn't send notification to the auto-shrink channel after 1024 attempts; giving up");
110                    return;
111                }
112
113                match err {
114                    mpsc::error::TrySendError::Full(stolen_room_id) => {
115                        room_id = stolen_room_id;
116                    }
117                    mpsc::error::TrySendError::Closed(_) => return,
118                }
119            }
120
121            trace!("sent notification to the parent channel that we were the last listener");
122        }
123    }
124}
125
126impl Deref for RoomEventCacheListener {
127    type Target = Receiver<RoomEventCacheUpdate>;
128
129    fn deref(&self) -> &Self::Target {
130        &self.recv
131    }
132}
133
134impl DerefMut for RoomEventCacheListener {
135    fn deref_mut(&mut self) -> &mut Self::Target {
136        &mut self.recv
137    }
138}
139
140impl RoomEventCache {
141    /// Create a new [`RoomEventCache`] using the given room and store.
142    pub(super) fn new(
143        client: WeakClient,
144        state: RoomEventCacheState,
145        pagination_status: SharedObservable<RoomPaginationStatus>,
146        room_id: OwnedRoomId,
147        all_events_cache: Arc<RwLock<AllEventsCache>>,
148        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
149    ) -> Self {
150        Self {
151            inner: Arc::new(RoomEventCacheInner::new(
152                client,
153                state,
154                pagination_status,
155                room_id,
156                all_events_cache,
157                auto_shrink_sender,
158            )),
159        }
160    }
161
162    /// Subscribe to this room updates, after getting the initial list of
163    /// events.
164    pub async fn subscribe(&self) -> (Vec<TimelineEvent>, RoomEventCacheListener) {
165        let state = self.inner.state.read().await;
166        let events = state.events().events().map(|(_position, item)| item.clone()).collect();
167
168        let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst);
169        trace!("added a room event cache listener; new count: {}", previous_listener_count + 1);
170
171        let recv = self.inner.sender.subscribe();
172        let listener = RoomEventCacheListener {
173            recv,
174            room_id: self.inner.room_id.clone(),
175            auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
176            listener_count: state.listener_count.clone(),
177        };
178
179        (events, listener)
180    }
181
182    /// Return a [`RoomPagination`] API object useful for running
183    /// back-pagination queries in the current room.
184    pub fn pagination(&self) -> RoomPagination {
185        RoomPagination { inner: self.inner.clone() }
186    }
187
188    /// Try to find an event by id in this room.
189    pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
190        // Search in all loaded or stored events.
191        let Ok(maybe_position_and_event) = self.inner.state.read().await.find_event(event_id).await
192        else {
193            error!("Failed to find the event");
194
195            return None;
196        };
197
198        // Search in `AllEventsCache` for known events that are not stored.
199        if let Some(event) = maybe_position_and_event.map(|(_location, _position, event)| event) {
200            Some(event)
201        } else if let Some((room_id, event)) =
202            self.inner.all_events.read().await.events.get(event_id).cloned()
203        {
204            (room_id == self.inner.room_id).then_some(event)
205        } else {
206            None
207        }
208    }
209
210    /// Try to find an event by id in this room, along with its related events.
211    ///
212    /// You can filter which types of related events to retrieve using
213    /// `filter`. `None` will retrieve related events of any type.
214    pub async fn event_with_relations(
215        &self,
216        event_id: &EventId,
217        filter: Option<Vec<RelationType>>,
218    ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
219        let cache = self.inner.all_events.read().await;
220        if let Some((_, event)) = cache.events.get(event_id) {
221            let related_events = cache.collect_related_events(event_id, filter.as_deref());
222            Some((event.clone(), related_events))
223        } else {
224            None
225        }
226    }
227
228    /// Clear all the storage for this [`RoomEventCache`].
229    ///
230    /// This will get rid of all the events from the linked chunk and persisted
231    /// storage.
232    pub async fn clear(&self) -> Result<()> {
233        // Clear the linked chunk and persisted storage.
234        let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
235
236        // Clear the (temporary) events mappings.
237        self.inner.all_events.write().await.clear();
238
239        // Notify observers about the update.
240        let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
241            diffs: updates_as_vector_diffs,
242            origin: EventsOrigin::Cache,
243        });
244
245        Ok(())
246    }
247
248    /// Save a single event in the event cache, for further retrieval with
249    /// [`Self::event`].
250    // TODO: This doesn't insert the event into the linked chunk. In the future
251    // there'll be no distinction between the linked chunk and the separate
252    // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886.
253    pub(crate) async fn save_event(&self, event: TimelineEvent) {
254        if let Some(event_id) = event.event_id() {
255            let mut cache = self.inner.all_events.write().await;
256
257            cache.append_related_event(&event);
258            cache.events.insert(event_id, (self.inner.room_id.clone(), event));
259        } else {
260            warn!("couldn't save event without event id in the event cache");
261        }
262    }
263
264    /// Save some events in the event cache, for further retrieval with
265    /// [`Self::event`]. This function will save them using a single lock,
266    /// as opposed to [`Self::save_event`].
267    // TODO: This doesn't insert the event into the linked chunk. In the future
268    // there'll be no distinction between the linked chunk and the separate
269    // cache. There is a discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/3886.
270    pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
271        let mut cache = self.inner.all_events.write().await;
272        for event in events {
273            if let Some(event_id) = event.event_id() {
274                cache.append_related_event(&event);
275                cache.events.insert(event_id, (self.inner.room_id.clone(), event));
276            } else {
277                warn!("couldn't save event without event id in the event cache");
278            }
279        }
280    }
281
282    /// Return a nice debug string (a vector of lines) for the linked chunk of
283    /// events for this room.
284    pub async fn debug_string(&self) -> Vec<String> {
285        self.inner.state.read().await.events().debug_string()
286    }
287}
288
289/// The (non-cloneable) details of the `RoomEventCache`.
290pub(super) struct RoomEventCacheInner {
291    /// The room id for this room.
292    room_id: OwnedRoomId,
293
294    pub weak_room: WeakRoom,
295
296    /// Sender part for subscribers to this room.
297    pub sender: Sender<RoomEventCacheUpdate>,
298
299    /// State for this room's event cache.
300    pub state: RwLock<RoomEventCacheState>,
301
302    /// See comment of [`super::EventCacheInner::all_events`].
303    ///
304    /// This is shared between the [`super::EventCacheInner`] singleton and all
305    /// [`RoomEventCacheInner`] instances.
306    all_events: Arc<RwLock<AllEventsCache>>,
307
308    /// A notifier that we received a new pagination token.
309    pub pagination_batch_token_notifier: Notify,
310
311    pub pagination_status: SharedObservable<RoomPaginationStatus>,
312
313    /// Sender to the auto-shrink channel.
314    ///
315    /// See doc comment around [`EventCache::auto_shrink_linked_chunk_task`] for
316    /// more details.
317    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
318}
319
320impl RoomEventCacheInner {
321    /// Creates a new cache for a room, and subscribes to room updates, so as
322    /// to handle new timeline events.
323    fn new(
324        client: WeakClient,
325        state: RoomEventCacheState,
326        pagination_status: SharedObservable<RoomPaginationStatus>,
327        room_id: OwnedRoomId,
328        all_events_cache: Arc<RwLock<AllEventsCache>>,
329        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
330    ) -> Self {
331        let sender = Sender::new(32);
332        let weak_room = WeakRoom::new(client, room_id);
333        Self {
334            room_id: weak_room.room_id().to_owned(),
335            weak_room,
336            state: RwLock::new(state),
337            all_events: all_events_cache,
338            sender,
339            pagination_batch_token_notifier: Default::default(),
340            auto_shrink_sender,
341            pagination_status,
342        }
343    }
344
345    fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
346        if account_data.is_empty() {
347            return;
348        }
349
350        let mut handled_read_marker = false;
351
352        trace!("Handling account data");
353
354        for raw_event in account_data {
355            match raw_event.deserialize() {
356                Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
357                    // If duplicated, do not forward read marker multiple times
358                    // to avoid clutter the update channel.
359                    if handled_read_marker {
360                        continue;
361                    }
362
363                    handled_read_marker = true;
364
365                    // Propagate to observers. (We ignore the error if there aren't any.)
366                    let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
367                        event_id: ev.content.event_id,
368                    });
369                }
370
371                Ok(_) => {
372                    // We're not interested in other room account data updates,
373                    // at this point.
374                }
375
376                Err(e) => {
377                    let event_type = raw_event.get_field::<String>("type").ok().flatten();
378                    warn!(event_type, "Failed to deserialize account data: {e}");
379                }
380            }
381        }
382    }
383
384    #[instrument(skip_all, fields(room_id = %self.room_id))]
385    pub(super) async fn handle_joined_room_update(
386        &self,
387        has_storage: bool,
388        updates: JoinedRoomUpdate,
389    ) -> Result<()> {
390        self.handle_timeline(
391            has_storage,
392            updates.timeline,
393            updates.ephemeral.clone(),
394            updates.ambiguity_changes,
395        )
396        .await?;
397
398        self.handle_account_data(updates.account_data);
399
400        Ok(())
401    }
402
403    async fn handle_timeline(
404        &self,
405        has_storage: bool,
406        timeline: Timeline,
407        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
408        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
409    ) -> Result<()> {
410        if !has_storage && timeline.limited {
411            // Ideally we'd try to reconcile existing events against those received in the
412            // timeline, but we're not there yet. In the meanwhile, clear the
413            // items from the room. TODO: implement Smart Matching™.
414            trace!("limited timeline, clearing all previous events and pushing new events");
415
416            self.replace_all_events_by(
417                timeline.events,
418                timeline.prev_batch,
419                ephemeral_events,
420                ambiguity_changes,
421                EventsOrigin::Sync,
422            )
423            .await?;
424        } else {
425            // Add all the events to the backend.
426            trace!("adding new events");
427
428            // If we have storage, only keep the previous-batch token if we have a limited
429            // timeline. Otherwise, we know about all the events, and we don't need to
430            // back-paginate, so we wouldn't make use of the given previous-batch token.
431            //
432            // If we don't have storage, even if the timeline isn't limited, we may not have
433            // saved the previous events in any cache, so we should always be
434            // able to retrieve those.
435            let prev_batch =
436                if has_storage && !timeline.limited { None } else { timeline.prev_batch };
437
438            let mut state = self.state.write().await;
439            self.append_events_locked(
440                &mut state,
441                timeline.events,
442                prev_batch,
443                ephemeral_events,
444                ambiguity_changes,
445            )
446            .await?;
447        }
448
449        Ok(())
450    }
451
452    #[instrument(skip_all, fields(room_id = %self.room_id))]
453    pub(super) async fn handle_left_room_update(
454        &self,
455        has_storage: bool,
456        updates: LeftRoomUpdate,
457    ) -> Result<()> {
458        self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
459            .await?;
460        Ok(())
461    }
462
463    /// Remove existing events, and append a set of events to the room cache and
464    /// storage, notifying observers.
465    pub(super) async fn replace_all_events_by(
466        &self,
467        timeline_events: Vec<TimelineEvent>,
468        prev_batch: Option<String>,
469        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
470        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
471        events_origin: EventsOrigin,
472    ) -> Result<()> {
473        // Acquire the lock.
474        let mut state = self.state.write().await;
475
476        // Reset the room's state.
477        let updates_as_vector_diffs = state.reset().await?;
478
479        // Propagate to observers.
480        let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
481            diffs: updates_as_vector_diffs,
482            origin: events_origin,
483        });
484
485        // Push the new events.
486        self.append_events_locked(
487            &mut state,
488            timeline_events,
489            prev_batch.clone(),
490            ephemeral_events,
491            ambiguity_changes,
492        )
493        .await?;
494
495        Ok(())
496    }
497
498    /// Append a set of events to the room cache and storage, notifying
499    /// observers.
500    ///
501    /// This is a private implementation. It must not be exposed publicly.
502    async fn append_events_locked(
503        &self,
504        state: &mut RoomEventCacheState,
505        timeline_events: Vec<TimelineEvent>,
506        prev_batch: Option<String>,
507        ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
508        ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
509    ) -> Result<()> {
510        if timeline_events.is_empty()
511            && prev_batch.is_none()
512            && ephemeral_events.is_empty()
513            && ambiguity_changes.is_empty()
514        {
515            return Ok(());
516        }
517
518        let (
519            DeduplicationOutcome {
520                all_events: events,
521                in_memory_duplicated_event_ids,
522                in_store_duplicated_event_ids,
523            },
524            all_duplicates,
525        ) = state.collect_valid_and_duplicated_events(timeline_events).await?;
526
527        // During a sync, when a duplicated event is found, the old event is removed and
528        // the new event is added.
529        //
530        // Let's remove the old events that are duplicated.
531        let timeline_event_diffs = if all_duplicates {
532            // No new events, thus no need to change the room events.
533            vec![]
534        } else {
535            // Remove the old duplicated events.
536            //
537            // We don't have to worry the removals can change the position of the
538            // existing events, because we are pushing all _new_
539            // `events` at the back.
540            let mut timeline_event_diffs = state
541                .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
542                .await?;
543
544            // Add the previous back-pagination token (if present), followed by the timeline
545            // events themselves.
546            let new_timeline_event_diffs = state
547                .with_events_mut(|room_events| {
548                    // If we only received duplicated events, we don't need to store the gap: if
549                    // there was a gap, we'd have received an unknown event at the tail of
550                    // the room's timeline (unless the server reordered sync events since the last
551                    // time we sync'd).
552                    if !all_duplicates {
553                        if let Some(prev_token) = &prev_batch {
554                            // As a tiny optimization: remove the last chunk if it's an empty event
555                            // one, as it's not useful to keep it before a gap.
556                            let prev_chunk_to_remove =
557                                room_events.rchunks().next().and_then(|chunk| {
558                                    (chunk.is_items() && chunk.num_items() == 0)
559                                        .then_some(chunk.identifier())
560                                });
561
562                            room_events.push_gap(Gap { prev_token: prev_token.clone() });
563
564                            if let Some(prev_chunk_to_remove) = prev_chunk_to_remove {
565                                room_events.remove_empty_chunk_at(prev_chunk_to_remove).expect(
566                                    "we just checked the chunk is there, and it's an empty item chunk",
567                                );
568                            }
569                        }
570                    }
571
572                    room_events.push_events(events.clone());
573
574                    events.clone()
575                })
576                .await?;
577
578            timeline_event_diffs.extend(new_timeline_event_diffs);
579
580            if prev_batch.is_some() && !all_duplicates {
581                // If there was a previous batch token, and there's at least one non-duplicated
582                // new event, unload the chunks so it only contains the last
583                // one; otherwise, there might be a valid gap in between, and
584                // observers may not render it (yet).
585                //
586                // We must do this *after* the above call to `.with_events_mut`, so the new
587                // events and gaps are properly persisted to storage.
588                if let Some(diffs) = state.shrink_to_last_chunk().await? {
589                    // Override the diffs with the new ones, as per `shrink_to_last_chunk`'s API
590                    // contract.
591                    timeline_event_diffs = diffs;
592                }
593            }
594
595            {
596                // Fill the AllEventsCache.
597                let mut all_events_cache = self.all_events.write().await;
598
599                for event in events {
600                    if let Some(event_id) = event.event_id() {
601                        all_events_cache.append_related_event(&event);
602                        all_events_cache
603                            .events
604                            .insert(event_id.to_owned(), (self.room_id.clone(), event));
605                    }
606                }
607            }
608
609            timeline_event_diffs
610        };
611
612        // Now that all events have been added, we can trigger the
613        // `pagination_token_notifier`.
614        if prev_batch.is_some() {
615            self.pagination_batch_token_notifier.notify_one();
616        }
617
618        // The order of `RoomEventCacheUpdate`s is **really** important here.
619        {
620            if !timeline_event_diffs.is_empty() {
621                let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
622                    diffs: timeline_event_diffs,
623                    origin: EventsOrigin::Sync,
624                });
625            }
626
627            if !ephemeral_events.is_empty() {
628                let _ = self
629                    .sender
630                    .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
631            }
632
633            if !ambiguity_changes.is_empty() {
634                let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
635            }
636        }
637
638        Ok(())
639    }
640}
641
642/// Internal type to represent the output of
643/// [`RoomEventCacheState::load_more_events_backwards`].
644#[derive(Debug)]
645pub(super) enum LoadMoreEventsBackwardsOutcome {
646    /// A gap has been inserted.
647    Gap {
648        /// The previous batch token to be used as the "end" parameter in the
649        /// back-pagination request.
650        prev_token: Option<String>,
651    },
652
653    /// The start of the timeline has been reached.
654    StartOfTimeline,
655
656    /// Events have been inserted.
657    Events {
658        events: Vec<TimelineEvent>,
659        timeline_event_diffs: Vec<VectorDiff<TimelineEvent>>,
660        reached_start: bool,
661    },
662
663    /// The caller must wait for the initial previous-batch token, and retry.
664    WaitForInitialPrevToken,
665}
666
667// Use a private module to hide `events` to this parent module.
668mod private {
669    use std::sync::{atomic::AtomicUsize, Arc};
670
671    use eyeball::SharedObservable;
672    use eyeball_im::VectorDiff;
673    use matrix_sdk_base::{
674        apply_redaction,
675        deserialized_responses::{TimelineEvent, TimelineEventKind},
676        event_cache::{store::EventCacheStoreLock, Event, Gap},
677        linked_chunk::{lazy_loader, ChunkContent, ChunkIdentifierGenerator, Position, Update},
678    };
679    use matrix_sdk_common::executor::spawn;
680    use once_cell::sync::OnceCell;
681    use ruma::{
682        events::{
683            room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent, MessageLikeEventType,
684        },
685        serde::Raw,
686        EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
687    };
688    use tracing::{debug, error, instrument, trace, warn};
689
690    use super::{
691        super::{
692            deduplicator::{DeduplicationOutcome, Deduplicator},
693            EventCacheError,
694        },
695        events::RoomEvents,
696        sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
697    };
698    use crate::event_cache::RoomPaginationStatus;
699
700    /// State for a single room's event cache.
701    ///
702    /// This contains all the inner mutable states that ought to be updated at
703    /// the same time.
704    pub struct RoomEventCacheState {
705        /// The room this state relates to.
706        room: OwnedRoomId,
707
708        /// The room version for this room.
709        room_version: RoomVersionId,
710
711        /// Reference to the underlying backing store.
712        ///
713        /// Set to none if the room shouldn't read the linked chunk from
714        /// storage, and shouldn't store updates to storage.
715        store: Arc<OnceCell<EventCacheStoreLock>>,
716
717        /// The events of the room.
718        events: RoomEvents,
719
720        /// The events deduplicator instance to help finding duplicates.
721        deduplicator: Deduplicator,
722
723        /// Have we ever waited for a previous-batch-token to come from sync, in
724        /// the context of pagination? We do this at most once per room,
725        /// the first time we try to run backward pagination. We reset
726        /// that upon clearing the timeline events.
727        pub waited_for_initial_prev_token: bool,
728
729        pagination_status: SharedObservable<RoomPaginationStatus>,
730
731        /// An atomic count of the current number of listeners of the
732        /// [`super::RoomEventCache`].
733        pub(super) listener_count: Arc<AtomicUsize>,
734    }
735
736    impl RoomEventCacheState {
737        /// Create a new state, or reload it from storage if it's been enabled.
738        ///
739        /// Not all events are going to be loaded. Only a portion of them. The
740        /// [`RoomEvents`] relies on a [`LinkedChunk`] to store all events. Only
741        /// the last chunk will be loaded. It means the events are loaded from
742        /// the most recent to the oldest. To load more events, see
743        /// [`Self::load_more_events_backwards`].
744        ///
745        /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
746        pub async fn new(
747            room_id: OwnedRoomId,
748            room_version: RoomVersionId,
749            store: Arc<OnceCell<EventCacheStoreLock>>,
750            pagination_status: SharedObservable<RoomPaginationStatus>,
751        ) -> Result<Self, EventCacheError> {
752            let (events, deduplicator) = if let Some(store) = store.get() {
753                let store_lock = store.lock().await?;
754
755                let linked_chunk = match store_lock
756                    .load_last_chunk(&room_id)
757                    .await
758                    .map_err(EventCacheError::from)
759                    .and_then(|(last_chunk, chunk_identifier_generator)| {
760                        lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
761                            .map_err(EventCacheError::from)
762                    }) {
763                    Ok(linked_chunk) => linked_chunk,
764
765                    Err(err) => {
766                        error!("error when reloading a linked chunk from memory: {err}");
767
768                        // Clear storage for this room.
769                        store_lock
770                            .handle_linked_chunk_updates(&room_id, vec![Update::Clear])
771                            .await?;
772
773                        // Restart with an empty linked chunk.
774                        None
775                    }
776                };
777
778                (
779                    RoomEvents::with_initial_linked_chunk(linked_chunk),
780                    Deduplicator::new_store_based(room_id.clone(), store.clone()),
781                )
782            } else {
783                (RoomEvents::default(), Deduplicator::new_memory_based())
784            };
785
786            Ok(Self {
787                room: room_id,
788                room_version,
789                store,
790                events,
791                deduplicator,
792                waited_for_initial_prev_token: false,
793                listener_count: Default::default(),
794                pagination_status,
795            })
796        }
797
798        /// Deduplicate `events` considering all events in `Self::events`.
799        ///
800        /// The returned tuple contains:
801        /// - all events (duplicated or not) with an ID
802        /// - all the duplicated event IDs with their position,
803        /// - a boolean indicating all events (at least one) are duplicates.
804        ///
805        /// This last boolean is useful to know whether we need to store a
806        /// previous-batch token (gap) we received from a server-side
807        /// request (sync or back-pagination), or if we should
808        /// *not* store it.
809        ///
810        /// Since there can be empty back-paginations with a previous-batch
811        /// token (that is, they don't contain any events), we need to
812        /// make sure that there is *at least* one new event that has
813        /// been added. Otherwise, we might conclude something wrong
814        /// because a subsequent back-pagination might
815        /// return non-duplicated events.
816        ///
817        /// If we had already seen all the duplicated events that we're trying
818        /// to add, then it would be wasteful to store a previous-batch
819        /// token, or even touch the linked chunk: we would repeat
820        /// back-paginations for events that we have already seen, and
821        /// possibly misplace them. And we should not be missing
822        /// events either: the already-known events would have their own
823        /// previous-batch token (it might already be consumed).
824        pub async fn collect_valid_and_duplicated_events(
825            &mut self,
826            events: Vec<Event>,
827        ) -> Result<(DeduplicationOutcome, bool), EventCacheError> {
828            let deduplication_outcome =
829                self.deduplicator.filter_duplicate_events(events, &self.events).await?;
830
831            let number_of_events = deduplication_outcome.all_events.len();
832            let number_of_deduplicated_events =
833                deduplication_outcome.in_memory_duplicated_event_ids.len()
834                    + deduplication_outcome.in_store_duplicated_event_ids.len();
835
836            let all_duplicates =
837                number_of_events > 0 && number_of_events == number_of_deduplicated_events;
838
839            Ok((deduplication_outcome, all_duplicates))
840        }
841
842        /// Given a fully-loaded linked chunk with no gaps, return the
843        /// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache.
844        fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
845            // If we never received events for this room, this means we've never
846            // received a sync for that room, because every room must have at least a
847            // room creation event. Otherwise, we have reached the start of the
848            // timeline.
849            if self.events.events().next().is_some() {
850                // If there's at least one event, this means we've reached the start of the
851                // timeline, since the chunk is fully loaded.
852                LoadMoreEventsBackwardsOutcome::StartOfTimeline
853            } else if !self.waited_for_initial_prev_token {
854                // There's no events. Since we haven't yet, wait for an initial previous-token.
855                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
856            } else {
857                // Otherwise, we've already waited, *and* received no previous-batch token from
858                // the sync, *and* there are still no events in the fully-loaded
859                // chunk: start back-pagination from the end of the room.
860                LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
861            }
862        }
863
864        /// Load more events backwards if the last chunk is **not** a gap.
865        pub(in super::super) async fn load_more_events_backwards(
866            &mut self,
867        ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
868            let Some(store) = self.store.get() else {
869                // No store to reload events from. Pretend the caller has to act as if a gap was
870                // present. Limited syncs will always clear and push a gap, in this mode.
871                // There's no lazy-loading.
872
873                // Look for a gap in the in-memory chunk, iterating in reverse so as to get the
874                // most recent one.
875                if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
876                    return Ok(LoadMoreEventsBackwardsOutcome::Gap {
877                        prev_token: Some(prev_token),
878                    });
879                }
880
881                return Ok(self.conclude_load_more_for_fully_loaded_chunk());
882            };
883
884            // If any in-memory chunk is a gap, don't load more events, and let the caller
885            // resolve the gap.
886            if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
887                return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
888            }
889
890            // Because `first_chunk` is `not `Send`, get this information before the
891            // `.await` point, so that this `Future` can implement `Send`.
892            let first_chunk_identifier =
893                self.events.chunks().next().expect("a linked chunk is never empty").identifier();
894
895            let store = store.lock().await?;
896
897            // The first chunk is not a gap, we can load its previous chunk.
898            let new_first_chunk =
899                match store.load_previous_chunk(&self.room, first_chunk_identifier).await {
900                    Ok(Some(new_first_chunk)) => {
901                        // All good, let's continue with this chunk.
902                        new_first_chunk
903                    }
904
905                    Ok(None) => {
906                        // There's no previous chunk. The chunk is now fully-loaded. Conclude.
907                        return Ok(self.conclude_load_more_for_fully_loaded_chunk());
908                    }
909
910                    Err(err) => {
911                        error!("error when loading the previous chunk of a linked chunk: {err}");
912
913                        // Clear storage for this room.
914                        store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
915
916                        // Return the error.
917                        return Err(err.into());
918                    }
919                };
920
921            let chunk_content = new_first_chunk.content.clone();
922
923            // We've reached the start on disk, if and only if, there was no chunk prior to
924            // the one we just loaded.
925            let reached_start = new_first_chunk.previous.is_none();
926
927            if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
928                error!("error when inserting the previous chunk into its linked chunk: {err}");
929
930                // Clear storage for this room.
931                store.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
932
933                // Return the error.
934                return Err(err.into());
935            };
936
937            // ⚠️ Let's not propagate the updates to the store! We already have these data
938            // in the store! Let's drain them.
939            let _ = self.events.store_updates().take();
940
941            // However, we want to get updates as `VectorDiff`s.
942            let timeline_event_diffs = self.events.updates_as_vector_diffs();
943
944            Ok(match chunk_content {
945                ChunkContent::Gap(gap) => {
946                    LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
947                }
948
949                ChunkContent::Items(events) => LoadMoreEventsBackwardsOutcome::Events {
950                    events,
951                    timeline_event_diffs,
952                    reached_start,
953                },
954            })
955        }
956
957        /// If storage is enabled, unload all the chunks, then reloads only the
958        /// last one.
959        ///
960        /// If storage's enabled, return a diff update that starts with a clear
961        /// of all events; as a result, the caller may override any
962        /// pending diff updates with the result of this function.
963        ///
964        /// Otherwise, returns `None`.
965        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
966        pub(super) async fn shrink_to_last_chunk(
967            &mut self,
968        ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
969            let Some(store) = self.store.get() else {
970                // No need to do anything if there's no storage; we'll already reset the
971                // timeline after a limited response.
972                return Ok(None);
973            };
974
975            let store_lock = store.lock().await?;
976
977            // Attempt to load the last chunk.
978            let (last_chunk, chunk_identifier_generator) = match store_lock
979                .load_last_chunk(&self.room)
980                .await
981            {
982                Ok(pair) => pair,
983
984                Err(err) => {
985                    // If loading the last chunk failed, clear the entire linked chunk.
986                    error!("error when reloading a linked chunk from memory: {err}");
987
988                    // Clear storage for this room.
989                    store_lock.handle_linked_chunk_updates(&self.room, vec![Update::Clear]).await?;
990
991                    // Restart with an empty linked chunk.
992                    (None, ChunkIdentifierGenerator::new_from_scratch())
993                }
994            };
995
996            debug!("unloading the linked chunk, and resetting it to its last chunk");
997
998            // Remove all the chunks from the linked chunks, except for the last one, and
999            // updates the chunk identifier generator.
1000            if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) {
1001                error!("error when replacing the linked chunk: {err}");
1002                return self.reset().await.map(Some);
1003            }
1004
1005            // Let pagination observers know that we may have not reached the start of the
1006            // timeline.
1007            // TODO: likely need to cancel any ongoing pagination.
1008            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1009
1010            // Don't propagate those updates to the store; this is only for the in-memory
1011            // representation that we're doing this. Let's drain those store updates.
1012            let _ = self.events.store_updates().take();
1013
1014            // However, we want to get updates as `VectorDiff`s, for the external listeners.
1015            // Check we're respecting the contract defined in the doc comment.
1016            let diffs = self.events.updates_as_vector_diffs();
1017            assert!(matches!(diffs[0], VectorDiff::Clear));
1018
1019            Ok(Some(diffs))
1020        }
1021
1022        /// Automatically shrink the room if there are no listeners, as
1023        /// indicated by the atomic number of active listeners.
1024        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1025        pub(crate) async fn auto_shrink_if_no_listeners(
1026            &mut self,
1027        ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
1028            let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst);
1029
1030            trace!(listener_count, "received request to auto-shrink");
1031
1032            if listener_count == 0 {
1033                // If we are the last strong reference to the auto-shrinker, we can shrink the
1034                // events data structure to its last chunk.
1035                self.shrink_to_last_chunk().await
1036            } else {
1037                Ok(None)
1038            }
1039        }
1040
1041        /// Removes the bundled relations from an event, if they were present.
1042        ///
1043        /// Only replaces the present if it contained bundled relations.
1044        fn strip_relations_if_present<T>(event: &mut Raw<T>) {
1045            // We're going to get rid of the `unsigned`/`m.relations` field, if it's
1046            // present.
1047            // Use a closure that returns an option so we can quickly short-circuit.
1048            let mut closure = || -> Option<()> {
1049                let mut val: serde_json::Value = event.deserialize_as().ok()?;
1050                let unsigned = val.get_mut("unsigned")?;
1051                let unsigned_obj = unsigned.as_object_mut()?;
1052                if unsigned_obj.remove("m.relations").is_some() {
1053                    *event = Raw::new(&val).ok()?.cast();
1054                }
1055                None
1056            };
1057            let _ = closure();
1058        }
1059
1060        fn strip_relations_from_event(ev: &mut TimelineEvent) {
1061            match &mut ev.kind {
1062                TimelineEventKind::Decrypted(decrypted) => {
1063                    // Remove all information about encryption info for
1064                    // the bundled events.
1065                    decrypted.unsigned_encryption_info = None;
1066
1067                    // Remove the `unsigned`/`m.relations` field, if needs be.
1068                    Self::strip_relations_if_present(&mut decrypted.event);
1069                }
1070
1071                TimelineEventKind::UnableToDecrypt { event, .. }
1072                | TimelineEventKind::PlainText { event } => {
1073                    Self::strip_relations_if_present(event);
1074                }
1075            }
1076        }
1077
1078        /// Strips the bundled relations from a collection of events.
1079        fn strip_relations_from_events(items: &mut [TimelineEvent]) {
1080            for ev in items.iter_mut() {
1081                Self::strip_relations_from_event(ev);
1082            }
1083        }
1084
1085        /// Remove events by their position, in `RoomEvents` and in
1086        /// `EventCacheStore`.
1087        ///
1088        /// This method is purposely isolated because it must ensure that
1089        /// positions are sorted appropriately or it can be disastrous.
1090        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1091        pub(crate) async fn remove_events(
1092            &mut self,
1093            in_memory_events: Vec<(OwnedEventId, Position)>,
1094            in_store_events: Vec<(OwnedEventId, Position)>,
1095        ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1096            // In-store events.
1097            {
1098                let mut positions = in_store_events
1099                    .into_iter()
1100                    .map(|(_event_id, position)| position)
1101                    .collect::<Vec<_>>();
1102
1103                sort_positions_descending(&mut positions);
1104
1105                self.send_updates_to_store(
1106                    positions
1107                        .into_iter()
1108                        .map(|position| Update::RemoveItem { at: position })
1109                        .collect(),
1110                )
1111                .await?;
1112            }
1113
1114            // In-memory events.
1115            let timeline_event_diffs = self
1116                .with_events_mut(|room_events| {
1117                    // `remove_events_by_position` sorts the positions by itself.
1118                    room_events
1119                        .remove_events_by_position(
1120                            in_memory_events
1121                                .into_iter()
1122                                .map(|(_event_id, position)| position)
1123                                .collect(),
1124                        )
1125                        .expect("failed to remove an event");
1126
1127                    vec![]
1128                })
1129                .await?;
1130
1131            Ok(timeline_event_diffs)
1132        }
1133
1134        /// Propagate changes to the underlying storage.
1135        #[instrument(skip_all)]
1136        async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1137            let updates = self.events.store_updates().take();
1138            self.send_updates_to_store(updates).await
1139        }
1140
1141        pub async fn send_updates_to_store(
1142            &mut self,
1143            mut updates: Vec<Update<TimelineEvent, Gap>>,
1144        ) -> Result<(), EventCacheError> {
1145            let Some(store) = self.store.get() else {
1146                return Ok(());
1147            };
1148
1149            if updates.is_empty() {
1150                return Ok(());
1151            }
1152
1153            // Strip relations from updates which insert or replace items.
1154            for update in updates.iter_mut() {
1155                match update {
1156                    Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1157                    Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1158                    // Other update kinds don't involve adding new events.
1159                    Update::NewItemsChunk { .. }
1160                    | Update::NewGapChunk { .. }
1161                    | Update::RemoveChunk(_)
1162                    | Update::RemoveItem { .. }
1163                    | Update::DetachLastItems { .. }
1164                    | Update::StartReattachItems
1165                    | Update::EndReattachItems
1166                    | Update::Clear => {}
1167                }
1168            }
1169
1170            // Spawn a task to make sure that all the changes are effectively forwarded to
1171            // the store, even if the call to this method gets aborted.
1172            //
1173            // The store cross-process locking involves an actual mutex, which ensures that
1174            // storing updates happens in the expected order.
1175
1176            let store = store.clone();
1177            let room_id = self.room.clone();
1178
1179            spawn(async move {
1180                let store = store.lock().await?;
1181
1182                trace!(%room_id, ?updates, "sending linked chunk updates to the store");
1183                store.handle_linked_chunk_updates(&room_id, updates).await?;
1184                trace!("linked chunk updates applied");
1185
1186                super::Result::Ok(())
1187            })
1188            .await
1189            .expect("joining failed")?;
1190
1191            Ok(())
1192        }
1193
1194        /// Reset this data structure as if it were brand new.
1195        ///
1196        /// Return a single diff update that is a clear of all events; as a
1197        /// result, the caller may override any pending diff updates
1198        /// with the result of this function.
1199        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1200        pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1201            self.events.reset();
1202
1203            self.propagate_changes().await?;
1204
1205            // Reset the pagination state too: pretend we never waited for the initial
1206            // prev-batch token, and indicate that we're not at the start of the
1207            // timeline, since we don't know about that anymore.
1208            self.waited_for_initial_prev_token = false;
1209            // TODO: likely must cancel any ongoing back-paginations too
1210            self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1211
1212            let diff_updates = self.events.updates_as_vector_diffs();
1213
1214            // Ensure the contract defined in the doc comment is true:
1215            debug_assert_eq!(diff_updates.len(), 1);
1216            debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1217
1218            Ok(diff_updates)
1219        }
1220
1221        /// Returns a read-only reference to the underlying events.
1222        pub fn events(&self) -> &RoomEvents {
1223            &self.events
1224        }
1225
1226        /// Find a single event in this room.
1227        ///
1228        /// It starts by looking into loaded events in `RoomEvents` before
1229        /// looking inside the storage if it is enabled.
1230        pub async fn find_event(
1231            &self,
1232            event_id: &EventId,
1233        ) -> Result<Option<(EventLocation, Position, TimelineEvent)>, EventCacheError> {
1234            let room_id = self.room.as_ref();
1235
1236            // There are supposedly fewer events loaded in memory than in the store. Let's
1237            // start by looking up in the `RoomEvents`.
1238            for (position, event) in self.events().revents() {
1239                if event.event_id().as_deref() == Some(event_id) {
1240                    return Ok(Some((EventLocation::Memory, position, event.clone())));
1241                }
1242            }
1243
1244            let Some(store) = self.store.get() else {
1245                // No store, event is not present.
1246                return Ok(None);
1247            };
1248
1249            let store = store.lock().await?;
1250
1251            Ok(store
1252                .find_event(room_id, event_id)
1253                .await?
1254                .map(|(position, event)| (EventLocation::Store, position, event)))
1255        }
1256
1257        /// Gives a temporary mutable handle to the underlying in-memory events,
1258        /// and will propagate changes to the storage once done.
1259        ///
1260        /// Returns the updates to the linked chunk, as vector diffs, so the
1261        /// caller may propagate such updates, if needs be.
1262        ///
1263        /// The function `func` takes a mutable reference to `RoomEvents`. It
1264        /// returns a set of events that will be post-processed. At the time of
1265        /// writing, all these events are passed to
1266        /// `Self::maybe_apply_new_redaction`.
1267        #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1268        pub async fn with_events_mut<F>(
1269            &mut self,
1270            func: F,
1271        ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError>
1272        where
1273            F: FnOnce(&mut RoomEvents) -> Vec<TimelineEvent>,
1274        {
1275            let events_to_post_process = func(&mut self.events);
1276
1277            // Update the store before doing the post-processing.
1278            self.propagate_changes().await?;
1279
1280            for event in &events_to_post_process {
1281                self.maybe_apply_new_redaction(event).await?;
1282            }
1283
1284            // If we've never waited for an initial previous-batch token, and we now have at
1285            // least one gap in the chunk, no need to wait for a previous-batch token later.
1286            if !self.waited_for_initial_prev_token
1287                && self.events.chunks().any(|chunk| chunk.is_gap())
1288            {
1289                self.waited_for_initial_prev_token = true;
1290            }
1291
1292            let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
1293
1294            Ok(updates_as_vector_diffs)
1295        }
1296
1297        /// If the given event is a redaction, try to retrieve the
1298        /// to-be-redacted event in the chunk, and replace it by the
1299        /// redacted form.
1300        #[instrument(skip_all)]
1301        async fn maybe_apply_new_redaction(
1302            &mut self,
1303            event: &Event,
1304        ) -> Result<(), EventCacheError> {
1305            let raw_event = event.raw();
1306
1307            // Do not deserialise the entire event if we aren't certain it's a
1308            // `m.room.redaction`. It saves a non-negligible amount of computations.
1309            let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1310                raw_event.get_field::<MessageLikeEventType>("type")
1311            else {
1312                return Ok(());
1313            };
1314
1315            // It is a `m.room.redaction`! We can deserialize it entirely.
1316
1317            let Ok(AnySyncTimelineEvent::MessageLike(
1318                ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction),
1319            )) = event.raw().deserialize()
1320            else {
1321                return Ok(());
1322            };
1323
1324            let Some(event_id) = redaction.redacts(&self.room_version) else {
1325                warn!("missing target event id from the redaction event");
1326                return Ok(());
1327            };
1328
1329            // Replace the redacted event by a redacted form, if we knew about it.
1330            if let Some((location, position, target_event)) = self.find_event(event_id).await? {
1331                // Don't redact already redacted events.
1332                if let Ok(deserialized) = target_event.raw().deserialize() {
1333                    match deserialized {
1334                        AnySyncTimelineEvent::MessageLike(ev) => {
1335                            if ev.is_redacted() {
1336                                return Ok(());
1337                            }
1338                        }
1339                        AnySyncTimelineEvent::State(ev) => {
1340                            if ev.is_redacted() {
1341                                return Ok(());
1342                            }
1343                        }
1344                    }
1345                }
1346
1347                if let Some(redacted_event) = apply_redaction(
1348                    target_event.raw(),
1349                    event.raw().cast_ref::<SyncRoomRedactionEvent>(),
1350                    &self.room_version,
1351                ) {
1352                    let mut copy = target_event.clone();
1353
1354                    // It's safe to cast `redacted_event` here:
1355                    // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
1356                    //   when calling .raw(), so it's still one under the hood.
1357                    // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
1358                    copy.replace_raw(redacted_event.cast());
1359
1360                    match location {
1361                        EventLocation::Memory => {
1362                            self.events
1363                                .replace_event_at(position, copy)
1364                                .expect("should have been a valid position of an item");
1365                        }
1366                        EventLocation::Store => {
1367                            self.send_updates_to_store(vec![Update::ReplaceItem {
1368                                at: position,
1369                                item: copy,
1370                            }])
1371                            .await?;
1372                        }
1373                    }
1374                }
1375            } else {
1376                trace!("redacted event is missing from the linked chunk");
1377            }
1378
1379            // TODO: remove all related events too!
1380
1381            Ok(())
1382        }
1383    }
1384}
1385
1386/// An enum representing where an event has been found.
1387pub(super) enum EventLocation {
1388    /// Event lives in memory (and likely in the store!).
1389    Memory,
1390
1391    /// Event lives in the store only, it has not been loaded in memory yet.
1392    Store,
1393}
1394
1395pub(super) use private::RoomEventCacheState;
1396
1397#[cfg(test)]
1398mod tests {
1399    use std::sync::Arc;
1400
1401    use assert_matches::assert_matches;
1402    use assert_matches2::assert_let;
1403    use matrix_sdk_base::{
1404        event_cache::{
1405            store::{EventCacheStore as _, MemoryStore},
1406            Gap,
1407        },
1408        linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1409        store::StoreConfig,
1410        sync::{JoinedRoomUpdate, Timeline},
1411    };
1412    use matrix_sdk_common::deserialized_responses::TimelineEvent;
1413    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
1414    use ruma::{
1415        event_id,
1416        events::{
1417            relation::RelationType, room::message::RoomMessageEventContentWithoutRelation,
1418            AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1419        },
1420        room_id, user_id, RoomId,
1421    };
1422
1423    use crate::test_utils::{client::MockClientBuilder, logged_in_client};
1424
1425    #[async_test]
1426    async fn test_event_with_redaction_relation() {
1427        let original_id = event_id!("$original");
1428        let related_id = event_id!("$related");
1429        let room_id = room_id!("!galette:saucisse.bzh");
1430        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1431
1432        assert_relations(
1433            room_id,
1434            f.text_msg("Original event").event_id(original_id).into(),
1435            f.redaction(original_id).event_id(related_id).into(),
1436            f,
1437        )
1438        .await;
1439    }
1440
1441    #[async_test]
1442    async fn test_event_with_edit_relation() {
1443        let original_id = event_id!("$original");
1444        let related_id = event_id!("$related");
1445        let room_id = room_id!("!galette:saucisse.bzh");
1446        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1447
1448        assert_relations(
1449            room_id,
1450            f.text_msg("Original event").event_id(original_id).into(),
1451            f.text_msg("* An edited event")
1452                .edit(
1453                    original_id,
1454                    RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1455                )
1456                .event_id(related_id)
1457                .into(),
1458            f,
1459        )
1460        .await;
1461    }
1462
1463    #[async_test]
1464    async fn test_event_with_reply_relation() {
1465        let original_id = event_id!("$original");
1466        let related_id = event_id!("$related");
1467        let room_id = room_id!("!galette:saucisse.bzh");
1468        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1469
1470        assert_relations(
1471            room_id,
1472            f.text_msg("Original event").event_id(original_id).into(),
1473            f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
1474            f,
1475        )
1476        .await;
1477    }
1478
1479    #[async_test]
1480    async fn test_event_with_thread_reply_relation() {
1481        let original_id = event_id!("$original");
1482        let related_id = event_id!("$related");
1483        let room_id = room_id!("!galette:saucisse.bzh");
1484        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1485
1486        assert_relations(
1487            room_id,
1488            f.text_msg("Original event").event_id(original_id).into(),
1489            f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
1490            f,
1491        )
1492        .await;
1493    }
1494
1495    #[async_test]
1496    async fn test_event_with_reaction_relation() {
1497        let original_id = event_id!("$original");
1498        let related_id = event_id!("$related");
1499        let room_id = room_id!("!galette:saucisse.bzh");
1500        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1501
1502        assert_relations(
1503            room_id,
1504            f.text_msg("Original event").event_id(original_id).into(),
1505            f.reaction(original_id, ":D").event_id(related_id).into(),
1506            f,
1507        )
1508        .await;
1509    }
1510
1511    #[async_test]
1512    async fn test_event_with_poll_response_relation() {
1513        let original_id = event_id!("$original");
1514        let related_id = event_id!("$related");
1515        let room_id = room_id!("!galette:saucisse.bzh");
1516        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1517
1518        assert_relations(
1519            room_id,
1520            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1521                .event_id(original_id)
1522                .into(),
1523            f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
1524            f,
1525        )
1526        .await;
1527    }
1528
1529    #[async_test]
1530    async fn test_event_with_poll_end_relation() {
1531        let original_id = event_id!("$original");
1532        let related_id = event_id!("$related");
1533        let room_id = room_id!("!galette:saucisse.bzh");
1534        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1535
1536        assert_relations(
1537            room_id,
1538            f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1539                .event_id(original_id)
1540                .into(),
1541            f.poll_end("Poll ended", original_id).event_id(related_id).into(),
1542            f,
1543        )
1544        .await;
1545    }
1546
1547    #[async_test]
1548    async fn test_event_with_filtered_relationships() {
1549        let original_id = event_id!("$original");
1550        let related_id = event_id!("$related");
1551        let associated_related_id = event_id!("$recursive_related");
1552        let room_id = room_id!("!galette:saucisse.bzh");
1553        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1554
1555        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1556        let related_event = event_factory
1557            .text_msg("* Edited event")
1558            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1559            .event_id(related_id)
1560            .into();
1561        let associated_related_event =
1562            event_factory.redaction(related_id).event_id(associated_related_id).into();
1563
1564        let client = logged_in_client(None).await;
1565
1566        let event_cache = client.event_cache();
1567        event_cache.subscribe().unwrap();
1568
1569        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1570        let room = client.get_room(room_id).unwrap();
1571
1572        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1573
1574        // Save the original event.
1575        room_event_cache.save_event(original_event).await;
1576
1577        // Save the related event.
1578        room_event_cache.save_event(related_event).await;
1579
1580        // Save the associated related event, which redacts the related event.
1581        room_event_cache.save_event(associated_related_event).await;
1582
1583        let filter = Some(vec![RelationType::Replacement]);
1584        let (event, related_events) =
1585            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1586        // Fetched event is the right one.
1587        let cached_event_id = event.event_id().unwrap();
1588        assert_eq!(cached_event_id, original_id);
1589
1590        // There are both the related id and the associatively related id
1591        assert_eq!(related_events.len(), 2);
1592
1593        let related_event_id = related_events[0].event_id().unwrap();
1594        assert_eq!(related_event_id, related_id);
1595        let related_event_id = related_events[1].event_id().unwrap();
1596        assert_eq!(related_event_id, associated_related_id);
1597
1598        // Now we'll filter threads instead, there should be no related events
1599        let filter = Some(vec![RelationType::Thread]);
1600        let (event, related_events) =
1601            room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1602        // Fetched event is the right one.
1603        let cached_event_id = event.event_id().unwrap();
1604        assert_eq!(cached_event_id, original_id);
1605        // No Thread related events found
1606        assert!(related_events.is_empty());
1607    }
1608
1609    #[async_test]
1610    async fn test_event_with_recursive_relation() {
1611        let original_id = event_id!("$original");
1612        let related_id = event_id!("$related");
1613        let associated_related_id = event_id!("$recursive_related");
1614        let room_id = room_id!("!galette:saucisse.bzh");
1615        let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1616
1617        let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1618        let related_event = event_factory
1619            .text_msg("* Edited event")
1620            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1621            .event_id(related_id)
1622            .into();
1623        let associated_related_event =
1624            event_factory.redaction(related_id).event_id(associated_related_id).into();
1625
1626        let client = logged_in_client(None).await;
1627
1628        let event_cache = client.event_cache();
1629        event_cache.subscribe().unwrap();
1630
1631        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1632        let room = client.get_room(room_id).unwrap();
1633
1634        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1635
1636        // Save the original event.
1637        room_event_cache.save_event(original_event).await;
1638
1639        // Save the related event.
1640        room_event_cache.save_event(related_event).await;
1641
1642        // Save the associated related event, which redacts the related event.
1643        room_event_cache.save_event(associated_related_event).await;
1644
1645        let (event, related_events) =
1646            room_event_cache.event_with_relations(original_id, None).await.unwrap();
1647        // Fetched event is the right one.
1648        let cached_event_id = event.event_id().unwrap();
1649        assert_eq!(cached_event_id, original_id);
1650
1651        // There are both the related id and the associatively related id
1652        assert_eq!(related_events.len(), 2);
1653
1654        let related_event_id = related_events[0].event_id().unwrap();
1655        assert_eq!(related_event_id, related_id);
1656        let related_event_id = related_events[1].event_id().unwrap();
1657        assert_eq!(related_event_id, associated_related_id);
1658    }
1659
1660    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1661    #[async_test]
1662    async fn test_write_to_storage() {
1663        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1664
1665        let room_id = room_id!("!galette:saucisse.bzh");
1666        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1667
1668        let event_cache_store = Arc::new(MemoryStore::new());
1669
1670        let client = MockClientBuilder::new("http://localhost".to_owned())
1671            .store_config(
1672                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1673            )
1674            .build()
1675            .await;
1676
1677        let event_cache = client.event_cache();
1678
1679        // Don't forget to subscribe and like^W enable storage!
1680        event_cache.subscribe().unwrap();
1681        event_cache.enable_storage().unwrap();
1682
1683        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1684        let room = client.get_room(room_id).unwrap();
1685
1686        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1687
1688        // Propagate an update for a message and a prev-batch token.
1689        let timeline = Timeline {
1690            limited: true,
1691            prev_batch: Some("raclette".to_owned()),
1692            events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1693        };
1694
1695        room_event_cache
1696            .inner
1697            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1698            .await
1699            .unwrap();
1700
1701        let linked_chunk =
1702            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1703                .unwrap()
1704                .unwrap();
1705
1706        assert_eq!(linked_chunk.chunks().count(), 2);
1707
1708        let mut chunks = linked_chunk.chunks();
1709
1710        // We start with the gap.
1711        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1712            assert_eq!(gap.prev_token, "raclette");
1713        });
1714
1715        // Then we have the stored event.
1716        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1717            assert_eq!(events.len(), 1);
1718            let deserialized = events[0].raw().deserialize().unwrap();
1719            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1720            assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1721        });
1722
1723        // That's all, folks!
1724        assert!(chunks.next().is_none());
1725    }
1726
1727    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1728    #[async_test]
1729    async fn test_write_to_storage_strips_bundled_relations() {
1730        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1731        use ruma::events::BundledMessageLikeRelations;
1732
1733        let room_id = room_id!("!galette:saucisse.bzh");
1734        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1735
1736        let event_cache_store = Arc::new(MemoryStore::new());
1737
1738        let client = MockClientBuilder::new("http://localhost".to_owned())
1739            .store_config(
1740                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1741            )
1742            .build()
1743            .await;
1744
1745        let event_cache = client.event_cache();
1746
1747        // Don't forget to subscribe and like^W enable storage!
1748        event_cache.subscribe().unwrap();
1749        event_cache.enable_storage().unwrap();
1750
1751        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1752        let room = client.get_room(room_id).unwrap();
1753
1754        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1755
1756        // Propagate an update for a message with bundled relations.
1757        let mut relations = BundledMessageLikeRelations::new();
1758        relations.replace =
1759            Some(Box::new(f.text_msg("Hello, Kind Sir").sender(*ALICE).into_raw_sync()));
1760        let ev = f.text_msg("hey yo").sender(*ALICE).bundled_relations(relations).into_event();
1761
1762        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1763
1764        room_event_cache
1765            .inner
1766            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1767            .await
1768            .unwrap();
1769
1770        // The in-memory linked chunk keeps the bundled relation.
1771        {
1772            let (events, _) = room_event_cache.subscribe().await;
1773
1774            assert_eq!(events.len(), 1);
1775
1776            let ev = events[0].raw().deserialize().unwrap();
1777            assert_let!(
1778                AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1779            );
1780
1781            let original = msg.as_original().unwrap();
1782            assert_eq!(original.content.body(), "hey yo");
1783            assert!(original.unsigned.relations.replace.is_some());
1784        }
1785
1786        // The one in storage does not.
1787        let linked_chunk =
1788            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1789                .unwrap()
1790                .unwrap();
1791
1792        assert_eq!(linked_chunk.chunks().count(), 1);
1793
1794        let mut chunks = linked_chunk.chunks();
1795        assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1796            assert_eq!(events.len(), 1);
1797
1798            let ev = events[0].raw().deserialize().unwrap();
1799            assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1800
1801            let original = msg.as_original().unwrap();
1802            assert_eq!(original.content.body(), "hey yo");
1803            assert!(original.unsigned.relations.replace.is_none());
1804        });
1805
1806        // That's all, folks!
1807        assert!(chunks.next().is_none());
1808    }
1809
1810    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1811    #[async_test]
1812    async fn test_clear() {
1813        use eyeball_im::VectorDiff;
1814        use matrix_sdk_base::linked_chunk::lazy_loader::from_all_chunks;
1815
1816        use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
1817
1818        let room_id = room_id!("!galette:saucisse.bzh");
1819        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1820
1821        let event_cache_store = Arc::new(MemoryStore::new());
1822
1823        let event_id1 = event_id!("$1");
1824        let event_id2 = event_id!("$2");
1825
1826        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1827        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1828
1829        // Prefill the store with some data.
1830        event_cache_store
1831            .handle_linked_chunk_updates(
1832                room_id,
1833                vec![
1834                    // An empty items chunk.
1835                    Update::NewItemsChunk {
1836                        previous: None,
1837                        new: ChunkIdentifier::new(0),
1838                        next: None,
1839                    },
1840                    // A gap chunk.
1841                    Update::NewGapChunk {
1842                        previous: Some(ChunkIdentifier::new(0)),
1843                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1844                        new: ChunkIdentifier::new(42),
1845                        next: None,
1846                        gap: Gap { prev_token: "comté".to_owned() },
1847                    },
1848                    // Another items chunk, non-empty this time.
1849                    Update::NewItemsChunk {
1850                        previous: Some(ChunkIdentifier::new(42)),
1851                        new: ChunkIdentifier::new(1),
1852                        next: None,
1853                    },
1854                    Update::PushItems {
1855                        at: Position::new(ChunkIdentifier::new(1), 0),
1856                        items: vec![ev1.clone()],
1857                    },
1858                    // And another items chunk, non-empty again.
1859                    Update::NewItemsChunk {
1860                        previous: Some(ChunkIdentifier::new(1)),
1861                        new: ChunkIdentifier::new(2),
1862                        next: None,
1863                    },
1864                    Update::PushItems {
1865                        at: Position::new(ChunkIdentifier::new(2), 0),
1866                        items: vec![ev2.clone()],
1867                    },
1868                ],
1869            )
1870            .await
1871            .unwrap();
1872
1873        let client = MockClientBuilder::new("http://localhost".to_owned())
1874            .store_config(
1875                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1876            )
1877            .build()
1878            .await;
1879
1880        let event_cache = client.event_cache();
1881
1882        // Don't forget to subscribe and like^W enable storage!
1883        event_cache.subscribe().unwrap();
1884        event_cache.enable_storage().unwrap();
1885
1886        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1887        let room = client.get_room(room_id).unwrap();
1888
1889        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1890
1891        let (items, mut stream) = room_event_cache.subscribe().await;
1892
1893        // The rooms knows about all cached events.
1894        {
1895            assert!(room_event_cache.event(event_id1).await.is_some());
1896            assert!(room_event_cache.event(event_id2).await.is_some());
1897        }
1898
1899        // But only part of events are loaded from the store
1900        {
1901            // The room must contain only one event because only one chunk has been loaded.
1902            assert_eq!(items.len(), 1);
1903            assert_eq!(items[0].event_id().unwrap(), event_id2);
1904
1905            assert!(stream.is_empty());
1906        }
1907
1908        // Let's load more chunks to load all events.
1909        {
1910            room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1911
1912            assert_let_timeout!(
1913                Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1914            );
1915            assert_eq!(diffs.len(), 1);
1916            assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1917                // Here you are `event_id1`!
1918                assert_eq!(event.event_id().unwrap(), event_id1);
1919            });
1920
1921            assert!(stream.is_empty());
1922        }
1923
1924        // After clearing,…
1925        room_event_cache.clear().await.unwrap();
1926
1927        //… we get an update that the content has been cleared.
1928        assert_let_timeout!(
1929            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1930        );
1931        assert_eq!(diffs.len(), 1);
1932        assert_let!(VectorDiff::Clear = &diffs[0]);
1933
1934        // The room event cache has forgotten about the events.
1935        assert!(room_event_cache.event(event_id1).await.is_none());
1936
1937        let (items, _) = room_event_cache.subscribe().await;
1938        assert!(items.is_empty());
1939
1940        // The event cache store too.
1941        let linked_chunk =
1942            from_all_chunks::<3, _, _>(event_cache_store.load_all_chunks(room_id).await.unwrap())
1943                .unwrap()
1944                .unwrap();
1945
1946        // Note: while the event cache store could return `None` here, clearing it will
1947        // reset it to its initial form, maintaining the invariant that it
1948        // contains a single items chunk that's empty.
1949        assert_eq!(linked_chunk.num_items(), 0);
1950    }
1951
1952    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
1953    #[async_test]
1954    async fn test_load_from_storage() {
1955        use eyeball_im::VectorDiff;
1956
1957        use super::RoomEventCacheUpdate;
1958        use crate::assert_let_timeout;
1959
1960        let room_id = room_id!("!galette:saucisse.bzh");
1961        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1962
1963        let event_cache_store = Arc::new(MemoryStore::new());
1964
1965        let event_id1 = event_id!("$1");
1966        let event_id2 = event_id!("$2");
1967
1968        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1969        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1970
1971        // Prefill the store with some data.
1972        event_cache_store
1973            .handle_linked_chunk_updates(
1974                room_id,
1975                vec![
1976                    // An empty items chunk.
1977                    Update::NewItemsChunk {
1978                        previous: None,
1979                        new: ChunkIdentifier::new(0),
1980                        next: None,
1981                    },
1982                    // A gap chunk.
1983                    Update::NewGapChunk {
1984                        previous: Some(ChunkIdentifier::new(0)),
1985                        // Chunk IDs aren't supposed to be ordered, so use a random value here.
1986                        new: ChunkIdentifier::new(42),
1987                        next: None,
1988                        gap: Gap { prev_token: "cheddar".to_owned() },
1989                    },
1990                    // Another items chunk, non-empty this time.
1991                    Update::NewItemsChunk {
1992                        previous: Some(ChunkIdentifier::new(42)),
1993                        new: ChunkIdentifier::new(1),
1994                        next: None,
1995                    },
1996                    Update::PushItems {
1997                        at: Position::new(ChunkIdentifier::new(1), 0),
1998                        items: vec![ev1.clone()],
1999                    },
2000                    // And another items chunk, non-empty again.
2001                    Update::NewItemsChunk {
2002                        previous: Some(ChunkIdentifier::new(1)),
2003                        new: ChunkIdentifier::new(2),
2004                        next: None,
2005                    },
2006                    Update::PushItems {
2007                        at: Position::new(ChunkIdentifier::new(2), 0),
2008                        items: vec![ev2.clone()],
2009                    },
2010                ],
2011            )
2012            .await
2013            .unwrap();
2014
2015        let client = MockClientBuilder::new("http://localhost".to_owned())
2016            .store_config(
2017                StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
2018            )
2019            .build()
2020            .await;
2021
2022        let event_cache = client.event_cache();
2023
2024        // Don't forget to subscribe and like^W enable storage!
2025        event_cache.subscribe().unwrap();
2026        event_cache.enable_storage().unwrap();
2027
2028        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2029        let room = client.get_room(room_id).unwrap();
2030
2031        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2032
2033        let (items, mut stream) = room_event_cache.subscribe().await;
2034
2035        // The initial items contain one event because only the last chunk is loaded by
2036        // default.
2037        assert_eq!(items.len(), 1);
2038        assert_eq!(items[0].event_id().unwrap(), event_id2);
2039        assert!(stream.is_empty());
2040
2041        // The event cache knows only all events though, even if they aren't loaded.
2042        assert!(room_event_cache.event(event_id1).await.is_some());
2043        assert!(room_event_cache.event(event_id2).await.is_some());
2044
2045        // Let's paginate to load more events.
2046        room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2047
2048        assert_let_timeout!(
2049            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2050        );
2051        assert_eq!(diffs.len(), 1);
2052        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2053            assert_eq!(event.event_id().unwrap(), event_id1);
2054        });
2055
2056        assert!(stream.is_empty());
2057
2058        // A new update with one of these events leads to deduplication.
2059        let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2060        room_event_cache
2061            .inner
2062            .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
2063            .await
2064            .unwrap();
2065
2066        // The stream doesn't report these changes *yet*. Use the items vector given
2067        // when subscribing, to check that the items correspond to their new
2068        // positions. The duplicated item is removed (so it's not the first
2069        // element anymore), and it's added to the back of the list.
2070        let (items, _stream) = room_event_cache.subscribe().await;
2071        assert_eq!(items.len(), 2);
2072        assert_eq!(items[0].event_id().unwrap(), event_id1);
2073        assert_eq!(items[1].event_id().unwrap(), event_id2);
2074    }
2075
2076    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2077    #[async_test]
2078    async fn test_load_from_storage_resilient_to_failure() {
2079        let room_id = room_id!("!fondue:patate.ch");
2080        let event_cache_store = Arc::new(MemoryStore::new());
2081
2082        let event = EventFactory::new()
2083            .room(room_id)
2084            .sender(user_id!("@ben:saucisse.bzh"))
2085            .text_msg("foo")
2086            .event_id(event_id!("$42"))
2087            .into_event();
2088
2089        // Prefill the store with invalid data: two chunks that form a cycle.
2090        event_cache_store
2091            .handle_linked_chunk_updates(
2092                room_id,
2093                vec![
2094                    Update::NewItemsChunk {
2095                        previous: None,
2096                        new: ChunkIdentifier::new(0),
2097                        next: None,
2098                    },
2099                    Update::PushItems {
2100                        at: Position::new(ChunkIdentifier::new(0), 0),
2101                        items: vec![event],
2102                    },
2103                    Update::NewItemsChunk {
2104                        previous: Some(ChunkIdentifier::new(0)),
2105                        new: ChunkIdentifier::new(1),
2106                        next: Some(ChunkIdentifier::new(0)),
2107                    },
2108                ],
2109            )
2110            .await
2111            .unwrap();
2112
2113        let client = MockClientBuilder::new("http://localhost".to_owned())
2114            .store_config(
2115                StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
2116            )
2117            .build()
2118            .await;
2119
2120        let event_cache = client.event_cache();
2121
2122        // Don't forget to subscribe and like^W enable storage!
2123        event_cache.subscribe().unwrap();
2124        event_cache.enable_storage().unwrap();
2125
2126        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2127        let room = client.get_room(room_id).unwrap();
2128
2129        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2130
2131        let (items, _stream) = room_event_cache.subscribe().await;
2132
2133        // Because the persisted content was invalid, the room store is reset: there are
2134        // no events in the cache.
2135        assert!(items.is_empty());
2136
2137        // Storage doesn't contain anything. It would also be valid that it contains a
2138        // single initial empty items chunk.
2139        let raw_chunks = event_cache_store.load_all_chunks(room_id).await.unwrap();
2140        assert!(raw_chunks.is_empty());
2141    }
2142
2143    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2144    #[async_test]
2145    async fn test_no_useless_gaps() {
2146        use crate::event_cache::room::LoadMoreEventsBackwardsOutcome;
2147
2148        let room_id = room_id!("!galette:saucisse.bzh");
2149
2150        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2151
2152        let event_cache = client.event_cache();
2153        event_cache.subscribe().unwrap();
2154
2155        let has_storage = true; // for testing purposes only
2156        event_cache.enable_storage().unwrap();
2157
2158        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2159        let room = client.get_room(room_id).unwrap();
2160        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2161
2162        let f = EventFactory::new().room(room_id).sender(*ALICE);
2163
2164        // Propagate an update including a limited timeline with one message and a
2165        // prev-batch token.
2166        room_event_cache
2167            .inner
2168            .handle_joined_room_update(
2169                has_storage,
2170                JoinedRoomUpdate {
2171                    timeline: Timeline {
2172                        limited: true,
2173                        prev_batch: Some("raclette".to_owned()),
2174                        events: vec![f.text_msg("hey yo").into_event()],
2175                    },
2176                    ..Default::default()
2177                },
2178            )
2179            .await
2180            .unwrap();
2181
2182        {
2183            let mut state = room_event_cache.inner.state.write().await;
2184
2185            let mut num_gaps = 0;
2186            let mut num_events = 0;
2187
2188            for c in state.events().chunks() {
2189                match c.content() {
2190                    ChunkContent::Items(items) => num_events += items.len(),
2191                    ChunkContent::Gap(_) => num_gaps += 1,
2192                }
2193            }
2194
2195            // The limited sync unloads the chunk, so it will appear as if there are only
2196            // the events.
2197            assert_eq!(num_gaps, 0);
2198            assert_eq!(num_events, 1);
2199
2200            // But if I manually reload more of the chunk, the gap will be present.
2201            assert_matches!(
2202                state.load_more_events_backwards().await.unwrap(),
2203                LoadMoreEventsBackwardsOutcome::Gap { .. }
2204            );
2205
2206            num_gaps = 0;
2207            num_events = 0;
2208            for c in state.events().chunks() {
2209                match c.content() {
2210                    ChunkContent::Items(items) => num_events += items.len(),
2211                    ChunkContent::Gap(_) => num_gaps += 1,
2212                }
2213            }
2214
2215            // The gap must have been stored.
2216            assert_eq!(num_gaps, 1);
2217            assert_eq!(num_events, 1);
2218        }
2219
2220        // Now, propagate an update for another message, but the timeline isn't limited
2221        // this time.
2222        room_event_cache
2223            .inner
2224            .handle_joined_room_update(
2225                has_storage,
2226                JoinedRoomUpdate {
2227                    timeline: Timeline {
2228                        limited: false,
2229                        prev_batch: Some("fondue".to_owned()),
2230                        events: vec![f.text_msg("sup").into_event()],
2231                    },
2232                    ..Default::default()
2233                },
2234            )
2235            .await
2236            .unwrap();
2237
2238        {
2239            let state = room_event_cache.inner.state.read().await;
2240
2241            let mut num_gaps = 0;
2242            let mut num_events = 0;
2243
2244            for c in state.events().chunks() {
2245                match c.content() {
2246                    ChunkContent::Items(items) => num_events += items.len(),
2247                    ChunkContent::Gap(gap) => {
2248                        assert_eq!(gap.prev_token, "raclette");
2249                        num_gaps += 1;
2250                    }
2251                }
2252            }
2253
2254            // There's only the previous gap, no new ones.
2255            assert_eq!(num_gaps, 1);
2256            assert_eq!(num_events, 2);
2257        }
2258    }
2259
2260    async fn assert_relations(
2261        room_id: &RoomId,
2262        original_event: TimelineEvent,
2263        related_event: TimelineEvent,
2264        event_factory: EventFactory,
2265    ) {
2266        let client = logged_in_client(None).await;
2267
2268        let event_cache = client.event_cache();
2269        event_cache.subscribe().unwrap();
2270
2271        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2272        let room = client.get_room(room_id).unwrap();
2273
2274        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2275
2276        // Save the original event.
2277        let original_event_id = original_event.event_id().unwrap();
2278        room_event_cache.save_event(original_event).await;
2279
2280        // Save an unrelated event to check it's not in the related events list.
2281        let unrelated_id = event_id!("$2");
2282        room_event_cache
2283            .save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
2284            .await;
2285
2286        // Save the related event.
2287        let related_id = related_event.event_id().unwrap();
2288        room_event_cache.save_event(related_event).await;
2289
2290        let (event, related_events) =
2291            room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
2292        // Fetched event is the right one.
2293        let cached_event_id = event.event_id().unwrap();
2294        assert_eq!(cached_event_id, original_event_id);
2295
2296        // There is only the actually related event in the related ones
2297        assert_eq!(related_events.len(), 1);
2298        let related_event_id = related_events[0].event_id().unwrap();
2299        assert_eq!(related_event_id, related_id);
2300    }
2301
2302    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2303    #[async_test]
2304    async fn test_shrink_to_last_chunk() {
2305        use eyeball_im::VectorDiff;
2306
2307        use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2308
2309        let room_id = room_id!("!galette:saucisse.bzh");
2310
2311        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2312
2313        let f = EventFactory::new().room(room_id);
2314
2315        let evid1 = event_id!("$1");
2316        let evid2 = event_id!("$2");
2317
2318        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2319        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2320
2321        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2322        {
2323            let store = client.event_cache_store();
2324            let store = store.lock().await.unwrap();
2325            store
2326                .handle_linked_chunk_updates(
2327                    room_id,
2328                    vec![
2329                        Update::NewItemsChunk {
2330                            previous: None,
2331                            new: ChunkIdentifier::new(0),
2332                            next: None,
2333                        },
2334                        Update::PushItems {
2335                            at: Position::new(ChunkIdentifier::new(0), 0),
2336                            items: vec![ev1],
2337                        },
2338                        Update::NewItemsChunk {
2339                            previous: Some(ChunkIdentifier::new(0)),
2340                            new: ChunkIdentifier::new(1),
2341                            next: None,
2342                        },
2343                        Update::PushItems {
2344                            at: Position::new(ChunkIdentifier::new(1), 0),
2345                            items: vec![ev2],
2346                        },
2347                    ],
2348                )
2349                .await
2350                .unwrap();
2351        }
2352
2353        let event_cache = client.event_cache();
2354        event_cache.subscribe().unwrap();
2355        event_cache.enable_storage().unwrap();
2356
2357        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2358        let room = client.get_room(room_id).unwrap();
2359        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2360
2361        // Sanity check: lazily loaded, so only includes one item at start.
2362        let (events, mut stream) = room_event_cache.subscribe().await;
2363        assert_eq!(events.len(), 1);
2364        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2365        assert!(stream.is_empty());
2366
2367        // Force loading the full linked chunk by back-paginating.
2368        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2369        assert_eq!(outcome.events.len(), 1);
2370        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2371        assert!(outcome.reached_start);
2372
2373        // We also get an update about the loading from the store.
2374        assert_let_timeout!(
2375            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2376        );
2377        assert_eq!(diffs.len(), 1);
2378        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2379            assert_eq!(value.event_id().as_deref(), Some(evid1));
2380        });
2381
2382        assert!(stream.is_empty());
2383
2384        // Shrink the linked chunk to the last chunk.
2385        let diffs = room_event_cache
2386            .inner
2387            .state
2388            .write()
2389            .await
2390            .shrink_to_last_chunk()
2391            .await
2392            .expect("shrinking should succeed")
2393            .unwrap();
2394
2395        // We receive updates about the changes to the linked chunk.
2396        assert_eq!(diffs.len(), 2);
2397        assert_matches!(&diffs[0], VectorDiff::Clear);
2398        assert_matches!(&diffs[1], VectorDiff::Append { values} => {
2399            assert_eq!(values.len(), 1);
2400            assert_eq!(values[0].event_id().as_deref(), Some(evid2));
2401        });
2402
2403        assert!(stream.is_empty());
2404
2405        // When reading the events, we do get only the last one.
2406        let (events, _) = room_event_cache.subscribe().await;
2407        assert_eq!(events.len(), 1);
2408        assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2409
2410        // But if we back-paginate, we don't need access to network to find out about
2411        // the previous event.
2412        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2413        assert_eq!(outcome.events.len(), 1);
2414        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2415        assert!(outcome.reached_start);
2416    }
2417
2418    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
2419    #[async_test]
2420    async fn test_auto_shrink_after_all_subscribers_are_gone() {
2421        use eyeball_im::VectorDiff;
2422        use tokio::task::yield_now;
2423
2424        use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
2425
2426        let room_id = room_id!("!galette:saucisse.bzh");
2427
2428        let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2429
2430        let f = EventFactory::new().room(room_id);
2431
2432        let evid1 = event_id!("$1");
2433        let evid2 = event_id!("$2");
2434
2435        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2436        let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2437
2438        // Fill the event cache store with an initial linked chunk with 2 events chunks.
2439        {
2440            let store = client.event_cache_store();
2441            let store = store.lock().await.unwrap();
2442            store
2443                .handle_linked_chunk_updates(
2444                    room_id,
2445                    vec![
2446                        Update::NewItemsChunk {
2447                            previous: None,
2448                            new: ChunkIdentifier::new(0),
2449                            next: None,
2450                        },
2451                        Update::PushItems {
2452                            at: Position::new(ChunkIdentifier::new(0), 0),
2453                            items: vec![ev1],
2454                        },
2455                        Update::NewItemsChunk {
2456                            previous: Some(ChunkIdentifier::new(0)),
2457                            new: ChunkIdentifier::new(1),
2458                            next: None,
2459                        },
2460                        Update::PushItems {
2461                            at: Position::new(ChunkIdentifier::new(1), 0),
2462                            items: vec![ev2],
2463                        },
2464                    ],
2465                )
2466                .await
2467                .unwrap();
2468        }
2469
2470        let event_cache = client.event_cache();
2471        event_cache.subscribe().unwrap();
2472        event_cache.enable_storage().unwrap();
2473
2474        client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2475        let room = client.get_room(room_id).unwrap();
2476        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2477
2478        // Sanity check: lazily loaded, so only includes one item at start.
2479        let (events1, mut stream1) = room_event_cache.subscribe().await;
2480        assert_eq!(events1.len(), 1);
2481        assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
2482        assert!(stream1.is_empty());
2483
2484        // Force loading the full linked chunk by back-paginating.
2485        let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2486        assert_eq!(outcome.events.len(), 1);
2487        assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2488        assert!(outcome.reached_start);
2489
2490        // We also get an update about the loading from the store. Ignore it, for this
2491        // test's sake.
2492        assert_let_timeout!(
2493            Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
2494        );
2495        assert_eq!(diffs.len(), 1);
2496        assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2497            assert_eq!(value.event_id().as_deref(), Some(evid1));
2498        });
2499
2500        assert!(stream1.is_empty());
2501
2502        // Have another listener subscribe to the event cache.
2503        // Since it's not the first one, and the previous one loaded some more events,
2504        // the second listener seems them all.
2505        let (events2, stream2) = room_event_cache.subscribe().await;
2506        assert_eq!(events2.len(), 2);
2507        assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
2508        assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
2509        assert!(stream2.is_empty());
2510
2511        // Drop the first stream, and wait a bit.
2512        drop(stream1);
2513        yield_now().await;
2514
2515        // The second stream remains undisturbed.
2516        assert!(stream2.is_empty());
2517
2518        // Now drop the second stream, and wait a bit.
2519        drop(stream2);
2520        yield_now().await;
2521
2522        // The linked chunk must have auto-shrunk by now.
2523
2524        {
2525            // Check the inner state: there's no more shared auto-shrinker.
2526            let state = room_event_cache.inner.state.read().await;
2527            assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0);
2528        }
2529
2530        // Getting the events will only give us the latest chunk.
2531        let (events3, _stream2) = room_event_cache.subscribe().await;
2532        assert_eq!(events3.len(), 1);
2533        assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2534    }
2535}