Coverage for src / kdbxtool / parsing / kdbx4.py: 94%

222 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-19 21:22 +0000

1"""KDBX4 payload encryption and decryption. 

2 

3This module handles the cryptographic operations for KDBX4 files: 

4- Master key derivation from credentials 

5- Header integrity verification (HMAC-SHA256) 

6- Payload decryption and encryption 

7- Block-based HMAC verification (HmacBlockStream) 

8- Inner header parsing 

9 

10KDBX4 structure: 

111. Outer header (plaintext) 

122. SHA-256 hash of header 

133. HMAC-SHA256 of header 

144. Encrypted payload (HmacBlockStream format) 

15 - Inner header 

16 - XML database content 

17""" 

18 

19from __future__ import annotations 

20 

21import gzip 

22import hashlib 

23import struct 

24import warnings 

25from dataclasses import dataclass 

26from typing import TYPE_CHECKING 

27 

28from kdbxtool.exceptions import ( 

29 AuthenticationError, 

30 CorruptedDataError, 

31 DecryptionError, 

32 KdfError, 

33 UnsupportedVersionError, 

34) 

35from kdbxtool.security import ( 

36 Argon2Config, 

37 CipherContext, 

38 SecureBytes, 

39 compute_hmac_sha256, 

40 constant_time_compare, 

41 derive_composite_key, 

42 derive_key_aes_kdf, 

43 derive_key_argon2, 

44) 

45from kdbxtool.security.kdf import AesKdfConfig, KdfType 

46 

47from .context import BuildContext, ParseContext 

48from .header import ( 

49 CompressionType, 

50 InnerHeaderFieldType, 

51 KdbxHeader, 

52 KdbxVersion, 

53) 

54 

55if TYPE_CHECKING: 

56 pass 

57 

58# Maximum size for a single binary attachment (512 MiB) 

59# Prevents memory exhaustion from malicious KDBX files 

60MAX_BINARY_SIZE = 512 * 1024 * 1024 

61 

62 

63@dataclass(slots=True) 

64class InnerHeader: 

65 """KDBX4 inner header data. 

66 

67 The inner header appears after decryption, before the XML payload. 

68 It contains the protected stream cipher settings and binary attachments. 

69 """ 

70 

71 # Random stream for protected values (e.g., passwords in XML) 

72 random_stream_id: int 

73 random_stream_key: bytes 

74 

75 # Binary attachments (id -> data with protection flag) 

76 binaries: dict[int, tuple[bool, bytes]] 

77 

78 

79@dataclass(slots=True) 

80class DecryptedPayload: 

81 """Result of decrypting a KDBX4 file. 

82 

83 Contains all data needed to work with the database. 

84 """ 

85 

86 header: KdbxHeader 

87 inner_header: InnerHeader 

88 xml_data: bytes 

89 transformed_key: bytes | None = None # For caching to speed up repeated opens 

90 

91 

92class Kdbx4Reader: 

93 """Reader for KDBX4 database files.""" 

94 

95 def __init__(self, data: bytes) -> None: 

96 """Initialize reader with file data. 

97 

98 Args: 

99 data: Complete KDBX4 file contents 

100 """ 

101 self._ctx = ParseContext(data) 

102 

103 def decrypt( 

104 self, 

105 password: str | None = None, 

106 keyfile_data: bytes | None = None, 

107 transformed_key: bytes | None = None, 

108 yubikey_response: bytes | None = None, 

109 ) -> DecryptedPayload: 

110 """Decrypt the KDBX4 file. 

111 

112 Args: 

113 password: Optional password 

114 keyfile_data: Optional keyfile contents 

115 transformed_key: Optional precomputed transformed key (skips KDF) 

116 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response 

117 

118 Returns: 

119 DecryptedPayload with header, inner header, XML, and transformed_key 

120 

121 Raises: 

122 ValueError: If decryption fails (wrong credentials, corrupted file) 

123 """ 

124 # Parse outer header 

