Coverage for src / kdbxtool / parsing / header.py: 92%

226 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-20 19:19 +0000

1"""KDBX header parsing and structures. 

2 

3This module provides typed structures for KDBX file headers: 

4- Magic bytes and version detection 

5- Outer header fields (cipher, compression, KDF parameters, etc.) 

6- Inner header fields (binary attachments, protected stream cipher) 

7 

8KDBX format reference: 

9https://keepass.info/help/kb/kdbx_4.html 

10""" 

11 

12from __future__ import annotations 

13 

14import contextlib 

15import logging 

16import struct 

17from dataclasses import dataclass, field 

18from enum import IntEnum 

19from typing import Self 

20 

21from kdbxtool.exceptions import ( 

22 CorruptedDataError, 

23 InvalidSignatureError, 

24 KdfError, 

25 UnsupportedVersionError, 

26) 

27from kdbxtool.security import Cipher, KdfType 

28 

29from .context import BuildContext, ParseContext 

30 

31logger = logging.getLogger(__name__) 

32 

33# KDBX signature bytes 

34KDBX_MAGIC = bytes.fromhex("03d9a29a67fb4bb5") # KeePass 2.x signature 

35KDBX4_MAGIC = bytes.fromhex("03d9a29a67fb4bb5") # Same magic, version differs 

36 

37 

38class KdbxVersion(IntEnum): 

39 """KDBX file format versions.""" 

40 

41 KDBX3 = 3 

42 KDBX4 = 4 

43 

44 

45class HeaderFieldType(IntEnum): 

46 """Outer header field types for KDBX format. 

47 

48 These are the TLV (Type-Length-Value) field identifiers 

49 in the outer (unencrypted) header. 

50 """ 

51 

52 END = 0 

53 COMMENT = 1 # Unused 

54 CIPHER_ID = 2 

55 COMPRESSION_FLAGS = 3 

56 MASTER_SEED = 4 

57 # KDBX3 only: 

58 TRANSFORM_SEED = 5 # AES-KDF seed 

59 TRANSFORM_ROUNDS = 6 # AES-KDF rounds 

60 # Both: 

61 ENCRYPTION_IV = 7 

62 # KDBX3 only: 

63 PROTECTED_STREAM_KEY = 8 

64 STREAM_START_BYTES = 9 

65 INNER_RANDOM_STREAM_ID = 10 

66 # KDBX4 only: 

67 KDF_PARAMETERS = 11 

68 PUBLIC_CUSTOM_DATA = 12 

69 

70 

71class InnerHeaderFieldType(IntEnum): 

72 """Inner header field types for KDBX4 format. 

73 

74 These appear after decryption, before the XML payload. 

75 """ 

76 

77 END = 0 

78 INNER_RANDOM_STREAM_ID = 1 

79 INNER_RANDOM_STREAM_KEY = 2 

80 BINARY = 3 # Attachment data 

81 

82 

83class CompressionType(IntEnum): 

84 """Compression algorithms for KDBX payload.""" 

85 

86 NONE = 0 

87 GZIP = 1 

88 

89 

90@dataclass(slots=True) 

91class KdbxHeader: 

92 """Parsed KDBX header data. 

93 

94 This class holds all fields from the outer header in a typed format. 

95 It supports both KDBX3 and KDBX4, with version-specific fields optional. 

96 """ 

97 

98 # Format version 

99 version: KdbxVersion 

100 

101 # Cipher for payload encryption 

102 cipher: Cipher 

103 

104 # Compression for XML payload 

105 compression: CompressionType 

106 

107 # Random seed for master key derivation (32 bytes) 

108 master_seed: bytes 

109 

110 # IV for payload encryption (16 bytes for AES, 12 for ChaCha20) 

111 encryption_iv: bytes 

112 

113 # KDF parameters (KDBX4: Argon2 config, KDBX3: AES-KDF config) 

114 kdf_type: KdfType 

115 

116 # Argon2/AES-KDF salt (32 bytes) 

117 kdf_salt: bytes 

118 

119 # For Argon2 (KDBX4) 

120 argon2_memory_kib: int | None = None 

121 argon2_iterations: int | None = None 

122 argon2_parallelism: int | None = None 

123 

124 # For AES-KDF (KDBX3) 

