1use std::{
24 collections::{BTreeMap, BTreeSet, HashMap},
25 fmt,
26 ops::Deref,
27 result::Result as StdResult,
28 str::Utf8Error,
29 sync::{Arc, RwLock as StdRwLock},
30};
31
32use eyeball_im::{Vector, VectorDiff};
33use futures_util::Stream;
34use once_cell::sync::OnceCell;
35
36#[cfg(any(test, feature = "testing"))]
37#[macro_use]
38pub mod integration_tests;
39mod observable_map;
40mod traits;
41
42#[cfg(feature = "e2e-encryption")]
43use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
44pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
45use observable_map::ObservableMap;
46use ruma::{
47 events::{
48 presence::PresenceEvent,
49 receipt::ReceiptEventContent,
50 room::{member::StrippedRoomMemberEvent, redaction::SyncRoomRedactionEvent},
51 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52 AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
53 },
54 serde::Raw,
55 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
56};
57use tokio::sync::{broadcast, Mutex, RwLock};
58use tracing::warn;
59
60use crate::{
61 deserialized_responses::DisplayName,
62 event_cache::store as event_cache_store,
63 rooms::{normal::RoomInfoNotableUpdate, RoomInfo, RoomState},
64 MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
65};
66
67pub(crate) mod ambiguity_map;
68mod memory_store;
69pub mod migration_helpers;
70mod send_queue;
71
72#[cfg(any(test, feature = "testing"))]
73pub use self::integration_tests::StateStoreIntegrationTests;
74#[cfg(feature = "unstable-msc4274")]
75pub use self::send_queue::AccumulatedSentMediaInfo;
76pub use self::{
77 memory_store::MemoryStore,
78 send_queue::{
79 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
80 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
81 SentMediaInfo, SentRequestKey, SerializableEventContent,
82 },
83 traits::{
84 ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities,
85 StateStore, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
86 },
87};
88
89#[derive(Debug, thiserror::Error)]
91pub enum StoreError {
92 #[error(transparent)]
94 Backend(Box<dyn std::error::Error + Send + Sync>),
95 #[error(transparent)]
97 Json(#[from] serde_json::Error),
98 #[error(transparent)]
101 Identifier(#[from] ruma::IdParseError),
102 #[error("The store failed to be unlocked")]
105 StoreLocked,
106 #[error("The store is not encrypted but was tried to be opened with a passphrase")]
108 UnencryptedStore,
109 #[error("Error encrypting or decrypting data from the store: {0}")]
111 Encryption(#[from] StoreEncryptionError),
112
113 #[error("Error encoding or decoding data from the store: {0}")]
115 Codec(#[from] Utf8Error),
116
117 #[error(
119 "The database format changed in an incompatible way, current \
120 version: {0}, latest version: {1}"
121 )]
122 UnsupportedDatabaseVersion(usize, usize),
123 #[error("Redaction failed: {0}")]
127 Redaction(#[source] ruma::canonical_json::RedactionError),
128}
129
130impl StoreError {
131 #[inline]
135 pub fn backend<E>(error: E) -> Self
136 where
137 E: std::error::Error + Send + Sync + 'static,
138 {
139 Self::Backend(Box::new(error))
140 }
141}
142
143pub type Result<T, E = StoreError> = std::result::Result<T, E>;
145
146#[derive(Clone)]
151pub(crate) struct BaseStateStore {
152 pub(super) inner: Arc<DynStateStore>,
153 session_meta: Arc<OnceCell<SessionMeta>>,
154 room_load_settings: Arc<RwLock<RoomLoadSettings>>,
155 pub(super) sync_token: Arc<RwLock<Option<String>>>,
157 rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
159 sync_lock: Arc<Mutex<()>>,
162}
163
164impl BaseStateStore {
165 pub fn new(inner: Arc<DynStateStore>) -> Self {
167 Self {
168 inner,
169 session_meta: Default::default(),
170 room_load_settings: Default::default(),
171 sync_token: Default::default(),
172 rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
173 sync_lock: Default::default(),
174 }
175 }
176
177 pub fn sync_lock(&self) -> &Mutex<()> {
179 &self.sync_lock
180 }
181
182 pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
188 self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
189 }
190
191 pub(crate) async fn load_rooms(
194 &self,
195 user_id: &UserId,
196 room_load_settings: RoomLoadSettings,
197 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
198 ) -> Result<()> {
199 *self.room_load_settings.write().await = room_load_settings.clone();
200
201 let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
202
203 let mut rooms = self.rooms.write().unwrap();
204
205 for room_info in room_infos {
206 let new_room = Room::restore(
207 user_id,
208 self.inner.clone(),
209 room_info,
210 room_info_notable_update_sender.clone(),
211 );
212 let new_room_id = new_room.room_id().to_owned();
213
214 rooms.insert(new_room_id, new_room);
215 }
216
217 Ok(())
218 }
219
220 async fn load_and_migrate_room_infos(
223 &self,
224 room_load_settings: RoomLoadSettings,
225 ) -> Result<Vec<RoomInfo>> {
226 let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
227 let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
228
229 for room_info in room_infos.iter_mut() {
230 if room_info.apply_migrations(self.inner.clone()).await {
231 migrated_room_infos.push(room_info.clone());
232 }
233 }
234
235 if !migrated_room_infos.is_empty() {
236 let changes = StateChanges {
237 room_infos: migrated_room_infos
238 .into_iter()
239 .map(|room_info| (room_info.room_id.clone(), room_info))
240 .collect(),
241 ..Default::default()
242 };
243
244 if let Err(error) = self.inner.save_changes(&changes).await {
245 warn!("Failed to save migrated room infos: {error}");
246 }
247 }
248
249 Ok(room_infos)
250 }
251
252 pub(crate) async fn load_sync_token(&self) -> Result<()> {
255 let token =
256 self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
257 *self.sync_token.write().await = token;
258
259 Ok(())
260 }
261
262 #[cfg(any(feature = "e2e-encryption", test))]
265 pub(crate) async fn derive_from_other(
266 &self,
267 other: &Self,
268 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
269 ) -> Result<()> {
270 let Some(session_meta) = other.session_meta.get() else {
271 return Ok(());
272 };
273
274 let room_load_settings = other.room_load_settings.read().await.clone();
275
276 self.load_rooms(&session_meta.user_id, room_load_settings, room_info_notable_update_sender)
277 .await?;
278 self.load_sync_token().await?;
279 self.set_session_meta(session_meta.clone());
280
281 Ok(())
282 }
283
284 pub fn session_meta(&self) -> Option<&SessionMeta> {
286 self.session_meta.get()
287 }
288
289 pub fn rooms(&self) -> Vec<Room> {
291 self.rooms.read().unwrap().iter().cloned().collect()
292 }
293
294 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
296 self.rooms
297 .read()
298 .unwrap()
299 .iter()
300 .filter(|room| filter.matches(room.state()))
301 .cloned()
302 .collect()
303 }
304
305 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
308 self.rooms.read().unwrap().stream()
309 }
310
311 pub fn room(&self, room_id: &RoomId) -> Option<Room> {
313 self.rooms.read().unwrap().get(room_id).cloned()
314 }
315
316 pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
318 self.rooms.read().unwrap().get(room_id).is_some()
319 }
320
321 pub fn get_or_create_room(
324 &self,
325 room_id: &RoomId,
326 room_state: RoomState,
327 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
328 ) -> Room {
329 let user_id =
330 &self.session_meta.get().expect("Creating room while not being logged in").user_id;
331
332 self.rooms
333 .write()
334 .unwrap()
335 .get_or_create(room_id, || {
336 Room::new(
337 user_id,
338 self.inner.clone(),
339 room_id,
340 room_state,
341 room_info_notable_update_sender,
342 )
343 })
344 .clone()
345 }
346
347 pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
353 self.inner.remove_room(room_id).await?;
354 self.rooms.write().unwrap().remove(room_id);
355 Ok(())
356 }
357}
358
359#[cfg(not(tarpaulin_include))]
360impl fmt::Debug for BaseStateStore {
361 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
362 f.debug_struct("Store")
363 .field("inner", &self.inner)
364 .field("session_meta", &self.session_meta)
365 .field("sync_token", &self.sync_token)
366 .field("rooms", &self.rooms)
367 .finish_non_exhaustive()
368 }
369}
370
371impl Deref for BaseStateStore {
372 type Target = DynStateStore;
373
374 fn deref(&self) -> &Self::Target {
375 self.inner.deref()
376 }
377}
378
379#[derive(Clone, Debug, Default)]
414pub enum RoomLoadSettings {
415 #[default]
420 All,
421
422 One(OwnedRoomId),
428}
429
430#[derive(Clone, Debug, Default)]
432pub struct StateChanges {
433 pub sync_token: Option<String>,
435 pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
437 pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
439
440 pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
443
444 pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
448
449 pub state:
452 BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
453 pub room_account_data:
455 BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
456
457 pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
459
460 pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
462
463 pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
466
467 pub stripped_state: BTreeMap<
470 OwnedRoomId,
471 BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
472 >,
473
474 pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
477}
478
479impl StateChanges {
480 pub fn new(sync_token: String) -> Self {
482 Self { sync_token: Some(sync_token), ..Default::default() }
483 }
484
485 pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
487 self.presence.insert(event.sender, raw_event);
488 }
489
490 pub fn add_room(&mut self, room: RoomInfo) {
492 self.room_infos.insert(room.room_id.clone(), room);
493 }
494
495 pub fn add_room_account_data(
498 &mut self,
499 room_id: &RoomId,
500 event: AnyRoomAccountDataEvent,
501 raw_event: Raw<AnyRoomAccountDataEvent>,
502 ) {
503 self.room_account_data
504 .entry(room_id.to_owned())
505 .or_default()
506 .insert(event.event_type(), raw_event);
507 }
508
509 pub fn add_stripped_member(
512 &mut self,
513 room_id: &RoomId,
514 user_id: &UserId,
515 event: Raw<StrippedRoomMemberEvent>,
516 ) {
517 self.stripped_state
518 .entry(room_id.to_owned())
519 .or_default()
520 .entry(StateEventType::RoomMember)
521 .or_default()
522 .insert(user_id.into(), event.cast());
523 }
524
525 pub fn add_state_event(
528 &mut self,
529 room_id: &RoomId,
530 event: AnySyncStateEvent,
531 raw_event: Raw<AnySyncStateEvent>,
532 ) {
533 self.state
534 .entry(room_id.to_owned())
535 .or_default()
536 .entry(event.event_type())
537 .or_default()
538 .insert(event.state_key().to_owned(), raw_event);
539 }
540
541 pub fn add_redaction(
543 &mut self,
544 room_id: &RoomId,
545 redacted_event_id: &EventId,
546 redaction: Raw<SyncRoomRedactionEvent>,
547 ) {
548 self.redactions
549 .entry(room_id.to_owned())
550 .or_default()
551 .insert(redacted_event_id.to_owned(), redaction);
552 }
553
554 pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
557 self.receipts.insert(room_id.to_owned(), event);
558 }
559}
560
561#[derive(Clone)]
576pub struct StoreConfig {
577 #[cfg(feature = "e2e-encryption")]
578 pub(crate) crypto_store: Arc<DynCryptoStore>,
579 pub(crate) state_store: Arc<DynStateStore>,
580 pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
581 cross_process_store_locks_holder_name: String,
582}
583
584#[cfg(not(tarpaulin_include))]
585impl fmt::Debug for StoreConfig {
586 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
587 fmt.debug_struct("StoreConfig").finish()
588 }
589}
590
591impl StoreConfig {
592 #[must_use]
597 pub fn new(cross_process_store_locks_holder_name: String) -> Self {
598 Self {
599 #[cfg(feature = "e2e-encryption")]
600 crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
601 state_store: Arc::new(MemoryStore::new()),
602 event_cache_store: event_cache_store::EventCacheStoreLock::new(
603 event_cache_store::MemoryStore::new(),
604 cross_process_store_locks_holder_name.clone(),
605 ),
606 cross_process_store_locks_holder_name,
607 }
608 }
609
610 #[cfg(feature = "e2e-encryption")]
614 pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
615 self.crypto_store = store.into_crypto_store();
616 self
617 }
618
619 pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
621 self.state_store = store.into_state_store();
622 self
623 }
624
625 pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
627 where
628 S: event_cache_store::IntoEventCacheStore,
629 {
630 self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
631 event_cache_store,
632 self.cross_process_store_locks_holder_name.clone(),
633 );
634 self
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use std::sync::Arc;
641
642 use assert_matches::assert_matches;
643 use matrix_sdk_test::async_test;
644 use ruma::{owned_device_id, owned_user_id, room_id, user_id};
645 use tokio::sync::broadcast;
646
647 use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
648 use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
649
650 #[async_test]
651 async fn test_set_session_meta() {
652 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
653
654 let session_meta = SessionMeta {
655 user_id: owned_user_id!("@mnt_io:matrix.org"),
656 device_id: owned_device_id!("HELLOYOU"),
657 };
658
659 assert!(store.session_meta.get().is_none());
660
661 store.set_session_meta(session_meta.clone());
662
663 assert_eq!(store.session_meta.get(), Some(&session_meta));
664 }
665
666 #[async_test]
667 #[should_panic]
668 async fn test_set_session_meta_twice() {
669 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
670
671 let session_meta = SessionMeta {
672 user_id: owned_user_id!("@mnt_io:matrix.org"),
673 device_id: owned_device_id!("HELLOYOU"),
674 };
675
676 store.set_session_meta(session_meta.clone());
677 store.set_session_meta(session_meta);
679 }
680
681 #[async_test]
682 async fn test_derive_from_other() {
683 let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
685
686 let session_meta = SessionMeta {
687 user_id: owned_user_id!("@mnt_io:matrix.org"),
688 device_id: owned_device_id!("HELLOYOU"),
689 };
690 let (room_info_notable_update_sender, _) = broadcast::channel(1);
691 let room_id_0 = room_id!("!r0");
692
693 other
694 .load_rooms(
695 &session_meta.user_id,
696 RoomLoadSettings::One(room_id_0.to_owned()),
697 &room_info_notable_update_sender,
698 )
699 .await
700 .unwrap();
701 other.set_session_meta(session_meta.clone());
702
703 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
705 store.derive_from_other(&other, &room_info_notable_update_sender).await.unwrap();
706
707 assert_eq!(store.session_meta.get(), Some(&session_meta));
709 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
711 assert_eq!(room_id, room_id_0);
712 });
713 }
714
715 #[test]
716 fn test_room_load_settings_default() {
717 assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
718 }
719
720 #[async_test]
721 async fn test_load_all_rooms() {
722 let room_id_0 = room_id!("!r0");
723 let room_id_1 = room_id!("!r1");
724 let user_id = user_id!("@mnt_io:matrix.org");
725
726 let memory_state_store = Arc::new(MemoryStore::new());
727
728 {
730 let store = BaseStateStore::new(memory_state_store.clone());
731 let mut changes = StateChanges::default();
732 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
733 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
734
735 store.inner.save_changes(&changes).await.unwrap();
736 }
737
738 {
740 let store = BaseStateStore::new(memory_state_store.clone());
741 let (room_info_notable_update_sender, _) = broadcast::channel(2);
742
743 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
745
746 store
748 .load_rooms(user_id, RoomLoadSettings::All, &room_info_notable_update_sender)
749 .await
750 .unwrap();
751
752 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
754
755 let mut rooms = store.rooms();
757 rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
758
759 assert_eq!(rooms.len(), 2);
760
761 assert_eq!(rooms[0].room_id(), room_id_0);
762 assert_eq!(rooms[0].own_user_id(), user_id);
763
764 assert_eq!(rooms[1].room_id(), room_id_1);
765 assert_eq!(rooms[1].own_user_id(), user_id);
766 }
767 }
768
769 #[async_test]
770 async fn test_load_one_room() {
771 let room_id_0 = room_id!("!r0");
772 let room_id_1 = room_id!("!r1");
773 let user_id = user_id!("@mnt_io:matrix.org");
774
775 let memory_state_store = Arc::new(MemoryStore::new());
776
777 {
779 let store = BaseStateStore::new(memory_state_store.clone());
780 let mut changes = StateChanges::default();
781 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
782 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
783
784 store.inner.save_changes(&changes).await.unwrap();
785 }
786
787 {
789 let store = BaseStateStore::new(memory_state_store.clone());
790 let (room_info_notable_update_sender, _) = broadcast::channel(2);
791
792 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
794
795 store
797 .load_rooms(
798 user_id,
799 RoomLoadSettings::One(room_id_1.to_owned()),
800 &room_info_notable_update_sender,
801 )
802 .await
803 .unwrap();
804
805 assert_matches!(
807 *store.room_load_settings.read().await,
808 RoomLoadSettings::One(ref room_id) => {
809 assert_eq!(room_id, room_id_1);
810 }
811 );
812
813 let rooms = store.rooms();
815 assert_eq!(rooms.len(), 1);
816
817 assert_eq!(rooms[0].room_id(), room_id_1);
818 assert_eq!(rooms[0].own_user_id(), user_id);
819 }
820 }
821}