Coverage for src / kdbxtool / parsing / kdbx4.py: 94%

232 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-20 19:19 +0000

1"""KDBX4 payload encryption and decryption. 

2 

3This module handles the cryptographic operations for KDBX4 files: 

4- Master key derivation from credentials 

5- Header integrity verification (HMAC-SHA256) 

6- Payload decryption and encryption 

7- Block-based HMAC verification (HmacBlockStream) 

8- Inner header parsing 

9 

10KDBX4 structure: 

111. Outer header (plaintext) 

122. SHA-256 hash of header 

133. HMAC-SHA256 of header 

144. Encrypted payload (HmacBlockStream format) 

15 - Inner header 

16 - XML database content 

17""" 

18 

19from __future__ import annotations 

20 

21import gzip 

22import hashlib 

23import logging 

24import struct 

25import warnings 

26from dataclasses import dataclass 

27from typing import TYPE_CHECKING 

28 

29from kdbxtool.exceptions import ( 

30 AuthenticationError, 

31 CorruptedDataError, 

32 DecryptionError, 

33 KdfError, 

34 UnsupportedVersionError, 

35) 

36from kdbxtool.security import ( 

37 Argon2Config, 

38 CipherContext, 

39 SecureBytes, 

40 compute_hmac_sha256, 

41 constant_time_compare, 

42 derive_composite_key, 

43 derive_key_aes_kdf, 

44 derive_key_argon2, 

45) 

46from kdbxtool.security.kdf import AesKdfConfig, KdfType 

47 

48from .context import BuildContext, ParseContext 

49from .header import ( 

50 CompressionType, 

51 InnerHeaderFieldType, 

52 KdbxHeader, 

53 KdbxVersion, 

54) 

55 

56if TYPE_CHECKING: 

57 pass 

58 

59logger = logging.getLogger(__name__) 

60 

61# Maximum size for a single binary attachment (512 MiB) 

62# Prevents memory exhaustion from malicious KDBX files 

63MAX_BINARY_SIZE = 512 * 1024 * 1024 

64 

65 

66@dataclass(slots=True) 

67class InnerHeader: 

68 """KDBX4 inner header data. 

69 

70 The inner header appears after decryption, before the XML payload. 

71 It contains the protected stream cipher settings and binary attachments. 

72 """ 

73 

74 # Random stream for protected values (e.g., passwords in XML) 

75 random_stream_id: int 

76 random_stream_key: bytes 

77 

78 # Binary attachments (id -> data with protection flag) 

79 binaries: dict[int, tuple[bool, bytes]] 

80 

81 

82@dataclass(slots=True) 

83class DecryptedPayload: 

84 """Result of decrypting a KDBX4 file. 

85 

86 Contains all data needed to work with the database. 

87 """ 

88 

89 header: KdbxHeader 

90 inner_header: InnerHeader 

91 xml_data: bytes 

92 transformed_key: bytes | None = None # For caching to speed up repeated opens 

93 

94 

95class Kdbx4Reader: 

96 """Reader for KDBX4 database files.""" 

97 

98 def __init__(self, data: bytes) -> None: 

99 """Initialize reader with file data. 

100 

101 Args: 

102 data: Complete KDBX4 file contents 

103 """ 

104 self._ctx = ParseContext(data) 

105 

106 def decrypt( 

107 self, 

108 password: str | None = None, 

109 keyfile_data: bytes | None = None, 

110 transformed_key: bytes | None = None, 

111 yubikey_response: bytes | None = None, 

112 ) -> DecryptedPayload: 

113 """Decrypt the KDBX4 file. 

114 

115 Args: 

116 password: Optional password 

117 keyfile_data: Optional keyfile contents 

118 transformed_key: Optional precomputed transformed key (skips KDF) 

119 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response 

120 

121 Returns: 

122 DecryptedPayload with header, inner header, XML, and transformed_key 

123 

124 Raises: 

125 ValueError: If decryption fails (wrong credentials, corrupted file) 

126 """ 

127 logger.debug("Starting KDBX4 decryption") 

