1use std::{
42 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43 fmt::Debug,
44 ops::Deref,
45 pin::pin,
46 sync::{atomic::Ordering, Arc},
47 time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55 encryption::KeyUsage, events::secret::request::SecretName, DeviceId, OwnedDeviceId,
56 OwnedUserId, RoomId, UserId,
57};
58use serde::{de::DeserializeOwned, Serialize};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{error, info, instrument, trace, warn};
63use types::RoomKeyBundleInfo;
64use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
65
66use self::types::{
67 Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68 PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, UserKeyQueryResult,
69};
70#[cfg(doc)]
71use crate::{backups::BackupMachine, identities::OwnUserIdentity};
72use crate::{
73 gossiping::GossippedSecret,
74 identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
75 olm::{
76 Account, ExportedRoomKey, InboundGroupSession, PrivateCrossSigningIdentity, SenderData,
77 Session, StaticAccountData,
78 },
79 types::{
80 BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
81 SecretsBundle,
82 },
83 verification::VerificationMachine,
84 CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
85};
86
87pub mod caches;
88mod crypto_store_wrapper;
89mod error;
90mod memorystore;
91mod traits;
92pub mod types;
93
94#[cfg(any(test, feature = "testing"))]
95#[macro_use]
96#[allow(missing_docs)]
97pub mod integration_tests;
98
99pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
100pub use error::{CryptoStoreError, Result};
101use matrix_sdk_common::{
102 deserialized_responses::WithheldCode, store_locks::CrossProcessStoreLock, timeout::timeout,
103};
104pub use memorystore::MemoryStore;
105pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
106
107use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
108use crate::types::{
109 events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
110};
111pub use crate::{
112 dehydrated_devices::DehydrationError,
113 gossiping::{GossipRequest, SecretInfo},
114};
115
116#[derive(Debug, Clone)]
123pub struct Store {
124 inner: Arc<StoreInner>,
125}
126
127#[derive(Debug, Default)]
128pub(crate) struct KeyQueryManager {
129 users_for_key_query: Mutex<UsersForKeyQuery>,
131
132 users_for_key_query_notify: Notify,
134}
135
136impl KeyQueryManager {
137 pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
138 self.ensure_sync_tracked_users(cache).await?;
139 Ok(SyncedKeyQueryManager { cache, manager: self })
140 }
141
142 async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
149 let loaded = cache.loaded_tracked_users.read().await;
151 if *loaded {
152 return Ok(());
153 }
154
155 drop(loaded);
157 let mut loaded = cache.loaded_tracked_users.write().await;
158
159 if *loaded {
163 return Ok(());
164 }
165
166 let tracked_users = cache.store.load_tracked_users().await?;
167
168 let mut query_users_lock = self.users_for_key_query.lock().await;
169 let mut tracked_users_cache = cache.tracked_users.write();
170 for user in tracked_users {
171 tracked_users_cache.insert(user.user_id.to_owned());
172
173 if user.dirty {
174 query_users_lock.insert_user(&user.user_id);
175 }
176 }
177
178 *loaded = true;
179
180 Ok(())
181 }
182
183 pub async fn wait_if_user_key_query_pending(
193 &self,
194 cache: StoreCacheGuard,
195 timeout_duration: Duration,
196 user: &UserId,
197 ) -> Result<UserKeyQueryResult> {
198 {
199 self.ensure_sync_tracked_users(&cache).await?;
202 drop(cache);
203 }
204
205 let mut users_for_key_query = self.users_for_key_query.lock().await;
206 let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
207 return Ok(UserKeyQueryResult::WasNotPending);
208 };
209
210 let wait_for_completion = async {
211 while !waiter.completed.load(Ordering::Relaxed) {
212 let mut notified = pin!(self.users_for_key_query_notify.notified());
216 notified.as_mut().enable();
217 drop(users_for_key_query);
218
219 notified.await;
221
222 users_for_key_query = self.users_for_key_query.lock().await;
226 }
227 };
228
229 match timeout(Box::pin(wait_for_completion), timeout_duration).await {
230 Err(_) => {
231 warn!(
232 user_id = ?user,
233 "The user has a pending `/keys/query` request which did \
234 not finish yet, some devices might be missing."
235 );
236
237 Ok(UserKeyQueryResult::TimeoutExpired)
238 }
239 _ => Ok(UserKeyQueryResult::WasPending),
240 }
241 }
242}
243
244pub(crate) struct SyncedKeyQueryManager<'a> {
245 cache: &'a StoreCache,
246 manager: &'a KeyQueryManager,
247}
248
249impl SyncedKeyQueryManager<'_> {
250 pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
255 let mut store_updates = Vec::new();
256 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
257
258 {
259 let mut tracked_users = self.cache.tracked_users.write();
260 for user_id in users {
261 if tracked_users.insert(user_id.to_owned()) {
262 key_query_lock.insert_user(user_id);
263 store_updates.push((user_id, true))
264 }
265 }
266 }
267
268 self.cache.store.save_tracked_users(&store_updates).await
269 }
270
271 pub async fn mark_tracked_users_as_changed(
278 &self,
279 users: impl Iterator<Item = &UserId>,
280 ) -> Result<()> {
281 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
282 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
283
284 {
285 let tracked_users = &self.cache.tracked_users.read();
286 for user_id in users {
287 if tracked_users.contains(user_id) {
288 key_query_lock.insert_user(user_id);
289 store_updates.push((user_id, true));
290 }
291 }
292 }
293
294 self.cache.store.save_tracked_users(&store_updates).await
295 }
296
297 pub async fn mark_tracked_users_as_up_to_date(
303 &self,
304 users: impl Iterator<Item = &UserId>,
305 sequence_number: SequenceNumber,
306 ) -> Result<()> {
307 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
308 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
309
310 {
311 let tracked_users = self.cache.tracked_users.read();
312 for user_id in users {
313 if tracked_users.contains(user_id) {
314 let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
315 store_updates.push((user_id, !clean));
316 }
317 }
318 }
319
320 self.cache.store.save_tracked_users(&store_updates).await?;
321 self.manager.users_for_key_query_notify.notify_waiters();
323
324 Ok(())
325 }
326
327 pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
339 self.manager.users_for_key_query.lock().await.users_for_key_query()
340 }
341
342 pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
344 self.cache.tracked_users.read().iter().cloned().collect()
345 }
346
347 pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
353 self.manager.users_for_key_query.lock().await.insert_user(user);
354 self.cache.tracked_users.write().insert(user.to_owned());
355
356 self.cache.store.save_tracked_users(&[(user, true)]).await
357 }
358}
359
360fn collect_device_updates(
366 verification_machine: VerificationMachine,
367 own_identity: Option<OwnUserIdentityData>,
368 identities: IdentityChanges,
369 devices: DeviceChanges,
370) -> DeviceUpdates {
371 let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
372 let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
373
374 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
375
376 let map_device = |device: DeviceData| {
377 let device_owner_identity = new_identities
378 .get(device.user_id())
379 .or_else(|| changed_identities.get(device.user_id()))
380 .or_else(|| unchanged_identities.get(device.user_id()))
381 .cloned();
382
383 Device {
384 inner: device,
385 verification_machine: verification_machine.to_owned(),
386 own_identity: own_identity.to_owned(),
387 device_owner_identity,
388 }
389 };
390
391 for device in devices.new {
392 let device = map_device(device);
393
394 new.entry(device.user_id().to_owned())
395 .or_default()
396 .insert(device.device_id().to_owned(), device);
397 }
398
399 for device in devices.changed {
400 let device = map_device(device);
401
402 changed
403 .entry(device.user_id().to_owned())
404 .or_default()
405 .insert(device.device_id().to_owned(), device.to_owned());
406 }
407
408 DeviceUpdates { new, changed }
409}
410
411#[allow(missing_debug_implementations)]
413pub struct StoreTransaction {
414 store: Store,
415 changes: PendingChanges,
416 cache: OwnedRwLockWriteGuard<StoreCache>,
418}
419
420impl StoreTransaction {
421 async fn new(store: Store) -> Self {
423 let cache = store.inner.cache.clone();
424
425 Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
426 }
427
428 pub(crate) fn cache(&self) -> &StoreCache {
429 &self.cache
430 }
431
432 pub fn store(&self) -> &Store {
434 &self.store
435 }
436
437 pub async fn account(&mut self) -> Result<&mut Account> {
444 if self.changes.account.is_none() {
445 let _ = self.cache.account().await?;
447 self.changes.account = self.cache.account.lock().await.take();
448 }
449 Ok(self.changes.account.as_mut().unwrap())
450 }
451
452 pub async fn commit(self) -> Result<()> {
455 if self.changes.is_empty() {
456 return Ok(());
457 }
458
459 let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
461
462 self.store.save_pending_changes(self.changes).await?;
463
464 if let Some(account) = account {
466 *self.cache.account.lock().await = Some(account);
467 }
468
469 Ok(())
470 }
471}
472
473#[derive(Debug)]
474struct StoreInner {
475 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
476 store: Arc<CryptoStoreWrapper>,
477
478 cache: Arc<RwLock<StoreCache>>,
482
483 verification_machine: VerificationMachine,
484
485 static_account: StaticAccountData,
488}
489
490#[derive(Debug, Error)]
493pub enum SecretImportError {
494 #[error(transparent)]
496 Key(#[from] vodozemac::KeyError),
497 #[error(
500 "The public key of the imported private key doesn't match to the \
501 public key that was uploaded to the server"
502 )]
503 MismatchedPublicKeys,
504 #[error(transparent)]
506 Store(#[from] CryptoStoreError),
507}
508
509#[derive(Debug, Error)]
514pub enum SecretsBundleExportError {
515 #[error(transparent)]
517 Store(#[from] CryptoStoreError),
518 #[error("The store is missing one or multiple cross-signing keys")]
520 MissingCrossSigningKey(KeyUsage),
521 #[error("The store doesn't contain any cross-signing keys")]
523 MissingCrossSigningKeys,
524 #[error("The store contains a backup key, but no backup version")]
527 MissingBackupVersion,
528}
529
530impl Store {
531 pub(crate) fn new(
533 account: StaticAccountData,
534 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
535 store: Arc<CryptoStoreWrapper>,
536 verification_machine: VerificationMachine,
537 ) -> Self {
538 Self {
539 inner: Arc::new(StoreInner {
540 static_account: account,
541 identity,
542 store: store.clone(),
543 verification_machine,
544 cache: Arc::new(RwLock::new(StoreCache {
545 store,
546 tracked_users: Default::default(),
547 loaded_tracked_users: Default::default(),
548 account: Default::default(),
549 })),
550 }),
551 }
552 }
553
554 pub(crate) fn user_id(&self) -> &UserId {
556 &self.inner.static_account.user_id
557 }
558
559 pub(crate) fn device_id(&self) -> &DeviceId {
561 self.inner.verification_machine.own_device_id()
562 }
563
564 pub(crate) fn static_account(&self) -> &StaticAccountData {
566 &self.inner.static_account
567 }
568
569 pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
570 Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
575 }
576
577 pub(crate) async fn transaction(&self) -> StoreTransaction {
578 StoreTransaction::new(self.clone()).await
579 }
580
581 pub(crate) async fn with_transaction<
584 T,
585 Fut: futures_core::Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
586 F: FnOnce(StoreTransaction) -> Fut,
587 >(
588 &self,
589 func: F,
590 ) -> Result<T, crate::OlmError> {
591 let tr = self.transaction().await;
592 let (tr, res) = func(tr).await?;
593 tr.commit().await?;
594 Ok(res)
595 }
596
597 #[cfg(test)]
598 pub(crate) async fn reset_cross_signing_identity(&self) {
600 self.inner.identity.lock().await.reset();
601 }
602
603 pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
605 self.inner.identity.clone()
606 }
607
608 pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
610 let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
611
612 self.save_changes(changes).await
613 }
614
615 pub(crate) async fn get_sessions(
616 &self,
617 sender_key: &str,
618 ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
619 self.inner.store.get_sessions(sender_key).await
620 }
621
622 pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
623 self.inner.store.save_changes(changes).await
624 }
625
626 pub(crate) async fn compare_group_session(
633 &self,
634 session: &InboundGroupSession,
635 ) -> Result<SessionOrdering> {
636 let old_session = self
637 .inner
638 .store
639 .get_inbound_group_session(session.room_id(), session.session_id())
640 .await?;
641
642 Ok(if let Some(old_session) = old_session {
643 session.compare(&old_session).await
644 } else {
645 SessionOrdering::Better
646 })
647 }
648
649 #[cfg(test)]
650 pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
652 use types::DeviceChanges;
653
654 let changes = Changes {
655 devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
656 ..Default::default()
657 };
658
659 self.save_changes(changes).await
660 }
661
662 pub(crate) async fn save_inbound_group_sessions(
664 &self,
665 sessions: &[InboundGroupSession],
666 ) -> Result<()> {
667 let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
668
669 self.save_changes(changes).await
670 }
671
672 pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
674 Ok(self
675 .inner
676 .store
677 .get_device(self.user_id(), self.device_id())
678 .await?
679 .and_then(|d| d.display_name().map(|d| d.to_owned())))
680 }
681
682 pub(crate) async fn get_device_data(
687 &self,
688 user_id: &UserId,
689 device_id: &DeviceId,
690 ) -> Result<Option<DeviceData>> {
691 self.inner.store.get_device(user_id, device_id).await
692 }
693
694 pub(crate) async fn get_device_data_for_user_filtered(
702 &self,
703 user_id: &UserId,
704 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
705 self.inner.store.get_user_devices(user_id).await.map(|mut d| {
706 if user_id == self.user_id() {
707 d.remove(self.device_id());
708 }
709 d
710 })
711 }
712
713 pub(crate) async fn get_device_data_for_user(
722 &self,
723 user_id: &UserId,
724 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
725 self.inner.store.get_user_devices(user_id).await
726 }
727
728 pub(crate) async fn get_device_from_curve_key(
734 &self,
735 user_id: &UserId,
736 curve_key: Curve25519PublicKey,
737 ) -> Result<Option<Device>> {
738 self.get_user_devices(user_id)
739 .await
740 .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
741 }
742
743 pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
753 let devices = self.get_device_data_for_user(user_id).await?;
754
755 let own_identity = self
756 .inner
757 .store
758 .get_user_identity(self.user_id())
759 .await?
760 .and_then(|i| i.own().cloned());
761 let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
762
763 Ok(UserDevices {
764 inner: devices,
765 verification_machine: self.inner.verification_machine.clone(),
766 own_identity,
767 device_owner_identity,
768 })
769 }
770
771 pub(crate) async fn get_device(
781 &self,
782 user_id: &UserId,
783 device_id: &DeviceId,
784 ) -> Result<Option<Device>> {
785 if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
786 Ok(Some(self.wrap_device_data(device_data).await?))
787 } else {
788 Ok(None)
789 }
790 }
791
792 pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
797 let own_identity = self
798 .inner
799 .store
800 .get_user_identity(self.user_id())
801 .await?
802 .and_then(|i| i.own().cloned());
803
804 let device_owner_identity =
805 self.inner.store.get_user_identity(device_data.user_id()).await?;
806
807 Ok(Device {
808 inner: device_data,
809 verification_machine: self.inner.verification_machine.clone(),
810 own_identity,
811 device_owner_identity,
812 })
813 }
814
815 pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
817 let own_identity = self
818 .inner
819 .store
820 .get_user_identity(self.user_id())
821 .await?
822 .and_then(as_variant!(UserIdentityData::Own));
823
824 Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
825 UserIdentity::new(
826 self.clone(),
827 i,
828 self.inner.verification_machine.to_owned(),
829 own_identity,
830 )
831 }))
832 }
833
834 pub async fn export_secret(
843 &self,
844 secret_name: &SecretName,
845 ) -> Result<Option<String>, CryptoStoreError> {
846 Ok(match secret_name {
847 SecretName::CrossSigningMasterKey
848 | SecretName::CrossSigningUserSigningKey
849 | SecretName::CrossSigningSelfSigningKey => {
850 self.inner.identity.lock().await.export_secret(secret_name).await
851 }
852 SecretName::RecoveryKey => {
853 if let Some(key) = self.load_backup_keys().await?.decryption_key {
854 let exported = key.to_base64();
855 Some(exported)
856 } else {
857 None
858 }
859 }
860 name => {
861 warn!(secret = ?name, "Unknown secret was requested");
862 None
863 }
864 })
865 }
866
867 pub async fn export_cross_signing_keys(
875 &self,
876 ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
877 let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
878 let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
879 let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
880
881 Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
882 None
883 } else {
884 Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
885 })
886 }
887
888 pub async fn import_cross_signing_keys(
893 &self,
894 export: CrossSigningKeyExport,
895 ) -> Result<CrossSigningStatus, SecretImportError> {
896 if let Some(public_identity) =
897 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
898 {
899 let identity = self.inner.identity.lock().await;
900
901 identity
902 .import_secrets(
903 public_identity.to_owned(),
904 export.master_key.as_deref(),
905 export.self_signing_key.as_deref(),
906 export.user_signing_key.as_deref(),
907 )
908 .await?;
909
910 let status = identity.status().await;
911
912 let diff = identity.get_public_identity_diff(&public_identity.inner).await;
913
914 let mut changes =
915 Changes { private_identity: Some(identity.clone()), ..Default::default() };
916
917 if diff.none_differ() {
918 public_identity.mark_as_verified();
919 changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
920 }
921
922 info!(?status, "Successfully imported the private cross-signing keys");
923
924 self.save_changes(changes).await?;
925 } else {
926 warn!(
927 "No public identity found while importing cross-signing keys, \
928 a /keys/query needs to be done"
929 );
930 }
931
932 Ok(self.inner.identity.lock().await.status().await)
933 }
934
935 pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
947 let Some(cross_signing) = self.export_cross_signing_keys().await? else {
948 return Err(SecretsBundleExportError::MissingCrossSigningKeys);
949 };
950
951 let Some(master_key) = cross_signing.master_key.clone() else {
952 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
953 };
954
955 let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
956 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
957 };
958
959 let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
960 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
961 };
962
963 let backup_keys = self.load_backup_keys().await?;
964
965 let backup = if let Some(key) = backup_keys.decryption_key {
966 if let Some(backup_version) = backup_keys.backup_version {
967 Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
968 MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
969 ))
970 } else {
971 return Err(SecretsBundleExportError::MissingBackupVersion);
972 }
973 } else {
974 None
975 };
976
977 Ok(SecretsBundle {
978 cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
979 backup,
980 })
981 }
982
983 pub async fn import_secrets_bundle(
996 &self,
997 bundle: &SecretsBundle,
998 ) -> Result<(), SecretImportError> {
999 let mut changes = Changes::default();
1000
1001 if let Some(backup_bundle) = &bundle.backup {
1002 match backup_bundle {
1003 BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1004 changes.backup_decryption_key = Some(bundle.key.clone());
1005 changes.backup_version = Some(bundle.backup_version.clone());
1006 }
1007 }
1008 }
1009
1010 let identity = self.inner.identity.lock().await;
1011
1012 identity
1013 .import_secrets_unchecked(
1014 Some(&bundle.cross_signing.master_key),
1015 Some(&bundle.cross_signing.self_signing_key),
1016 Some(&bundle.cross_signing.user_signing_key),
1017 )
1018 .await?;
1019
1020 let public_identity = identity.to_public_identity().await.expect(
1021 "We should be able to create a new public identity since we just imported \
1022 all the private cross-signing keys",
1023 );
1024
1025 changes.private_identity = Some(identity.clone());
1026 changes.identities.new.push(UserIdentityData::Own(public_identity));
1027
1028 Ok(self.save_changes(changes).await?)
1029 }
1030
1031 pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1033 match &secret.secret_name {
1034 SecretName::CrossSigningMasterKey
1035 | SecretName::CrossSigningUserSigningKey
1036 | SecretName::CrossSigningSelfSigningKey => {
1037 if let Some(public_identity) =
1038 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1039 {
1040 let identity = self.inner.identity.lock().await;
1041
1042 identity
1043 .import_secret(
1044 public_identity,
1045 &secret.secret_name,
1046 &secret.event.content.secret,
1047 )
1048 .await?;
1049 info!(
1050 secret_name = ?secret.secret_name,
1051 "Successfully imported a private cross signing key"
1052 );
1053
1054 let changes =
1055 Changes { private_identity: Some(identity.clone()), ..Default::default() };
1056
1057 self.save_changes(changes).await?;
1058 }
1059 }
1060 SecretName::RecoveryKey => {
1061 }
1067 name => {
1068 warn!(secret = ?name, "Tried to import an unknown secret");
1069 }
1070 }
1071
1072 Ok(())
1073 }
1074
1075 pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1078 let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1079 Ok(value)
1080 }
1081
1082 pub async fn set_only_allow_trusted_devices(
1085 &self,
1086 block_untrusted_devices: bool,
1087 ) -> Result<()> {
1088 self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1089 }
1090
1091 pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1093 let Some(value) = self.get_custom_value(key).await? else {
1094 return Ok(None);
1095 };
1096 let deserialized = self.deserialize_value(&value)?;
1097 Ok(Some(deserialized))
1098 }
1099
1100 pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1102 let serialized = self.serialize_value(value)?;
1103 self.set_custom_value(key, serialized).await?;
1104 Ok(())
1105 }
1106
1107 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1108 let serialized =
1109 rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1110 Ok(serialized)
1111 }
1112
1113 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1114 let deserialized =
1115 rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1116 Ok(deserialized)
1117 }
1118
1119 pub fn room_keys_received_stream(
1131 &self,
1132 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
1133 self.inner.store.room_keys_received_stream()
1134 }
1135
1136 pub fn room_keys_withheld_received_stream(
1145 &self,
1146 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
1147 self.inner.store.room_keys_withheld_received_stream()
1148 }
1149
1150 pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> {
1181 let verification_machine = self.inner.verification_machine.to_owned();
1182
1183 let this = self.clone();
1184 self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1185 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1186
1187 let map_identity = |(user_id, identity)| {
1188 (
1189 user_id,
1190 UserIdentity::new(
1191 this.clone(),
1192 identity,
1193 verification_machine.to_owned(),
1194 own_identity.to_owned(),
1195 ),
1196 )
1197 };
1198
1199 let new = new_identities.into_iter().map(map_identity).collect();
1200 let changed = changed_identities.into_iter().map(map_identity).collect();
1201 let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1202
1203 IdentityUpdates { new, changed, unchanged }
1204 })
1205 }
1206
1207 pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> {
1239 let verification_machine = self.inner.verification_machine.to_owned();
1240
1241 self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1242 collect_device_updates(
1243 verification_machine.to_owned(),
1244 own_identity,
1245 identities,
1246 devices,
1247 )
1248 })
1249 }
1250
1251 pub fn identities_stream_raw(&self) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> {
1261 self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1262 }
1263
1264 pub fn create_store_lock(
1267 &self,
1268 lock_key: String,
1269 lock_value: String,
1270 ) -> CrossProcessStoreLock<LockableCryptoStore> {
1271 self.inner.store.create_store_lock(lock_key, lock_value)
1272 }
1273
1274 pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
1314 self.inner.store.secrets_stream()
1315 }
1316
1317 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> {
1360 self.inner.store.historic_room_key_stream()
1361 }
1362
1363 pub async fn import_room_keys(
1376 &self,
1377 exported_keys: Vec<ExportedRoomKey>,
1378 from_backup_version: Option<&str>,
1379 progress_listener: impl Fn(usize, usize),
1380 ) -> Result<RoomKeyImportResult> {
1381 let exported_keys: Vec<&ExportedRoomKey> = exported_keys.iter().collect();
1382 self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1383 }
1384
1385 pub async fn import_exported_room_keys(
1412 &self,
1413 exported_keys: Vec<ExportedRoomKey>,
1414 progress_listener: impl Fn(usize, usize),
1415 ) -> Result<RoomKeyImportResult> {
1416 self.import_room_keys(exported_keys, None, progress_listener).await
1417 }
1418
1419 async fn import_sessions_impl<T>(
1420 &self,
1421 room_keys: Vec<T>,
1422 from_backup_version: Option<&str>,
1423 progress_listener: impl Fn(usize, usize),
1424 ) -> Result<RoomKeyImportResult>
1425 where
1426 T: TryInto<InboundGroupSession> + RoomKeyExport + Copy,
1427 T::Error: Debug,
1428 {
1429 let mut sessions = Vec::new();
1430
1431 async fn new_session_better(
1432 session: &InboundGroupSession,
1433 old_session: Option<InboundGroupSession>,
1434 ) -> bool {
1435 if let Some(old_session) = &old_session {
1436 session.compare(old_session).await == SessionOrdering::Better
1437 } else {
1438 true
1439 }
1440 }
1441
1442 let total_count = room_keys.len();
1443 let mut keys = BTreeMap::new();
1444
1445 for (i, key) in room_keys.into_iter().enumerate() {
1446 match key.try_into() {
1447 Ok(session) => {
1448 let old_session = self
1449 .inner
1450 .store
1451 .get_inbound_group_session(session.room_id(), session.session_id())
1452 .await?;
1453
1454 if new_session_better(&session, old_session).await {
1457 if from_backup_version.is_some() {
1458 session.mark_as_backed_up();
1459 }
1460
1461 keys.entry(session.room_id().to_owned())
1462 .or_insert_with(BTreeMap::new)
1463 .entry(session.sender_key().to_base64())
1464 .or_insert_with(BTreeSet::new)
1465 .insert(session.session_id().to_owned());
1466
1467 sessions.push(session);
1468 }
1469 }
1470 Err(e) => {
1471 warn!(
1472 sender_key = key.sender_key().to_base64(),
1473 room_id = ?key.room_id(),
1474 session_id = key.session_id(),
1475 error = ?e,
1476 "Couldn't import a room key from a file export."
1477 );
1478 }
1479 }
1480
1481 progress_listener(i, total_count);
1482 }
1483
1484 let imported_count = sessions.len();
1485
1486 self.inner.store.save_inbound_group_sessions(sessions, from_backup_version).await?;
1487
1488 info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1489
1490 Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1491 }
1492
1493 pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1494 self.inner.store.clone()
1495 }
1496
1497 pub async fn export_room_keys(
1520 &self,
1521 predicate: impl FnMut(&InboundGroupSession) -> bool,
1522 ) -> Result<Vec<ExportedRoomKey>> {
1523 let mut exported = Vec::new();
1524
1525 let mut sessions = self.get_inbound_group_sessions().await?;
1526 sessions.retain(predicate);
1527
1528 for session in sessions {
1529 let export = session.export().await;
1530 exported.push(export);
1531 }
1532
1533 Ok(exported)
1534 }
1535
1536 pub async fn export_room_keys_stream(
1569 &self,
1570 predicate: impl FnMut(&InboundGroupSession) -> bool,
1571 ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1572 let sessions = self.get_inbound_group_sessions().await?;
1574 Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1575 .then(|session| async move { session.export().await }))
1576 }
1577
1578 pub async fn build_room_key_bundle(
1583 &self,
1584 room_id: &RoomId,
1585 ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1586 let mut sessions = self.get_inbound_group_sessions().await?;
1589 sessions.retain(|session| session.room_id == room_id);
1590
1591 let mut bundle = RoomKeyBundle::default();
1592 for session in sessions {
1593 if session.shared_history() {
1594 bundle.room_keys.push(session.export().await.into());
1595 } else {
1596 bundle.withheld.push(RoomKeyWithheldContent::new(
1597 session.algorithm().to_owned(),
1598 WithheldCode::Unauthorised,
1599 session.room_id().to_owned(),
1600 session.session_id().to_owned(),
1601 session.sender_key().to_owned(),
1602 self.device_id().to_owned(),
1603 ));
1604 }
1605 }
1606
1607 Ok(bundle)
1608 }
1609
1610 #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len()))]
1623 pub async fn receive_room_key_bundle(
1624 &self,
1625 room_id: &RoomId,
1626 sender_user: &UserId,
1627 sender_data: &SenderData,
1628 bundle: RoomKeyBundle,
1629 progress_listener: impl Fn(usize, usize),
1630 ) -> Result<(), CryptoStoreError> {
1631 let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1632 if key.room_id != room_id {
1633 trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1634 Either::Right(key)
1635 } else {
1636 Either::Left(key)
1637 }
1638 });
1639
1640 match (bad.is_empty(), good.is_empty()) {
1641 (true, true) => {
1643 warn!("Received a completely empty room key bundle");
1644 }
1645
1646 (false, true) => {
1648 let bad_keys: Vec<_> =
1649 bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1650
1651 warn!(
1652 ?bad_keys,
1653 "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1654 );
1655 }
1656
1657 (_, false) => {
1659 if !bad.is_empty() {
1662 warn!(
1663 bad_key_count = bad.len(),
1664 "The room key bundle contained some room keys \
1665 that were meant for a different room"
1666 );
1667 }
1668
1669 self.import_sessions_impl(good, None, progress_listener).await?;
1670 }
1671 }
1672
1673 Ok(())
1674 }
1675}
1676
1677impl Deref for Store {
1678 type Target = DynCryptoStore;
1679
1680 fn deref(&self) -> &Self::Target {
1681 self.inner.store.deref().deref()
1682 }
1683}
1684
1685#[derive(Clone, Debug)]
1687pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1688
1689impl matrix_sdk_common::store_locks::BackingStore for LockableCryptoStore {
1690 type LockError = CryptoStoreError;
1691
1692 async fn try_lock(
1693 &self,
1694 lease_duration_ms: u32,
1695 key: &str,
1696 holder: &str,
1697 ) -> std::result::Result<bool, Self::LockError> {
1698 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1699 }
1700}
1701
1702#[cfg(test)]
1703mod tests {
1704 use std::pin::pin;
1705
1706 use futures_util::StreamExt;
1707 use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1708 use matrix_sdk_test::async_test;
1709 use ruma::{device_id, room_id, user_id, RoomId};
1710 use vodozemac::megolm::SessionKey;
1711
1712 use crate::{
1713 machine::test_helpers::get_machine_pair,
1714 olm::{InboundGroupSession, SenderData},
1715 store::types::DehydratedDeviceKey,
1716 types::EventEncryptionAlgorithm,
1717 OlmMachine,
1718 };
1719
1720 #[async_test]
1721 async fn test_import_room_keys_notifies_stream() {
1722 use futures_util::FutureExt;
1723
1724 let (alice, bob, _) =
1725 get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1726
1727 let room1_id = room_id!("!room1:localhost");
1728 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1729 let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
1730
1731 let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
1732 bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
1733
1734 let room_keys = room_keys_received_stream
1735 .next()
1736 .now_or_never()
1737 .flatten()
1738 .expect("We should have received an update of room key infos")
1739 .unwrap();
1740 assert_eq!(room_keys.len(), 1);
1741 assert_eq!(room_keys[0].room_id, "!room1:localhost");
1742 }
1743
1744 #[async_test]
1745 async fn test_export_room_keys_provides_selected_keys() {
1746 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1748 let room1_id = room_id!("!room1:localhost");
1749 let room2_id = room_id!("!room2:localhost");
1750 let room3_id = room_id!("!room3:localhost");
1751 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1752 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1753 alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
1754
1755 let keys = alice
1757 .store()
1758 .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
1759 .await
1760 .unwrap();
1761
1762 assert_eq!(keys.len(), 2);
1764 assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1765 assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1766 assert_eq!(keys[0].room_id, "!room2:localhost");
1767 assert_eq!(keys[1].room_id, "!room3:localhost");
1768 assert_eq!(keys[0].session_key.to_base64().len(), 220);
1769 assert_eq!(keys[1].session_key.to_base64().len(), 220);
1770 }
1771
1772 #[async_test]
1773 async fn test_export_room_keys_stream_can_provide_all_keys() {
1774 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1776 let room1_id = room_id!("!room1:localhost");
1777 let room2_id = room_id!("!room2:localhost");
1778 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1779 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1780
1781 let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
1783
1784 let mut collected = vec![];
1786 while let Some(key) = keys.next().await {
1787 collected.push(key);
1788 }
1789
1790 assert_eq!(collected.len(), 2);
1792 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1793 assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1794 assert_eq!(collected[0].room_id, "!room1:localhost");
1795 assert_eq!(collected[1].room_id, "!room2:localhost");
1796 assert_eq!(collected[0].session_key.to_base64().len(), 220);
1797 assert_eq!(collected[1].session_key.to_base64().len(), 220);
1798 }
1799
1800 #[async_test]
1801 async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
1802 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1804 let room1_id = room_id!("!room1:localhost");
1805 let room2_id = room_id!("!room2:localhost");
1806 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1807 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1808
1809 let mut keys =
1811 pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
1812
1813 let mut collected = vec![];
1815 while let Some(key) = keys.next().await {
1816 collected.push(key);
1817 }
1818
1819 assert_eq!(collected.len(), 1);
1821 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1822 assert_eq!(collected[0].room_id, "!room1:localhost");
1823 assert_eq!(collected[0].session_key.to_base64().len(), 220);
1824 }
1825
1826 #[async_test]
1827 async fn test_export_secrets_bundle() {
1828 let user_id = user_id!("@alice:example.com");
1829 let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
1830
1831 let _ = first
1832 .bootstrap_cross_signing(false)
1833 .await
1834 .expect("We should be able to bootstrap cross-signing");
1835
1836 let bundle = first.store().export_secrets_bundle().await.expect(
1837 "We should be able to export the secrets bundle, now that we \
1838 have the cross-signing keys",
1839 );
1840
1841 assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
1842
1843 second
1844 .store()
1845 .import_secrets_bundle(&bundle)
1846 .await
1847 .expect("We should be able to import the secrets bundle");
1848
1849 let status = second.cross_signing_status().await;
1850 let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
1851
1852 assert!(identity.is_verified(), "The public identity should be marked as verified.");
1853
1854 assert!(status.is_complete(), "We should have imported all the cross-signing keys");
1855 }
1856
1857 #[async_test]
1858 async fn test_create_dehydrated_device_key() {
1859 let pickle_key = DehydratedDeviceKey::new()
1860 .expect("Should be able to create a random dehydrated device key");
1861
1862 let to_vec = pickle_key.inner.to_vec();
1863 let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
1864 .expect("Should be able to create a dehydrated device key from slice");
1865
1866 assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
1867 }
1868
1869 #[async_test]
1870 async fn test_create_dehydrated_errors() {
1871 let too_small = [0u8; 22];
1872 let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
1873
1874 assert!(pickle_key.is_err());
1875
1876 let too_big = [0u8; 40];
1877 let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
1878
1879 assert!(pickle_key.is_err());
1880 }
1881
1882 #[async_test]
1883 async fn test_build_room_key_bundle() {
1884 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
1887 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
1888
1889 let room1_id = room_id!("!room1:localhost");
1890 let room2_id = room_id!("!room2:localhost");
1891
1892 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
1897 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
1898 let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
1899 let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
1900
1901 let sessions = [
1902 create_inbound_group_session_with_visibility(
1903 &alice,
1904 room1_id,
1905 &SessionKey::from_base64(session_key1).unwrap(),
1906 true,
1907 ),
1908 create_inbound_group_session_with_visibility(
1909 &alice,
1910 room1_id,
1911 &SessionKey::from_base64(session_key2).unwrap(),
1912 true,
1913 ),
1914 create_inbound_group_session_with_visibility(
1915 &alice,
1916 room1_id,
1917 &SessionKey::from_base64(session_key3).unwrap(),
1918 false,
1919 ),
1920 create_inbound_group_session_with_visibility(
1921 &alice,
1922 room2_id,
1923 &SessionKey::from_base64(session_key4).unwrap(),
1924 true,
1925 ),
1926 ];
1927 bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
1928
1929 let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
1931
1932 bundle.room_keys.sort_by_key(|session| session.session_id.clone());
1936
1937 let alice_curve_key = alice.identity_keys().curve25519.to_base64();
1939 let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
1940 assert_eq!(value.as_str().unwrap(), alice_curve_key);
1941 "[alice curve key]"
1942 };
1943 let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
1944 let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
1945 assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
1946 "[alice ed25519 key]"
1947 };
1948
1949 insta::with_settings!({ sort_maps => true }, {
1950 assert_json_snapshot!(bundle, {
1951 ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
1952 ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
1953 ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
1954 });
1955 });
1956 }
1957
1958 fn create_inbound_group_session_with_visibility(
1963 olm_machine: &OlmMachine,
1964 room_id: &RoomId,
1965 session_key: &SessionKey,
1966 shared_history: bool,
1967 ) -> InboundGroupSession {
1968 let identity_keys = &olm_machine.store().static_account().identity_keys;
1969 InboundGroupSession::new(
1970 identity_keys.curve25519,
1971 identity_keys.ed25519,
1972 room_id,
1973 session_key,
1974 SenderData::unknown(),
1975 EventEncryptionAlgorithm::MegolmV1AesSha2,
1976 None,
1977 shared_history,
1978 )
1979 .unwrap()
1980 }
1981}