125 aes_kdf_rounds: int | None = None 

126 

127 # KDBX4 inner header fields (populated after decryption) 

128 inner_random_stream_id: int | None = None 

129 inner_random_stream_key: bytes | None = None 

130 

131 # KDBX3 fields 

132 stream_start_bytes: bytes | None = None 

133 protected_stream_key: bytes | None = None 

134 

135 # Raw header bytes for HMAC verification 

136 raw_header: bytes = field(default=b"", repr=False) 

137 

138 @classmethod 

139 def parse(cls, data: bytes) -> tuple[Self, int]: 

140 """Parse KDBX header from raw bytes. 

141 

142 Args: 

143 data: Raw file data starting from beginning 

144 

145 Returns: 

146 Tuple of (parsed header, number of bytes consumed) 

147 

148 Raises: 

149 InvalidSignatureError: If magic bytes don't match 

150 UnsupportedVersionError: If KDBX version is not supported 

151 CorruptedDataError: If header is malformed or truncated 

152 """ 

153 ctx = ParseContext(data) 

154 

155 with ctx.scope("signature"): 

156 magic = ctx.read(8, "magic") 

157 if magic != KDBX_MAGIC: 

158 raise InvalidSignatureError( 

159 f"Invalid KDBX signature: {magic.hex()} (expected {KDBX_MAGIC.hex()})" 

160 ) 

161 

162 with ctx.scope("version"): 

163 version_minor = ctx.read_u16("minor") 

164 version_major = ctx.read_u16("major") 

165 

166 if version_major == 4: 

167 version = KdbxVersion.KDBX4 

168 elif version_major == 3: 

169 version = KdbxVersion.KDBX3 

170 else: 

171 raise UnsupportedVersionError(version_major, version_minor) 

172 

173 logger.info("Detected KDBX version %d.%d", version_major, version_minor) 

174 

175 # Parse header fields 

176 header_fields: dict[HeaderFieldType, bytes] = {} 

177 

178 with ctx.scope("fields"): 

179 while not ctx.exhausted: 

180 field_type = ctx.read_u8("type") 

181 

182 if version == KdbxVersion.KDBX4: 

183 # KDBX4: 4-byte length 

184 field_len = ctx.read_u32("length") 

185 else: 

186 # KDBX3: 2-byte length 

187 field_len = ctx.read_u16("length") 

188 

189 field_data = ctx.read(field_len, "data") 

190 

191 with contextlib.suppress(ValueError): 

192 header_fields[HeaderFieldType(field_type)] = field_data 

193 

194 if field_type == HeaderFieldType.END: 

195 break 

196 

197 # Extract required fields 

198 raw_header = data[: ctx.offset] 

199 

200 # Cipher ID (required) 

201 if HeaderFieldType.CIPHER_ID not in header_fields: 

202 raise CorruptedDataError("Missing cipher ID in header") 

203 cipher = Cipher.from_uuid(header_fields[HeaderFieldType.CIPHER_ID]) 

204 logger.debug("Cipher: %s", cipher.display_name) 

205 

206 # Compression (required) 

207 if HeaderFieldType.COMPRESSION_FLAGS not in header_fields: 

208 raise CorruptedDataError("Missing compression flags in header") 

209 compression_val = struct.unpack("<I", header_fields[HeaderFieldType.COMPRESSION_FLAGS])[0] 

210 compression = CompressionType(compression_val) 

211 logger.debug("Compression: %s", compression.name) 

212 

213 # Master seed (required, 32 bytes) 

214 if HeaderFieldType.MASTER_SEED not in header_fields: 

215 raise CorruptedDataError("Missing master seed in header") 

216 master_seed = header_fields[HeaderFieldType.MASTER_SEED] 

217 if len(master_seed) != 32: 

218 raise CorruptedDataError(f"Invalid master seed length: {len(master_seed)}") 

219 

220 # Encryption IV (required) 

221 if HeaderFieldType.ENCRYPTION_IV not in header_fields: 

222 raise CorruptedDataError("Missing encryption IV in header") 

223 encryption_iv = header_fields[HeaderFieldType.ENCRYPTION_IV] 

224 

225 # KDF parameters 

226 if version == KdbxVersion.KDBX4: 