128 

129 # Parse outer header 

130 header, header_end = KdbxHeader.parse(self._ctx.data) 

131 

132 if header.version != KdbxVersion.KDBX4: 

133 raise UnsupportedVersionError(header.version.value, 0) 

134 

135 self._ctx.offset = header_end 

136 

137 # Read header hash and HMAC 

138 with self._ctx.scope("header_verification"): 

139 header_hash = self._ctx.read(32, "header_hash") 

140 header_hmac = self._ctx.read(32, "header_hmac") 

141 

142 # Verify header hash 

143 computed_hash = hashlib.sha256(header.raw_header).digest() 

144 if not constant_time_compare(computed_hash, header_hash): 

145 raise CorruptedDataError("Header hash mismatch - file may be corrupted") 

146 logger.debug("Header hash verified") 

147 

148 # Get transformed key - either use provided one or derive via KDF 

149 if transformed_key is not None: 

150 # Use precomputed transformed key (skips expensive KDF) 

151 logger.debug("Using cached transformed key") 

152 master_key_bytes = transformed_key 

153 else: 

154 # Derive composite key from credentials 

155 # KeePassXC: YubiKey response is incorporated into composite key 

156 logger.debug("Starting KDF derivation") 

157 composite_key = derive_composite_key( 

158 password=password, 

159 keyfile_data=keyfile_data, 

160 yubikey_response=yubikey_response, 

161 ) 

162 # Derive master key using KDF (slow) 

163 master_key = self._derive_master_key(header, composite_key) 

164 master_key_bytes = master_key.data 

165 

166 # Derive keys for HMAC and encryption 

167 hmac_key, cipher_key = self._derive_keys(master_key_bytes, header.master_seed) 

168 

169 # Verify header HMAC 

170 block_key = self._compute_block_hmac_key(hmac_key, 0xFFFFFFFFFFFFFFFF) 

171 computed_hmac = compute_hmac_sha256(block_key, header.raw_header) 

172 if not constant_time_compare(computed_hmac, header_hmac): 

173 raise AuthenticationError() 

174 logger.debug("Header HMAC verified") 

175 

176 # Read and verify HMAC block stream 

177 encrypted_payload = self._read_hmac_block_stream(hmac_key) 

178 

179 # Decrypt payload 

180 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv) 

181 decrypted = ctx.decrypt(encrypted_payload) 

182 

183 # Remove PKCS7 padding for AES-CBC 

184 if header.cipher.iv_size == 16: # AES-CBC 

185 decrypted = self._remove_pkcs7_padding(decrypted) 

186 

187 # Decompress if needed 

188 if header.compression == CompressionType.GZIP: 

189 decrypted = gzip.decompress(decrypted) 

190 

191 logger.debug("Payload decrypted, %d bytes", len(decrypted)) 

192 

193 # Parse inner header 

194 inner_header, xml_start = self._parse_inner_header(decrypted) 

195 

196 # Extract XML 

197 xml_data = decrypted[xml_start:] 

198 

199 return DecryptedPayload( 

200 header=header, 

201 inner_header=inner_header, 

202 xml_data=xml_data, 

203 transformed_key=master_key_bytes, 

204 ) 

205 

206 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes: 

207 """Derive master key using the KDF specified in header.""" 

208 if header.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D): 

209 if ( 

210 header.argon2_memory_kib is None 

211 or header.argon2_iterations is None 

212 or header.argon2_parallelism is None 

213 ): 

214 raise KdfError("Missing Argon2 parameters in header") 

215 

216 argon2_config = Argon2Config( 

217 memory_kib=header.argon2_memory_kib, 

218 iterations=header.argon2_iterations, 

219 parallelism=header.argon2_parallelism, 

220 salt=header.kdf_salt, 

221 variant=header.kdf_type, 

222 ) 

223 # Warn if parameters are below security minimums 

224 try: 

225 argon2_config.validate_security() 

226 except KdfError as e: 

