Coverage for src / kdbxtool / parsing / kdbx3.py: 87%
95 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-20 19:19 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-20 19:19 +0000
1"""KDBX3 payload encryption and decryption.
3This module handles the cryptographic operations for KDBX3 files:
4- Master key derivation from credentials (AES-KDF)
5- Payload decryption and encryption
6- Content hashed block verification
7- Synthetic inner header creation from outer header
9KDBX3 structure:
101. Outer header (plaintext, with 2-byte length fields)
112. Encrypted payload (content hashed blocks format)
12 - Stream start bytes (32 bytes, for verification)
13 - Compressed/uncompressed XML database content
15Key differences from KDBX4:
16- No header hash or HMAC verification
17- Protected stream key is in outer header (not inner)
18- Uses content hashed blocks instead of HMAC block stream
19- No inner header inside the encrypted payload
20"""
22from __future__ import annotations
24import gzip
25import hashlib
26import logging
28from kdbxtool.exceptions import (
29 AuthenticationError,
30 CorruptedDataError,
31 KdfError,
32 UnsupportedVersionError,
33)
34from kdbxtool.security import (
35 CipherContext,
36 SecureBytes,
37 constant_time_compare,
38 derive_composite_key,
39 derive_key_aes_kdf,
40)
41from kdbxtool.security.kdf import AesKdfConfig
43from .context import ParseContext
44from .header import CompressionType, KdbxHeader, KdbxVersion
45from .kdbx4 import DecryptedPayload, InnerHeader
47logger = logging.getLogger(__name__)
50class Kdbx3Reader:
51 """Reader for KDBX3 format databases.
53 KDBX3 uses AES-KDF for key derivation and content hashed blocks
54 for payload integrity verification.
55 """
57 def __init__(self, data: bytes) -> None:
58 """Initialize reader with KDBX3 file data.
60 Args:
61 data: Complete KDBX3 file contents
62 """
63 self._data = data
64 self._ctx = ParseContext(data)
66 def decrypt(
67 self,
68 password: str | None = None,
69 keyfile_data: bytes | None = None,
70 transformed_key: bytes | None = None,
71 ) -> DecryptedPayload:
72 """Decrypt the KDBX3 file.
74 Args:
75 password: Optional password
76 keyfile_data: Optional keyfile contents
77 transformed_key: Optional pre-computed transformed key (skips KDF)
79 Returns:
80 DecryptedPayload with header, synthetic inner header, and XML
82 Raises:
83 AuthenticationError: If credentials are wrong
84 CorruptedDataError: If file is corrupted
85 """
86 logger.debug("Starting KDBX3 decryption")
88 # Parse outer header
89 header, header_end = KdbxHeader.parse(self._ctx.data)
91 if header.version != KdbxVersion.KDBX3:
92 raise UnsupportedVersionError(header.version.value, 0)
94 self._ctx.offset = header_end
96 # Use pre-computed transformed key if provided, otherwise derive it
97 if transformed_key is not None:
98 master_key = SecureBytes(transformed_key)
99 else:
100 # Derive composite key from credentials
101 composite_key = derive_composite_key(
102 password=password,
103 keyfile_data=keyfile_data,
104 )
106 # Derive master key using AES-KDF
107 master_key = self._derive_master_key(header, composite_key)
109 # Derive cipher key
110 cipher_key = self._derive_cipher_key(master_key.data, header.master_seed)
112 # Read encrypted payload (everything after header)
113 encrypted_payload = self._ctx.data[self._ctx.offset :]
115 # Decrypt payload
116 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv)
117 decrypted = ctx.decrypt(encrypted_payload)
119 # Remove PKCS7 padding for block ciphers
120 if header.cipher.iv_size == 16: # AES-CBC or Twofish-CBC
121 decrypted = self._remove_pkcs7_padding(decrypted)
123 # Verify stream start bytes (first 32 bytes)
124 if len(decrypted) < 32:
125 raise CorruptedDataError("Decrypted payload too short")
127 stream_start = decrypted[:32]
128 if header.stream_start_bytes is None:
129 raise CorruptedDataError("Missing stream start bytes in header")
131 if not constant_time_compare(stream_start, header.stream_start_bytes):
132 raise AuthenticationError()
133 logger.debug("Stream start bytes verified")
135 # Read content hashed blocks (after stream start bytes)
136 payload_data = self._read_hashed_blocks(decrypted[32:])
138 # Decompress if needed
139 if header.compression == CompressionType.GZIP:
140 payload_data = gzip.decompress(payload_data)
141 logger.debug("Payload decompressed, %d bytes", len(payload_data))
143 # Create synthetic inner header from outer header fields
144 inner_header = self._create_synthetic_inner_header(header)
146 return DecryptedPayload(
147 header=header,
148 inner_header=inner_header,
149 xml_data=payload_data,
150 transformed_key=master_key.data,
151 )
153 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes:
154 """Derive master key using AES-KDF."""
155 if header.aes_kdf_rounds is None:
156 raise KdfError("Missing AES-KDF rounds in header")
158 aes_config = AesKdfConfig(
159 salt=header.kdf_salt,
160 rounds=header.aes_kdf_rounds,
161 )
163 return derive_key_aes_kdf(composite_key.data, aes_config)
165 def _derive_cipher_key(self, master_key: bytes, master_seed: bytes) -> bytes:
166 """Derive the cipher key from master key and seed.
168 KDBX3 key derivation:
169 - cipher_key = SHA256(master_seed || transformed_key)
170 """
171 return hashlib.sha256(master_seed + master_key).digest()
173 def _read_hashed_blocks(self, data: bytes) -> bytes:
174 """Read and verify content hashed blocks.
176 KDBX3 uses content hashed blocks for integrity:
177 - 4 bytes: block index (sequential, starting at 0)
178 - 32 bytes: SHA-256 hash of block data
179 - 4 bytes: block data length
180 - N bytes: block data
182 Blocks continue until block data length is 0.
183 """
184 result = bytearray()
185 ctx = ParseContext(data)
186 expected_index = 0
188 with ctx.scope("hashed_blocks"):
189 while not ctx.exhausted:
190 with ctx.scope(f"block[{expected_index}]"):
191 # Read block index
192 block_index = ctx.read_u32("index")
194 if block_index != expected_index:
195 raise CorruptedDataError(
196 f"Block index mismatch: expected {expected_index}, got {block_index}"
197 )
199 # Read block hash
200 block_hash = ctx.read(32, "hash")
202 # Read block data length
203 block_len = ctx.read_u32("length")
205 # Check for end block
206 if block_len == 0:
207 # Verify empty block has zero hash
208 if block_hash != b"\x00" * 32:
209 raise CorruptedDataError("Invalid end block hash")
210 break
212 # Read block data
213 block_data = ctx.read(block_len, "data")
215 # Verify block hash
216 computed_hash = hashlib.sha256(block_data).digest()
217 if not constant_time_compare(computed_hash, block_hash):
218 raise CorruptedDataError(f"Block {block_index} hash mismatch")
220 result.extend(block_data)
221 expected_index += 1
223 logger.debug("Verified %d hashed blocks", expected_index)
224 return bytes(result)
226 def _create_synthetic_inner_header(self, header: KdbxHeader) -> InnerHeader:
227 """Create a synthetic inner header from KDBX3 outer header fields.
229 KDBX3 stores the protected stream configuration in the outer header,
230 while KDBX4 has an inner header. We create a synthetic inner header
231 so the rest of the code can work uniformly.
232 """
233 # Default to Salsa20 (stream_id=2) if not specified
234 stream_id = header.inner_random_stream_id or 2
236 # Protected stream key from outer header
237 if header.protected_stream_key is None:
238 raise CorruptedDataError("Missing protected stream key in KDBX3 header")
240 return InnerHeader(
241 random_stream_id=stream_id,
242 random_stream_key=header.protected_stream_key,
243 binaries={}, # KDBX3 stores binaries in XML, not inner header
244 )
246 @staticmethod
247 def _remove_pkcs7_padding(data: bytes) -> bytes:
248 """Remove PKCS7 padding from decrypted data."""
249 if not data:
250 raise CorruptedDataError("Empty decrypted data")
252 pad_len = data[-1]
253 if pad_len == 0 or pad_len > 16:
254 raise CorruptedDataError(f"Invalid PKCS7 padding length: {pad_len}")
256 # Verify all padding bytes are correct
257 for i in range(1, pad_len + 1):
258 if data[-i] != pad_len:
259 raise CorruptedDataError("Invalid PKCS7 padding bytes")
261 return data[:-pad_len]
264def read_kdbx3(
265 data: bytes,
266 password: str | None = None,
267 keyfile_data: bytes | None = None,
268 transformed_key: bytes | None = None,
269) -> DecryptedPayload:
270 """Read and decrypt a KDBX3 database.
272 Args:
273 data: Complete KDBX3 file contents
274 password: Optional password
275 keyfile_data: Optional keyfile contents
276 transformed_key: Optional pre-computed transformed key (skips KDF)
278 Returns:
279 DecryptedPayload containing header, inner header, and XML data
281 Raises:
282 AuthenticationError: If credentials are wrong
283 CorruptedDataError: If file is corrupted
284 UnsupportedVersionError: If file is not KDBX3
285 """
286 reader = Kdbx3Reader(data)
287 return reader.decrypt(
288 password=password,
289 keyfile_data=keyfile_data,
290 transformed_key=transformed_key,
291 )