125 header, header_end = KdbxHeader.parse(self._ctx.data) 

126 

127 if header.version != KdbxVersion.KDBX4: 

128 raise UnsupportedVersionError(header.version.value, 0) 

129 

130 self._ctx.offset = header_end 

131 

132 # Read header hash and HMAC 

133 with self._ctx.scope("header_verification"): 

134 header_hash = self._ctx.read(32, "header_hash") 

135 header_hmac = self._ctx.read(32, "header_hmac") 

136 

137 # Verify header hash 

138 computed_hash = hashlib.sha256(header.raw_header).digest() 

139 if not constant_time_compare(computed_hash, header_hash): 

140 raise CorruptedDataError("Header hash mismatch - file may be corrupted") 

141 

142 # Get transformed key - either use provided one or derive via KDF 

143 if transformed_key is not None: 

144 # Use precomputed transformed key (skips expensive KDF) 

145 master_key_bytes = transformed_key 

146 else: 

147 # Derive composite key from credentials 

148 # KeePassXC: YubiKey response is incorporated into composite key 

149 composite_key = derive_composite_key( 

150 password=password, 

151 keyfile_data=keyfile_data, 

152 yubikey_response=yubikey_response, 

153 ) 

154 # Derive master key using KDF (slow) 

155 master_key = self._derive_master_key(header, composite_key) 

156 master_key_bytes = master_key.data 

157 

158 # Derive keys for HMAC and encryption 

159 hmac_key, cipher_key = self._derive_keys(master_key_bytes, header.master_seed) 

160 

161 # Verify header HMAC 

162 block_key = self._compute_block_hmac_key(hmac_key, 0xFFFFFFFFFFFFFFFF) 

163 computed_hmac = compute_hmac_sha256(block_key, header.raw_header) 

164 if not constant_time_compare(computed_hmac, header_hmac): 

165 raise AuthenticationError() 

166 

167 # Read and verify HMAC block stream 

168 encrypted_payload = self._read_hmac_block_stream(hmac_key) 

169 

170 # Decrypt payload 

171 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv) 

172 decrypted = ctx.decrypt(encrypted_payload) 

173 

174 # Remove PKCS7 padding for AES-CBC 

175 if header.cipher.iv_size == 16: # AES-CBC 

176 decrypted = self._remove_pkcs7_padding(decrypted) 

177 

178 # Decompress if needed 

179 if header.compression == CompressionType.GZIP: 

180 decrypted = gzip.decompress(decrypted) 

181 

182 # Parse inner header 

183 inner_header, xml_start = self._parse_inner_header(decrypted) 

184 

185 # Extract XML 

186 xml_data = decrypted[xml_start:] 

187 

188 return DecryptedPayload( 

189 header=header, 

190 inner_header=inner_header, 

191 xml_data=xml_data, 

192 transformed_key=master_key_bytes, 

193 ) 

194 

195 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes: 

196 """Derive master key using the KDF specified in header.""" 

197 if header.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D): 

198 if ( 

199 header.argon2_memory_kib is None 

200 or header.argon2_iterations is None 

201 or header.argon2_parallelism is None 

202 ): 

203 raise KdfError("Missing Argon2 parameters in header") 

204 

205 argon2_config = Argon2Config( 

206 memory_kib=header.argon2_memory_kib, 

207 iterations=header.argon2_iterations, 

208 parallelism=header.argon2_parallelism, 

209 salt=header.kdf_salt, 

210 variant=header.kdf_type, 

211 ) 

212 # Warn if parameters are below security minimums 

213 try: 

214 argon2_config.validate_security() 

215 except KdfError as e: 

216 warnings.warn( 

217 f"Database has weak KDF parameters: {e}. " 

218 "Consider re-saving with stronger settings.", 

219 UserWarning, 

220 stacklevel=4, 

221 ) 

222 # Don't enforce minimums when reading - accept what the file has 

223 return derive_key_argon2(composite_key.data, argon2_config, enforce_minimums=False) 