227 return cls._parse_kdbx4_kdf( 

228 header_fields, 

229 version, 

230 cipher, 

231 compression, 

232 master_seed, 

233 encryption_iv, 

234 raw_header, 

235 ctx.offset, 

236 ) 

237 else: 

238 return cls._parse_kdbx3_kdf( 

239 header_fields, 

240 version, 

241 cipher, 

242 compression, 

243 master_seed, 

244 encryption_iv, 

245 raw_header, 

246 ctx.offset, 

247 ) 

248 

249 @classmethod 

250 def _parse_kdbx4_kdf( 

251 cls, 

252 fields: dict[HeaderFieldType, bytes], 

253 version: KdbxVersion, 

254 cipher: Cipher, 

255 compression: CompressionType, 

256 master_seed: bytes, 

257 encryption_iv: bytes, 

258 raw_header: bytes, 

259 offset: int, 

260 ) -> tuple[Self, int]: 

261 """Parse KDBX4-specific KDF parameters.""" 

262 if HeaderFieldType.KDF_PARAMETERS not in fields: 

263 raise CorruptedDataError("Missing KDF parameters in KDBX4 header") 

264 

265 kdf_data = fields[HeaderFieldType.KDF_PARAMETERS] 

266 kdf_params = cls._parse_variant_dict(kdf_data) 

267 

268 # Get KDF UUID (must be bytes) 

269 kdf_uuid = kdf_params.get("$UUID") 

270 if not isinstance(kdf_uuid, bytes): 

271 raise KdfError("Missing or invalid KDF UUID in parameters") 

272 kdf_type = KdfType.from_uuid(kdf_uuid) 

273 

274 # Get salt (must be bytes) 

275 kdf_salt = kdf_params.get("S") 

276 if not isinstance(kdf_salt, bytes) or len(kdf_salt) != 32: 

277 raise KdfError("Invalid or missing KDF salt") 

278 

279 argon2_memory: int | None = None 

280 argon2_iterations: int | None = None 

281 argon2_parallelism: int | None = None 

282 aes_kdf_rounds: int | None = None 

283 

284 if kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D): 

285 # Argon2 parameters (must be ints) 

286 memory = kdf_params.get("M") 

287 iterations = kdf_params.get("I") 

288 parallelism = kdf_params.get("P") 

289 

290 if ( 

291 not isinstance(memory, int) 

292 or not isinstance(iterations, int) 

293 or not isinstance(parallelism, int) 

294 ): 

295 raise KdfError("Missing or invalid Argon2 parameters") 

296 

297 argon2_memory = memory // 1024 # Convert bytes to KiB 

298 argon2_iterations = iterations 

299 argon2_parallelism = parallelism 

300 logger.debug( 

301 "Argon2: memory=%d KiB, iterations=%d, parallelism=%d", 

302 argon2_memory, 

303 argon2_iterations, 

304 argon2_parallelism, 

305 ) 

306 elif kdf_type == KdfType.AES_KDF: 

307 # AES-KDF parameters 

308 rounds = kdf_params.get("R") 

309 

310 if not isinstance(rounds, int): 

311 raise KdfError("Missing or invalid AES-KDF rounds parameter") 

312 

313 aes_kdf_rounds = rounds 

314 logger.debug("AES-KDF rounds: %d", aes_kdf_rounds) 

315 

316 return ( 

317 cls( 

318 version=version, 

319 cipher=cipher, 

320 compression=compression, 

321 master_seed=master_seed, 

322 encryption_iv=encryption_iv, 

323 kdf_type=kdf_type, 

324 kdf_salt=kdf_salt, 

325 argon2_memory_kib=argon2_memory, 

326 argon2_iterations=argon2_iterations, 

327 argon2_parallelism=argon2_parallelism, 

328 aes_kdf_rounds=aes_kdf_rounds, 

329 raw_header=raw_header, 

330 ), 

331 offset, 

332 ) 

333 

334 @classmethod 

335 def _parse_kdbx3_kdf( 

336 cls, 

337 fields: dict[HeaderFieldType, bytes], 

338 version: KdbxVersion, 

339 cipher: Cipher, 

340 compression: CompressionType, 

341 master_seed: bytes, 

342 encryption_iv: bytes, 

343 raw_header: bytes, 

344 offset: int, 

345 ) -> tuple[Self, int]: 

