Coverage for src / kdbxtool / parsing / kdbx4.py: 94%
222 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-19 21:22 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-19 21:22 +0000
1"""KDBX4 payload encryption and decryption.
3This module handles the cryptographic operations for KDBX4 files:
4- Master key derivation from credentials
5- Header integrity verification (HMAC-SHA256)
6- Payload decryption and encryption
7- Block-based HMAC verification (HmacBlockStream)
8- Inner header parsing
10KDBX4 structure:
111. Outer header (plaintext)
122. SHA-256 hash of header
133. HMAC-SHA256 of header
144. Encrypted payload (HmacBlockStream format)
15 - Inner header
16 - XML database content
17"""
19from __future__ import annotations
21import gzip
22import hashlib
23import struct
24import warnings
25from dataclasses import dataclass
26from typing import TYPE_CHECKING
28from kdbxtool.exceptions import (
29 AuthenticationError,
30 CorruptedDataError,
31 DecryptionError,
32 KdfError,
33 UnsupportedVersionError,
34)
35from kdbxtool.security import (
36 Argon2Config,
37 CipherContext,
38 SecureBytes,
39 compute_hmac_sha256,
40 constant_time_compare,
41 derive_composite_key,
42 derive_key_aes_kdf,
43 derive_key_argon2,
44)
45from kdbxtool.security.kdf import AesKdfConfig, KdfType
47from .context import BuildContext, ParseContext
48from .header import (
49 CompressionType,
50 InnerHeaderFieldType,
51 KdbxHeader,
52 KdbxVersion,
53)
55if TYPE_CHECKING:
56 pass
58# Maximum size for a single binary attachment (512 MiB)
59# Prevents memory exhaustion from malicious KDBX files
60MAX_BINARY_SIZE = 512 * 1024 * 1024
63@dataclass(slots=True)
64class InnerHeader:
65 """KDBX4 inner header data.
67 The inner header appears after decryption, before the XML payload.
68 It contains the protected stream cipher settings and binary attachments.
69 """
71 # Random stream for protected values (e.g., passwords in XML)
72 random_stream_id: int
73 random_stream_key: bytes
75 # Binary attachments (id -> data with protection flag)
76 binaries: dict[int, tuple[bool, bytes]]
79@dataclass(slots=True)
80class DecryptedPayload:
81 """Result of decrypting a KDBX4 file.
83 Contains all data needed to work with the database.
84 """
86 header: KdbxHeader
87 inner_header: InnerHeader
88 xml_data: bytes
89 transformed_key: bytes | None = None # For caching to speed up repeated opens
92class Kdbx4Reader:
93 """Reader for KDBX4 database files."""
95 def __init__(self, data: bytes) -> None:
96 """Initialize reader with file data.
98 Args:
99 data: Complete KDBX4 file contents
100 """
101 self._ctx = ParseContext(data)
103 def decrypt(
104 self,
105 password: str | None = None,
106 keyfile_data: bytes | None = None,
107 transformed_key: bytes | None = None,
108 yubikey_response: bytes | None = None,
109 ) -> DecryptedPayload:
110 """Decrypt the KDBX4 file.
112 Args:
113 password: Optional password
114 keyfile_data: Optional keyfile contents
115 transformed_key: Optional precomputed transformed key (skips KDF)
116 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response
118 Returns:
119 DecryptedPayload with header, inner header, XML, and transformed_key
121 Raises:
122 ValueError: If decryption fails (wrong credentials, corrupted file)
123 """
124 # Parse outer header
125 header, header_end = KdbxHeader.parse(self._ctx.data)
127 if header.version != KdbxVersion.KDBX4:
128 raise UnsupportedVersionError(header.version.value, 0)
130 self._ctx.offset = header_end
132 # Read header hash and HMAC
133 with self._ctx.scope("header_verification"):
134 header_hash = self._ctx.read(32, "header_hash")
135 header_hmac = self._ctx.read(32, "header_hmac")
137 # Verify header hash
138 computed_hash = hashlib.sha256(header.raw_header).digest()
139 if not constant_time_compare(computed_hash, header_hash):
140 raise CorruptedDataError("Header hash mismatch - file may be corrupted")
142 # Get transformed key - either use provided one or derive via KDF
143 if transformed_key is not None:
144 # Use precomputed transformed key (skips expensive KDF)
145 master_key_bytes = transformed_key
146 else:
147 # Derive composite key from credentials
148 # KeePassXC: YubiKey response is incorporated into composite key
149 composite_key = derive_composite_key(
150 password=password,
151 keyfile_data=keyfile_data,
152 yubikey_response=yubikey_response,
153 )
154 # Derive master key using KDF (slow)
155 master_key = self._derive_master_key(header, composite_key)
156 master_key_bytes = master_key.data
158 # Derive keys for HMAC and encryption
159 hmac_key, cipher_key = self._derive_keys(master_key_bytes, header.master_seed)
161 # Verify header HMAC
162 block_key = self._compute_block_hmac_key(hmac_key, 0xFFFFFFFFFFFFFFFF)
163 computed_hmac = compute_hmac_sha256(block_key, header.raw_header)
164 if not constant_time_compare(computed_hmac, header_hmac):
165 raise AuthenticationError()
167 # Read and verify HMAC block stream
168 encrypted_payload = self._read_hmac_block_stream(hmac_key)
170 # Decrypt payload
171 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv)
172 decrypted = ctx.decrypt(encrypted_payload)
174 # Remove PKCS7 padding for AES-CBC
175 if header.cipher.iv_size == 16: # AES-CBC
176 decrypted = self._remove_pkcs7_padding(decrypted)
178 # Decompress if needed
179 if header.compression == CompressionType.GZIP:
180 decrypted = gzip.decompress(decrypted)
182 # Parse inner header
183 inner_header, xml_start = self._parse_inner_header(decrypted)
185 # Extract XML
186 xml_data = decrypted[xml_start:]
188 return DecryptedPayload(
189 header=header,
190 inner_header=inner_header,
191 xml_data=xml_data,
192 transformed_key=master_key_bytes,
193 )
195 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes:
196 """Derive master key using the KDF specified in header."""
197 if header.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D):
198 if (
199 header.argon2_memory_kib is None
200 or header.argon2_iterations is None
201 or header.argon2_parallelism is None
202 ):
203 raise KdfError("Missing Argon2 parameters in header")
205 argon2_config = Argon2Config(
206 memory_kib=header.argon2_memory_kib,
207 iterations=header.argon2_iterations,
208 parallelism=header.argon2_parallelism,
209 salt=header.kdf_salt,
210 variant=header.kdf_type,
211 )
212 # Warn if parameters are below security minimums
213 try:
214 argon2_config.validate_security()
215 except KdfError as e:
216 warnings.warn(
217 f"Database has weak KDF parameters: {e}. "
218 "Consider re-saving with stronger settings.",
219 UserWarning,
220 stacklevel=4,
221 )
222 # Don't enforce minimums when reading - accept what the file has
223 return derive_key_argon2(composite_key.data, argon2_config, enforce_minimums=False)
224 elif header.kdf_type == KdfType.AES_KDF:
225 if header.aes_kdf_rounds is None:
226 raise KdfError("Missing AES-KDF rounds in header")
227 aes_config = AesKdfConfig(
228 rounds=header.aes_kdf_rounds,
229 salt=header.kdf_salt,
230 )
231 return derive_key_aes_kdf(composite_key.data, aes_config)
232 else:
233 raise KdfError(f"Unsupported KDF: {header.kdf_type}")
235 def _derive_keys(self, transformed_key: bytes, master_seed: bytes) -> tuple[bytes, bytes]:
236 """Derive HMAC key and cipher key from transformed key.
238 KDBX4 key derivation:
239 - cipher_key = SHA256(master_seed || transformed_key)
240 - hmac_key = SHA512(master_seed || transformed_key || 0x01)
241 """
242 cipher_key = hashlib.sha256(master_seed + transformed_key).digest()
243 hmac_key = hashlib.sha512(master_seed + transformed_key + b"\x01").digest()
245 return hmac_key, cipher_key
247 def _compute_block_hmac_key(self, hmac_key: bytes, block_index: int) -> bytes:
248 """Compute HMAC key for a specific block.
250 Each block uses a different key derived from the master HMAC key.
251 key = SHA512(block_index_le64 || hmac_key)
252 """
253 index_bytes = struct.pack("<Q", block_index)
254 return hashlib.sha512(index_bytes + hmac_key).digest()
256 def _read_hmac_block_stream(self, hmac_key: bytes) -> bytes:
257 """Read and verify HMAC block stream.
259 KDBX4 uses a block-based format with per-block HMAC:
260 - 32 bytes: HMAC of (block_index || length || data)
261 - 4 bytes: block length (little-endian)
262 - N bytes: block data
264 Last block has length 0.
265 """
266 blocks = []
267 block_index = 0
269 with self._ctx.scope("hmac_blocks"):
270 while True:
271 with self._ctx.scope(f"block[{block_index}]"):
272 block_hmac = self._ctx.read(32, "hmac")
273 block_len = self._ctx.read_u32("length")
275 if block_len == 0:
276 # Verify final block HMAC
277 block_key = self._compute_block_hmac_key(hmac_key, block_index)
278 expected = compute_hmac_sha256(
279 block_key,
280 struct.pack("<Q", block_index) + struct.pack("<I", 0),
281 )
282 if not constant_time_compare(expected, block_hmac):
283 raise AuthenticationError("Block authentication failed")
284 break
286 block_data = self._ctx.read(block_len, "data")
288 # Verify block HMAC
289 block_key = self._compute_block_hmac_key(hmac_key, block_index)
290 hmac_data = (
291 struct.pack("<Q", block_index) + struct.pack("<I", block_len) + block_data
292 )
293 expected = compute_hmac_sha256(block_key, hmac_data)
295 if not constant_time_compare(expected, block_hmac):
296 raise AuthenticationError("Block authentication failed")
298 blocks.append(block_data)
299 block_index += 1
301 return b"".join(blocks)
303 def _remove_pkcs7_padding(self, data: bytes) -> bytes:
304 """Remove PKCS7 padding from decrypted data.
306 Note: Padding oracle attacks are not possible here because HMAC
307 verification on the ciphertext occurs BEFORE decryption. Any
308 ciphertext modification would fail HMAC verification first.
309 We still use generic error messages for defense-in-depth.
310 """
311 if not data:
312 raise DecryptionError()
313 padding_len = data[-1]
314 if padding_len == 0 or padding_len > 16:
315 raise DecryptionError()
316 # Verify all padding bytes are correct
317 for i in range(1, padding_len + 1):
318 if data[-i] != padding_len:
319 raise DecryptionError()
320 return data[:-padding_len]
322 def _parse_inner_header(self, data: bytes) -> tuple[InnerHeader, int]:
323 """Parse KDBX4 inner header.
325 Returns inner header and offset where XML starts.
326 """
327 ctx = ParseContext(data)
328 random_stream_id = 0
329 random_stream_key = b""
330 binaries: dict[int, tuple[bool, bytes]] = {}
331 binary_index = 0
333 with ctx.scope("inner_header"):
334 while not ctx.exhausted:
335 field_type = ctx.read_u8("type")
336 field_len = ctx.read_u32("length")
337 field_data = ctx.read(field_len, "data")
339 if field_type == InnerHeaderFieldType.END:
340 break
341 elif field_type == InnerHeaderFieldType.INNER_RANDOM_STREAM_ID:
342 random_stream_id = struct.unpack("<I", field_data)[0]
343 elif field_type == InnerHeaderFieldType.INNER_RANDOM_STREAM_KEY:
344 random_stream_key = field_data
345 elif field_type == InnerHeaderFieldType.BINARY:
346 # First byte is protection flag
347 binary_data = field_data[1:]
348 if len(binary_data) > MAX_BINARY_SIZE:
349 raise CorruptedDataError(
350 f"Binary attachment too large: {len(binary_data)} bytes "
351 f"(max {MAX_BINARY_SIZE} bytes)"
352 )
353 protected = field_data[0] != 0
354 binaries[binary_index] = (protected, binary_data)
355 binary_index += 1
357 return (
358 InnerHeader(
359 random_stream_id=random_stream_id,
360 random_stream_key=random_stream_key,
361 binaries=binaries,
362 ),
363 ctx.offset,
364 )
367class Kdbx4Writer:
368 """Writer for KDBX4 database files."""
370 # Default block size for HMAC block stream (1 MiB)
371 BLOCK_SIZE = 1024 * 1024
373 def encrypt(
374 self,
375 header: KdbxHeader,
376 inner_header: InnerHeader,
377 xml_data: bytes,
378 password: str | None = None,
379 keyfile_data: bytes | None = None,
380 transformed_key: bytes | None = None,
381 yubikey_response: bytes | None = None,
382 ) -> bytes:
383 """Encrypt database to KDBX4 format.
385 Args:
386 header: Outer header configuration
387 inner_header: Inner header with stream cipher and binaries
388 xml_data: XML database content
389 password: Optional password
390 keyfile_data: Optional keyfile contents
391 transformed_key: Optional precomputed transformed key (skips KDF)
392 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response
394 Returns:
395 Complete KDBX4 file as bytes
396 """
397 if header.version != KdbxVersion.KDBX4:
398 raise UnsupportedVersionError(header.version.value, 0)
400 # Get transformed key - either use provided one or derive via KDF
401 if transformed_key is not None:
402 # Use precomputed transformed key (skips expensive KDF)
403 master_key_bytes = transformed_key
404 else:
405 # Derive composite key from credentials
406 # KeePassXC: YubiKey response is incorporated into composite key
407 composite_key = derive_composite_key(
408 password=password,
409 keyfile_data=keyfile_data,
410 yubikey_response=yubikey_response,
411 )
412 # Derive master key using KDF (slow)
413 master_key = self._derive_master_key(header, composite_key)
414 master_key_bytes = master_key.data
416 # Derive keys for HMAC and encryption
417 hmac_key, cipher_key = self._derive_keys(master_key_bytes, header.master_seed)
419 # Build inner header
420 inner_header_bytes = self._build_inner_header(inner_header)
422 # Combine inner header and XML
423 payload = inner_header_bytes + xml_data
425 # Compress if needed
426 if header.compression == CompressionType.GZIP:
427 payload = gzip.compress(payload, compresslevel=6)
429 # Add PKCS7 padding for AES-CBC
430 if header.cipher.iv_size == 16: # AES-CBC
431 payload = self._add_pkcs7_padding(payload)
433 # Encrypt payload
434 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv)
435 encrypted_payload = ctx.encrypt(payload)
437 # Build HMAC block stream
438 hmac_blocks = self._build_hmac_block_stream(encrypted_payload, hmac_key)
440 # Build outer header
441 header_bytes = header.to_bytes()
443 # Compute header hash and HMAC
444 header_hash = hashlib.sha256(header_bytes).digest()
445 block_key = self._compute_block_hmac_key(hmac_key, 0xFFFFFFFFFFFFFFFF)
446 header_hmac = compute_hmac_sha256(block_key, header_bytes)
448 # Assemble final file
449 return header_bytes + header_hash + header_hmac + hmac_blocks
451 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes:
452 """Derive master key using the KDF specified in header."""
453 if header.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D):
454 if (
455 header.argon2_memory_kib is None
456 or header.argon2_iterations is None
457 or header.argon2_parallelism is None
458 ):
459 raise KdfError("Missing Argon2 parameters in header")
461 config = Argon2Config(
462 memory_kib=header.argon2_memory_kib,
463 iterations=header.argon2_iterations,
464 parallelism=header.argon2_parallelism,
465 salt=header.kdf_salt,
466 variant=header.kdf_type,
467 )
468 return derive_key_argon2(composite_key.data, config)
469 elif header.kdf_type == KdfType.AES_KDF:
470 if header.aes_kdf_rounds is None:
471 raise KdfError("Missing AES-KDF rounds in header")
473 aes_config = AesKdfConfig(
474 rounds=header.aes_kdf_rounds,
475 salt=header.kdf_salt,
476 )
477 return derive_key_aes_kdf(composite_key.data, aes_config)
478 else:
479 raise KdfError(f"Unsupported KDF for writing: {header.kdf_type}")
481 def _derive_keys(self, transformed_key: bytes, master_seed: bytes) -> tuple[bytes, bytes]:
482 """Derive HMAC key and cipher key from transformed key."""
483 cipher_key = hashlib.sha256(master_seed + transformed_key).digest()
484 hmac_key = hashlib.sha512(master_seed + transformed_key + b"\x01").digest()
485 return hmac_key, cipher_key
487 def _compute_block_hmac_key(self, hmac_key: bytes, block_index: int) -> bytes:
488 """Compute HMAC key for a specific block."""
489 index_bytes = struct.pack("<Q", block_index)
490 return hashlib.sha512(index_bytes + hmac_key).digest()
492 def _build_inner_header(self, inner: InnerHeader) -> bytes:
493 """Build inner header bytes."""
494 ctx = BuildContext()
496 # Random stream ID
497 ctx.write_tlv(
498 InnerHeaderFieldType.INNER_RANDOM_STREAM_ID,
499 struct.pack("<I", inner.random_stream_id),
500 )
502 # Random stream key
503 ctx.write_tlv(
504 InnerHeaderFieldType.INNER_RANDOM_STREAM_KEY,
505 inner.random_stream_key,
506 )
508 # Binary attachments
509 for _idx, (protected, data) in sorted(inner.binaries.items()):
510 binary_data = bytes([1 if protected else 0]) + data
511 ctx.write_tlv(InnerHeaderFieldType.BINARY, binary_data)
513 # End marker
514 ctx.write_tlv(InnerHeaderFieldType.END, b"")
516 return ctx.build()
518 def _add_pkcs7_padding(self, data: bytes) -> bytes:
519 """Add PKCS7 padding to make data a multiple of 16 bytes."""
520 padding_len = 16 - (len(data) % 16)
521 padding = bytes([padding_len] * padding_len)
522 return data + padding
524 def _build_hmac_block_stream(self, data: bytes, hmac_key: bytes) -> bytes:
525 """Build HMAC block stream from data."""
526 ctx = BuildContext()
527 block_index = 0
528 offset = 0
530 while offset < len(data):
531 block_data = data[offset : offset + self.BLOCK_SIZE]
532 block_len = len(block_data)
533 offset += block_len
535 # Compute block HMAC
536 block_key = self._compute_block_hmac_key(hmac_key, block_index)
537 hmac_data = struct.pack("<Q", block_index) + struct.pack("<I", block_len) + block_data
538 block_hmac = compute_hmac_sha256(block_key, hmac_data)
540 ctx.write(block_hmac)
541 ctx.write_u32(block_len)
542 ctx.write(block_data)
544 block_index += 1
546 # Final empty block
547 block_key = self._compute_block_hmac_key(hmac_key, block_index)
548 final_hmac = compute_hmac_sha256(
549 block_key,
550 struct.pack("<Q", block_index) + struct.pack("<I", 0),
551 )
552 ctx.write(final_hmac)
553 ctx.write_u32(0)
555 return ctx.build()
558def read_kdbx4(
559 data: bytes,
560 password: str | None = None,
561 keyfile_data: bytes | None = None,
562 transformed_key: bytes | None = None,
563 yubikey_response: bytes | None = None,
564) -> DecryptedPayload:
565 """Convenience function to read a KDBX4 file.
567 Args:
568 data: Complete file contents
569 password: Optional password
570 keyfile_data: Optional keyfile contents
571 transformed_key: Optional precomputed transformed key (skips KDF)
572 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response
574 Returns:
575 DecryptedPayload with header, inner header, XML, and transformed_key
576 """
577 reader = Kdbx4Reader(data)
578 return reader.decrypt(
579 password=password,
580 keyfile_data=keyfile_data,
581 transformed_key=transformed_key,
582 yubikey_response=yubikey_response,
583 )
586def write_kdbx4(
587 header: KdbxHeader,
588 inner_header: InnerHeader,
589 xml_data: bytes,
590 password: str | None = None,
591 keyfile_data: bytes | None = None,
592 transformed_key: bytes | None = None,
593 yubikey_response: bytes | None = None,
594) -> bytes:
595 """Convenience function to write a KDBX4 file.
597 Args:
598 header: Outer header configuration
599 inner_header: Inner header with stream cipher and binaries
600 xml_data: XML database content
601 password: Optional password
602 keyfile_data: Optional keyfile contents
603 transformed_key: Optional precomputed transformed key (skips KDF)
604 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response
606 Returns:
607 Complete KDBX4 file as bytes
608 """
609 writer = Kdbx4Writer()
610 return writer.encrypt(
611 header=header,
612 inner_header=inner_header,
613 xml_data=xml_data,
614 password=password,
615 keyfile_data=keyfile_data,
616 transformed_key=transformed_key,
617 yubikey_response=yubikey_response,
618 )