227 warnings.warn( 

228 f"Database has weak KDF parameters: {e}. " 

229 "Consider re-saving with stronger settings.", 

230 UserWarning, 

231 stacklevel=4, 

232 ) 

233 # Don't enforce minimums when reading - accept what the file has 

234 return derive_key_argon2(composite_key.data, argon2_config, enforce_minimums=False) 

235 elif header.kdf_type == KdfType.AES_KDF: 

236 if header.aes_kdf_rounds is None: 

237 raise KdfError("Missing AES-KDF rounds in header") 

238 aes_config = AesKdfConfig( 

239 rounds=header.aes_kdf_rounds, 

240 salt=header.kdf_salt, 

241 ) 

242 return derive_key_aes_kdf(composite_key.data, aes_config) 

243 else: 

244 raise KdfError(f"Unsupported KDF: {header.kdf_type}") 

245 

246 def _derive_keys(self, transformed_key: bytes, master_seed: bytes) -> tuple[bytes, bytes]: 

247 """Derive HMAC key and cipher key from transformed key. 

248 

249 KDBX4 key derivation: 

250 - cipher_key = SHA256(master_seed || transformed_key) 

251 - hmac_key = SHA512(master_seed || transformed_key || 0x01) 

252 """ 

253 cipher_key = hashlib.sha256(master_seed + transformed_key).digest() 

254 hmac_key = hashlib.sha512(master_seed + transformed_key + b"\x01").digest() 

255 

256 return hmac_key, cipher_key 

257 

258 def _compute_block_hmac_key(self, hmac_key: bytes, block_index: int) -> bytes: 

259 """Compute HMAC key for a specific block. 

260 

261 Each block uses a different key derived from the master HMAC key. 

262 key = SHA512(block_index_le64 || hmac_key) 

263 """ 

264 index_bytes = struct.pack("<Q", block_index) 

265 return hashlib.sha512(index_bytes + hmac_key).digest() 

266 

267 def _read_hmac_block_stream(self, hmac_key: bytes) -> bytes: 

268 """Read and verify HMAC block stream. 

269 

270 KDBX4 uses a block-based format with per-block HMAC: 

271 - 32 bytes: HMAC of (block_index || length || data) 

272 - 4 bytes: block length (little-endian) 

273 - N bytes: block data 

274 

275 Last block has length 0. 

276 """ 

277 blocks = [] 

278 block_index = 0 

279 

280 with self._ctx.scope("hmac_blocks"): 

281 while True: 

282 with self._ctx.scope(f"block[{block_index}]"): 

283 block_hmac = self._ctx.read(32, "hmac") 

284 block_len = self._ctx.read_u32("length") 

285 

286 if block_len == 0: 

287 # Verify final block HMAC 

288 block_key = self._compute_block_hmac_key(hmac_key, block_index) 

289 expected = compute_hmac_sha256( 

290 block_key, 

291 struct.pack("<Q", block_index) + struct.pack("<I", 0), 

292 ) 

293 if not constant_time_compare(expected, block_hmac): 

294 raise AuthenticationError("Block authentication failed") 

295 break 

296 

297 block_data = self._ctx.read(block_len, "data") 

298 

299 # Verify block HMAC 

300 block_key = self._compute_block_hmac_key(hmac_key, block_index) 

301 hmac_data = ( 

302 struct.pack("<Q", block_index) + struct.pack("<I", block_len) + block_data 

303 ) 

304 expected = compute_hmac_sha256(block_key, hmac_data) 

305 

306 if not constant_time_compare(expected, block_hmac): 

307 raise AuthenticationError("Block authentication failed") 

308 

309 blocks.append(block_data) 

310 block_index += 1 

311 

312 logger.debug("Verified %d HMAC blocks", block_index) 

313 return b"".join(blocks) 

314 

315 def _remove_pkcs7_padding(self, data: bytes) -> bytes: 

316 """Remove PKCS7 padding from decrypted data. 

317 

318 Note: Padding oracle attacks are not possible here because HMAC 

319 verification on the ciphertext occurs BEFORE decryption. Any 

320 ciphertext modification would fail HMAC verification first. 

321 We still use generic error messages for defense-in-depth. 

322 """ 