346 """Parse KDBX3-specific KDF parameters (AES-KDF).""" 

347 # Transform seed (AES-KDF key) 

348 if HeaderFieldType.TRANSFORM_SEED not in fields: 

349 raise CorruptedDataError("Missing transform seed in KDBX3 header") 

350 kdf_salt = fields[HeaderFieldType.TRANSFORM_SEED] 

351 if len(kdf_salt) != 32: 

352 raise CorruptedDataError(f"Invalid transform seed length: {len(kdf_salt)}") 

353 

354 # Transform rounds 

355 if HeaderFieldType.TRANSFORM_ROUNDS not in fields: 

356 raise CorruptedDataError("Missing transform rounds in KDBX3 header") 

357 aes_kdf_rounds = struct.unpack("<Q", fields[HeaderFieldType.TRANSFORM_ROUNDS])[0] 

358 logger.debug("AES-KDF rounds: %d", aes_kdf_rounds) 

359 

360 # Stream start bytes (for verification) 

361 stream_start = fields.get(HeaderFieldType.STREAM_START_BYTES) 

362 

363 # Protected stream key (in outer header for KDBX3) 

364 protected_key = fields.get(HeaderFieldType.PROTECTED_STREAM_KEY) 

365 

366 # Protected stream ID (in outer header for KDBX3) 

367 stream_id = None 

368 if HeaderFieldType.INNER_RANDOM_STREAM_ID in fields: 

369 stream_id = struct.unpack("<I", fields[HeaderFieldType.INNER_RANDOM_STREAM_ID])[0] 

370 

371 return ( 

372 cls( 

373 version=version, 

374 cipher=cipher, 

375 compression=compression, 

376 master_seed=master_seed, 

377 encryption_iv=encryption_iv, 

378 kdf_type=KdfType.AES_KDF, 

379 kdf_salt=kdf_salt, 

380 aes_kdf_rounds=aes_kdf_rounds, 

381 stream_start_bytes=stream_start, 

382 protected_stream_key=protected_key, 

383 inner_random_stream_id=stream_id, 

384 raw_header=raw_header, 

385 ), 

386 offset, 

387 ) 

388 

389 @staticmethod 

390 def _parse_variant_dict(data: bytes) -> dict[str, bytes | int | bool | str]: 

391 """Parse KDBX4 VariantDictionary format. 

392 

393 VariantDictionary is a TLV format used for KDF parameters: 

394 - 2 bytes: version (0x0100) 

395 - Entries until type 0x00: 

396 - 1 byte: type 

397 - 4 bytes: key length 

398 - key bytes 

399 - 4 bytes: value length 

400 - value bytes 

401 

402 Types: 

403 - 0x00: End 

404 - 0x04: UInt32 

405 - 0x05: UInt64 

406 - 0x08: Bool 

407 - 0x0C: Int32 

408 - 0x0D: Int64 

409 - 0x18: String 

410 - 0x42: ByteArray 

411 """ 

412 ctx = ParseContext(data) 

413 

414 with ctx.scope("variant_dict"): 

415 version = ctx.read_u16("version") 

416 if version != 0x0100: 

417 raise CorruptedDataError(f"Unsupported VariantDictionary version: {version:#x}") 

418 

419 result: dict[str, bytes | int | bool | str] = {} 

420 

421 while not ctx.exhausted: 

422 entry_type = ctx.read_u8("entry_type") 

423 

424 if entry_type == 0x00: # End 

425 break 

426 

427 with ctx.scope(f"entry[{entry_type:#x}]"): 

428 # Read key 

429 key_data = ctx.read_bytes_prefixed("key") 

430 key = key_data.decode("utf-8") 

431 

432 # Read value 

433 val_data = ctx.read_bytes_prefixed("value") 

434 

435 # Parse value based on type 

436 if entry_type == 0x04: # UInt32 

437 result[key] = struct.unpack("<I", val_data)[0] 

438 elif entry_type == 0x05: # UInt64 

439 result[key] = struct.unpack("<Q", val_data)[0] 

440 elif entry_type == 0x08: # Bool 

441 result[key] = val_data[0] != 0 