224 elif header.kdf_type == KdfType.AES_KDF: 

225 if header.aes_kdf_rounds is None: 

226 raise KdfError("Missing AES-KDF rounds in header") 

227 aes_config = AesKdfConfig( 

228 rounds=header.aes_kdf_rounds, 

229 salt=header.kdf_salt, 

230 ) 

231 return derive_key_aes_kdf(composite_key.data, aes_config) 

232 else: 

233 raise KdfError(f"Unsupported KDF: {header.kdf_type}") 

234 

235 def _derive_keys(self, transformed_key: bytes, master_seed: bytes) -> tuple[bytes, bytes]: 

236 """Derive HMAC key and cipher key from transformed key. 

237 

238 KDBX4 key derivation: 

239 - cipher_key = SHA256(master_seed || transformed_key) 

240 - hmac_key = SHA512(master_seed || transformed_key || 0x01) 

241 """ 

242 cipher_key = hashlib.sha256(master_seed + transformed_key).digest() 

243 hmac_key = hashlib.sha512(master_seed + transformed_key + b"\x01").digest() 

244 

245 return hmac_key, cipher_key 

246 

247 def _compute_block_hmac_key(self, hmac_key: bytes, block_index: int) -> bytes: 

248 """Compute HMAC key for a specific block. 

249 

250 Each block uses a different key derived from the master HMAC key. 

251 key = SHA512(block_index_le64 || hmac_key) 

252 """ 

253 index_bytes = struct.pack("<Q", block_index) 

254 return hashlib.sha512(index_bytes + hmac_key).digest() 

255 

256 def _read_hmac_block_stream(self, hmac_key: bytes) -> bytes: 

257 """Read and verify HMAC block stream. 

258 

259 KDBX4 uses a block-based format with per-block HMAC: 

260 - 32 bytes: HMAC of (block_index || length || data) 

261 - 4 bytes: block length (little-endian) 

262 - N bytes: block data 

263 

264 Last block has length 0. 

265 """ 

266 blocks = [] 

267 block_index = 0 

268 

269 with self._ctx.scope("hmac_blocks"): 

270 while True: 

271 with self._ctx.scope(f"block[{block_index}]"): 

272 block_hmac = self._ctx.read(32, "hmac") 

273 block_len = self._ctx.read_u32("length") 

274 

275 if block_len == 0: 

276 # Verify final block HMAC 

277 block_key = self._compute_block_hmac_key(hmac_key, block_index) 

278 expected = compute_hmac_sha256( 

279 block_key, 

280 struct.pack("<Q", block_index) + struct.pack("<I", 0), 

281 ) 

282 if not constant_time_compare(expected, block_hmac): 

283 raise AuthenticationError("Block authentication failed") 

284 break 

285 

286 block_data = self._ctx.read(block_len, "data") 

287 

288 # Verify block HMAC 

289 block_key = self._compute_block_hmac_key(hmac_key, block_index) 

290 hmac_data = ( 

291 struct.pack("<Q", block_index) + struct.pack("<I", block_len) + block_data 

292 ) 

293 expected = compute_hmac_sha256(block_key, hmac_data) 

294 

295 if not constant_time_compare(expected, block_hmac): 

296 raise AuthenticationError("Block authentication failed") 

297 

298 blocks.append(block_data) 

299 block_index += 1 

300 

301 return b"".join(blocks) 

302 

303 def _remove_pkcs7_padding(self, data: bytes) -> bytes: 

304 """Remove PKCS7 padding from decrypted data. 

305 

306 Note: Padding oracle attacks are not possible here because HMAC 

307 verification on the ciphertext occurs BEFORE decryption. Any 

308 ciphertext modification would fail HMAC verification first. 

309 We still use generic error messages for defense-in-depth. 

310 """ 

311 if not data: 

312 raise DecryptionError() 

313 padding_len = data[-1] 

314 if padding_len == 0 or padding_len > 16: 

315 raise DecryptionError() 

316 # Verify all padding bytes are correct 

