Coverage for src / kdbxtool / parsing / kdbx4.py: 94%
232 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-20 19:19 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-20 19:19 +0000
1"""KDBX4 payload encryption and decryption.
3This module handles the cryptographic operations for KDBX4 files:
4- Master key derivation from credentials
5- Header integrity verification (HMAC-SHA256)
6- Payload decryption and encryption
7- Block-based HMAC verification (HmacBlockStream)
8- Inner header parsing
10KDBX4 structure:
111. Outer header (plaintext)
122. SHA-256 hash of header
133. HMAC-SHA256 of header
144. Encrypted payload (HmacBlockStream format)
15 - Inner header
16 - XML database content
17"""
19from __future__ import annotations
21import gzip
22import hashlib
23import logging
24import struct
25import warnings
26from dataclasses import dataclass
27from typing import TYPE_CHECKING
29from kdbxtool.exceptions import (
30 AuthenticationError,
31 CorruptedDataError,
32 DecryptionError,
33 KdfError,
34 UnsupportedVersionError,
35)
36from kdbxtool.security import (
37 Argon2Config,
38 CipherContext,
39 SecureBytes,
40 compute_hmac_sha256,
41 constant_time_compare,
42 derive_composite_key,
43 derive_key_aes_kdf,
44 derive_key_argon2,
45)
46from kdbxtool.security.kdf import AesKdfConfig, KdfType
48from .context import BuildContext, ParseContext
49from .header import (
50 CompressionType,
51 InnerHeaderFieldType,
52 KdbxHeader,
53 KdbxVersion,
54)
56if TYPE_CHECKING:
57 pass
59logger = logging.getLogger(__name__)
61# Maximum size for a single binary attachment (512 MiB)
62# Prevents memory exhaustion from malicious KDBX files
63MAX_BINARY_SIZE = 512 * 1024 * 1024
66@dataclass(slots=True)
67class InnerHeader:
68 """KDBX4 inner header data.
70 The inner header appears after decryption, before the XML payload.
71 It contains the protected stream cipher settings and binary attachments.
72 """
74 # Random stream for protected values (e.g., passwords in XML)
75 random_stream_id: int
76 random_stream_key: bytes
78 # Binary attachments (id -> data with protection flag)
79 binaries: dict[int, tuple[bool, bytes]]
82@dataclass(slots=True)
83class DecryptedPayload:
84 """Result of decrypting a KDBX4 file.
86 Contains all data needed to work with the database.
87 """
89 header: KdbxHeader
90 inner_header: InnerHeader
91 xml_data: bytes
92 transformed_key: bytes | None = None # For caching to speed up repeated opens
95class Kdbx4Reader:
96 """Reader for KDBX4 database files."""
98 def __init__(self, data: bytes) -> None:
99 """Initialize reader with file data.
101 Args:
102 data: Complete KDBX4 file contents
103 """
104 self._ctx = ParseContext(data)
106 def decrypt(
107 self,
108 password: str | None = None,
109 keyfile_data: bytes | None = None,
110 transformed_key: bytes | None = None,
111 yubikey_response: bytes | None = None,
112 ) -> DecryptedPayload:
113 """Decrypt the KDBX4 file.
115 Args:
116 password: Optional password
117 keyfile_data: Optional keyfile contents
118 transformed_key: Optional precomputed transformed key (skips KDF)
119 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response
121 Returns:
122 DecryptedPayload with header, inner header, XML, and transformed_key
124 Raises:
125 ValueError: If decryption fails (wrong credentials, corrupted file)
126 """
127 logger.debug("Starting KDBX4 decryption")
129 # Parse outer header
130 header, header_end = KdbxHeader.parse(self._ctx.data)
132 if header.version != KdbxVersion.KDBX4:
133 raise UnsupportedVersionError(header.version.value, 0)
135 self._ctx.offset = header_end
137 # Read header hash and HMAC
138 with self._ctx.scope("header_verification"):
139 header_hash = self._ctx.read(32, "header_hash")
140 header_hmac = self._ctx.read(32, "header_hmac")
142 # Verify header hash
143 computed_hash = hashlib.sha256(header.raw_header).digest()
144 if not constant_time_compare(computed_hash, header_hash):
145 raise CorruptedDataError("Header hash mismatch - file may be corrupted")
146 logger.debug("Header hash verified")
148 # Get transformed key - either use provided one or derive via KDF
149 if transformed_key is not None:
150 # Use precomputed transformed key (skips expensive KDF)
151 logger.debug("Using cached transformed key")
152 master_key_bytes = transformed_key
153 else:
154 # Derive composite key from credentials
155 # KeePassXC: YubiKey response is incorporated into composite key
156 logger.debug("Starting KDF derivation")
157 composite_key = derive_composite_key(
158 password=password,
159 keyfile_data=keyfile_data,
160 yubikey_response=yubikey_response,
161 )
162 # Derive master key using KDF (slow)
163 master_key = self._derive_master_key(header, composite_key)
164 master_key_bytes = master_key.data
166 # Derive keys for HMAC and encryption
167 hmac_key, cipher_key = self._derive_keys(master_key_bytes, header.master_seed)
169 # Verify header HMAC
170 block_key = self._compute_block_hmac_key(hmac_key, 0xFFFFFFFFFFFFFFFF)
171 computed_hmac = compute_hmac_sha256(block_key, header.raw_header)
172 if not constant_time_compare(computed_hmac, header_hmac):
173 raise AuthenticationError()
174 logger.debug("Header HMAC verified")
176 # Read and verify HMAC block stream
177 encrypted_payload = self._read_hmac_block_stream(hmac_key)
179 # Decrypt payload
180 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv)
181 decrypted = ctx.decrypt(encrypted_payload)
183 # Remove PKCS7 padding for AES-CBC
184 if header.cipher.iv_size == 16: # AES-CBC
185 decrypted = self._remove_pkcs7_padding(decrypted)
187 # Decompress if needed
188 if header.compression == CompressionType.GZIP:
189 decrypted = gzip.decompress(decrypted)
191 logger.debug("Payload decrypted, %d bytes", len(decrypted))
193 # Parse inner header
194 inner_header, xml_start = self._parse_inner_header(decrypted)
196 # Extract XML
197 xml_data = decrypted[xml_start:]
199 return DecryptedPayload(
200 header=header,
201 inner_header=inner_header,
202 xml_data=xml_data,
203 transformed_key=master_key_bytes,
204 )
206 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes:
207 """Derive master key using the KDF specified in header."""
208 if header.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D):
209 if (
210 header.argon2_memory_kib is None
211 or header.argon2_iterations is None
212 or header.argon2_parallelism is None
213 ):
214 raise KdfError("Missing Argon2 parameters in header")
216 argon2_config = Argon2Config(
217 memory_kib=header.argon2_memory_kib,
218 iterations=header.argon2_iterations,
219 parallelism=header.argon2_parallelism,
220 salt=header.kdf_salt,
221 variant=header.kdf_type,
222 )
223 # Warn if parameters are below security minimums
224 try:
225 argon2_config.validate_security()
226 except KdfError as e:
227 warnings.warn(
228 f"Database has weak KDF parameters: {e}. "
229 "Consider re-saving with stronger settings.",
230 UserWarning,
231 stacklevel=4,
232 )
233 # Don't enforce minimums when reading - accept what the file has
234 return derive_key_argon2(composite_key.data, argon2_config, enforce_minimums=False)
235 elif header.kdf_type == KdfType.AES_KDF:
236 if header.aes_kdf_rounds is None:
237 raise KdfError("Missing AES-KDF rounds in header")
238 aes_config = AesKdfConfig(
239 rounds=header.aes_kdf_rounds,
240 salt=header.kdf_salt,
241 )
242 return derive_key_aes_kdf(composite_key.data, aes_config)
243 else:
244 raise KdfError(f"Unsupported KDF: {header.kdf_type}")
246 def _derive_keys(self, transformed_key: bytes, master_seed: bytes) -> tuple[bytes, bytes]:
247 """Derive HMAC key and cipher key from transformed key.
249 KDBX4 key derivation:
250 - cipher_key = SHA256(master_seed || transformed_key)
251 - hmac_key = SHA512(master_seed || transformed_key || 0x01)
252 """
253 cipher_key = hashlib.sha256(master_seed + transformed_key).digest()
254 hmac_key = hashlib.sha512(master_seed + transformed_key + b"\x01").digest()
256 return hmac_key, cipher_key
258 def _compute_block_hmac_key(self, hmac_key: bytes, block_index: int) -> bytes:
259 """Compute HMAC key for a specific block.
261 Each block uses a different key derived from the master HMAC key.
262 key = SHA512(block_index_le64 || hmac_key)
263 """
264 index_bytes = struct.pack("<Q", block_index)
265 return hashlib.sha512(index_bytes + hmac_key).digest()
267 def _read_hmac_block_stream(self, hmac_key: bytes) -> bytes:
268 """Read and verify HMAC block stream.
270 KDBX4 uses a block-based format with per-block HMAC:
271 - 32 bytes: HMAC of (block_index || length || data)
272 - 4 bytes: block length (little-endian)
273 - N bytes: block data
275 Last block has length 0.
276 """
277 blocks = []
278 block_index = 0
280 with self._ctx.scope("hmac_blocks"):
281 while True:
282 with self._ctx.scope(f"block[{block_index}]"):
283 block_hmac = self._ctx.read(32, "hmac")
284 block_len = self._ctx.read_u32("length")
286 if block_len == 0:
287 # Verify final block HMAC
288 block_key = self._compute_block_hmac_key(hmac_key, block_index)
289 expected = compute_hmac_sha256(
290 block_key,
291 struct.pack("<Q", block_index) + struct.pack("<I", 0),
292 )
293 if not constant_time_compare(expected, block_hmac):
294 raise AuthenticationError("Block authentication failed")
295 break
297 block_data = self._ctx.read(block_len, "data")
299 # Verify block HMAC
300 block_key = self._compute_block_hmac_key(hmac_key, block_index)
301 hmac_data = (
302 struct.pack("<Q", block_index) + struct.pack("<I", block_len) + block_data
303 )
304 expected = compute_hmac_sha256(block_key, hmac_data)
306 if not constant_time_compare(expected, block_hmac):
307 raise AuthenticationError("Block authentication failed")
309 blocks.append(block_data)
310 block_index += 1
312 logger.debug("Verified %d HMAC blocks", block_index)
313 return b"".join(blocks)
315 def _remove_pkcs7_padding(self, data: bytes) -> bytes:
316 """Remove PKCS7 padding from decrypted data.
318 Note: Padding oracle attacks are not possible here because HMAC
319 verification on the ciphertext occurs BEFORE decryption. Any
320 ciphertext modification would fail HMAC verification first.
321 We still use generic error messages for defense-in-depth.
322 """
323 if not data:
324 raise DecryptionError()
325 padding_len = data[-1]
326 if padding_len == 0 or padding_len > 16:
327 raise DecryptionError()
328 # Verify all padding bytes are correct
329 for i in range(1, padding_len + 1):
330 if data[-i] != padding_len:
331 raise DecryptionError()
332 return data[:-padding_len]
334 def _parse_inner_header(self, data: bytes) -> tuple[InnerHeader, int]:
335 """Parse KDBX4 inner header.
337 Returns inner header and offset where XML starts.
338 """
339 ctx = ParseContext(data)
340 random_stream_id = 0
341 random_stream_key = b""
342 binaries: dict[int, tuple[bool, bytes]] = {}
343 binary_index = 0
345 with ctx.scope("inner_header"):
346 while not ctx.exhausted:
347 field_type = ctx.read_u8("type")
348 field_len = ctx.read_u32("length")
349 field_data = ctx.read(field_len, "data")
351 if field_type == InnerHeaderFieldType.END:
352 break
353 elif field_type == InnerHeaderFieldType.INNER_RANDOM_STREAM_ID:
354 random_stream_id = struct.unpack("<I", field_data)[0]
355 elif field_type == InnerHeaderFieldType.INNER_RANDOM_STREAM_KEY:
356 random_stream_key = field_data
357 elif field_type == InnerHeaderFieldType.BINARY:
358 # First byte is protection flag
359 binary_data = field_data[1:]
360 if len(binary_data) > MAX_BINARY_SIZE:
361 raise CorruptedDataError(
362 f"Binary attachment too large: {len(binary_data)} bytes "
363 f"(max {MAX_BINARY_SIZE} bytes)"
364 )
365 protected = field_data[0] != 0
366 binaries[binary_index] = (protected, binary_data)
367 binary_index += 1
369 return (
370 InnerHeader(
371 random_stream_id=random_stream_id,
372 random_stream_key=random_stream_key,
373 binaries=binaries,
374 ),
375 ctx.offset,
376 )
379class Kdbx4Writer:
380 """Writer for KDBX4 database files."""
382 # Default block size for HMAC block stream (1 MiB)
383 BLOCK_SIZE = 1024 * 1024
385 def encrypt(
386 self,
387 header: KdbxHeader,
388 inner_header: InnerHeader,
389 xml_data: bytes,
390 password: str | None = None,
391 keyfile_data: bytes | None = None,
392 transformed_key: bytes | None = None,
393 yubikey_response: bytes | None = None,
394 ) -> bytes:
395 """Encrypt database to KDBX4 format.
397 Args:
398 header: Outer header configuration
399 inner_header: Inner header with stream cipher and binaries
400 xml_data: XML database content
401 password: Optional password
402 keyfile_data: Optional keyfile contents
403 transformed_key: Optional precomputed transformed key (skips KDF)
404 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response
406 Returns:
407 Complete KDBX4 file as bytes
408 """
409 logger.debug("Starting KDBX4 encryption")
411 if header.version != KdbxVersion.KDBX4:
412 raise UnsupportedVersionError(header.version.value, 0)
414 # Get transformed key - either use provided one or derive via KDF
415 if transformed_key is not None:
416 # Use precomputed transformed key (skips expensive KDF)
417 master_key_bytes = transformed_key
418 else:
419 # Derive composite key from credentials
420 # KeePassXC: YubiKey response is incorporated into composite key
421 composite_key = derive_composite_key(
422 password=password,
423 keyfile_data=keyfile_data,
424 yubikey_response=yubikey_response,
425 )
426 # Derive master key using KDF (slow)
427 master_key = self._derive_master_key(header, composite_key)
428 master_key_bytes = master_key.data
430 # Derive keys for HMAC and encryption
431 hmac_key, cipher_key = self._derive_keys(master_key_bytes, header.master_seed)
433 # Build inner header
434 inner_header_bytes = self._build_inner_header(inner_header)
436 # Combine inner header and XML
437 payload = inner_header_bytes + xml_data
439 # Compress if needed
440 if header.compression == CompressionType.GZIP:
441 payload = gzip.compress(payload, compresslevel=6)
443 # Add PKCS7 padding for AES-CBC
444 if header.cipher.iv_size == 16: # AES-CBC
445 payload = self._add_pkcs7_padding(payload)
447 # Encrypt payload
448 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv)
449 encrypted_payload = ctx.encrypt(payload)
451 # Build HMAC block stream
452 hmac_blocks = self._build_hmac_block_stream(encrypted_payload, hmac_key)
454 # Build outer header
455 header_bytes = header.to_bytes()
457 # Compute header hash and HMAC
458 header_hash = hashlib.sha256(header_bytes).digest()
459 block_key = self._compute_block_hmac_key(hmac_key, 0xFFFFFFFFFFFFFFFF)
460 header_hmac = compute_hmac_sha256(block_key, header_bytes)
462 # Assemble final file
463 return header_bytes + header_hash + header_hmac + hmac_blocks
465 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes:
466 """Derive master key using the KDF specified in header."""
467 if header.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D):
468 if (
469 header.argon2_memory_kib is None
470 or header.argon2_iterations is None
471 or header.argon2_parallelism is None
472 ):
473 raise KdfError("Missing Argon2 parameters in header")
475 config = Argon2Config(
476 memory_kib=header.argon2_memory_kib,
477 iterations=header.argon2_iterations,
478 parallelism=header.argon2_parallelism,
479 salt=header.kdf_salt,
480 variant=header.kdf_type,
481 )
482 return derive_key_argon2(composite_key.data, config)
483 elif header.kdf_type == KdfType.AES_KDF:
484 if header.aes_kdf_rounds is None:
485 raise KdfError("Missing AES-KDF rounds in header")
487 aes_config = AesKdfConfig(
488 rounds=header.aes_kdf_rounds,
489 salt=header.kdf_salt,
490 )
491 return derive_key_aes_kdf(composite_key.data, aes_config)
492 else:
493 raise KdfError(f"Unsupported KDF for writing: {header.kdf_type}")
495 def _derive_keys(self, transformed_key: bytes, master_seed: bytes) -> tuple[bytes, bytes]:
496 """Derive HMAC key and cipher key from transformed key."""
497 cipher_key = hashlib.sha256(master_seed + transformed_key).digest()
498 hmac_key = hashlib.sha512(master_seed + transformed_key + b"\x01").digest()
499 return hmac_key, cipher_key
501 def _compute_block_hmac_key(self, hmac_key: bytes, block_index: int) -> bytes:
502 """Compute HMAC key for a specific block."""
503 index_bytes = struct.pack("<Q", block_index)
504 return hashlib.sha512(index_bytes + hmac_key).digest()
506 def _build_inner_header(self, inner: InnerHeader) -> bytes:
507 """Build inner header bytes."""
508 ctx = BuildContext()
510 # Random stream ID
511 ctx.write_tlv(
512 InnerHeaderFieldType.INNER_RANDOM_STREAM_ID,
513 struct.pack("<I", inner.random_stream_id),
514 )
516 # Random stream key
517 ctx.write_tlv(
518 InnerHeaderFieldType.INNER_RANDOM_STREAM_KEY,
519 inner.random_stream_key,
520 )
522 # Binary attachments
523 for _idx, (protected, data) in sorted(inner.binaries.items()):
524 binary_data = bytes([1 if protected else 0]) + data
525 ctx.write_tlv(InnerHeaderFieldType.BINARY, binary_data)
527 # End marker
528 ctx.write_tlv(InnerHeaderFieldType.END, b"")
530 return ctx.build()
532 def _add_pkcs7_padding(self, data: bytes) -> bytes:
533 """Add PKCS7 padding to make data a multiple of 16 bytes."""
534 padding_len = 16 - (len(data) % 16)
535 padding = bytes([padding_len] * padding_len)
536 return data + padding
538 def _build_hmac_block_stream(self, data: bytes, hmac_key: bytes) -> bytes:
539 """Build HMAC block stream from data."""
540 ctx = BuildContext()
541 block_index = 0
542 offset = 0
544 while offset < len(data):
545 block_data = data[offset : offset + self.BLOCK_SIZE]
546 block_len = len(block_data)
547 offset += block_len
549 # Compute block HMAC
550 block_key = self._compute_block_hmac_key(hmac_key, block_index)
551 hmac_data = struct.pack("<Q", block_index) + struct.pack("<I", block_len) + block_data
552 block_hmac = compute_hmac_sha256(block_key, hmac_data)
554 ctx.write(block_hmac)
555 ctx.write_u32(block_len)
556 ctx.write(block_data)
558 block_index += 1
560 # Final empty block
561 block_key = self._compute_block_hmac_key(hmac_key, block_index)
562 final_hmac = compute_hmac_sha256(
563 block_key,
564 struct.pack("<Q", block_index) + struct.pack("<I", 0),
565 )
566 ctx.write(final_hmac)
567 ctx.write_u32(0)
569 return ctx.build()
572def read_kdbx4(
573 data: bytes,
574 password: str | None = None,
575 keyfile_data: bytes | None = None,
576 transformed_key: bytes | None = None,
577 yubikey_response: bytes | None = None,
578) -> DecryptedPayload:
579 """Convenience function to read a KDBX4 file.
581 Args:
582 data: Complete file contents
583 password: Optional password
584 keyfile_data: Optional keyfile contents
585 transformed_key: Optional precomputed transformed key (skips KDF)
586 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response
588 Returns:
589 DecryptedPayload with header, inner header, XML, and transformed_key
590 """
591 reader = Kdbx4Reader(data)
592 return reader.decrypt(
593 password=password,
594 keyfile_data=keyfile_data,
595 transformed_key=transformed_key,
596 yubikey_response=yubikey_response,
597 )
600def write_kdbx4(
601 header: KdbxHeader,
602 inner_header: InnerHeader,
603 xml_data: bytes,
604 password: str | None = None,
605 keyfile_data: bytes | None = None,
606 transformed_key: bytes | None = None,
607 yubikey_response: bytes | None = None,
608) -> bytes:
609 """Convenience function to write a KDBX4 file.
611 Args:
612 header: Outer header configuration
613 inner_header: Inner header with stream cipher and binaries
614 xml_data: XML database content
615 password: Optional password
616 keyfile_data: Optional keyfile contents
617 transformed_key: Optional precomputed transformed key (skips KDF)
618 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response
620 Returns:
621 Complete KDBX4 file as bytes
622 """
623 writer = Kdbx4Writer()
624 return writer.encrypt(
625 header=header,
626 inner_header=inner_header,
627 xml_data=xml_data,
628 password=password,
629 keyfile_data=keyfile_data,
630 transformed_key=transformed_key,
631 yubikey_response=yubikey_response,
632 )