323 if not data: 

324 raise DecryptionError() 

325 padding_len = data[-1] 

326 if padding_len == 0 or padding_len > 16: 

327 raise DecryptionError() 

328 # Verify all padding bytes are correct 

329 for i in range(1, padding_len + 1): 

330 if data[-i] != padding_len: 

331 raise DecryptionError() 

332 return data[:-padding_len] 

333 

334 def _parse_inner_header(self, data: bytes) -> tuple[InnerHeader, int]: 

335 """Parse KDBX4 inner header. 

336 

337 Returns inner header and offset where XML starts. 

338 """ 

339 ctx = ParseContext(data) 

340 random_stream_id = 0 

341 random_stream_key = b"" 

342 binaries: dict[int, tuple[bool, bytes]] = {} 

343 binary_index = 0 

344 

345 with ctx.scope("inner_header"): 

346 while not ctx.exhausted: 

347 field_type = ctx.read_u8("type") 

348 field_len = ctx.read_u32("length") 

349 field_data = ctx.read(field_len, "data") 

350 

351 if field_type == InnerHeaderFieldType.END: 

352 break 

353 elif field_type == InnerHeaderFieldType.INNER_RANDOM_STREAM_ID: 

354 random_stream_id = struct.unpack("<I", field_data)[0] 

355 elif field_type == InnerHeaderFieldType.INNER_RANDOM_STREAM_KEY: 

356 random_stream_key = field_data 

357 elif field_type == InnerHeaderFieldType.BINARY: 

358 # First byte is protection flag 

359 binary_data = field_data[1:] 

360 if len(binary_data) > MAX_BINARY_SIZE: 

361 raise CorruptedDataError( 

362 f"Binary attachment too large: {len(binary_data)} bytes " 

363 f"(max {MAX_BINARY_SIZE} bytes)" 

364 ) 

365 protected = field_data[0] != 0 

366 binaries[binary_index] = (protected, binary_data) 

367 binary_index += 1 

368 

369 return ( 

370 InnerHeader( 

371 random_stream_id=random_stream_id, 

372 random_stream_key=random_stream_key, 

373 binaries=binaries, 

374 ), 

375 ctx.offset, 

376 ) 

377 

378 

379class Kdbx4Writer: 

380 """Writer for KDBX4 database files.""" 

381 

382 # Default block size for HMAC block stream (1 MiB) 

383 BLOCK_SIZE = 1024 * 1024 

384 

385 def encrypt( 

386 self, 

387 header: KdbxHeader, 

388 inner_header: InnerHeader, 

389 xml_data: bytes, 

390 password: str | None = None, 

391 keyfile_data: bytes | None = None, 

392 transformed_key: bytes | None = None, 

393 yubikey_response: bytes | None = None, 

394 ) -> bytes: 

395 """Encrypt database to KDBX4 format. 

396 

397 Args: 

398 header: Outer header configuration 

399 inner_header: Inner header with stream cipher and binaries 

400 xml_data: XML database content 

401 password: Optional password 

402 keyfile_data: Optional keyfile contents 

403 transformed_key: Optional precomputed transformed key (skips KDF) 

404 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response 

405 

406 Returns: 

407 Complete KDBX4 file as bytes 

408 """ 

409 logger.debug("Starting KDBX4 encryption") 

410 

411 if header.version != KdbxVersion.KDBX4: 

412 raise UnsupportedVersionError(header.version.value, 0) 

413 

414 # Get transformed key - either use provided one or derive via KDF 

415 if transformed_key is not None: 

416 # Use precomputed transformed key (skips expensive KDF) 

417 master_key_bytes = transformed_key 

418 else: 

419 # Derive composite key from credentials 

420 # KeePassXC: YubiKey response is incorporated into composite key 

421 composite_key = derive_composite_key( 

422 password=password, 

423 keyfile_data=keyfile_data, 

424 yubikey_response=yubikey_response, 

425 ) 

426 # Derive master key using KDF (slow) 