317 for i in range(1, padding_len + 1): 

318 if data[-i] != padding_len: 

319 raise DecryptionError() 

320 return data[:-padding_len] 

321 

322 def _parse_inner_header(self, data: bytes) -> tuple[InnerHeader, int]: 

323 """Parse KDBX4 inner header. 

324 

325 Returns inner header and offset where XML starts. 

326 """ 

327 ctx = ParseContext(data) 

328 random_stream_id = 0 

329 random_stream_key = b"" 

330 binaries: dict[int, tuple[bool, bytes]] = {} 

331 binary_index = 0 

332 

333 with ctx.scope("inner_header"): 

334 while not ctx.exhausted: 

335 field_type = ctx.read_u8("type") 

336 field_len = ctx.read_u32("length") 

337 field_data = ctx.read(field_len, "data") 

338 

339 if field_type == InnerHeaderFieldType.END: 

340 break 

341 elif field_type == InnerHeaderFieldType.INNER_RANDOM_STREAM_ID: 

342 random_stream_id = struct.unpack("<I", field_data)[0] 

343 elif field_type == InnerHeaderFieldType.INNER_RANDOM_STREAM_KEY: 

344 random_stream_key = field_data 

345 elif field_type == InnerHeaderFieldType.BINARY: 

346 # First byte is protection flag 

347 binary_data = field_data[1:] 

348 if len(binary_data) > MAX_BINARY_SIZE: 

349 raise CorruptedDataError( 

350 f"Binary attachment too large: {len(binary_data)} bytes " 

351 f"(max {MAX_BINARY_SIZE} bytes)" 

352 ) 

353 protected = field_data[0] != 0 

354 binaries[binary_index] = (protected, binary_data) 

355 binary_index += 1 

356 

357 return ( 

358 InnerHeader( 

359 random_stream_id=random_stream_id, 

360 random_stream_key=random_stream_key, 

361 binaries=binaries, 

362 ), 

363 ctx.offset, 

364 ) 

365 

366 

367class Kdbx4Writer: 

368 """Writer for KDBX4 database files.""" 

369 

370 # Default block size for HMAC block stream (1 MiB) 

371 BLOCK_SIZE = 1024 * 1024 

372 

373 def encrypt( 

374 self, 

375 header: KdbxHeader, 

376 inner_header: InnerHeader, 

377 xml_data: bytes, 

378 password: str | None = None, 

379 keyfile_data: bytes | None = None, 

380 transformed_key: bytes | None = None, 

381 yubikey_response: bytes | None = None, 

382 ) -> bytes: 

383 """Encrypt database to KDBX4 format. 

384 

385 Args: 

386 header: Outer header configuration 

387 inner_header: Inner header with stream cipher and binaries 

388 xml_data: XML database content 

389 password: Optional password 

390 keyfile_data: Optional keyfile contents 

391 transformed_key: Optional precomputed transformed key (skips KDF) 

392 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response 

393 

394 Returns: 

395 Complete KDBX4 file as bytes 

396 """ 

397 if header.version != KdbxVersion.KDBX4: 

398 raise UnsupportedVersionError(header.version.value, 0) 

399 

400 # Get transformed key - either use provided one or derive via KDF 

401 if transformed_key is not None: 

402 # Use precomputed transformed key (skips expensive KDF) 

403 master_key_bytes = transformed_key 

404 else: 

405 # Derive composite key from credentials 

406 # KeePassXC: YubiKey response is incorporated into composite key 

407 composite_key = derive_composite_key( 

408 password=password, 

409 keyfile_data=keyfile_data, 

410 yubikey_response=yubikey_response, 

411 ) 

412 # Derive master key using KDF (slow) 

413 master_key = self._derive_master_key(header, composite_key) 

414 master_key_bytes = master_key.data 

415 

416 # Derive keys for HMAC and encryption 

417 hmac_key, cipher_key = self._derive_keys(master_key_bytes, header.master_seed) 

418 