442 elif entry_type == 0x0C: # Int32 

443 result[key] = struct.unpack("<i", val_data)[0] 

444 elif entry_type == 0x0D: # Int64 

445 result[key] = struct.unpack("<q", val_data)[0] 

446 elif entry_type == 0x42: # ByteArray 

447 result[key] = val_data 

448 elif entry_type == 0x18: # String 

449 result[key] = val_data.decode("utf-8") 

450 else: 

451 # Unknown type, store as bytes 

452 result[key] = val_data 

453 

454 return result 

455 

456 def to_bytes(self) -> bytes: 

457 """Serialize header to KDBX4 binary format. 

458 

459 Returns: 

460 Binary header data ready to be written to file 

461 

462 Raises: 

463 UnsupportedVersionError: If not KDBX4 format 

464 KdfError: If Argon2 parameters are missing 

465 """ 

466 if self.version != KdbxVersion.KDBX4: 

467 raise UnsupportedVersionError(self.version.value, 0) 

468 

469 ctx = BuildContext() 

470 

471 # Magic and version 

472 ctx.write(KDBX_MAGIC) 

473 ctx.write_u16(1) # Minor version 

474 ctx.write_u16(4) # Major version 

475 

476 # Cipher ID 

477 ctx.write_tlv(HeaderFieldType.CIPHER_ID, self.cipher.value) 

478 

479 # Compression 

480 ctx.write_tlv( 

481 HeaderFieldType.COMPRESSION_FLAGS, 

482 struct.pack("<I", self.compression.value), 

483 ) 

484 

485 # Master seed 

486 ctx.write_tlv(HeaderFieldType.MASTER_SEED, self.master_seed) 

487 

488 # Encryption IV 

489 ctx.write_tlv(HeaderFieldType.ENCRYPTION_IV, self.encryption_iv) 

490 

491 # KDF parameters as VariantDictionary 

492 kdf_dict = self._build_kdf_variant_dict() 

493 ctx.write_tlv(HeaderFieldType.KDF_PARAMETERS, kdf_dict) 

494 

495 # End of header 

496 ctx.write_tlv(HeaderFieldType.END, b"\r\n\r\n") 

497 

498 return ctx.build() 

499 

500 def _build_kdf_variant_dict(self) -> bytes: 

501 """Build VariantDictionary for KDF parameters.""" 

502 ctx = BuildContext() 

503 

504 # Version 

505 ctx.write_u16(0x0100) 

506 

507 def add_entry(entry_type: int, key: str, value: bytes) -> None: 

508 """Add an entry to the variant dictionary.""" 

509 key_bytes = key.encode("utf-8") 

510 ctx.write_u8(entry_type) 

511 ctx.write_bytes_prefixed(key_bytes) 

512 ctx.write_bytes_prefixed(value) 

513 

514 # KDF UUID 

515 add_entry(0x42, "$UUID", self.kdf_type.value) 

516 

517 # Salt 

518 add_entry(0x42, "S", self.kdf_salt) 

519 

520 if self.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D): 

521 if ( 

522 self.argon2_memory_kib is None 

523 or self.argon2_iterations is None 

524 or self.argon2_parallelism is None 

525 ): 

526 raise KdfError("Missing Argon2 parameters") 

527 

528 # Memory in bytes (UInt64) 

529 add_entry(0x05, "M", struct.pack("<Q", self.argon2_memory_kib * 1024)) 

530 # Iterations (UInt64) 

531 add_entry(0x05, "I", struct.pack("<Q", self.argon2_iterations)) 

532 # Parallelism (UInt32) 

533 add_entry(0x04, "P", struct.pack("<I", self.argon2_parallelism)) 

534 # Version (UInt32) - Argon2 version 0x13 

535 add_entry(0x04, "V", struct.pack("<I", 0x13)) 

536 elif self.kdf_type == KdfType.AES_KDF: 

537 if self.aes_kdf_rounds is None: 

538 raise KdfError("Missing AES-KDF rounds") 

539 

540 # Rounds (UInt64) 

541 add_entry(0x05, "R", struct.pack("<Q", self.aes_kdf_rounds)) 

542 

543 # End marker 

544 ctx.write_u8(0x00) 

545 

546 return ctx.build()