427 master_key = self._derive_master_key(header, composite_key) 

428 master_key_bytes = master_key.data 

429 

430 # Derive keys for HMAC and encryption 

431 hmac_key, cipher_key = self._derive_keys(master_key_bytes, header.master_seed) 

432 

433 # Build inner header 

434 inner_header_bytes = self._build_inner_header(inner_header) 

435 

436 # Combine inner header and XML 

437 payload = inner_header_bytes + xml_data 

438 

439 # Compress if needed 

440 if header.compression == CompressionType.GZIP: 

441 payload = gzip.compress(payload, compresslevel=6) 

442 

443 # Add PKCS7 padding for AES-CBC 

444 if header.cipher.iv_size == 16: # AES-CBC 

445 payload = self._add_pkcs7_padding(payload) 

446 

447 # Encrypt payload 

448 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv) 

449 encrypted_payload = ctx.encrypt(payload) 

450 

451 # Build HMAC block stream 

452 hmac_blocks = self._build_hmac_block_stream(encrypted_payload, hmac_key) 

453 

454 # Build outer header 

455 header_bytes = header.to_bytes() 

456 

457 # Compute header hash and HMAC 

458 header_hash = hashlib.sha256(header_bytes).digest() 

459 block_key = self._compute_block_hmac_key(hmac_key, 0xFFFFFFFFFFFFFFFF) 

460 header_hmac = compute_hmac_sha256(block_key, header_bytes) 

461 

462 # Assemble final file 

463 return header_bytes + header_hash + header_hmac + hmac_blocks 

464 

465 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes: 

466 """Derive master key using the KDF specified in header.""" 

467 if header.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D): 

468 if ( 

469 header.argon2_memory_kib is None 

470 or header.argon2_iterations is None 

471 or header.argon2_parallelism is None 

472 ): 

473 raise KdfError("Missing Argon2 parameters in header") 

474 

475 config = Argon2Config( 

476 memory_kib=header.argon2_memory_kib, 

477 iterations=header.argon2_iterations, 

478 parallelism=header.argon2_parallelism, 

479 salt=header.kdf_salt, 

480 variant=header.kdf_type, 

481 ) 

482 return derive_key_argon2(composite_key.data, config) 

483 elif header.kdf_type == KdfType.AES_KDF: 

484 if header.aes_kdf_rounds is None: 

485 raise KdfError("Missing AES-KDF rounds in header") 

486 

487 aes_config = AesKdfConfig( 

488 rounds=header.aes_kdf_rounds, 

489 salt=header.kdf_salt, 

490 ) 

491 return derive_key_aes_kdf(composite_key.data, aes_config) 

492 else: 

493 raise KdfError(f"Unsupported KDF for writing: {header.kdf_type}") 

494 

495 def _derive_keys(self, transformed_key: bytes, master_seed: bytes) -> tuple[bytes, bytes]: 

496 """Derive HMAC key and cipher key from transformed key.""" 

497 cipher_key = hashlib.sha256(master_seed + transformed_key).digest() 

498 hmac_key = hashlib.sha512(master_seed + transformed_key + b"\x01").digest() 

499 return hmac_key, cipher_key 

500 

501 def _compute_block_hmac_key(self, hmac_key: bytes, block_index: int) -> bytes: 

502 """Compute HMAC key for a specific block.""" 

503 index_bytes = struct.pack("<Q", block_index) 

504 return hashlib.sha512(index_bytes + hmac_key).digest() 

505 

506 def _build_inner_header(self, inner: InnerHeader) -> bytes: 

507 """Build inner header bytes.""" 

508 ctx = BuildContext() 

509 

510 # Random stream ID 

511 ctx.write_tlv( 

512 InnerHeaderFieldType.INNER_RANDOM_STREAM_ID, 

513 struct.pack("<I", inner.random_stream_id), 

514 ) 

515 

516 # Random stream key 

517 ctx.write_tlv( 

518 InnerHeaderFieldType.INNER_RANDOM_STREAM_KEY, 

519 inner.random_stream_key, 

520 ) 