419 # Build inner header 

420 inner_header_bytes = self._build_inner_header(inner_header) 

421 

422 # Combine inner header and XML 

423 payload = inner_header_bytes + xml_data 

424 

425 # Compress if needed 

426 if header.compression == CompressionType.GZIP: 

427 payload = gzip.compress(payload, compresslevel=6) 

428 

429 # Add PKCS7 padding for AES-CBC 

430 if header.cipher.iv_size == 16: # AES-CBC 

431 payload = self._add_pkcs7_padding(payload) 

432 

433 # Encrypt payload 

434 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv) 

435 encrypted_payload = ctx.encrypt(payload) 

436 

437 # Build HMAC block stream 

438 hmac_blocks = self._build_hmac_block_stream(encrypted_payload, hmac_key) 

439 

440 # Build outer header 

441 header_bytes = header.to_bytes() 

442 

443 # Compute header hash and HMAC 

444 header_hash = hashlib.sha256(header_bytes).digest() 

445 block_key = self._compute_block_hmac_key(hmac_key, 0xFFFFFFFFFFFFFFFF) 

446 header_hmac = compute_hmac_sha256(block_key, header_bytes) 

447 

448 # Assemble final file 

449 return header_bytes + header_hash + header_hmac + hmac_blocks 

450 

451 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes: 

452 """Derive master key using the KDF specified in header.""" 

453 if header.kdf_type in (KdfType.ARGON2ID, KdfType.ARGON2D): 

454 if ( 

455 header.argon2_memory_kib is None 

456 or header.argon2_iterations is None 

457 or header.argon2_parallelism is None 

458 ): 

459 raise KdfError("Missing Argon2 parameters in header") 

460 

461 config = Argon2Config( 

462 memory_kib=header.argon2_memory_kib, 

463 iterations=header.argon2_iterations, 

464 parallelism=header.argon2_parallelism, 

465 salt=header.kdf_salt, 

466 variant=header.kdf_type, 

467 ) 

468 return derive_key_argon2(composite_key.data, config) 

469 elif header.kdf_type == KdfType.AES_KDF: 

470 if header.aes_kdf_rounds is None: 

471 raise KdfError("Missing AES-KDF rounds in header") 

472 

473 aes_config = AesKdfConfig( 

474 rounds=header.aes_kdf_rounds, 

475 salt=header.kdf_salt, 

476 ) 

477 return derive_key_aes_kdf(composite_key.data, aes_config) 

478 else: 

479 raise KdfError(f"Unsupported KDF for writing: {header.kdf_type}") 

480 

481 def _derive_keys(self, transformed_key: bytes, master_seed: bytes) -> tuple[bytes, bytes]: 

482 """Derive HMAC key and cipher key from transformed key.""" 

483 cipher_key = hashlib.sha256(master_seed + transformed_key).digest() 

484 hmac_key = hashlib.sha512(master_seed + transformed_key + b"\x01").digest() 

485 return hmac_key, cipher_key 

486 

487 def _compute_block_hmac_key(self, hmac_key: bytes, block_index: int) -> bytes: 

488 """Compute HMAC key for a specific block.""" 

489 index_bytes = struct.pack("<Q", block_index) 

490 return hashlib.sha512(index_bytes + hmac_key).digest() 

491 

492 def _build_inner_header(self, inner: InnerHeader) -> bytes: 

493 """Build inner header bytes.""" 

494 ctx = BuildContext() 

495 

496 # Random stream ID 

497 ctx.write_tlv( 

498 InnerHeaderFieldType.INNER_RANDOM_STREAM_ID, 

499 struct.pack("<I", inner.random_stream_id), 

500 ) 

501 

502 # Random stream key 

503 ctx.write_tlv( 

504 InnerHeaderFieldType.INNER_RANDOM_STREAM_KEY, 

505 inner.random_stream_key, 

506 ) 

507 

508 # Binary attachments 

509 for _idx, (protected, data) in sorted(inner.binaries.items()): 

