Coverage for src / kdbxtool / parsing / kdbx3.py: 87%
89 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-19 21:22 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-19 21:22 +0000
1"""KDBX3 payload encryption and decryption.
3This module handles the cryptographic operations for KDBX3 files:
4- Master key derivation from credentials (AES-KDF)
5- Payload decryption and encryption
6- Content hashed block verification
7- Synthetic inner header creation from outer header
9KDBX3 structure:
101. Outer header (plaintext, with 2-byte length fields)
112. Encrypted payload (content hashed blocks format)
12 - Stream start bytes (32 bytes, for verification)
13 - Compressed/uncompressed XML database content
15Key differences from KDBX4:
16- No header hash or HMAC verification
17- Protected stream key is in outer header (not inner)
18- Uses content hashed blocks instead of HMAC block stream
19- No inner header inside the encrypted payload
20"""
22from __future__ import annotations
24import gzip
25import hashlib
27from kdbxtool.exceptions import (
28 AuthenticationError,
29 CorruptedDataError,
30 KdfError,
31 UnsupportedVersionError,
32)
33from kdbxtool.security import (
34 CipherContext,
35 SecureBytes,
36 constant_time_compare,
37 derive_composite_key,
38 derive_key_aes_kdf,
39)
40from kdbxtool.security.kdf import AesKdfConfig
42from .context import ParseContext
43from .header import CompressionType, KdbxHeader, KdbxVersion
44from .kdbx4 import DecryptedPayload, InnerHeader
47class Kdbx3Reader:
48 """Reader for KDBX3 format databases.
50 KDBX3 uses AES-KDF for key derivation and content hashed blocks
51 for payload integrity verification.
52 """
54 def __init__(self, data: bytes) -> None:
55 """Initialize reader with KDBX3 file data.
57 Args:
58 data: Complete KDBX3 file contents
59 """
60 self._data = data
61 self._ctx = ParseContext(data)
63 def decrypt(
64 self,
65 password: str | None = None,
66 keyfile_data: bytes | None = None,
67 transformed_key: bytes | None = None,
68 ) -> DecryptedPayload:
69 """Decrypt the KDBX3 file.
71 Args:
72 password: Optional password
73 keyfile_data: Optional keyfile contents
74 transformed_key: Optional pre-computed transformed key (skips KDF)
76 Returns:
77 DecryptedPayload with header, synthetic inner header, and XML
79 Raises:
80 AuthenticationError: If credentials are wrong
81 CorruptedDataError: If file is corrupted
82 """
83 # Parse outer header
84 header, header_end = KdbxHeader.parse(self._ctx.data)
86 if header.version != KdbxVersion.KDBX3:
87 raise UnsupportedVersionError(header.version.value, 0)
89 self._ctx.offset = header_end
91 # Use pre-computed transformed key if provided, otherwise derive it
92 if transformed_key is not None:
93 master_key = SecureBytes(transformed_key)
94 else:
95 # Derive composite key from credentials
96 composite_key = derive_composite_key(
97 password=password,
98 keyfile_data=keyfile_data,
99 )
101 # Derive master key using AES-KDF
102 master_key = self._derive_master_key(header, composite_key)
104 # Derive cipher key
105 cipher_key = self._derive_cipher_key(master_key.data, header.master_seed)
107 # Read encrypted payload (everything after header)
108 encrypted_payload = self._ctx.data[self._ctx.offset :]
110 # Decrypt payload
111 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv)
112 decrypted = ctx.decrypt(encrypted_payload)
114 # Remove PKCS7 padding for block ciphers
115 if header.cipher.iv_size == 16: # AES-CBC or Twofish-CBC
116 decrypted = self._remove_pkcs7_padding(decrypted)
118 # Verify stream start bytes (first 32 bytes)
119 if len(decrypted) < 32:
120 raise CorruptedDataError("Decrypted payload too short")
122 stream_start = decrypted[:32]
123 if header.stream_start_bytes is None:
124 raise CorruptedDataError("Missing stream start bytes in header")
126 if not constant_time_compare(stream_start, header.stream_start_bytes):
127 raise AuthenticationError()
129 # Read content hashed blocks (after stream start bytes)
130 payload_data = self._read_hashed_blocks(decrypted[32:])
132 # Decompress if needed
133 if header.compression == CompressionType.GZIP:
134 payload_data = gzip.decompress(payload_data)
136 # Create synthetic inner header from outer header fields
137 inner_header = self._create_synthetic_inner_header(header)
139 return DecryptedPayload(
140 header=header,
141 inner_header=inner_header,
142 xml_data=payload_data,
143 transformed_key=master_key.data,
144 )
146 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes:
147 """Derive master key using AES-KDF."""
148 if header.aes_kdf_rounds is None:
149 raise KdfError("Missing AES-KDF rounds in header")
151 aes_config = AesKdfConfig(
152 salt=header.kdf_salt,
153 rounds=header.aes_kdf_rounds,
154 )
156 return derive_key_aes_kdf(composite_key.data, aes_config)
158 def _derive_cipher_key(self, master_key: bytes, master_seed: bytes) -> bytes:
159 """Derive the cipher key from master key and seed.
161 KDBX3 key derivation:
162 - cipher_key = SHA256(master_seed || transformed_key)
163 """
164 return hashlib.sha256(master_seed + master_key).digest()
166 def _read_hashed_blocks(self, data: bytes) -> bytes:
167 """Read and verify content hashed blocks.
169 KDBX3 uses content hashed blocks for integrity:
170 - 4 bytes: block index (sequential, starting at 0)
171 - 32 bytes: SHA-256 hash of block data
172 - 4 bytes: block data length
173 - N bytes: block data
175 Blocks continue until block data length is 0.
176 """
177 result = bytearray()
178 ctx = ParseContext(data)
179 expected_index = 0
181 with ctx.scope("hashed_blocks"):
182 while not ctx.exhausted:
183 with ctx.scope(f"block[{expected_index}]"):
184 # Read block index
185 block_index = ctx.read_u32("index")
187 if block_index != expected_index:
188 raise CorruptedDataError(
189 f"Block index mismatch: expected {expected_index}, got {block_index}"
190 )
192 # Read block hash
193 block_hash = ctx.read(32, "hash")
195 # Read block data length
196 block_len = ctx.read_u32("length")
198 # Check for end block
199 if block_len == 0:
200 # Verify empty block has zero hash
201 if block_hash != b"\x00" * 32:
202 raise CorruptedDataError("Invalid end block hash")
203 break
205 # Read block data
206 block_data = ctx.read(block_len, "data")
208 # Verify block hash
209 computed_hash = hashlib.sha256(block_data).digest()
210 if not constant_time_compare(computed_hash, block_hash):
211 raise CorruptedDataError(f"Block {block_index} hash mismatch")
213 result.extend(block_data)
214 expected_index += 1
216 return bytes(result)
218 def _create_synthetic_inner_header(self, header: KdbxHeader) -> InnerHeader:
219 """Create a synthetic inner header from KDBX3 outer header fields.
221 KDBX3 stores the protected stream configuration in the outer header,
222 while KDBX4 has an inner header. We create a synthetic inner header
223 so the rest of the code can work uniformly.
224 """
225 # Default to Salsa20 (stream_id=2) if not specified
226 stream_id = header.inner_random_stream_id or 2
228 # Protected stream key from outer header
229 if header.protected_stream_key is None:
230 raise CorruptedDataError("Missing protected stream key in KDBX3 header")
232 return InnerHeader(
233 random_stream_id=stream_id,
234 random_stream_key=header.protected_stream_key,
235 binaries={}, # KDBX3 stores binaries in XML, not inner header
236 )
238 @staticmethod
239 def _remove_pkcs7_padding(data: bytes) -> bytes:
240 """Remove PKCS7 padding from decrypted data."""
241 if not data:
242 raise CorruptedDataError("Empty decrypted data")
244 pad_len = data[-1]
245 if pad_len == 0 or pad_len > 16:
246 raise CorruptedDataError(f"Invalid PKCS7 padding length: {pad_len}")
248 # Verify all padding bytes are correct
249 for i in range(1, pad_len + 1):
250 if data[-i] != pad_len:
251 raise CorruptedDataError("Invalid PKCS7 padding bytes")
253 return data[:-pad_len]
256def read_kdbx3(
257 data: bytes,
258 password: str | None = None,
259 keyfile_data: bytes | None = None,
260 transformed_key: bytes | None = None,
261) -> DecryptedPayload:
262 """Read and decrypt a KDBX3 database.
264 Args:
265 data: Complete KDBX3 file contents
266 password: Optional password
267 keyfile_data: Optional keyfile contents
268 transformed_key: Optional pre-computed transformed key (skips KDF)
270 Returns:
271 DecryptedPayload containing header, inner header, and XML data
273 Raises:
274 AuthenticationError: If credentials are wrong
275 CorruptedDataError: If file is corrupted
276 UnsupportedVersionError: If file is not KDBX3
277 """
278 reader = Kdbx3Reader(data)
279 return reader.decrypt(
280 password=password,
281 keyfile_data=keyfile_data,
282 transformed_key=transformed_key,
283 )