521 

522 # Binary attachments 

523 for _idx, (protected, data) in sorted(inner.binaries.items()): 

524 binary_data = bytes([1 if protected else 0]) + data 

525 ctx.write_tlv(InnerHeaderFieldType.BINARY, binary_data) 

526 

527 # End marker 

528 ctx.write_tlv(InnerHeaderFieldType.END, b"") 

529 

530 return ctx.build() 

531 

532 def _add_pkcs7_padding(self, data: bytes) -> bytes: 

533 """Add PKCS7 padding to make data a multiple of 16 bytes.""" 

534 padding_len = 16 - (len(data) % 16) 

535 padding = bytes([padding_len] * padding_len) 

536 return data + padding 

537 

538 def _build_hmac_block_stream(self, data: bytes, hmac_key: bytes) -> bytes: 

539 """Build HMAC block stream from data.""" 

540 ctx = BuildContext() 

541 block_index = 0 

542 offset = 0 

543 

544 while offset < len(data): 

545 block_data = data[offset : offset + self.BLOCK_SIZE] 

546 block_len = len(block_data) 

547 offset += block_len 

548 

549 # Compute block HMAC 

550 block_key = self._compute_block_hmac_key(hmac_key, block_index) 

551 hmac_data = struct.pack("<Q", block_index) + struct.pack("<I", block_len) + block_data 

552 block_hmac = compute_hmac_sha256(block_key, hmac_data) 

553 

554 ctx.write(block_hmac) 

555 ctx.write_u32(block_len) 

556 ctx.write(block_data) 

557 

558 block_index += 1 

559 

560 # Final empty block 

561 block_key = self._compute_block_hmac_key(hmac_key, block_index) 

562 final_hmac = compute_hmac_sha256( 

563 block_key, 

564 struct.pack("<Q", block_index) + struct.pack("<I", 0), 

565 ) 

566 ctx.write(final_hmac) 

567 ctx.write_u32(0) 

568 

569 return ctx.build() 

570 

571 

572def read_kdbx4( 

573 data: bytes, 

574 password: str | None = None, 

575 keyfile_data: bytes | None = None, 

576 transformed_key: bytes | None = None, 

577 yubikey_response: bytes | None = None, 

578) -> DecryptedPayload: 

579 """Convenience function to read a KDBX4 file. 

580 

581 Args: 

582 data: Complete file contents 

583 password: Optional password 

584 keyfile_data: Optional keyfile contents 

585 transformed_key: Optional precomputed transformed key (skips KDF) 

586 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response 

587 

588 Returns: 

589 DecryptedPayload with header, inner header, XML, and transformed_key 

590 """ 

591 reader = Kdbx4Reader(data) 

592 return reader.decrypt( 

593 password=password, 

594 keyfile_data=keyfile_data, 

595 transformed_key=transformed_key, 

596 yubikey_response=yubikey_response, 

597 ) 

598 

599 

600def write_kdbx4( 

601 header: KdbxHeader, 

602 inner_header: InnerHeader, 

603 xml_data: bytes, 

604 password: str | None = None, 

605 keyfile_data: bytes | None = None, 

606 transformed_key: bytes | None = None, 

607 yubikey_response: bytes | None = None, 

608) -> bytes: 

609 """Convenience function to write a KDBX4 file. 

610 

611 Args: 

612 header: Outer header configuration 

613 inner_header: Inner header with stream cipher and binaries 

614 xml_data: XML database content 

615 password: Optional password 

616 keyfile_data: Optional keyfile contents 

617 transformed_key: Optional precomputed transformed key (skips KDF) 

618 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response 

619 

620 Returns: 

621 Complete KDBX4 file as bytes 

622 """ 

623 writer = Kdbx4Writer() 

624 return writer.encrypt( 

625 header=header, 

626 inner_header=inner_header, 

627 xml_data=xml_data, 

628 password=password, 

629 keyfile_data=keyfile_data, 

630 transformed_key=transformed_key, 

631 yubikey_response=yubikey_response, 

632 )