Coverage for src / kdbxtool / parsing / header.py: 92%
226 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-20 19:19 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-20 19:19 +0000
1"""KDBX header parsing and structures.
3This module provides typed structures for KDBX file headers:
4- Magic bytes and version detection
5- Outer header fields (cipher, compression, KDF parameters, etc.)
6- Inner header fields (binary attachments, protected stream cipher)
8KDBX format reference:
9https://keepass.info/help/kb/kdbx_4.html
10"""
12from __future__ import annotations
14import contextlib
15import logging
16import struct
17from dataclasses import dataclass, field
18from enum import IntEnum
19from typing import Self
21from kdbxtool.exceptions import (
22 CorruptedDataError,
23 InvalidSignatureError,
24 KdfError,
25 UnsupportedVersionError,
26)
27from kdbxtool.security import Cipher, KdfType
29from .context import BuildContext, ParseContext
31logger = logging.getLogger(__name__)
33# KDBX signature bytes
34KDBX_MAGIC = bytes.fromhex("03d9a29a67fb4bb5") # KeePass 2.x signature
35KDBX4_MAGIC = bytes.fromhex("03d9a29a67fb4bb5") # Same magic, version differs
38class KdbxVersion(IntEnum):
39 """KDBX file format versions."""
41 KDBX3 = 3
42 KDBX4 = 4
45class HeaderFieldType(IntEnum):
46 """Outer header field types for KDBX format.
48 These are the TLV (Type-Length-Value) field identifiers
49 in the outer (unencrypted) header.
50 """
52 END = 0
53 COMMENT = 1 # Unused
54 CIPHER_ID = 2
55 COMPRESSION_FLAGS = 3
56 MASTER_SEED = 4
57 # KDBX3 only:
58 TRANSFORM_SEED = 5 # AES-KDF seed
59 TRANSFORM_ROUNDS = 6 # AES-KDF rounds
60 # Both:
61 ENCRYPTION_IV = 7
62 # KDBX3 only:
63 PROTECTED_STREAM_KEY = 8
64 STREAM_START_BYTES = 9
65 INNER_RANDOM_STREAM_ID = 10
66 # KDBX4 only:
67 KDF_PARAMETERS = 11
68 PUBLIC_CUSTOM_DATA = 12
71class InnerHeaderFieldType(IntEnum):
72 """Inner header field types for KDBX4 format.
74 These appear after decryption, before the XML payload.
75 """
77 END = 0
78 INNER_RANDOM_STREAM_ID = 1
79 INNER_RANDOM_STREAM_KEY = 2
80 BINARY = 3 # Attachment data
83class CompressionType(IntEnum):
84 """Compression algorithms for KDBX payload."""
86 NONE = 0
87 GZIP = 1
90@dataclass(slots=True)
91class KdbxHeader:
92 """Parsed KDBX header data.
94 This class holds all fields from the outer header in a typed format.
95 It supports both KDBX3 and KDBX4, with version-specific fields optional.
96 """
98 # Format version
99 version: KdbxVersion
101 # Cipher for payload encryption
102 cipher: Cipher
104 # Compression for XML payload
105 compression: CompressionType
107 # Random seed for master key derivation (32 bytes)
108 master_seed: bytes
110 # IV for payload encryption (16 bytes for AES, 12 for ChaCha20)
111 encryption_iv: bytes
113 # KDF parameters (KDBX4: Argon2 config, KDBX3: AES-KDF config)
114 kdf_type: KdfType
116 # Argon2/AES-KDF salt (32 bytes)
117 kdf_salt: bytes
119 # For Argon2 (KDBX4)
120 argon2_memory_kib: int | None = None
121 argon2_iterations: int | None = None
122 argon2_parallelism: int | None = None
124 # For AES-KDF (KDBX3)
125 aes_kdf_rounds: int | None = None
127 # KDBX4 inner header fields (populated after decryption)
128 inner_random_stream_id: int | None = None
129 inner_random_stream_key: bytes | None = None
131 # KDBX3 fields
132 stream_start_bytes: bytes | None = None
133 protected_stream_key: bytes | None = None
135 # Raw header bytes for HMAC verification
136 raw_header: bytes = field(default=b"", repr=False)
138 @classmethod
139 def parse(cls, data: bytes) -> tuple[Self, int]:
140 """Parse KDBX header from raw bytes.
142 Args:
143 data: Raw file data starting from beginning
145 Returns:
146 Tuple of (parsed header, number of bytes consumed)
148 Raises:
149 InvalidSignatureError: If magic bytes don't match
150 UnsupportedVersionError: If KDBX version is not supported
151 CorruptedDataError: If header is malformed or truncated
152 """
153 ctx = ParseContext(data)
155 with ctx.scope("signature"):
156 magic = ctx.read(8, "magic")
157 if magic != KDBX_MAGIC:
158 raise InvalidSignatureError(
159 f"Invalid KDBX signature: {magic.hex()} (expected {KDBX_MAGIC.hex()})"
160 )
162 with ctx.scope("version"):
163 version_minor = ctx.read_u16("minor")
164 version_major = ctx.read_u16("major")
166 if version_major == 4:
167 version = KdbxVersion.KDBX4
168 elif version_major == 3:
169 version = KdbxVersion.KDBX3
170 else:
171 raise UnsupportedVersionError(version_major, version_minor)
173 logger.info("Detected KDBX version %d.%d", version_major, version_minor)
175 # Parse header fields
176 header_fields: dict[HeaderFieldType, bytes] = {}
178 with ctx.scope("fields"):
179 while not ctx.exhausted:
180 field_type = ctx.read_u8("type")
182 if version == KdbxVersion.KDBX4:
183 # KDBX4: 4-byte length
184 field_len = ctx.read_u32("length")
185 else:
186 # KDBX3: 2-byte length
187 field_len = ctx.read_u16("length")
189 field_data = ctx.read(field_len, "data")
191 with contextlib.suppress(ValueError):
192 header_fields[HeaderFieldType(field_type)] = field_data
194 if field_type == HeaderFieldType.END:
195 break
197 # Extract required fields
198 raw_header = data[: ctx.offset]
200 # Cipher ID (required)
201 if HeaderFieldType.CIPHER_ID not in header_fields:
202 raise CorruptedDataError("Missing cipher ID in header")
203 cipher = Cipher.from_uuid(header_fields[HeaderFieldType.CIPHER_ID])
204 logger.debug("Cipher: %s", cipher.display_name)
206 # Compression (required)
207 if HeaderFieldType.COMPRESSION_FLAGS not in header_fields:
208 raise CorruptedDataError("Missing compression flags in header")
209 compression_val = struct.unpack("<I", header_fields[HeaderFieldType.COMPRESSION_FLAGS])[0]
210 compression = CompressionType(compression_val)
211 logger.debug("Compression: %s", compression.name)
213 # Master seed (required, 32 bytes)
214 if HeaderFieldType.MASTER_SEED not in header_fields:
215 raise CorruptedDataError("Missing master seed in header")
216 master_seed = header_fields[HeaderFieldType.MASTER_SEED]
217 if len(master_seed) != 32:
218 raise CorruptedDataError(f"Invalid master seed length: {len(master_seed)}")
220 # Encryption IV (required)
221 if HeaderFieldType.ENCRYPTION_IV not in header_fields:
222 raise CorruptedDataError("Missing encryption IV in header")
223 encryption_iv = header_fields[HeaderFieldType.ENCRYPTION_IV]
225 # KDF parameters
226 if version == KdbxVersion.KDBX4:
227 return cls._parse_kdbx4_kdf(
228 header_fields,
229 version,
230 cipher,
231 compression,
232 master_seed,
233 encryption_iv,
234 raw_header,
235 ctx.offset,
236 )
237 else:
238 return cls._parse_kdbx3_kdf(
239 header_fields,
240 version,
241 cipher,
242 compression,
243 master_seed,
244 encryption_iv,
245 raw_header,
246 ctx.offset,
247 )
249 @classmethod
250 def _parse_kdbx4_kdf(
251 cls,
252 fields: dict[HeaderFieldType, bytes],
253 version: KdbxVersion,
254 cipher: Cipher,
255 compression: CompressionType,
256 master_seed: bytes,
257 encryption_iv: bytes,
258 raw_header: bytes,
259 offset: int,
260 ) -> tuple[Self, int]:
261 """Parse KDBX4-specific KDF parameters."""
262 if HeaderFieldType.KDF_PARAMETERS not in fields:
263 raise CorruptedDataError("Missing KDF parameters in KDBX4 header")
265 kdf_data = fields[HeaderFieldType.KDF_PARAMETERS]
266 kdf_params = cls._parse_variant_dict(kdf_data)
268 # Get KDF UUID (must be bytes)
269 kdf_uuid = kdf_params.get("$UUID")
270 if not isinstance(kdf_uuid, bytes):
271 raise KdfError("Missing or invalid KDF UUID in parameters")
272 kdf_type = KdfType.from_uuid(kdf_uuid)
274 # Get salt (must be bytes)
275 kdf_salt = kdf_params.get("S")
276 if not isinstance(kdf_salt, bytes) or len(kdf_salt) != 32:
277 raise KdfError("Invalid or missing KDF salt")
279 argon2_memory: int | None = None
280 argon2_iterations: int | None = None
281 argon2_parallelism: int | None = None
282 aes_kdf_rounds: int | None = None
284 if kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D):
285 # Argon2 parameters (must be ints)
286 memory = kdf_params.get("M")
287 iterations = kdf_params.get("I")
288 parallelism = kdf_params.get("P")
290 if (
291 not isinstance(memory, int)
292 or not isinstance(iterations, int)
293 or not isinstance(parallelism, int)
294 ):
295 raise KdfError("Missing or invalid Argon2 parameters")
297 argon2_memory = memory // 1024 # Convert bytes to KiB
298 argon2_iterations = iterations
299 argon2_parallelism = parallelism
300 logger.debug(
301 "Argon2: memory=%d KiB, iterations=%d, parallelism=%d",
302 argon2_memory,
303 argon2_iterations,
304 argon2_parallelism,
305 )
306 elif kdf_type == KdfType.AES_KDF:
307 # AES-KDF parameters
308 rounds = kdf_params.get("R")
310 if not isinstance(rounds, int):
311 raise KdfError("Missing or invalid AES-KDF rounds parameter")
313 aes_kdf_rounds = rounds
314 logger.debug("AES-KDF rounds: %d", aes_kdf_rounds)
316 return (
317 cls(
318 version=version,
319 cipher=cipher,
320 compression=compression,
321 master_seed=master_seed,
322 encryption_iv=encryption_iv,
323 kdf_type=kdf_type,
324 kdf_salt=kdf_salt,
325 argon2_memory_kib=argon2_memory,
326 argon2_iterations=argon2_iterations,
327 argon2_parallelism=argon2_parallelism,
328 aes_kdf_rounds=aes_kdf_rounds,
329 raw_header=raw_header,
330 ),
331 offset,
332 )
334 @classmethod
335 def _parse_kdbx3_kdf(
336 cls,
337 fields: dict[HeaderFieldType, bytes],
338 version: KdbxVersion,
339 cipher: Cipher,
340 compression: CompressionType,
341 master_seed: bytes,
342 encryption_iv: bytes,
343 raw_header: bytes,
344 offset: int,
345 ) -> tuple[Self, int]:
346 """Parse KDBX3-specific KDF parameters (AES-KDF)."""
347 # Transform seed (AES-KDF key)
348 if HeaderFieldType.TRANSFORM_SEED not in fields:
349 raise CorruptedDataError("Missing transform seed in KDBX3 header")
350 kdf_salt = fields[HeaderFieldType.TRANSFORM_SEED]
351 if len(kdf_salt) != 32:
352 raise CorruptedDataError(f"Invalid transform seed length: {len(kdf_salt)}")
354 # Transform rounds
355 if HeaderFieldType.TRANSFORM_ROUNDS not in fields:
356 raise CorruptedDataError("Missing transform rounds in KDBX3 header")
357 aes_kdf_rounds = struct.unpack("<Q", fields[HeaderFieldType.TRANSFORM_ROUNDS])[0]
358 logger.debug("AES-KDF rounds: %d", aes_kdf_rounds)
360 # Stream start bytes (for verification)
361 stream_start = fields.get(HeaderFieldType.STREAM_START_BYTES)
363 # Protected stream key (in outer header for KDBX3)
364 protected_key = fields.get(HeaderFieldType.PROTECTED_STREAM_KEY)
366 # Protected stream ID (in outer header for KDBX3)
367 stream_id = None
368 if HeaderFieldType.INNER_RANDOM_STREAM_ID in fields:
369 stream_id = struct.unpack("<I", fields[HeaderFieldType.INNER_RANDOM_STREAM_ID])[0]
371 return (
372 cls(
373 version=version,
374 cipher=cipher,
375 compression=compression,
376 master_seed=master_seed,
377 encryption_iv=encryption_iv,
378 kdf_type=KdfType.AES_KDF,
379 kdf_salt=kdf_salt,
380 aes_kdf_rounds=aes_kdf_rounds,
381 stream_start_bytes=stream_start,
382 protected_stream_key=protected_key,
383 inner_random_stream_id=stream_id,
384 raw_header=raw_header,
385 ),
386 offset,
387 )
389 @staticmethod
390 def _parse_variant_dict(data: bytes) -> dict[str, bytes | int | bool | str]:
391 """Parse KDBX4 VariantDictionary format.
393 VariantDictionary is a TLV format used for KDF parameters:
394 - 2 bytes: version (0x0100)
395 - Entries until type 0x00:
396 - 1 byte: type
397 - 4 bytes: key length
398 - key bytes
399 - 4 bytes: value length
400 - value bytes
402 Types:
403 - 0x00: End
404 - 0x04: UInt32
405 - 0x05: UInt64
406 - 0x08: Bool
407 - 0x0C: Int32
408 - 0x0D: Int64
409 - 0x18: String
410 - 0x42: ByteArray
411 """
412 ctx = ParseContext(data)
414 with ctx.scope("variant_dict"):
415 version = ctx.read_u16("version")
416 if version != 0x0100:
417 raise CorruptedDataError(f"Unsupported VariantDictionary version: {version:#x}")
419 result: dict[str, bytes | int | bool | str] = {}
421 while not ctx.exhausted:
422 entry_type = ctx.read_u8("entry_type")
424 if entry_type == 0x00: # End
425 break
427 with ctx.scope(f"entry[{entry_type:#x}]"):
428 # Read key
429 key_data = ctx.read_bytes_prefixed("key")
430 key = key_data.decode("utf-8")
432 # Read value
433 val_data = ctx.read_bytes_prefixed("value")
435 # Parse value based on type
436 if entry_type == 0x04: # UInt32
437 result[key] = struct.unpack("<I", val_data)[0]
438 elif entry_type == 0x05: # UInt64
439 result[key] = struct.unpack("<Q", val_data)[0]
440 elif entry_type == 0x08: # Bool
441 result[key] = val_data[0] != 0
442 elif entry_type == 0x0C: # Int32
443 result[key] = struct.unpack("<i", val_data)[0]
444 elif entry_type == 0x0D: # Int64
445 result[key] = struct.unpack("<q", val_data)[0]
446 elif entry_type == 0x42: # ByteArray
447 result[key] = val_data
448 elif entry_type == 0x18: # String
449 result[key] = val_data.decode("utf-8")
450 else:
451 # Unknown type, store as bytes
452 result[key] = val_data
454 return result
456 def to_bytes(self) -> bytes:
457 """Serialize header to KDBX4 binary format.
459 Returns:
460 Binary header data ready to be written to file
462 Raises:
463 UnsupportedVersionError: If not KDBX4 format
464 KdfError: If Argon2 parameters are missing
465 """
466 if self.version != KdbxVersion.KDBX4:
467 raise UnsupportedVersionError(self.version.value, 0)
469 ctx = BuildContext()
471 # Magic and version
472 ctx.write(KDBX_MAGIC)
473 ctx.write_u16(1) # Minor version
474 ctx.write_u16(4) # Major version
476 # Cipher ID
477 ctx.write_tlv(HeaderFieldType.CIPHER_ID, self.cipher.value)
479 # Compression
480 ctx.write_tlv(
481 HeaderFieldType.COMPRESSION_FLAGS,
482 struct.pack("<I", self.compression.value),
483 )
485 # Master seed
486 ctx.write_tlv(HeaderFieldType.MASTER_SEED, self.master_seed)
488 # Encryption IV
489 ctx.write_tlv(HeaderFieldType.ENCRYPTION_IV, self.encryption_iv)
491 # KDF parameters as VariantDictionary
492 kdf_dict = self._build_kdf_variant_dict()
493 ctx.write_tlv(HeaderFieldType.KDF_PARAMETERS, kdf_dict)
495 # End of header
496 ctx.write_tlv(HeaderFieldType.END, b"\r\n\r\n")
498 return ctx.build()
500 def _build_kdf_variant_dict(self) -> bytes:
501 """Build VariantDictionary for KDF parameters."""
502 ctx = BuildContext()
504 # Version
505 ctx.write_u16(0x0100)
507 def add_entry(entry_type: int, key: str, value: bytes) -> None:
508 """Add an entry to the variant dictionary."""
509 key_bytes = key.encode("utf-8")
510 ctx.write_u8(entry_type)
511 ctx.write_bytes_prefixed(key_bytes)
512 ctx.write_bytes_prefixed(value)
514 # KDF UUID
515 add_entry(0x42, "$UUID", self.kdf_type.value)
517 # Salt
518 add_entry(0x42, "S", self.kdf_salt)
520 if self.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D):
521 if (
522 self.argon2_memory_kib is None
523 or self.argon2_iterations is None
524 or self.argon2_parallelism is None
525 ):
526 raise KdfError("Missing Argon2 parameters")
528 # Memory in bytes (UInt64)
529 add_entry(0x05, "M", struct.pack("<Q", self.argon2_memory_kib * 1024))
530 # Iterations (UInt64)
531 add_entry(0x05, "I", struct.pack("<Q", self.argon2_iterations))
532 # Parallelism (UInt32)
533 add_entry(0x04, "P", struct.pack("<I", self.argon2_parallelism))
534 # Version (UInt32) - Argon2 version 0x13
535 add_entry(0x04, "V", struct.pack("<I", 0x13))
536 elif self.kdf_type == KdfType.AES_KDF:
537 if self.aes_kdf_rounds is None:
538 raise KdfError("Missing AES-KDF rounds")
540 # Rounds (UInt64)
541 add_entry(0x05, "R", struct.pack("<Q", self.aes_kdf_rounds))
543 # End marker
544 ctx.write_u8(0x00)
546 return ctx.build()