From 6ddc0fd348513dbec555b0955870b50b1fa5d126 Mon Sep 17 00:00:00 2001 From: Arpit Jain Date: Fri, 5 Jun 2026 20:43:27 +0900 Subject: [PATCH] fix: preserve root-cause errors in KMSMasterKey exception chaining KMSMasterKey caught boto3 ClientError (and KeyError) on the generate_data_key, encrypt, and decrypt calls and re-raised its own GenerateKeyError / EncryptKeyError / DecryptKeyError without chaining the original exception. Because the re-raise used a bare "raise SomeError(...)" instead of "raise SomeError(...) from error", the re-raised exception's __cause__ was None and the underlying KMS failure (for example an AccessDeniedException) was dropped from the traceback. Callers were left debugging in the dark with only the generic "unable to ... data key" message. Add "from error" at the three boto-call catch sites so the original exception is preserved as __cause__. The raised exception type and message are unchanged, so this is backward compatible; it only adds the chained cause to the traceback. Includes regression tests asserting __cause__ is the original ClientError for the generate, encrypt, and decrypt paths. Fixes #774 Signed-off-by: Arpit Jain --- src/aws_encryption_sdk/key_providers/kms.py | 12 +++--- test/unit/test_providers_kms_master_key.py | 47 +++++++++++++++++++++ 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/src/aws_encryption_sdk/key_providers/kms.py b/src/aws_encryption_sdk/key_providers/kms.py index 45402077d..b34e0b8fc 100644 --- a/src/aws_encryption_sdk/key_providers/kms.py +++ b/src/aws_encryption_sdk/key_providers/kms.py @@ -199,10 +199,10 @@ def _generate_data_key(self, algorithm, encryption_context=None): # //# ciphertext for the encrypted data key in the output. ciphertext = response["CiphertextBlob"] key_id = response["KeyId"] - except (ClientError, KeyError): + except (ClientError, KeyError) as error: error_message = "Master Key {key_id} unable to generate data key".format(key_id=self._key_id) _LOGGER.exception(error_message) - raise GenerateKeyError(error_message) + raise GenerateKeyError(error_message) from error # //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.10 # //# The response's "KeyId" MUST be valid. @@ -242,10 +242,10 @@ def _encrypt_data_key(self, data_key, algorithm, encryption_context=None): # //# encrypted data key. ciphertext = response["CiphertextBlob"] key_id = response["KeyId"] - except (ClientError, KeyError): + except (ClientError, KeyError) as error: error_message = "Master Key {key_id} unable to encrypt data key".format(key_id=self._key_id) _LOGGER.exception(error_message) - raise EncryptKeyError(error_message) + raise EncryptKeyError(error_message) from error # //= compliance/framework/aws-kms/aws-kms-mrk-aware-master-key.txt#2.11 # //# The AWS KMS Encrypt response MUST contain a valid "KeyId". @@ -315,10 +315,10 @@ def _decrypt_data_key(self, encrypted_data_key, algorithm, encryption_context=No ) raise DecryptKeyError(error_message) - except (ClientError, KeyError): + except (ClientError, KeyError) as error: error_message = "Master Key {key_id} unable to decrypt data key".format(key_id=self._key_id) _LOGGER.exception(error_message) - raise DecryptKeyError(error_message) + raise DecryptKeyError(error_message) from error return DataKey( key_provider=self.key_provider, data_key=plaintext, encrypted_data_key=encrypted_data_key.encrypted_data_key ) diff --git a/test/unit/test_providers_kms_master_key.py b/test/unit/test_providers_kms_master_key.py index 01a87d4fd..c38a0c853 100644 --- a/test/unit/test_providers_kms_master_key.py +++ b/test/unit/test_providers_kms_master_key.py @@ -229,6 +229,21 @@ def test_generate_data_key_unsuccessful_clienterror(self, config_class, key_clas test._generate_data_key(self.mock_algorithm) excinfo.match("Master Key .* unable to generate data key") + @pytest.mark.parametrize( + "config_class, key_class", + [(KMSMasterKeyConfig, KMSMasterKey), (MRKAwareKMSMasterKeyConfig, MRKAwareKMSMasterKey)], + ) + @pytest.mark.parametrize("key_id", [VALUES["mrk_arn_region1"], VALUES["arn"]]) + def test_generate_data_key_clienterror_preserves_cause(self, config_class, key_class, key_id): + """The original KMS error must be chained as __cause__ (see GitHub issue #774).""" + root_error = ClientError({"Error": {}}, "This is an error!") + self.mock_client.generate_data_key.side_effect = root_error + config = config_class(key_id=key_id, client=self.mock_client) + test = key_class(config=config) + with pytest.raises(GenerateKeyError) as excinfo: + test._generate_data_key(self.mock_algorithm) + assert excinfo.value.__cause__ is root_error + @pytest.mark.parametrize( "config_class, key_class", [(KMSMasterKeyConfig, KMSMasterKey), (MRKAwareKMSMasterKeyConfig, MRKAwareKMSMasterKey)], @@ -344,6 +359,21 @@ def test_encrypt_data_key_unsuccessful_clienterror(self, config_class, key_class test._encrypt_data_key(self.mock_data_key, self.mock_algorithm) excinfo.match("Master Key .* unable to encrypt data key") + @pytest.mark.parametrize( + "config_class, key_class", + [(KMSMasterKeyConfig, KMSMasterKey), (MRKAwareKMSMasterKeyConfig, MRKAwareKMSMasterKey)], + ) + @pytest.mark.parametrize("key_id", [VALUES["mrk_arn_region1"], VALUES["arn"]]) + def test_encrypt_data_key_clienterror_preserves_cause(self, config_class, key_class, key_id): + """The original KMS error must be chained as __cause__ (see GitHub issue #774).""" + root_error = ClientError({"Error": {}}, "This is an error!") + self.mock_client.encrypt.side_effect = root_error + config = config_class(key_id=key_id, client=self.mock_client) + test = key_class(config=config) + with pytest.raises(EncryptKeyError) as excinfo: + test._encrypt_data_key(self.mock_data_key, self.mock_algorithm) + assert excinfo.value.__cause__ is root_error + @pytest.mark.parametrize( "config_class, key_class", [(KMSMasterKeyConfig, KMSMasterKey), (MRKAwareKMSMasterKeyConfig, MRKAwareKMSMasterKey)], @@ -498,6 +528,23 @@ def test_decrypt_data_key_unsuccessful_clienterror(self, config_class, key_class test._decrypt_data_key(encrypted_data_key=self.mock_encrypted_data_key, algorithm=sentinel.algorithm) excinfo.match("Master Key .* unable to decrypt data key") + @pytest.mark.parametrize( + "config_class, key_class", + [(KMSMasterKeyConfig, KMSMasterKey), (MRKAwareKMSMasterKeyConfig, MRKAwareKMSMasterKey)], + ) + @pytest.mark.parametrize("key_id", [VALUES["mrk_arn_region1"], VALUES["arn"]]) + def test_decrypt_data_key_clienterror_preserves_cause(self, config_class, key_class, key_id): + """The original KMS error must be chained as __cause__ (see GitHub issue #774).""" + root_error = ClientError({"Error": {}}, "This is an error!") + self.mock_client.decrypt.side_effect = root_error + config = config_class(key_id=key_id, client=self.mock_client) + test = key_class(config=config) + self.mock_encrypted_data_key.key_provider.key_info = key_id + self.mock_client.decrypt.return_value["KeyId"] = key_id.decode("ascii") + with pytest.raises(DecryptKeyError) as excinfo: + test._decrypt_data_key(encrypted_data_key=self.mock_encrypted_data_key, algorithm=sentinel.algorithm) + assert excinfo.value.__cause__ is root_error + @pytest.mark.parametrize( "config_class, key_class", [(KMSMasterKeyConfig, KMSMasterKey), (MRKAwareKMSMasterKeyConfig, MRKAwareKMSMasterKey)],