matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_arch = "wasm32", allow(unused_imports))]
18
19use std::{
20    collections::{BTreeMap, HashSet},
21    io::{Cursor, Read, Write},
22    iter,
23    path::PathBuf,
24    sync::Arc,
25};
26
27use eyeball::{SharedObservable, Subscriber};
28use futures_core::Stream;
29use futures_util::{
30    future::try_join,
31    stream::{self, StreamExt},
32};
33use matrix_sdk_base::crypto::{
34    store::RoomKeyInfo,
35    types::requests::{
36        OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
37    },
38    CrossSigningBootstrapRequests, OlmMachine,
39};
40use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
41use ruma::{
42    api::client::{
43        keys::{
44            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
45            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
46        },
47        message::send_message_event,
48        to_device::send_event_to_device::v3::{
49            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
50        },
51        uiaa::{AuthData, UiaaInfo},
52    },
53    assign,
54    events::{
55        direct::DirectUserIdentifier,
56        room::{MediaSource, ThumbnailInfo},
57    },
58    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
59};
60use serde::Deserialize;
61use tokio::sync::{Mutex, RwLockReadGuard};
62use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
63use tracing::{debug, error, instrument, trace, warn};
64use url::Url;
65use vodozemac::Curve25519PublicKey;
66
67use self::{
68    backups::{types::BackupClientState, Backups},
69    futures::UploadEncryptedFile,
70    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
71    recovery::{Recovery, RecoveryState},
72    secret_storage::SecretStorage,
73    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
74    verification::{SasVerification, Verification, VerificationRequest},
75};
76use crate::{
77    attachment::Thumbnail,
78    client::{ClientInner, WeakClient},
79    error::HttpResult,
80    store_locks::CrossProcessStoreLockGuard,
81    Client, Error, HttpError, Result, Room, TransmissionProgress,
82};
83
84pub mod backups;
85pub mod futures;
86pub mod identities;
87pub mod recovery;
88pub mod secret_storage;
89pub(crate) mod tasks;
90pub mod verification;
91
92pub use matrix_sdk_base::crypto::{
93    olm::{
94        SessionCreationError as MegolmSessionCreationError,
95        SessionExportError as OlmSessionExportError,
96    },
97    vodozemac, CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError,
98    LocalTrust, MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
99    SessionCreationError, SignatureError, VERSION,
100};
101
102pub use crate::error::RoomKeyImportError;
103
104/// All the data related to the encryption state.
105pub(crate) struct EncryptionData {
106    /// Background tasks related to encryption (key backup, initialization
107    /// tasks, etc.).
108    pub tasks: StdMutex<ClientTasks>,
109
110    /// End-to-end encryption settings.
111    pub encryption_settings: EncryptionSettings,
112
113    /// All state related to key backup.
114    pub backup_state: BackupClientState,
115
116    /// All state related to secret storage recovery.
117    pub recovery_state: SharedObservable<RecoveryState>,
118}
119
120impl EncryptionData {
121    pub fn new(encryption_settings: EncryptionSettings) -> Self {
122        Self {
123            encryption_settings,
124
125            tasks: StdMutex::new(Default::default()),
126            backup_state: Default::default(),
127            recovery_state: Default::default(),
128        }
129    }
130
131    pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
132        let weak_client = WeakClient::from_inner(client);
133
134        let mut tasks = self.tasks.lock();
135        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
136
137        if self.encryption_settings.backup_download_strategy
138            == BackupDownloadStrategy::AfterDecryptionFailure
139        {
140            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
141        }
142    }
143
144    /// Initialize the background task which listens for changes in the
145    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
146    ///
147    /// This should happen after the usual tasks have been set up and after the
148    /// E2EE initialization tasks have been set up.
149    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
150        let mut guard = self.tasks.lock();
151
152        let future = Recovery::update_state_after_backup_state_change(client);
153        let join_handle = spawn(future);
154
155        guard.update_recovery_state_after_backup = Some(join_handle);
156    }
157}
158
159/// Settings for end-to-end encryption features.
160#[derive(Clone, Copy, Debug, Default)]
161pub struct EncryptionSettings {
162    /// Automatically bootstrap cross-signing for a user once they're logged, in
163    /// case it's not already done yet.
164    ///
165    /// This requires to login with a username and password, or that MSC3967 is
166    /// enabled on the server, as of 2023-10-20.
167    pub auto_enable_cross_signing: bool,
168
169    /// Select a strategy to download room keys from the backup, by default room
170    /// keys won't be downloaded from the backup automatically.
171    ///
172    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
173    pub backup_download_strategy: BackupDownloadStrategy,
174
175    /// Automatically create a backup version if no backup exists.
176    pub auto_enable_backups: bool,
177}
178
179/// Settings for end-to-end encryption features.
180#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
181#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
182pub enum BackupDownloadStrategy {
183    /// Automatically download all room keys from the backup when the backup
184    /// recovery key has been received. The backup recovery key can be received
185    /// in two ways:
186    ///
187    /// 1. Received as a `m.secret.send` to-device event, after a successful
188    ///    interactive verification.
189    /// 2. Imported from secret storage (4S) using the
190    ///    [`SecretStore::import_secrets()`] method.
191    ///
192    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
193    OneShot,
194
195    /// Attempt to download a single room key if an event fails to be decrypted.
196    AfterDecryptionFailure,
197
198    /// Don't download any room keys automatically. The user can manually
199    /// download room keys using the [`Backups::download_room_key()`] methods.
200    ///
201    /// This is the default option.
202    #[default]
203    Manual,
204}
205
206/// The verification state of our own device
207///
208/// This enum tells us if our own user identity trusts these devices, in other
209/// words it tells us if the user identity has signed the device.
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211pub enum VerificationState {
212    /// The verification state is unknown for now.
213    Unknown,
214    /// The device is considered to be verified, it has been signed by its user
215    /// identity.
216    Verified,
217    /// The device is unverified.
218    Unverified,
219}
220
221/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
222#[derive(Debug)]
223pub struct CrossProcessLockStoreGuardWithGeneration {
224    _guard: CrossProcessStoreLockGuard,
225    generation: u64,
226}
227
228impl CrossProcessLockStoreGuardWithGeneration {
229    /// Return the Crypto Store generation associated with this store lock.
230    pub fn generation(&self) -> u64 {
231        self.generation
232    }
233}
234
235/// A stateful struct remembering the cross-signing keys we need to upload.
236///
237/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
238/// additional authentication, this struct will contain information on the type
239/// of authentication the user needs to complete before the upload might be
240/// continued.
241///
242/// More info can be found in the [spec].
243///
244/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
245#[derive(Debug)]
246pub struct CrossSigningResetHandle {
247    client: Client,
248    upload_request: UploadSigningKeysRequest,
249    signatures_request: UploadSignaturesRequest,
250    auth_type: CrossSigningResetAuthType,
251    is_cancelled: Mutex<bool>,
252}
253
254impl CrossSigningResetHandle {
255    /// Set up a new `CrossSigningResetHandle`.
256    pub fn new(
257        client: Client,
258        upload_request: UploadSigningKeysRequest,
259        signatures_request: UploadSignaturesRequest,
260        auth_type: CrossSigningResetAuthType,
261    ) -> Self {
262        Self {
263            client,
264            upload_request,
265            signatures_request,
266            auth_type,
267            is_cancelled: Mutex::new(false),
268        }
269    }
270
271    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
272    /// is using.
273    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
274        &self.auth_type
275    }
276
277    /// Continue the cross-signing reset by either waiting for the
278    /// authentication to be done on the side of the OIDC issuer or by
279    /// providing additional [`AuthData`] the homeserver requires.
280    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
281        let mut upload_request = self.upload_request.clone();
282        upload_request.auth = auth;
283
284        while let Err(e) = self.client.send(upload_request.clone()).await {
285            if *self.is_cancelled.lock().await {
286                return Ok(());
287            }
288
289            if e.as_uiaa_response().is_none() {
290                return Err(e.into());
291            }
292        }
293
294        self.client.send(self.signatures_request.clone()).await?;
295
296        Ok(())
297    }
298
299    /// Cancel the ongoing identity reset process
300    pub async fn cancel(&self) {
301        *self.is_cancelled.lock().await = true;
302    }
303}
304
305/// information about the additional authentication that is required before the
306/// cross-signing keys can be uploaded.
307#[derive(Debug, Clone)]
308pub enum CrossSigningResetAuthType {
309    /// The homeserver requires user-interactive authentication.
310    Uiaa(UiaaInfo),
311    /// OIDC is used for authentication and the user needs to open a URL to
312    /// approve the upload of cross-signing keys.
313    Oidc(OidcCrossSigningResetInfo),
314}
315
316impl CrossSigningResetAuthType {
317    #[allow(clippy::unused_async)]
318    async fn new(
319        #[allow(unused_variables)] client: &Client,
320        error: &HttpError,
321    ) -> Result<Option<Self>> {
322        if let Some(auth_info) = error.as_uiaa_response() {
323            #[cfg(feature = "experimental-oidc")]
324            if client.oidc().issuer().is_some() {
325                OidcCrossSigningResetInfo::from_auth_info(client, auth_info)
326                    .map(|t| Some(CrossSigningResetAuthType::Oidc(t)))
327            } else {
328                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
329            }
330
331            #[cfg(not(feature = "experimental-oidc"))]
332            Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
333        } else {
334            Ok(None)
335        }
336    }
337}
338
339/// OIDC specific information about the required authentication for the upload
340/// of cross-signing keys.
341#[derive(Debug, Clone, Deserialize)]
342pub struct OidcCrossSigningResetInfo {
343    /// The URL where the user can approve the reset of the cross-signing keys.
344    pub approval_url: Url,
345}
346
347impl OidcCrossSigningResetInfo {
348    #[cfg(feature = "experimental-oidc")]
349    fn from_auth_info(
350        // This is used if the OIDC feature is enabled.
351        #[allow(unused_variables)] client: &Client,
352        auth_info: &UiaaInfo,
353    ) -> Result<Self> {
354        let parameters =
355            serde_json::from_str::<OidcCrossSigningResetUiaaParameters>(auth_info.params.get())?;
356
357        Ok(OidcCrossSigningResetInfo { approval_url: parameters.reset.url })
358    }
359}
360
361/// The parsed `parameters` part of a [`ruma::api::client::uiaa::UiaaInfo`]
362/// response
363#[cfg(feature = "experimental-oidc")]
364#[derive(Debug, Deserialize)]
365struct OidcCrossSigningResetUiaaParameters {
366    /// The URL where the user can approve the reset of the cross-signing keys.
367    #[serde(rename = "org.matrix.cross_signing_reset")]
368    reset: OidcCrossSigningResetUiaaResetParameter,
369}
370
371/// The `org.matrix.cross_signing_reset` part of the Uiaa response `parameters``
372/// dictionary.
373#[cfg(feature = "experimental-oidc")]
374#[derive(Debug, Deserialize)]
375struct OidcCrossSigningResetUiaaResetParameter {
376    /// The URL where the user can approve the reset of the cross-signing keys.
377    url: Url,
378}
379
380impl Client {
381    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
382        self.base_client().olm_machine().await
383    }
384
385    pub(crate) async fn mark_request_as_sent(
386        &self,
387        request_id: &TransactionId,
388        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
389    ) -> Result<(), matrix_sdk_base::Error> {
390        Ok(self
391            .olm_machine()
392            .await
393            .as_ref()
394            .expect(
395                "We should have an olm machine once we try to mark E2EE related requests as sent",
396            )
397            .mark_request_as_sent(request_id, response)
398            .await?)
399    }
400
401    /// Query the server for users device keys.
402    ///
403    /// # Panics
404    ///
405    /// Panics if no key query needs to be done.
406    #[instrument(skip(self))]
407    pub(crate) async fn keys_query(
408        &self,
409        request_id: &TransactionId,
410        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
411    ) -> Result<get_keys::v3::Response> {
412        let request = assign!(get_keys::v3::Request::new(), { device_keys });
413
414        let response = self.send(request).await?;
415        self.mark_request_as_sent(request_id, &response).await?;
416        self.encryption().update_state_after_keys_query(&response).await;
417
418        Ok(response)
419    }
420
421    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
422    /// encrypting and uploading a provided reader.
423    ///
424    /// # Arguments
425    ///
426    /// * `content_type` - The content type of the file.
427    /// * `reader` - The reader that should be encrypted and uploaded.
428    ///
429    /// # Examples
430    ///
431    /// ```no_run
432    /// # use matrix_sdk::Client;
433    /// # use url::Url;
434    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
435    /// use serde::{Deserialize, Serialize};
436    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
437    ///
438    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
439    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
440    /// struct CustomEventContent {
441    ///     encrypted_file: EncryptedFile,
442    /// }
443    ///
444    /// # async {
445    /// # let homeserver = Url::parse("http://example.com")?;
446    /// # let client = Client::new(homeserver).await?;
447    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
448    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
449    /// let encrypted_file = client.upload_encrypted_file(&mime::TEXT_PLAIN, &mut reader).await?;
450    ///
451    /// room.send(CustomEventContent { encrypted_file }).await?;
452    /// # anyhow::Ok(()) };
453    /// ```
454    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
455        &'a self,
456        content_type: &'a mime::Mime,
457        reader: &'a mut R,
458    ) -> UploadEncryptedFile<'a, R> {
459        UploadEncryptedFile::new(self, content_type, reader)
460    }
461
462    /// Encrypt and upload the file and thumbnails, and return the source
463    /// information.
464    pub(crate) async fn upload_encrypted_media_and_thumbnail(
465        &self,
466        content_type: &mime::Mime,
467        data: &[u8],
468        thumbnail: Option<Thumbnail>,
469        send_progress: SharedObservable<TransmissionProgress>,
470    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
471        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
472
473        let upload_attachment = async {
474            let mut cursor = Cursor::new(data);
475            self.upload_encrypted_file(content_type, &mut cursor)
476                .with_send_progress_observable(send_progress)
477                .await
478        };
479
480        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
481
482        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
483    }
484
485    /// Uploads an encrypted thumbnail to the media repository, and returns
486    /// its source and extra information.
487    async fn upload_encrypted_thumbnail(
488        &self,
489        thumbnail: Option<Thumbnail>,
490        send_progress: SharedObservable<TransmissionProgress>,
491    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
492        let Some(thumbnail) = thumbnail else {
493            return Ok(None);
494        };
495
496        let (data, content_type, thumbnail_info) = thumbnail.into_parts();
497        let mut cursor = Cursor::new(data);
498
499        let file = self
500            .upload_encrypted_file(&content_type, &mut cursor)
501            .with_send_progress_observable(send_progress)
502            .await?;
503
504        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
505    }
506
507    /// Claim one-time keys creating new Olm sessions.
508    ///
509    /// # Arguments
510    ///
511    /// * `users` - The list of user/device pairs that we should claim keys for.
512    pub(crate) async fn claim_one_time_keys(
513        &self,
514        users: impl Iterator<Item = &UserId>,
515    ) -> Result<()> {
516        let _lock = self.locks().key_claim_lock.lock().await;
517
518        if let Some((request_id, request)) = self
519            .olm_machine()
520            .await
521            .as_ref()
522            .ok_or(Error::NoOlmMachine)?
523            .get_missing_sessions(users)
524            .await?
525        {
526            let response = self.send(request).await?;
527            self.mark_request_as_sent(&request_id, &response).await?;
528        }
529
530        Ok(())
531    }
532
533    /// Upload the E2E encryption keys.
534    ///
535    /// This uploads the long lived device keys as well as the required amount
536    /// of one-time keys.
537    ///
538    /// # Panics
539    ///
540    /// Panics if the client isn't logged in, or if no encryption keys need to
541    /// be uploaded.
542    #[instrument(skip(self, request))]
543    pub(crate) async fn keys_upload(
544        &self,
545        request_id: &TransactionId,
546        request: &upload_keys::v3::Request,
547    ) -> Result<upload_keys::v3::Response> {
548        debug!(
549            device_keys = request.device_keys.is_some(),
550            one_time_key_count = request.one_time_keys.len(),
551            "Uploading public encryption keys",
552        );
553
554        let response = self.send(request.clone()).await?;
555        self.mark_request_as_sent(request_id, &response).await?;
556
557        Ok(response)
558    }
559
560    pub(crate) async fn room_send_helper(
561        &self,
562        request: &RoomMessageRequest,
563    ) -> Result<send_message_event::v3::Response> {
564        let content = request.content.clone();
565        let txn_id = request.txn_id.clone();
566        let room_id = &request.room_id;
567
568        self.get_room(room_id)
569            .expect("Can't send a message to a room that isn't known to the store")
570            .send(content)
571            .with_transaction_id(txn_id)
572            .await
573    }
574
575    pub(crate) async fn send_to_device(
576        &self,
577        request: &ToDeviceRequest,
578    ) -> HttpResult<ToDeviceResponse> {
579        let request = RumaToDeviceRequest::new_raw(
580            request.event_type.clone(),
581            request.txn_id.clone(),
582            request.messages.clone(),
583        );
584
585        self.send(request).await
586    }
587
588    pub(crate) async fn send_verification_request(
589        &self,
590        request: OutgoingVerificationRequest,
591    ) -> Result<()> {
592        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
593
594        match request {
595            ToDevice(t) => {
596                self.send_to_device(&t).await?;
597            }
598            InRoom(r) => {
599                self.room_send_helper(&r).await?;
600            }
601        }
602
603        Ok(())
604    }
605
606    /// Get the existing DM room with the given user, if any.
607    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
608        let rooms = self.joined_rooms();
609
610        // Find the room we share with the `user_id` and only with `user_id`
611        let room = rooms.into_iter().find(|r| {
612            let targets = r.direct_targets();
613            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
614        });
615
616        trace!(?room, "Found room");
617        room
618    }
619
620    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
621        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
622
623        match r.request() {
624            AnyOutgoingRequest::KeysQuery(request) => {
625                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
626            }
627            AnyOutgoingRequest::KeysUpload(request) => {
628                self.keys_upload(r.request_id(), request).await?;
629            }
630            AnyOutgoingRequest::ToDeviceRequest(request) => {
631                let response = self.send_to_device(request).await?;
632                self.mark_request_as_sent(r.request_id(), &response).await?;
633            }
634            AnyOutgoingRequest::SignatureUpload(request) => {
635                let response = self.send(request.clone()).await?;
636                self.mark_request_as_sent(r.request_id(), &response).await?;
637            }
638            AnyOutgoingRequest::RoomMessage(request) => {
639                let response = self.room_send_helper(request).await?;
640                self.mark_request_as_sent(r.request_id(), &response).await?;
641            }
642            AnyOutgoingRequest::KeysClaim(request) => {
643                let response = self.send(request.clone()).await?;
644                self.mark_request_as_sent(r.request_id(), &response).await?;
645            }
646        }
647
648        Ok(())
649    }
650
651    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
652        const MAX_CONCURRENT_REQUESTS: usize = 20;
653
654        // This is needed because sometimes we need to automatically
655        // claim some one-time keys to unwedge an existing Olm session.
656        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
657            warn!("Error while claiming one-time keys {:?}", e);
658        }
659
660        let outgoing_requests = stream::iter(
661            self.olm_machine()
662                .await
663                .as_ref()
664                .ok_or(Error::NoOlmMachine)?
665                .outgoing_requests()
666                .await?,
667        )
668        .map(|r| self.send_outgoing_request(r));
669
670        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
671
672        requests
673            .for_each(|r| async move {
674                match r {
675                    Ok(_) => (),
676                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
677                }
678            })
679            .await;
680
681        Ok(())
682    }
683}
684
685#[cfg(any(feature = "testing", test))]
686impl Client {
687    /// Get the olm machine, for testing purposes only.
688    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
689        self.olm_machine().await
690    }
691}
692
693/// A high-level API to manage the client's encryption.
694///
695/// To get this, use [`Client::encryption()`].
696#[derive(Debug, Clone)]
697pub struct Encryption {
698    /// The underlying client.
699    client: Client,
700}
701
702impl Encryption {
703    pub(crate) fn new(client: Client) -> Self {
704        Self { client }
705    }
706
707    /// Returns the current encryption settings for this client.
708    pub(crate) fn settings(&self) -> EncryptionSettings {
709        self.client.inner.e2ee.encryption_settings
710    }
711
712    /// Get the public ed25519 key of our own device. This is usually what is
713    /// called the fingerprint of the device.
714    pub async fn ed25519_key(&self) -> Option<String> {
715        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
716    }
717
718    /// Get the public Curve25519 key of our own device.
719    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
720        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
721    }
722
723    /// Get the current device creation timestamp.
724    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
725        match self.get_own_device().await {
726            Ok(Some(device)) => device.first_time_seen_ts(),
727            // Should not happen, there should always be an own device
728            _ => MilliSecondsSinceUnixEpoch::now(),
729        }
730    }
731
732    #[cfg(feature = "experimental-oidc")]
733    pub(crate) async fn import_secrets_bundle(
734        &self,
735        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
736    ) -> Result<(), SecretImportError> {
737        let olm_machine = self.client.olm_machine().await;
738        let olm_machine =
739            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
740
741        olm_machine.store().import_secrets_bundle(bundle).await
742    }
743
744    /// Get the status of the private cross signing keys.
745    ///
746    /// This can be used to check which private cross signing keys we have
747    /// stored locally.
748    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
749        let olm = self.client.olm_machine().await;
750        let machine = olm.as_ref()?;
751        Some(machine.cross_signing_status().await)
752    }
753
754    /// Get all the tracked users we know about
755    ///
756    /// Tracked users are users for which we keep the device list of E2EE
757    /// capable devices up to date.
758    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
759        if let Some(machine) = self.client.olm_machine().await.as_ref() {
760            machine.tracked_users().await
761        } else {
762            Ok(HashSet::new())
763        }
764    }
765
766    /// Get a [`Subscriber`] for the [`VerificationState`].
767    ///
768    /// # Examples
769    ///
770    /// ```no_run
771    /// use matrix_sdk::{encryption, Client};
772    /// use url::Url;
773    ///
774    /// # async {
775    /// let homeserver = Url::parse("http://example.com")?;
776    /// let client = Client::new(homeserver).await?;
777    /// let mut subscriber = client.encryption().verification_state();
778    ///
779    /// let current_value = subscriber.get();
780    ///
781    /// println!("The current verification state is: {current_value:?}");
782    ///
783    /// if let Some(verification_state) = subscriber.next().await {
784    ///     println!("Received verification state update {:?}", verification_state)
785    /// }
786    /// # anyhow::Ok(()) };
787    /// ```
788    pub fn verification_state(&self) -> Subscriber<VerificationState> {
789        self.client.inner.verification_state.subscribe_reset()
790    }
791
792    /// Get a verification object with the given flow id.
793    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
794        let olm = self.client.olm_machine().await;
795        let olm = olm.as_ref()?;
796        #[allow(clippy::bind_instead_of_map)]
797        olm.get_verification(user_id, flow_id).and_then(|v| match v {
798            matrix_sdk_base::crypto::Verification::SasV1(s) => {
799                Some(SasVerification { inner: s, client: self.client.clone() }.into())
800            }
801            #[cfg(feature = "qrcode")]
802            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
803                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
804            }
805            _ => None,
806        })
807    }
808
809    /// Get a `VerificationRequest` object for the given user with the given
810    /// flow id.
811    pub async fn get_verification_request(
812        &self,
813        user_id: &UserId,
814        flow_id: impl AsRef<str>,
815    ) -> Option<VerificationRequest> {
816        let olm = self.client.olm_machine().await;
817        let olm = olm.as_ref()?;
818
819        olm.get_verification_request(user_id, flow_id)
820            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
821    }
822
823    /// Get a specific device of a user.
824    ///
825    /// # Arguments
826    ///
827    /// * `user_id` - The unique id of the user that the device belongs to.
828    ///
829    /// * `device_id` - The unique id of the device.
830    ///
831    /// Returns a `Device` if one is found and the crypto store didn't throw an
832    /// error.
833    ///
834    /// This will always return None if the client hasn't been logged in.
835    ///
836    /// # Examples
837    ///
838    /// ```no_run
839    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
840    /// # use url::Url;
841    /// # async {
842    /// # let alice = user_id!("@alice:example.org");
843    /// # let homeserver = Url::parse("http://example.com")?;
844    /// # let client = Client::new(homeserver).await?;
845    /// if let Some(device) =
846    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
847    /// {
848    ///     println!("{:?}", device.is_verified());
849    ///
850    ///     if !device.is_verified() {
851    ///         let verification = device.request_verification().await?;
852    ///     }
853    /// }
854    /// # anyhow::Ok(()) };
855    /// ```
856    pub async fn get_device(
857        &self,
858        user_id: &UserId,
859        device_id: &DeviceId,
860    ) -> Result<Option<Device>, CryptoStoreError> {
861        let olm = self.client.olm_machine().await;
862        let Some(machine) = olm.as_ref() else { return Ok(None) };
863        let device = machine.get_device(user_id, device_id, None).await?;
864        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
865    }
866
867    /// A convenience method to retrieve your own device from the store.
868    ///
869    /// This is the same as calling [`Encryption::get_device()`] with your own
870    /// user and device ID.
871    ///
872    /// This will always return a device, unless you are not logged in.
873    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
874        let olm = self.client.olm_machine().await;
875        let Some(machine) = olm.as_ref() else { return Ok(None) };
876        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
877        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
878    }
879
880    /// Get a map holding all the devices of an user.
881    ///
882    /// This will always return an empty map if the client hasn't been logged
883    /// in.
884    ///
885    /// # Arguments
886    ///
887    /// * `user_id` - The unique id of the user that the devices belong to.
888    ///
889    /// # Examples
890    ///
891    /// ```no_run
892    /// # use matrix_sdk::{Client, ruma::user_id};
893    /// # use url::Url;
894    /// # async {
895    /// # let alice = user_id!("@alice:example.org");
896    /// # let homeserver = Url::parse("http://example.com")?;
897    /// # let client = Client::new(homeserver).await?;
898    /// let devices = client.encryption().get_user_devices(alice).await?;
899    ///
900    /// for device in devices.devices() {
901    ///     println!("{device:?}");
902    /// }
903    /// # anyhow::Ok(()) };
904    /// ```
905    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
906        let devices = self
907            .client
908            .olm_machine()
909            .await
910            .as_ref()
911            .ok_or(Error::NoOlmMachine)?
912            .get_user_devices(user_id, None)
913            .await?;
914
915        Ok(UserDevices { inner: devices, client: self.client.clone() })
916    }
917
918    /// Get the E2EE identity of a user from the crypto store.
919    ///
920    /// Usually, we only have the E2EE identity of a user locally if the user
921    /// is tracked, meaning that we are both members of the same encrypted room.
922    ///
923    /// To get the E2EE identity of a user even if it is not available locally
924    /// use [`Encryption::request_user_identity()`].
925    ///
926    /// # Arguments
927    ///
928    /// * `user_id` - The unique id of the user that the identity belongs to.
929    ///
930    /// Returns a `UserIdentity` if one is found and the crypto store
931    /// didn't throw an error.
932    ///
933    /// This will always return None if the client hasn't been logged in.
934    ///
935    /// # Examples
936    ///
937    /// ```no_run
938    /// # use matrix_sdk::{Client, ruma::user_id};
939    /// # use url::Url;
940    /// # async {
941    /// # let alice = user_id!("@alice:example.org");
942    /// # let homeserver = Url::parse("http://example.com")?;
943    /// # let client = Client::new(homeserver).await?;
944    /// let user = client.encryption().get_user_identity(alice).await?;
945    ///
946    /// if let Some(user) = user {
947    ///     println!("{:?}", user.is_verified());
948    ///
949    ///     let verification = user.request_verification().await?;
950    /// }
951    /// # anyhow::Ok(()) };
952    /// ```
953    pub async fn get_user_identity(
954        &self,
955        user_id: &UserId,
956    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
957        let olm = self.client.olm_machine().await;
958        let Some(olm) = olm.as_ref() else { return Ok(None) };
959        let identity = olm.get_identity(user_id, None).await?;
960
961        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
962    }
963
964    /// Get the E2EE identity of a user from the homeserver.
965    ///
966    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
967    /// E2EE identity is not found, it should mean that the user did not set
968    /// up cross-signing.
969    ///
970    /// If you want the E2EE identity of a user without making a request to the
971    /// homeserver, use [`Encryption::get_user_identity()`] instead.
972    ///
973    /// # Arguments
974    ///
975    /// * `user_id` - The ID of the user that the identity belongs to.
976    ///
977    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
978    /// was an issue with the crypto store or with the request to the
979    /// homeserver.
980    ///
981    /// This will always return `None` if the client hasn't been logged in.
982    ///
983    /// # Examples
984    ///
985    /// ```no_run
986    /// # use matrix_sdk::{Client, ruma::user_id};
987    /// # use url::Url;
988    /// # async {
989    /// # let alice = user_id!("@alice:example.org");
990    /// # let homeserver = Url::parse("http://example.com")?;
991    /// # let client = Client::new(homeserver).await?;
992    /// let user = client.encryption().request_user_identity(alice).await?;
993    ///
994    /// if let Some(user) = user {
995    ///     println!("User is verified: {:?}", user.is_verified());
996    ///
997    ///     let verification = user.request_verification().await?;
998    /// }
999    /// # anyhow::Ok(()) };
1000    /// ```
1001    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1002        let olm = self.client.olm_machine().await;
1003        let Some(olm) = olm.as_ref() else { return Ok(None) };
1004
1005        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1006        self.client.keys_query(&request_id, request.device_keys).await?;
1007
1008        let identity = olm.get_identity(user_id, None).await?;
1009        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1010    }
1011
1012    /// Returns a stream of device updates, allowing users to listen for
1013    /// notifications about new or changed devices.
1014    ///
1015    /// The stream produced by this method emits updates whenever a new device
1016    /// is discovered or when an existing device's information is changed. Users
1017    /// can subscribe to this stream and receive updates in real-time.
1018    ///
1019    /// # Examples
1020    ///
1021    /// ```no_run
1022    /// # use matrix_sdk::Client;
1023    /// # use ruma::{device_id, user_id};
1024    /// # use futures_util::{pin_mut, StreamExt};
1025    /// # let client: Client = unimplemented!();
1026    /// # async {
1027    /// let devices_stream = client.encryption().devices_stream().await?;
1028    /// let user_id = client
1029    ///     .user_id()
1030    ///     .expect("We should know our user id after we have logged in");
1031    /// pin_mut!(devices_stream);
1032    ///
1033    /// for device_updates in devices_stream.next().await {
1034    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1035    ///         for device in user_devices.values() {
1036    ///             println!("A new device has been added {}", device.device_id());
1037    ///         }
1038    ///     }
1039    /// }
1040    /// # anyhow::Ok(()) };
1041    /// ```
1042    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates>> {
1043        let olm = self.client.olm_machine().await;
1044        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1045        let client = self.client.to_owned();
1046
1047        Ok(olm
1048            .store()
1049            .devices_stream()
1050            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1051    }
1052
1053    /// Returns a stream of user identity updates, allowing users to listen for
1054    /// notifications about new or changed user identities.
1055    ///
1056    /// The stream produced by this method emits updates whenever a new user
1057    /// identity is discovered or when an existing identities information is
1058    /// changed. Users can subscribe to this stream and receive updates in
1059    /// real-time.
1060    ///
1061    /// # Examples
1062    ///
1063    /// ```no_run
1064    /// # use matrix_sdk::Client;
1065    /// # use ruma::{device_id, user_id};
1066    /// # use futures_util::{pin_mut, StreamExt};
1067    /// # let client: Client = unimplemented!();
1068    /// # async {
1069    /// let identities_stream =
1070    ///     client.encryption().user_identities_stream().await?;
1071    /// pin_mut!(identities_stream);
1072    ///
1073    /// for identity_updates in identities_stream.next().await {
1074    ///     for (_, identity) in identity_updates.new {
1075    ///         println!("A new identity has been added {}", identity.user_id());
1076    ///     }
1077    /// }
1078    /// # anyhow::Ok(()) };
1079    /// ```
1080    pub async fn user_identities_stream(&self) -> Result<impl Stream<Item = IdentityUpdates>> {
1081        let olm = self.client.olm_machine().await;
1082        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1083        let client = self.client.to_owned();
1084
1085        Ok(olm
1086            .store()
1087            .user_identities_stream()
1088            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1089    }
1090
1091    /// Create and upload a new cross signing identity.
1092    ///
1093    /// # Arguments
1094    ///
1095    /// * `auth_data` - This request requires user interactive auth, the first
1096    ///   request needs to set this to `None` and will always fail with an
1097    ///   `UiaaResponse`. The response will contain information for the
1098    ///   interactive auth and the same request needs to be made but this time
1099    ///   with some `auth_data` provided.
1100    ///
1101    /// # Examples
1102    ///
1103    /// ```no_run
1104    /// # use std::collections::BTreeMap;
1105    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1106    /// # use url::Url;
1107    /// # use serde_json::json;
1108    /// # async {
1109    /// # let homeserver = Url::parse("http://example.com")?;
1110    /// # let client = Client::new(homeserver).await?;
1111    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1112    ///     if let Some(response) = e.as_uiaa_response() {
1113    ///         let mut password = uiaa::Password::new(
1114    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1115    ///             "wordpass".to_owned(),
1116    ///         );
1117    ///         password.session = response.session.clone();
1118    ///
1119    ///         client
1120    ///             .encryption()
1121    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1122    ///             .await
1123    ///             .expect("Couldn't bootstrap cross signing")
1124    ///     } else {
1125    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1126    ///     }
1127    /// }
1128    /// # anyhow::Ok(()) };
1129    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1130        let olm = self.client.olm_machine().await;
1131        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1132
1133        let CrossSigningBootstrapRequests {
1134            upload_signing_keys_req,
1135            upload_keys_req,
1136            upload_signatures_req,
1137        } = olm.bootstrap_cross_signing(false).await?;
1138
1139        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1140            auth: auth_data,
1141            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1142            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1143            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1144        });
1145
1146        if let Some(req) = upload_keys_req {
1147            self.client.send_outgoing_request(req).await?;
1148        }
1149        self.client.send(upload_signing_keys_req).await?;
1150        self.client.send(upload_signatures_req).await?;
1151
1152        Ok(())
1153    }
1154
1155    /// Reset the cross-signing keys.
1156    ///
1157    /// # Example
1158    ///
1159    /// ```no_run
1160    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1161    /// # use url::Url;
1162    /// # async {
1163    /// # let homeserver = Url::parse("http://example.com")?;
1164    /// # let client = Client::new(homeserver).await?;
1165    /// # let user_id = unimplemented!();
1166    /// let encryption = client.encryption();
1167    ///
1168    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1169    ///     match handle.auth_type() {
1170    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1171    ///             use matrix_sdk::ruma::api::client::uiaa;
1172    ///
1173    ///             let password = "1234".to_owned();
1174    ///             let mut password = uiaa::Password::new(user_id, password);
1175    ///             password.session = uiaa.session;
1176    ///
1177    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1178    ///         }
1179    ///         CrossSigningResetAuthType::Oidc(o) => {
1180    ///             println!(
1181    ///                 "To reset your end-to-end encryption cross-signing identity, \
1182    ///                 you first need to approve it at {}",
1183    ///                 o.approval_url
1184    ///             );
1185    ///             handle.auth(None).await?;
1186    ///         }
1187    ///     }
1188    /// }
1189    /// # anyhow::Ok(()) };
1190    /// ```
1191    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1192        let olm = self.client.olm_machine().await;
1193        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1194
1195        let CrossSigningBootstrapRequests {
1196            upload_keys_req,
1197            upload_signing_keys_req,
1198            upload_signatures_req,
1199        } = olm.bootstrap_cross_signing(true).await?;
1200
1201        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1202            auth: None,
1203            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1204            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1205            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1206        });
1207
1208        if let Some(req) = upload_keys_req {
1209            self.client.send_outgoing_request(req).await?;
1210        }
1211
1212        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1213            if let Some(auth_type) = CrossSigningResetAuthType::new(&self.client, &error).await? {
1214                let client = self.client.clone();
1215
1216                Ok(Some(CrossSigningResetHandle::new(
1217                    client,
1218                    upload_signing_keys_req,
1219                    upload_signatures_req,
1220                    auth_type,
1221                )))
1222            } else {
1223                Err(error.into())
1224            }
1225        } else {
1226            self.client.send(upload_signatures_req).await?;
1227
1228            Ok(None)
1229        }
1230    }
1231
1232    /// Query the user's own device keys, if, and only if, we didn't have their
1233    /// identity in the first place.
1234    async fn ensure_initial_key_query(&self) -> Result<()> {
1235        let olm_machine = self.client.olm_machine().await;
1236        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1237
1238        let user_id = olm_machine.user_id();
1239
1240        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1241            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1242            self.client.keys_query(&request_id, request.device_keys).await?;
1243        }
1244
1245        Ok(())
1246    }
1247
1248    /// Create and upload a new cross signing identity, if that has not been
1249    /// done yet.
1250    ///
1251    /// This will only create a new cross-signing identity if the user had never
1252    /// done it before. If the user did it before, then this is a no-op.
1253    ///
1254    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1255    /// behavior of this function.
1256    ///
1257    /// # Arguments
1258    ///
1259    /// * `auth_data` - This request requires user interactive auth, the first
1260    ///   request needs to set this to `None` and will always fail with an
1261    ///   `UiaaResponse`. The response will contain information for the
1262    ///   interactive auth and the same request needs to be made but this time
1263    ///   with some `auth_data` provided.
1264    ///
1265    /// # Examples
1266    /// ```no_run
1267    /// # use std::collections::BTreeMap;
1268    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1269    /// # use url::Url;
1270    /// # use serde_json::json;
1271    /// # async {
1272    /// # let homeserver = Url::parse("http://example.com")?;
1273    /// # let client = Client::new(homeserver).await?;
1274    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1275    ///     if let Some(response) = e.as_uiaa_response() {
1276    ///         let mut password = uiaa::Password::new(
1277    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1278    ///             "wordpass".to_owned(),
1279    ///         );
1280    ///         password.session = response.session.clone();
1281    ///
1282    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1283    ///         // avoid checks.
1284    ///         client
1285    ///             .encryption()
1286    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1287    ///             .await
1288    ///             .expect("Couldn't bootstrap cross signing")
1289    ///     } else {
1290    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1291    ///     }
1292    /// }
1293    /// # anyhow::Ok(()) };
1294    pub async fn bootstrap_cross_signing_if_needed(
1295        &self,
1296        auth_data: Option<AuthData>,
1297    ) -> Result<()> {
1298        let olm_machine = self.client.olm_machine().await;
1299        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1300        let user_id = olm_machine.user_id();
1301
1302        self.ensure_initial_key_query().await?;
1303
1304        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1305            self.bootstrap_cross_signing(auth_data).await?;
1306        }
1307
1308        Ok(())
1309    }
1310
1311    /// Export E2EE keys that match the given predicate encrypting them with the
1312    /// given passphrase.
1313    ///
1314    /// # Arguments
1315    ///
1316    /// * `path` - The file path where the exported key file will be saved.
1317    ///
1318    /// * `passphrase` - The passphrase that will be used to encrypt the
1319    ///   exported room keys.
1320    ///
1321    /// * `predicate` - A closure that will be called for every known
1322    ///   `InboundGroupSession`, which represents a room key. If the closure
1323    ///   returns `true` the `InboundGroupSessoin` will be included in the
1324    ///   export, if the closure returns `false` it will not be included.
1325    ///
1326    /// # Panics
1327    ///
1328    /// This method will panic if it isn't run on a Tokio runtime.
1329    ///
1330    /// This method will panic if it can't get enough randomness from the OS to
1331    /// encrypt the exported keys securely.
1332    ///
1333    /// # Examples
1334    ///
1335    /// ```no_run
1336    /// # use std::{path::PathBuf, time::Duration};
1337    /// # use matrix_sdk::{
1338    /// #     Client, config::SyncSettings,
1339    /// #     ruma::room_id,
1340    /// # };
1341    /// # use url::Url;
1342    /// # async {
1343    /// # let homeserver = Url::parse("http://localhost:8080")?;
1344    /// # let mut client = Client::new(homeserver).await?;
1345    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1346    /// // Export all room keys.
1347    /// client
1348    ///     .encryption()
1349    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1350    ///     .await?;
1351    ///
1352    /// // Export only the room keys for a certain room.
1353    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1354    /// let room_id = room_id!("!test:localhost");
1355    ///
1356    /// client
1357    ///     .encryption()
1358    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1359    ///     .await?;
1360    /// # anyhow::Ok(()) };
1361    /// ```
1362    #[cfg(not(target_arch = "wasm32"))]
1363    pub async fn export_room_keys(
1364        &self,
1365        path: PathBuf,
1366        passphrase: &str,
1367        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1368    ) -> Result<()> {
1369        let olm = self.client.olm_machine().await;
1370        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1371
1372        let keys = olm.store().export_room_keys(predicate).await?;
1373        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1374
1375        let encrypt = move || -> Result<()> {
1376            let export: String =
1377                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1378            let mut file = std::fs::File::create(path)?;
1379            file.write_all(&export.into_bytes())?;
1380            Ok(())
1381        };
1382
1383        let task = tokio::task::spawn_blocking(encrypt);
1384        task.await.expect("Task join error")
1385    }
1386
1387    /// Import E2EE keys from the given file path.
1388    ///
1389    /// # Arguments
1390    ///
1391    /// * `path` - The file path where the exported key file will can be found.
1392    ///
1393    /// * `passphrase` - The passphrase that should be used to decrypt the
1394    ///   exported room keys.
1395    ///
1396    /// Returns a tuple of numbers that represent the number of sessions that
1397    /// were imported and the total number of sessions that were found in the
1398    /// key export.
1399    ///
1400    /// # Panics
1401    ///
1402    /// This method will panic if it isn't run on a Tokio runtime.
1403    ///
1404    /// ```no_run
1405    /// # use std::{path::PathBuf, time::Duration};
1406    /// # use matrix_sdk::{
1407    /// #     Client, config::SyncSettings,
1408    /// #     ruma::room_id,
1409    /// # };
1410    /// # use url::Url;
1411    /// # async {
1412    /// # let homeserver = Url::parse("http://localhost:8080")?;
1413    /// # let mut client = Client::new(homeserver).await?;
1414    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1415    /// let result =
1416    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1417    ///
1418    /// println!(
1419    ///     "Imported {} room keys out of {}",
1420    ///     result.imported_count, result.total_count
1421    /// );
1422    /// # anyhow::Ok(()) };
1423    /// ```
1424    #[cfg(not(target_arch = "wasm32"))]
1425    pub async fn import_room_keys(
1426        &self,
1427        path: PathBuf,
1428        passphrase: &str,
1429    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1430        let olm = self.client.olm_machine().await;
1431        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1432        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1433
1434        let decrypt = move || {
1435            let file = std::fs::File::open(path)?;
1436            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1437        };
1438
1439        let task = tokio::task::spawn_blocking(decrypt);
1440        let import = task.await.expect("Task join error")?;
1441
1442        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1443
1444        self.backups().maybe_trigger_backup();
1445
1446        Ok(ret)
1447    }
1448
1449    /// Receive notifications of room keys being received as a [`Stream`].
1450    ///
1451    /// Each time a room key is updated in any way, an update will be sent to
1452    /// the stream. Updates that happen at the same time are batched into a
1453    /// [`Vec`].
1454    ///
1455    /// If the reader of the stream lags too far behind, an error is broadcast
1456    /// containing the number of skipped items.
1457    ///
1458    /// # Examples
1459    ///
1460    /// ```no_run
1461    /// # use matrix_sdk::Client;
1462    /// # use url::Url;
1463    /// # async {
1464    /// # let homeserver = Url::parse("http://example.com")?;
1465    /// # let client = Client::new(homeserver).await?;
1466    /// use futures_util::StreamExt;
1467    ///
1468    /// let Some(mut room_keys_stream) =
1469    ///     client.encryption().room_keys_received_stream().await
1470    /// else {
1471    ///     return Ok(());
1472    /// };
1473    ///
1474    /// while let Some(update) = room_keys_stream.next().await {
1475    ///     println!("Received room keys {update:?}");
1476    /// }
1477    /// # anyhow::Ok(()) };
1478    /// ```
1479    pub async fn room_keys_received_stream(
1480        &self,
1481    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
1482        let olm = self.client.olm_machine().await;
1483        let olm = olm.as_ref()?;
1484
1485        Some(olm.store().room_keys_received_stream())
1486    }
1487
1488    /// Get the secret storage manager of the client.
1489    pub fn secret_storage(&self) -> SecretStorage {
1490        SecretStorage { client: self.client.to_owned() }
1491    }
1492
1493    /// Get the backups manager of the client.
1494    pub fn backups(&self) -> Backups {
1495        Backups { client: self.client.to_owned() }
1496    }
1497
1498    /// Get the recovery manager of the client.
1499    pub fn recovery(&self) -> Recovery {
1500        Recovery { client: self.client.to_owned() }
1501    }
1502
1503    /// Enables the crypto-store cross-process lock.
1504    ///
1505    /// This may be required if there are multiple processes that may do writes
1506    /// to the same crypto store. In that case, it's necessary to create a
1507    /// lock, so that only one process writes to it, otherwise this may
1508    /// cause confusing issues because of stale data contained in in-memory
1509    /// caches.
1510    ///
1511    /// The provided `lock_value` must be a unique identifier for this process.
1512    /// Check [`Client::cross_process_store_locks_holder_name`] to
1513    /// get the global value.
1514    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1515        // If the lock has already been created, don't recreate it from scratch.
1516        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1517            let prev_holder = prev_lock.lock_holder();
1518            if prev_holder == lock_value {
1519                return Ok(());
1520            }
1521            warn!("Recreating cross-process store lock with a different holder value: prev was {prev_holder}, new is {lock_value}");
1522        }
1523
1524        let olm_machine = self.client.base_client().olm_machine().await;
1525        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1526
1527        let lock =
1528            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1529
1530        // Gently try to initialize the crypto store generation counter.
1531        //
1532        // If we don't get the lock immediately, then it is already acquired by another
1533        // process, and we'll get to reload next time we acquire the lock.
1534        {
1535            let guard = lock.try_lock_once().await?;
1536            if guard.is_some() {
1537                olm_machine
1538                    .initialize_crypto_store_generation(
1539                        &self.client.locks().crypto_store_generation,
1540                    )
1541                    .await?;
1542            }
1543        }
1544
1545        self.client
1546            .locks()
1547            .cross_process_crypto_store_lock
1548            .set(lock)
1549            .map_err(|_| Error::BadCryptoStoreState)?;
1550
1551        Ok(())
1552    }
1553
1554    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1555    /// time.
1556    ///
1557    /// Returns the current generation number.
1558    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1559        let olm_machine_guard = self.client.olm_machine().await;
1560        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1561            let (new_gen, generation_number) = olm_machine
1562                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1563                .await?;
1564            // If the crypto store generation has changed,
1565            if new_gen {
1566                // (get rid of the reference to the current crypto store first)
1567                drop(olm_machine_guard);
1568                // Recreate the OlmMachine.
1569                self.client.base_client().regenerate_olm(None).await?;
1570            }
1571            Ok(generation_number)
1572        } else {
1573            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1574            // been initialised by the time we get here. Ideally we'd panic, or return an
1575            // error, but for now I'm just adding some logging to check if it
1576            // happens, and returning the magic number 0.
1577            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1578            Ok(0)
1579        }
1580    }
1581
1582    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1583    /// spin-waits until the lock is available.
1584    ///
1585    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1586    /// first time.
1587    pub async fn spin_lock_store(
1588        &self,
1589        max_backoff: Option<u32>,
1590    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1591        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1592            let guard = lock.spin_lock(max_backoff).await?;
1593
1594            let generation = self.on_lock_newly_acquired().await?;
1595
1596            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1597        } else {
1598            Ok(None)
1599        }
1600    }
1601
1602    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1603    /// attempts to lock it once.
1604    ///
1605    /// Returns a guard to the lock, if it was obtained.
1606    pub async fn try_lock_store_once(
1607        &self,
1608    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1609        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1610            let maybe_guard = lock.try_lock_once().await?;
1611
1612            let Some(guard) = maybe_guard else {
1613                return Ok(None);
1614            };
1615
1616            let generation = self.on_lock_newly_acquired().await?;
1617
1618            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1619        } else {
1620            Ok(None)
1621        }
1622    }
1623
1624    /// Testing purposes only.
1625    #[cfg(any(test, feature = "testing"))]
1626    pub async fn uploaded_key_count(&self) -> Result<u64> {
1627        let olm_machine = self.client.olm_machine().await;
1628        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1629        Ok(olm_machine.uploaded_key_count().await?)
1630    }
1631
1632    /// Bootstrap encryption and enables event listeners for the E2EE support.
1633    ///
1634    /// Based on the `EncryptionSettings`, this call might:
1635    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1636    /// - Create a key backup if needed (POST `/room_keys/version`)
1637    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1638    ///
1639    /// As part of this process, and if needed, the current device keys would be
1640    /// uploaded to the server, new account data would be added, and cross
1641    /// signing keys and signatures might be uploaded.
1642    ///
1643    /// Should be called once we
1644    /// created a [`OlmMachine`], i.e. after logging in.
1645    ///
1646    /// # Arguments
1647    ///
1648    /// * `auth_data` - Some requests may require re-authentication. To prevent
1649    ///   the user from having to re-enter their password (or use other
1650    ///   methods), we can provide the authentication data here. This is
1651    ///   necessary for uploading cross-signing keys. However, please note that
1652    ///   there is a proposal (MSC3967) to remove this requirement, which would
1653    ///   allow for the initial upload of cross-signing keys without
1654    ///   authentication, rendering this parameter obsolete.
1655    pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1656        let mut tasks = self.client.inner.e2ee.tasks.lock();
1657
1658        let this = self.clone();
1659        tasks.setup_e2ee = Some(spawn(async move {
1660            // Update the current state first, so we don't have to wait for the result of
1661            // network requests
1662            this.update_verification_state().await;
1663
1664            if this.settings().auto_enable_cross_signing {
1665                if let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await {
1666                    error!("Couldn't bootstrap cross signing {e:?}");
1667                }
1668            }
1669
1670            if let Err(e) = this.backups().setup_and_resume().await {
1671                error!("Couldn't setup and resume backups {e:?}");
1672            }
1673            if let Err(e) = this.recovery().setup().await {
1674                error!("Couldn't setup and resume recovery {e:?}");
1675            }
1676        }));
1677    }
1678
1679    /// Waits for end-to-end encryption initialization tasks to finish, if any
1680    /// was running in the background.
1681    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1682        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1683
1684        if let Some(task) = task {
1685            if let Err(err) = task.await {
1686                warn!("Error when initializing backups: {err}");
1687            }
1688        }
1689    }
1690
1691    /// Upload the device keys and initial set of one-tim keys to the server.
1692    ///
1693    /// This should only be called when the user logs in for the first time,
1694    /// the method will ensure that other devices see our own device as an
1695    /// end-to-end encryption enabled one.
1696    ///
1697    /// **Warning**: Do not use this method if we're already calling
1698    /// [`Client::send_outgoing_request()`]. This method is intended for
1699    /// explicitly uploading the device keys before starting a sync.
1700    #[cfg(feature = "experimental-oidc")]
1701    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1702        let olm = self.client.olm_machine().await;
1703        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1704
1705        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1706            self.client.keys_upload(&request_id, &request).await?;
1707
1708            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1709            self.client.keys_query(&request_id, request.device_keys).await?;
1710        }
1711
1712        Ok(())
1713    }
1714
1715    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1716        self.recovery().update_state_after_keys_query(response).await;
1717
1718        // Only update the verification_state if our own devices changed
1719        if let Some(user_id) = self.client.user_id() {
1720            let contains_own_device = response.device_keys.contains_key(user_id);
1721
1722            if contains_own_device {
1723                self.update_verification_state().await;
1724            }
1725        }
1726    }
1727
1728    async fn update_verification_state(&self) {
1729        match self.get_own_device().await {
1730            Ok(device) => {
1731                if let Some(device) = device {
1732                    let is_verified = device.is_cross_signed_by_owner();
1733
1734                    if is_verified {
1735                        self.client.inner.verification_state.set(VerificationState::Verified);
1736                    } else {
1737                        self.client.inner.verification_state.set(VerificationState::Unverified);
1738                    }
1739                } else {
1740                    warn!("Couldn't find out own device in the store.");
1741                    self.client.inner.verification_state.set(VerificationState::Unknown);
1742                }
1743            }
1744            Err(error) => {
1745                warn!("Failed retrieving own device: {error}");
1746                self.client.inner.verification_state.set(VerificationState::Unknown);
1747            }
1748        }
1749    }
1750}
1751
1752#[cfg(all(test, not(target_arch = "wasm32")))]
1753mod tests {
1754    use std::{
1755        ops::Not,
1756        sync::{
1757            atomic::{AtomicBool, Ordering},
1758            Arc,
1759        },
1760        time::Duration,
1761    };
1762
1763    use matrix_sdk_base::SessionMeta;
1764    use matrix_sdk_test::{
1765        async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
1766        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
1767    };
1768    use ruma::{
1769        device_id, event_id,
1770        events::{reaction::ReactionEventContent, relation::Annotation},
1771        user_id,
1772    };
1773    use serde_json::json;
1774    use wiremock::{
1775        matchers::{header, method, path_regex},
1776        Mock, MockServer, Request, ResponseTemplate,
1777    };
1778
1779    use crate::{
1780        assert_next_matches_with_timeout,
1781        authentication::matrix::{MatrixSession, MatrixSessionTokens},
1782        config::RequestConfig,
1783        encryption::VerificationState,
1784        test_utils::{logged_in_client, no_retry_test_client, set_client_session},
1785        Client,
1786    };
1787
1788    #[async_test]
1789    async fn test_reaction_sending() {
1790        let server = MockServer::start().await;
1791        let client = logged_in_client(Some(server.uri())).await;
1792
1793        let event_id = event_id!("$2:example.org");
1794
1795        Mock::given(method("GET"))
1796            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
1797            .and(header("authorization", "Bearer 1234"))
1798            .respond_with(
1799                ResponseTemplate::new(200)
1800                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
1801            )
1802            .mount(&server)
1803            .await;
1804
1805        Mock::given(method("PUT"))
1806            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
1807            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1808                "event_id": event_id,
1809            })))
1810            .mount(&server)
1811            .await;
1812
1813        let response = SyncResponseBuilder::default()
1814            .add_joined_room(
1815                JoinedRoomBuilder::default()
1816                    .add_state_event(StateTestEvent::Member)
1817                    .add_state_event(StateTestEvent::PowerLevels)
1818                    .add_state_event(StateTestEvent::Encryption),
1819            )
1820            .build_sync_response();
1821
1822        client.base_client().receive_sync_response(response).await.unwrap();
1823
1824        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
1825        assert!(room.is_encrypted().await.expect("Getting encryption state"));
1826
1827        let event_id = event_id!("$1:example.org");
1828        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
1829        room.send(reaction).await.expect("Sending the reaction should not fail");
1830
1831        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
1832    }
1833
1834    #[async_test]
1835    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
1836        let server = MockServer::start().await;
1837        let client = logged_in_client(Some(server.uri())).await;
1838        // This is the user ID that is inside MemberAdditional.
1839        // Note the confusing username, so we can share
1840        // GlobalAccountDataTestEvent::Direct with the invited test.
1841        let user_id = user_id!("@invited:localhost");
1842
1843        // When we receive a sync response saying "invited" is invited to a DM
1844        let response = SyncResponseBuilder::default()
1845            .add_joined_room(
1846                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
1847            )
1848            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1849            .build_sync_response();
1850        client.base_client().receive_sync_response(response).await.unwrap();
1851
1852        // Then get_dm_room finds this room
1853        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1854        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1855    }
1856
1857    #[async_test]
1858    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
1859        let server = MockServer::start().await;
1860        let client = logged_in_client(Some(server.uri())).await;
1861        // This is the user ID that is inside MemberInvite
1862        let user_id = user_id!("@invited:localhost");
1863
1864        // When we receive a sync response saying "invited" is invited to a DM
1865        let response = SyncResponseBuilder::default()
1866            .add_joined_room(
1867                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
1868            )
1869            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1870            .build_sync_response();
1871        client.base_client().receive_sync_response(response).await.unwrap();
1872
1873        // Then get_dm_room finds this room
1874        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1875        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1876    }
1877
1878    #[async_test]
1879    async fn test_get_dm_room_still_finds_left_room() {
1880        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
1881        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
1882
1883        let server = MockServer::start().await;
1884        let client = logged_in_client(Some(server.uri())).await;
1885        // This is the user ID that is inside MemberAdditional.
1886        // Note the confusing username, so we can share
1887        // GlobalAccountDataTestEvent::Direct with the invited test.
1888        let user_id = user_id!("@invited:localhost");
1889
1890        // When we receive a sync response saying "invited" is invited to a DM
1891        let response = SyncResponseBuilder::default()
1892            .add_joined_room(
1893                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
1894            )
1895            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1896            .build_sync_response();
1897        client.base_client().receive_sync_response(response).await.unwrap();
1898
1899        // Then get_dm_room finds this room
1900        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1901        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1902    }
1903
1904    #[cfg(feature = "sqlite")]
1905    #[async_test]
1906    async fn test_generation_counter_invalidates_olm_machine() {
1907        // Create two clients using the same sqlite database.
1908        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
1909        let session = MatrixSession {
1910            meta: SessionMeta {
1911                user_id: user_id!("@example:localhost").to_owned(),
1912                device_id: device_id!("DEVICEID").to_owned(),
1913            },
1914            tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None },
1915        };
1916
1917        let client1 = Client::builder()
1918            .homeserver_url("http://localhost:1234")
1919            .request_config(RequestConfig::new().disable_retry())
1920            .sqlite_store(&sqlite_path, None)
1921            .build()
1922            .await
1923            .unwrap();
1924        client1.matrix_auth().restore_session(session.clone()).await.unwrap();
1925
1926        let client2 = Client::builder()
1927            .homeserver_url("http://localhost:1234")
1928            .request_config(RequestConfig::new().disable_retry())
1929            .sqlite_store(sqlite_path, None)
1930            .build()
1931            .await
1932            .unwrap();
1933        client2.matrix_auth().restore_session(session).await.unwrap();
1934
1935        // When the lock isn't enabled, any attempt at locking won't return a guard.
1936        let guard = client1.encryption().try_lock_store_once().await.unwrap();
1937        assert!(guard.is_none());
1938
1939        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
1940        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
1941
1942        // One client can take the lock.
1943        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1944        assert!(acquired1.is_some());
1945
1946        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
1947        let initial_olm_machine =
1948            client1.olm_machine().await.clone().expect("must have an olm machine");
1949
1950        // Also enable backup to check that new machine has the same backup keys.
1951        let decryption_key = matrix_sdk_base::crypto::store::BackupDecryptionKey::new()
1952            .expect("Can't create new recovery key");
1953        let backup_key = decryption_key.megolm_v1_public_key();
1954        backup_key.set_version("1".to_owned());
1955        initial_olm_machine
1956            .backup_machine()
1957            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
1958            .await
1959            .expect("Should save");
1960
1961        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
1962
1963        assert!(client1.encryption().backups().are_enabled().await);
1964
1965        // The other client can't take the lock too.
1966        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
1967        assert!(acquired2.is_none());
1968
1969        // Now have the first client release the lock,
1970        drop(acquired1);
1971        tokio::time::sleep(Duration::from_millis(100)).await;
1972
1973        // And re-take it.
1974        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1975        assert!(acquired1.is_some());
1976
1977        // In that case, the Olm Machine shouldn't change.
1978        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
1979        assert!(initial_olm_machine.same_as(&olm_machine));
1980
1981        // Ok, release again.
1982        drop(acquired1);
1983        tokio::time::sleep(Duration::from_millis(100)).await;
1984
1985        // Client2 can acquire the lock.
1986        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
1987        assert!(acquired2.is_some());
1988
1989        // And then release it.
1990        drop(acquired2);
1991        tokio::time::sleep(Duration::from_millis(100)).await;
1992
1993        // Client1 can acquire it again,
1994        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1995        assert!(acquired1.is_some());
1996
1997        // But now its olm machine has been invalidated and thus regenerated!
1998        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
1999
2000        assert!(!initial_olm_machine.same_as(&olm_machine));
2001
2002        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2003        assert!(backup_key_new.decryption_key.is_some());
2004        assert_eq!(
2005            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2006            backup_key.to_base64()
2007        );
2008        assert!(client1.encryption().backups().are_enabled().await);
2009    }
2010
2011    #[cfg(feature = "sqlite")]
2012    #[async_test]
2013    async fn test_generation_counter_no_spurious_invalidation() {
2014        // Create two clients using the same sqlite database.
2015        let sqlite_path =
2016            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2017        let session = MatrixSession {
2018            meta: SessionMeta {
2019                user_id: user_id!("@example:localhost").to_owned(),
2020                device_id: device_id!("DEVICEID").to_owned(),
2021            },
2022            tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None },
2023        };
2024
2025        let client = Client::builder()
2026            .homeserver_url("http://localhost:1234")
2027            .request_config(RequestConfig::new().disable_retry())
2028            .sqlite_store(&sqlite_path, None)
2029            .build()
2030            .await
2031            .unwrap();
2032        client.matrix_auth().restore_session(session.clone()).await.unwrap();
2033
2034        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2035
2036        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2037
2038        // Enabling the lock doesn't update the olm machine.
2039        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2040        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2041
2042        {
2043            // Simulate that another client hold the lock before.
2044            let client2 = Client::builder()
2045                .homeserver_url("http://localhost:1234")
2046                .request_config(RequestConfig::new().disable_retry())
2047                .sqlite_store(sqlite_path, None)
2048                .build()
2049                .await
2050                .unwrap();
2051            client2.matrix_auth().restore_session(session).await.unwrap();
2052
2053            client2
2054                .encryption()
2055                .enable_cross_process_store_lock("client2".to_owned())
2056                .await
2057                .unwrap();
2058
2059            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2060            assert!(guard.is_some());
2061
2062            drop(guard);
2063            tokio::time::sleep(Duration::from_millis(100)).await;
2064        }
2065
2066        {
2067            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2068            assert!(acquired.is_some());
2069        }
2070
2071        // Taking the lock the first time will update the olm machine.
2072        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2073        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2074
2075        {
2076            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2077            assert!(acquired.is_some());
2078        }
2079
2080        // Re-taking the lock doesn't update the olm machine.
2081        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2082        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2083    }
2084
2085    #[async_test]
2086    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2087        // Given a client and a server
2088        let client = no_retry_test_client(None).await;
2089        let server = MockServer::start().await;
2090
2091        // When we subscribe to its verification state
2092        let mut verification_state = client.encryption().verification_state();
2093
2094        // We can get its initial value, and it's Unknown
2095        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2096
2097        // We set up a mocked request to check this endpoint is not called before
2098        // reading the new state
2099        let keys_requested = Arc::new(AtomicBool::new(false));
2100        let inner_bool = keys_requested.clone();
2101
2102        Mock::given(method("GET"))
2103            .and(path_regex(
2104                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2105            ))
2106            .respond_with(move |_req: &Request| {
2107                inner_bool.fetch_or(true, Ordering::SeqCst);
2108                ResponseTemplate::new(200).set_body_json(json!({}))
2109            })
2110            .mount(&server)
2111            .await;
2112
2113        // When the session is initialised and the encryption tasks spawn
2114        set_client_session(&client).await;
2115
2116        // Then we can get an updated value without waiting for any network requests
2117        assert!(keys_requested.load(Ordering::SeqCst).not());
2118        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2119    }
2120}