510 binary_data = bytes([1 if protected else 0]) + data 

511 ctx.write_tlv(InnerHeaderFieldType.BINARY, binary_data) 

512 

513 # End marker 

514 ctx.write_tlv(InnerHeaderFieldType.END, b"") 

515 

516 return ctx.build() 

517 

518 def _add_pkcs7_padding(self, data: bytes) -> bytes: 

519 """Add PKCS7 padding to make data a multiple of 16 bytes.""" 

520 padding_len = 16 - (len(data) % 16) 

521 padding = bytes([padding_len] * padding_len) 

522 return data + padding 

523 

524 def _build_hmac_block_stream(self, data: bytes, hmac_key: bytes) -> bytes: 

525 """Build HMAC block stream from data.""" 

526 ctx = BuildContext() 

527 block_index = 0 

528 offset = 0 

529 

530 while offset < len(data): 

531 block_data = data[offset : offset + self.BLOCK_SIZE] 

532 block_len = len(block_data) 

533 offset += block_len 

534 

535 # Compute block HMAC 

536 block_key = self._compute_block_hmac_key(hmac_key, block_index) 

537 hmac_data = struct.pack("<Q", block_index) + struct.pack("<I", block_len) + block_data 

538 block_hmac = compute_hmac_sha256(block_key, hmac_data) 

539 

540 ctx.write(block_hmac) 

541 ctx.write_u32(block_len) 

542 ctx.write(block_data) 

543 

544 block_index += 1 

545 

546 # Final empty block 

547 block_key = self._compute_block_hmac_key(hmac_key, block_index) 

548 final_hmac = compute_hmac_sha256( 

549 block_key, 

550 struct.pack("<Q", block_index) + struct.pack("<I", 0), 

551 ) 

552 ctx.write(final_hmac) 

553 ctx.write_u32(0) 

554 

555 return ctx.build() 

556 

557 

558def read_kdbx4( 

559 data: bytes, 

560 password: str | None = None, 

561 keyfile_data: bytes | None = None, 

562 transformed_key: bytes | None = None, 

563 yubikey_response: bytes | None = None, 

564) -> DecryptedPayload: 

565 """Convenience function to read a KDBX4 file. 

566 

567 Args: 

568 data: Complete file contents 

569 password: Optional password 

570 keyfile_data: Optional keyfile contents 

571 transformed_key: Optional precomputed transformed key (skips KDF) 

572 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response 

573 

574 Returns: 

575 DecryptedPayload with header, inner header, XML, and transformed_key 

576 """ 

577 reader = Kdbx4Reader(data) 

578 return reader.decrypt( 

579 password=password, 

580 keyfile_data=keyfile_data, 

581 transformed_key=transformed_key, 

582 yubikey_response=yubikey_response, 

583 ) 

584 

585 

586def write_kdbx4( 

587 header: KdbxHeader, 

588 inner_header: InnerHeader, 

589 xml_data: bytes, 

590 password: str | None = None, 

591 keyfile_data: bytes | None = None, 

592 transformed_key: bytes | None = None, 

593 yubikey_response: bytes | None = None, 

594) -> bytes: 

595 """Convenience function to write a KDBX4 file. 

596 

597 Args: 

598 header: Outer header configuration 

599 inner_header: Inner header with stream cipher and binaries 

600 xml_data: XML database content 

601 password: Optional password 

602 keyfile_data: Optional keyfile contents 

603 transformed_key: Optional precomputed transformed key (skips KDF) 

604 yubikey_response: Optional 20-byte YubiKey HMAC-SHA1 response 

605 

606 Returns: 

607 Complete KDBX4 file as bytes 

608 """ 

609 writer = Kdbx4Writer() 

610 return writer.encrypt( 

611 header=header, 

612 inner_header=inner_header, 

613 xml_data=xml_data, 

614 password=password, 

615 keyfile_data=keyfile_data, 

616 transformed_key=transformed_key, 

617 yubikey_response=yubikey_response, 

618 )