Coverage for src / kdbxtool / database.py: 93%

956 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-19 21:22 +0000

1"""High-level Database API for KDBX files. 

2 

3This module provides the main interface for working with KeePass databases: 

4- Opening and decrypting KDBX files 

5- Creating new databases 

6- Searching for entries and groups 

7- Saving databases 

8""" 

9 

10from __future__ import annotations 

11 

12import base64 

13import binascii 

14import contextlib 

15import getpass 

16import hashlib 

17import os 

18import uuid as uuid_module 

19from collections.abc import Iterator 

20from dataclasses import dataclass, field 

21from datetime import UTC, datetime, timedelta 

22from pathlib import Path 

23from types import TracebackType 

24from typing import TYPE_CHECKING, Protocol, cast 

25from xml.etree.ElementTree import Element, SubElement, tostring 

26 

27if TYPE_CHECKING: 

28 from .merge import DeletedObject, MergeMode, MergeResult 

29 

30from Cryptodome.Cipher import ChaCha20, Salsa20 

31from defusedxml import ElementTree as DefusedET 

32 

33from .exceptions import ( 

34 AuthenticationError, 

35 DatabaseError, 

36 InvalidXmlError, 

37 Kdbx3UpgradeRequired, 

38 MissingCredentialsError, 

39 UnknownCipherError, 

40) 

41from .models import Attachment, Entry, Group, HistoryEntry, Times 

42from .models.entry import AutoType, BinaryRef, StringField 

43from .parsing import CompressionType, KdbxHeader, KdbxVersion 

44from .parsing.kdbx3 import read_kdbx3 

45from .parsing.kdbx4 import InnerHeader, read_kdbx4, write_kdbx4 

46from .security import AesKdfConfig, Argon2Config, Cipher, KdfType 

47from .security import yubikey as yubikey_module 

48from .security.yubikey import YubiKeyConfig, compute_challenge_response 

49 

50# Union type for KDF configurations 

51KdfConfig = Argon2Config | AesKdfConfig 

52 

53 

54class _StreamCipher(Protocol): 

55 """Protocol for stream ciphers used for protected value encryption.""" 

56 

57 def encrypt(self, plaintext: bytes) -> bytes: ... 

58 def decrypt(self, ciphertext: bytes) -> bytes: ... 

59 

60 

61# KDBX4 time format (ISO 8601, compatible with KeePassXC) 

62KDBX4_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" 

63 

64# Protected stream cipher IDs 

65PROTECTED_STREAM_SALSA20 = 2 

66PROTECTED_STREAM_CHACHA20 = 3 

67 

68 

69class ProtectedStreamCipher: 

70 """Stream cipher for encrypting/decrypting protected values in XML. 

71 

72 KDBX uses a stream cipher (ChaCha20 or Salsa20) to protect sensitive 

73 values like passwords in the XML payload. Each protected value is 

74 XOR'd with the cipher output in document order. 

75 """ 

76 

77 def __init__(self, stream_id: int, stream_key: bytes) -> None: 

78 """Initialize the stream cipher. 

79 

80 Args: 

81 stream_id: Cipher type (2=Salsa20, 3=ChaCha20) 

82 stream_key: Key material from inner header (typically 64 bytes) 

83 """ 

84 self._stream_id = stream_id 

85 self._stream_key = stream_key 

86 self._cipher = self._create_cipher() 

87 

88 def _create_cipher(self) -> _StreamCipher: 

89 """Create the appropriate cipher based on stream_id.""" 

90 if self._stream_id == PROTECTED_STREAM_CHACHA20: 

91 # ChaCha20: SHA-512 of key, first 32 bytes = key, bytes 32-44 = nonce 

92 key_hash = hashlib.sha512(self._stream_key).digest() 

93 key = key_hash[:32] 

94 nonce = key_hash[32:44] 

95 return ChaCha20.new(key=key, nonce=nonce) 

96 elif self._stream_id == PROTECTED_STREAM_SALSA20: 

97 # Salsa20: SHA-256 of key, fixed nonce 

98 key = hashlib.sha256(self._stream_key).digest() 

99 nonce = b"\xe8\x30\x09\x4b\x97\x20\x5d\x2a" 

100 return Salsa20.new(key=key, nonce=nonce) 

101 else: 

102 raise UnknownCipherError(self._stream_id.to_bytes(4, "little")) 

103 

104 def decrypt(self, ciphertext: bytes) -> bytes: 

105 """Decrypt protected value (XOR with stream).""" 

106 return self._cipher.decrypt(ciphertext) 

107 

108 def encrypt(self, plaintext: bytes) -> bytes: 

109 """Encrypt protected value (XOR with stream).""" 

110 return self._cipher.encrypt(plaintext) 

111 

112 

113@dataclass 

114class CustomIcon: 

115 """A custom icon in a KDBX database. 

116 

117 Custom icons are PNG images that can be assigned to entries and groups 

118 for visual customization beyond the standard icon set. 

119 

120 Attributes: 

121 uuid: Unique identifier for the icon 

122 data: PNG image data 

123 name: Optional display name for the icon 

124 last_modification_time: When the icon was last modified 

125 """ 

126 

127 uuid: uuid_module.UUID 

128 data: bytes 

129 name: str | None = None 

130 last_modification_time: datetime | None = None 

131 

132 

133@dataclass 

134class DatabaseSettings: 

135 """Settings for a KDBX database. 

136 

137 Attributes: 

138 generator: Generator application name 

139 database_name: Name of the database 

140 database_description: Description of the database 

141 default_username: Default username for new entries 

142 maintenance_history_days: Days to keep deleted items 

143 color: Database color (hex) 

144 master_key_change_rec: Days until master key change recommended 

145 master_key_change_force: Days until master key change forced 

146 memory_protection: Which fields to protect in memory 

147 recycle_bin_enabled: Whether recycle bin is enabled 

148 recycle_bin_uuid: UUID of recycle bin group 

149 history_max_items: Max history entries per entry 

150 history_max_size: Max history size in bytes 

151 custom_icons: Dictionary of custom icons (UUID -> CustomIcon) 

152 """ 

153 

154 generator: str = "kdbxtool" 

155 database_name: str = "Database" 

156 database_description: str = "" 

157 default_username: str = "" 

158 maintenance_history_days: int = 365 

159 color: str | None = None 

160 master_key_change_rec: int = -1 

161 master_key_change_force: int = -1 

162 memory_protection: dict[str, bool] = field( 

163 default_factory=lambda: { 

164 "Title": False, 

165 "UserName": False, 

166 "Password": True, 

167 "URL": False, 

168 "Notes": False, 

169 } 

170 ) 

171 recycle_bin_enabled: bool = True 

172 recycle_bin_uuid: uuid_module.UUID | None = None 

173 history_max_items: int = 10 

174 history_max_size: int = 6 * 1024 * 1024 # 6 MiB 

175 custom_icons: dict[uuid_module.UUID, CustomIcon] = field(default_factory=dict) 

176 deleted_objects: list[DeletedObject] = field(default_factory=list) 

177 

178 

179class Database: 

180 """High-level interface for KDBX databases. 

181 

182 This class provides the main API for working with KeePass databases. 

183 It handles encryption/decryption, XML parsing, and model management. 

184 

185 Example usage: 

186 # Open existing database 

187 db = Database.open("passwords.kdbx", password="secret") 

188 

189 # Find entries 

190 entries = db.find_entries(title="GitHub") 

191 

192 # Create entry 

193 entry = db.root_group.create_entry( 

194 title="New Site", 

195 username="user", 

196 password="pass123", 

197 ) 

198 

199 # Save changes 

200 db.save() 

201 """ 

202 

203 def __init__( 

204 self, 

205 root_group: Group, 

206 settings: DatabaseSettings | None = None, 

207 header: KdbxHeader | None = None, 

208 inner_header: InnerHeader | None = None, 

209 binaries: dict[int, bytes] | None = None, 

210 ) -> None: 

211 """Initialize database. 

212 

213 Usually you should use Database.open() or Database.create() instead. 

214 

215 Args: 

216 root_group: Root group containing all entries/groups 

217 settings: Database settings 

218 header: KDBX header (for existing databases) 

219 inner_header: Inner header (for existing databases) 

220 binaries: Binary attachments 

221 """ 

222 self._root_group = root_group 

223 self._settings = settings or DatabaseSettings() 

224 self._header = header 

225 self._inner_header = inner_header 

226 self._binaries = binaries or {} 

227 self._password: str | None = None 

228 self._keyfile_data: bytes | None = None 

229 self._transformed_key: bytes | None = None 

230 self._filepath: Path | None = None 

231 self._yubikey_slot: int | None = None 

232 self._yubikey_serial: int | None = None 

233 # Set database reference on all entries and groups 

234 self._set_database_references(root_group) 

235 

236 def _set_database_references(self, group: Group) -> None: 

237 """Recursively set _database reference on a group and all its contents.""" 

238 group._database = self 

239 for entry in group.entries: 

240 entry._database = self 

241 for subgroup in group.subgroups: 

242 self._set_database_references(subgroup) 

243 

244 def __enter__(self) -> Database: 

245 """Enter context manager.""" 

246 return self 

247 

248 def __exit__( 

249 self, 

250 exc_type: type[BaseException] | None, 

251 exc_val: BaseException | None, 

252 exc_tb: TracebackType | None, 

253 ) -> None: 

254 """Exit context manager, zeroizing credentials.""" 

255 self.zeroize_credentials() 

256 

257 def zeroize_credentials(self) -> None: 

258 """Explicitly zeroize stored credentials from memory. 

259 

260 Call this when done with the database to minimize the time 

261 credentials remain in memory. Note that Python's string 

262 interning may retain copies; for maximum security, use 

263 SecureBytes for credential input. 

264 """ 

265 # Clear password (Python GC will eventually free memory) 

266 self._password = None 

267 # Clear keyfile data (convert to bytearray and zeroize if possible) 

268 if self._keyfile_data is not None: 

269 try: 

270 # Attempt to overwrite the memory 

271 data = bytearray(self._keyfile_data) 

272 for i in range(len(data)): 

273 data[i] = 0 

274 except TypeError: 

275 pass # bytes is immutable, just dereference 

276 self._keyfile_data = None 

277 # Clear transformed key 

278 if self._transformed_key is not None: 

279 try: 

280 data = bytearray(self._transformed_key) 

281 for i in range(len(data)): 

282 data[i] = 0 

283 except TypeError: 

284 pass 

285 self._transformed_key = None 

286 

287 def dump(self) -> str: 

288 """Return a human-readable summary of the database for debugging. 

289 

290 Returns: 

291 Multi-line string with database metadata and statistics. 

292 """ 

293 lines = [f'Database: "{self._settings.database_name or "(unnamed)"}"'] 

294 if self._header is not None: 

295 lines.append(f" Format: KDBX{self._header.version.value}") 

296 lines.append(f" Cipher: {self._header.cipher.name}") 

297 lines.append(f" KDF: {self._header.kdf_type.name}") 

298 

299 # Count entries and groups 

300 entry_count = sum(1 for _ in self._root_group.iter_entries(recursive=True)) 

301 group_count = sum(1 for _ in self._root_group.iter_groups(recursive=True)) 

302 lines.append(f" Total entries: {entry_count}") 

303 lines.append(f" Total groups: {group_count}") 

304 

305 # Custom icons 

306 if self.custom_icons: 

307 lines.append(f" Custom icons: {len(self.custom_icons)}") 

308 

309 # Recycle bin 

310 if self._settings.recycle_bin_enabled: 

311 lines.append(" Recycle bin: enabled") 

312 

313 return "\n".join(lines) 

314 

315 def merge( 

316 self, 

317 source: Database, 

318 *, 

319 mode: MergeMode | None = None, 

320 ) -> MergeResult: 

321 """Merge another database into this one. 

322 

323 Combines entries, groups, history, attachments, and custom icons 

324 from the source database into this database using UUID-based 

325 matching and timestamp-based conflict resolution. 

326 

327 Args: 

328 source: Database to merge from (read-only) 

329 mode: Merge mode (STANDARD or SYNCHRONIZE). Defaults to STANDARD. 

330 - STANDARD: Add and update only, never deletes 

331 - SYNCHRONIZE: Full sync including deletions 

332 

333 Returns: 

334 MergeResult with counts and statistics about the merge 

335 

336 Raises: 

337 MergeError: If merge cannot be completed 

338 

339 Example: 

340 >>> target_db = Database.open("main.kdbx", password="secret") 

341 >>> source_db = Database.open("branch.kdbx", password="secret") 

342 >>> result = target_db.merge(source_db) 

343 >>> print(f"Added {result.entries_added} entries") 

344 >>> target_db.save() 

345 """ 

346 from .merge import MergeMode, Merger 

347 

348 if mode is None: 

349 mode = MergeMode.STANDARD 

350 

351 merger = Merger(self, source, mode=mode) 

352 return merger.merge() 

353 

354 @property 

355 def transformed_key(self) -> bytes | None: 

356 """Get the transformed key for caching. 

357 

358 The transformed key is the result of applying the KDF (Argon2) to 

359 the credentials. Caching this allows fast repeated database opens 

360 without re-running the expensive KDF. 

361 

362 Security note: The transformed key is as sensitive as the password. 

363 Anyone with this key can decrypt the database. Store securely and 

364 zeroize when done. 

365 

366 Returns: 

367 The transformed key, or None if not available (e.g., newly created DB) 

368 """ 

369 return self._transformed_key 

370 

371 @property 

372 def kdf_salt(self) -> bytes | None: 

373 """Get the KDF salt used for key derivation. 

374 

375 The salt is used together with credentials to derive the transformed key. 

376 If the salt changes (e.g., after save with regenerate_seeds=True), 

377 any cached transformed_key becomes invalid. 

378 

379 Returns: 

380 The KDF salt, or None if no header is set 

381 """ 

382 if self._header is None: 

383 return None 

384 return self._header.kdf_salt 

385 

386 @property 

387 def root_group(self) -> Group: 

388 """Get the root group of the database.""" 

389 return self._root_group 

390 

391 @property 

392 def settings(self) -> DatabaseSettings: 

393 """Get database settings.""" 

394 return self._settings 

395 

396 @property 

397 def filepath(self) -> Path | None: 

398 """Get the file path (if opened from file).""" 

399 return self._filepath 

400 

401 # --- Opening databases --- 

402 

403 @classmethod 

404 def open( 

405 cls, 

406 filepath: str | Path, 

407 password: str | None = None, 

408 keyfile: str | Path | None = None, 

409 yubikey_slot: int | None = None, 

410 yubikey_serial: int | None = None, 

411 ) -> Database: 

412 """Open an existing KDBX database. 

413 

414 Args: 

415 filepath: Path to the .kdbx file 

416 password: Database password 

417 keyfile: Path to keyfile (optional) 

418 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional). 

419 If provided, the database's KDF salt is used as challenge and 

420 the 20-byte HMAC-SHA1 response is incorporated into key derivation. 

421 Requires yubikey-manager package: pip install kdbxtool[yubikey] 

422 yubikey_serial: Serial number of specific YubiKey to use when multiple 

423 devices are connected. Use list_yubikeys() to discover serials. 

424 

425 Returns: 

426 Database instance 

427 

428 Raises: 

429 FileNotFoundError: If file doesn't exist 

430 ValueError: If credentials are wrong or file is corrupted 

431 YubiKeyError: If YubiKey operation fails 

432 """ 

433 filepath = Path(filepath) 

434 if not filepath.exists(): 

435 raise FileNotFoundError(f"Database file not found: {filepath}") 

436 

437 data = filepath.read_bytes() 

438 

439 keyfile_data = None 

440 if keyfile: 

441 keyfile_path = Path(keyfile) 

442 if not keyfile_path.exists(): 

443 raise FileNotFoundError(f"Keyfile not found: {keyfile}") 

444 keyfile_data = keyfile_path.read_bytes() 

445 

446 return cls.open_bytes( 

447 data, 

448 password=password, 

449 keyfile_data=keyfile_data, 

450 filepath=filepath, 

451 yubikey_slot=yubikey_slot, 

452 yubikey_serial=yubikey_serial, 

453 ) 

454 

455 @classmethod 

456 def open_interactive( 

457 cls, 

458 filepath: str | Path, 

459 keyfile: str | Path | None = None, 

460 yubikey_slot: int | None = None, 

461 yubikey_serial: int | None = None, 

462 prompt: str = "Password: ", 

463 max_attempts: int = 3, 

464 ) -> Database: 

465 """Open a KDBX database with interactive password prompt. 

466 

467 Prompts the user for a password using secure input (no echo). If the 

468 password is incorrect, allows retrying up to max_attempts times. 

469 

470 This is a convenience method for CLI applications that need to securely 

471 prompt for database credentials. 

472 

473 Args: 

474 filepath: Path to the .kdbx file 

475 keyfile: Path to keyfile (optional) 

476 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional) 

477 yubikey_serial: Serial number of specific YubiKey to use 

478 prompt: Custom prompt string (default: "Password: ") 

479 max_attempts: Maximum password attempts before raising (default: 3) 

480 

481 Returns: 

482 Database instance 

483 

484 Raises: 

485 FileNotFoundError: If file or keyfile doesn't exist 

486 AuthenticationError: If max_attempts exceeded with wrong password 

487 YubiKeyError: If YubiKey operation fails 

488 

489 Example: 

490 >>> db = Database.open_interactive("vault.kdbx") 

491 Password: 

492 >>> db = Database.open_interactive("vault.kdbx", keyfile="vault.key") 

493 Password: 

494 """ 

495 import sys 

496 

497 for attempt in range(max_attempts): 

498 password = getpass.getpass(prompt) 

499 try: 

500 return cls.open( 

501 filepath, 

502 password=password, 

503 keyfile=keyfile, 

504 yubikey_slot=yubikey_slot, 

505 yubikey_serial=yubikey_serial, 

506 ) 

507 except AuthenticationError: 

508 if attempt < max_attempts - 1: 

509 print("Invalid password, try again.", file=sys.stderr) 

510 else: 

511 raise AuthenticationError( 

512 f"Authentication failed after {max_attempts} attempts" 

513 ) from None 

514 # This should never be reached due to the raise in the loop 

515 raise AuthenticationError(f"Authentication failed after {max_attempts} attempts") 

516 

517 @classmethod 

518 def open_bytes( 

519 cls, 

520 data: bytes, 

521 password: str | None = None, 

522 keyfile_data: bytes | None = None, 

523 filepath: Path | None = None, 

524 transformed_key: bytes | None = None, 

525 yubikey_slot: int | None = None, 

526 yubikey_serial: int | None = None, 

527 ) -> Database: 

528 """Open a KDBX database from bytes. 

529 

530 Supports both KDBX3 and KDBX4 formats. KDBX3 databases are opened 

531 read-only and will be automatically upgraded to KDBX4 on save. 

532 

533 Args: 

534 data: KDBX file contents 

535 password: Database password 

536 keyfile_data: Keyfile contents (optional) 

537 filepath: Original file path (for save) 

538 transformed_key: Precomputed transformed key (skips KDF, faster opens) 

539 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional). 

540 If provided, the database's KDF salt is used as challenge and 

541 the 20-byte HMAC-SHA1 response is incorporated into key derivation. 

542 Requires yubikey-manager package: pip install kdbxtool[yubikey] 

543 yubikey_serial: Serial number of specific YubiKey to use when multiple 

544 devices are connected. Use list_yubikeys() to discover serials. 

545 

546 Returns: 

547 Database instance 

548 

549 Raises: 

550 YubiKeyError: If YubiKey operation fails 

551 """ 

552 # Detect version from header (parse just enough to get version) 

553 header, _ = KdbxHeader.parse(data) 

554 

555 # Get YubiKey response if slot specified 

556 # KeePassXC uses the KDF salt as the challenge, not master_seed 

557 yubikey_response: bytes | None = None 

558 if yubikey_slot is not None: 

559 if not yubikey_module.YUBIKEY_AVAILABLE: 

560 from .exceptions import YubiKeyNotAvailableError 

561 

562 raise YubiKeyNotAvailableError() 

563 config = YubiKeyConfig(slot=yubikey_slot, serial=yubikey_serial) 

564 response = compute_challenge_response(header.kdf_salt, config) 

565 yubikey_response = response.data 

566 

567 # Decrypt the file using appropriate reader 

568 is_kdbx3 = header.version == KdbxVersion.KDBX3 

569 if is_kdbx3: 

570 import warnings 

571 

572 warnings.warn( 

573 "Opening KDBX3 database. Saving will automatically upgrade to KDBX4 " 

574 "with modern security settings (Argon2d, ChaCha20). " 

575 "Use save(allow_upgrade=True) to confirm.", 

576 UserWarning, 

577 stacklevel=3, 

578 ) 

579 # KDBX3 doesn't support YubiKey CR in the same way 

580 # (KeeChallenge used a sidecar XML file, not integrated) 

581 if yubikey_slot is not None: 

582 raise DatabaseError( 

583 "YubiKey challenge-response is not supported for KDBX3 databases. " 

584 "Upgrade to KDBX4 first." 

585 ) 

586 payload = read_kdbx3( 

587 data, 

588 password=password, 

589 keyfile_data=keyfile_data, 

590 transformed_key=transformed_key, 

591 ) 

592 else: 

593 payload = read_kdbx4( 

594 data, 

595 password=password, 

596 keyfile_data=keyfile_data, 

597 transformed_key=transformed_key, 

598 yubikey_response=yubikey_response, 

599 ) 

600 

601 # Parse XML into models (with protected value decryption) 

602 root_group, settings, binaries = cls._parse_xml(payload.xml_data, payload.inner_header) 

603 

604 db = cls( 

605 root_group=root_group, 

606 settings=settings, 

607 header=payload.header, 

608 inner_header=payload.inner_header, 

609 binaries=binaries, 

610 ) 

611 db._password = password 

612 db._keyfile_data = keyfile_data 

613 db._transformed_key = payload.transformed_key 

614 db._filepath = filepath 

615 db._opened_as_kdbx3 = is_kdbx3 

616 db._yubikey_slot = yubikey_slot 

617 db._yubikey_serial = yubikey_serial 

618 

619 return db 

620 

621 # --- Creating databases --- 

622 

623 @classmethod 

624 def create( 

625 cls, 

626 filepath: str | Path | None = None, 

627 password: str | None = None, 

628 keyfile: str | Path | None = None, 

629 database_name: str = "Database", 

630 cipher: Cipher = Cipher.AES256_CBC, 

631 kdf_config: KdfConfig | None = None, 

632 ) -> Database: 

633 """Create a new KDBX database. 

634 

635 Args: 

636 filepath: Path to save the database (optional) 

637 password: Database password 

638 keyfile: Path to keyfile (optional) 

639 database_name: Name for the database 

640 cipher: Encryption cipher to use 

641 kdf_config: KDF configuration (Argon2Config or AesKdfConfig). 

642 Defaults to Argon2Config.standard() with Argon2d variant. 

643 

644 Returns: 

645 New Database instance 

646 """ 

647 if password is None and keyfile is None: 

648 raise MissingCredentialsError() 

649 

650 keyfile_data = None 

651 if keyfile: 

652 keyfile_path = Path(keyfile) 

653 if not keyfile_path.exists(): 

654 raise FileNotFoundError(f"Keyfile not found: {keyfile}") 

655 keyfile_data = keyfile_path.read_bytes() 

656 

657 # Use provided config or standard Argon2d defaults 

658 if kdf_config is None: 

659 kdf_config = Argon2Config.standard() 

660 

661 # Create root group 

662 root_group = Group.create_root(database_name) 

663 

664 # Create recycle bin group 

665 recycle_bin = Group(name="Recycle Bin", icon_id="43") 

666 root_group.add_subgroup(recycle_bin) 

667 

668 # Create header based on KDF config type 

669 if isinstance(kdf_config, Argon2Config): 

670 header = KdbxHeader( 

671 version=KdbxVersion.KDBX4, 

672 cipher=cipher, 

673 compression=CompressionType.GZIP, 

674 master_seed=os.urandom(32), 

675 encryption_iv=os.urandom(cipher.iv_size), 

676 kdf_type=kdf_config.variant, 

677 kdf_salt=kdf_config.salt, 

678 argon2_memory_kib=kdf_config.memory_kib, 

679 argon2_iterations=kdf_config.iterations, 

680 argon2_parallelism=kdf_config.parallelism, 

681 ) 

682 elif isinstance(kdf_config, AesKdfConfig): 

683 header = KdbxHeader( 

684 version=KdbxVersion.KDBX4, 

685 cipher=cipher, 

686 compression=CompressionType.GZIP, 

687 master_seed=os.urandom(32), 

688 encryption_iv=os.urandom(cipher.iv_size), 

689 kdf_type=KdfType.AES_KDF, 

690 kdf_salt=kdf_config.salt, 

691 aes_kdf_rounds=kdf_config.rounds, 

692 ) 

693 else: 

694 raise DatabaseError(f"Unsupported KDF config type: {type(kdf_config)}") 

695 

696 # Create inner header 

697 inner_header = InnerHeader( 

698 random_stream_id=3, # ChaCha20 

699 random_stream_key=os.urandom(64), 

700 binaries={}, 

701 ) 

702 

703 settings = DatabaseSettings( 

704 database_name=database_name, 

705 recycle_bin_enabled=True, 

706 recycle_bin_uuid=recycle_bin.uuid, 

707 ) 

708 

709 db = cls( 

710 root_group=root_group, 

711 settings=settings, 

712 header=header, 

713 inner_header=inner_header, 

714 ) 

715 db._password = password 

716 db._keyfile_data = keyfile_data 

717 if filepath: 

718 db._filepath = Path(filepath) 

719 

720 return db 

721 

722 # --- Saving databases --- 

723 

724 def _upgrade_to_kdbx4( 

725 self, 

726 kdf_config: KdfConfig | None = None, 

727 cipher: Cipher | None = None, 

728 ) -> None: 

729 """Upgrade KDBX3 database to KDBX4 format. 

730 

731 This converts: 

732 - KDF to specified config (defaults to Argon2d with standard parameters) 

733 - Salsa20 protected stream to ChaCha20 

734 - Generates new cryptographic material (seeds, IVs) 

735 

736 Args: 

737 kdf_config: Optional KDF configuration (Argon2Config or AesKdfConfig). 

738 If not provided, uses Argon2Config.standard() with Argon2d. 

739 cipher: Optional encryption cipher. If not provided, preserves existing. 

740 """ 

741 if self._header is None: 

742 return 

743 

744 # Use provided config or standard Argon2d defaults 

745 if kdf_config is None: 

746 kdf_config = Argon2Config.standard() 

747 

748 # Use provided cipher or preserve existing 

749 target_cipher = cipher if cipher is not None else self._header.cipher 

750 

751 # Build header based on KDF config type 

752 if isinstance(kdf_config, Argon2Config): 

753 self._header = KdbxHeader( 

754 version=KdbxVersion.KDBX4, 

755 cipher=target_cipher, 

756 compression=self._header.compression, 

757 master_seed=os.urandom(32), 

758 encryption_iv=os.urandom(target_cipher.iv_size), 

759 kdf_type=kdf_config.variant, 

760 kdf_salt=kdf_config.salt, 

761 argon2_memory_kib=kdf_config.memory_kib, 

762 argon2_iterations=kdf_config.iterations, 

763 argon2_parallelism=kdf_config.parallelism, 

764 ) 

765 elif isinstance(kdf_config, AesKdfConfig): 

766 self._header = KdbxHeader( 

767 version=KdbxVersion.KDBX4, 

768 cipher=target_cipher, 

769 compression=self._header.compression, 

770 master_seed=os.urandom(32), 

771 encryption_iv=os.urandom(target_cipher.iv_size), 

772 kdf_type=KdfType.AES_KDF, 

773 kdf_salt=kdf_config.salt, 

774 aes_kdf_rounds=kdf_config.rounds, 

775 ) 

776 

777 # Upgrade inner header to use ChaCha20 (more secure than Salsa20) 

778 if self._inner_header is not None: 

779 self._inner_header.random_stream_id = PROTECTED_STREAM_CHACHA20 

780 self._inner_header.random_stream_key = os.urandom(64) 

781 

782 def save( 

783 self, 

784 filepath: str | Path | None = None, 

785 *, 

786 allow_upgrade: bool = False, 

787 regenerate_seeds: bool = True, 

788 kdf_config: KdfConfig | None = None, 

789 cipher: Cipher | None = None, 

790 yubikey_slot: int | None = None, 

791 yubikey_serial: int | None = None, 

792 ) -> None: 

793 """Save the database to a file. 

794 

795 KDBX3 databases are automatically upgraded to KDBX4 on save. When saving 

796 a KDBX3 database to its original file, explicit confirmation is required 

797 via the allow_upgrade parameter. 

798 

799 Args: 

800 filepath: Path to save to (uses original path if not specified) 

801 allow_upgrade: Must be True to confirm KDBX3 to KDBX4 upgrade when 

802 saving to the original file. Not required when saving to a new file. 

803 regenerate_seeds: If True (default), regenerate all cryptographic seeds 

804 (master_seed, encryption_iv, kdf_salt, random_stream_key) on save. 

805 Set to False only for testing or when using pre-computed transformed keys. 

806 kdf_config: Optional KDF configuration for KDBX3 upgrade. Use presets like: 

807 - Argon2Config.standard() / high_security() / fast() 

808 - AesKdfConfig.standard() / high_security() / fast() 

809 Defaults to Argon2Config.standard() with Argon2d variant. 

810 cipher: Optional encryption cipher. Use one of: 

811 - Cipher.AES256_CBC (default, widely compatible) 

812 - Cipher.CHACHA20 (modern, faster in software) 

813 - Cipher.TWOFISH256_CBC (requires oxifish package) 

814 If not specified, preserves existing cipher. 

815 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional). 

816 If provided (or if database was opened with yubikey_slot), the new 

817 KDF salt is used as challenge and the response is incorporated 

818 into key derivation. Requires yubikey-manager package. 

819 yubikey_serial: Serial number of specific YubiKey to use when multiple 

820 devices are connected. Use list_yubikeys() to discover serials. 

821 

822 Raises: 

823 DatabaseError: If no filepath specified and database wasn't opened from file 

824 Kdbx3UpgradeRequired: If saving KDBX3 to original file without allow_upgrade=True 

825 YubiKeyError: If YubiKey operation fails 

826 """ 

827 save_to_new_file = filepath is not None 

828 if filepath: 

829 self._filepath = Path(filepath) 

830 elif self._filepath is None: 

831 raise DatabaseError("No filepath specified and database wasn't opened from file") 

832 

833 # Require explicit confirmation when saving KDBX3 to original file 

834 was_kdbx3 = getattr(self, "_opened_as_kdbx3", False) 

835 if was_kdbx3 and not save_to_new_file and not allow_upgrade: 

836 raise Kdbx3UpgradeRequired() 

837 

838 # Use provided yubikey params, or fall back to stored ones 

839 effective_yubikey_slot = yubikey_slot if yubikey_slot is not None else self._yubikey_slot 

840 effective_yubikey_serial = ( 

841 yubikey_serial if yubikey_serial is not None else self._yubikey_serial 

842 ) 

843 

844 data = self.to_bytes( 

845 regenerate_seeds=regenerate_seeds, 

846 kdf_config=kdf_config, 

847 cipher=cipher, 

848 yubikey_slot=effective_yubikey_slot, 

849 yubikey_serial=effective_yubikey_serial, 

850 ) 

851 self._filepath.write_bytes(data) 

852 

853 # Update stored yubikey params if changed 

854 if yubikey_slot is not None: 

855 self._yubikey_slot = yubikey_slot 

856 if yubikey_serial is not None: 

857 self._yubikey_serial = yubikey_serial 

858 

859 # After KDBX3 upgrade, reload to get proper KDBX4 state (including transformed_key) 

860 if was_kdbx3: 

861 self.reload() 

862 self._opened_as_kdbx3 = False 

863 

864 def reload(self) -> None: 

865 """Reload the database from disk using stored credentials. 

866 

867 Re-reads the database file and replaces all in-memory state with 

868 the current file contents. Useful for discarding unsaved changes 

869 or syncing with external modifications. 

870 

871 Raises: 

872 DatabaseError: If database wasn't opened from a file 

873 MissingCredentialsError: If no credentials are stored 

874 """ 

875 if self._filepath is None: 

876 raise DatabaseError("Cannot reload: database wasn't opened from a file") 

877 

878 if self._password is None and self._keyfile_data is None: 

879 raise MissingCredentialsError() 

880 

881 # Re-read and parse the file 

882 data = self._filepath.read_bytes() 

883 reloaded = self.open_bytes( 

884 data, 

885 password=self._password, 

886 keyfile_data=self._keyfile_data, 

887 filepath=self._filepath, 

888 ) 

889 

890 # Copy all state from reloaded database 

891 self._root_group = reloaded._root_group 

892 self._settings = reloaded._settings 

893 self._header = reloaded._header 

894 self._inner_header = reloaded._inner_header 

895 self._binaries = reloaded._binaries 

896 self._transformed_key = reloaded._transformed_key 

897 self._opened_as_kdbx3 = reloaded._opened_as_kdbx3 

898 

899 def xml(self, *, pretty_print: bool = False) -> bytes: 

900 """Export database XML payload. 

901 

902 Returns the decrypted, decompressed XML payload of the database. 

903 Protected values (passwords, etc.) are shown in plaintext. 

904 Useful for debugging and migration. 

905 

906 Args: 

907 pretty_print: If True, format XML with indentation for readability 

908 

909 Returns: 

910 XML payload as bytes (UTF-8 encoded) 

911 """ 

912 root = Element("KeePassFile") 

913 

914 # Meta section 

915 meta = SubElement(root, "Meta") 

916 self._build_meta(meta) 

917 

918 # Root section 

919 root_elem = SubElement(root, "Root") 

920 self._build_group(root_elem, self._root_group) 

921 

922 # Note: We do NOT encrypt protected values here - the point is to 

923 # export the decrypted XML for debugging/inspection 

924 

925 if pretty_print: 

926 self._indent_xml(root) 

927 

928 return cast(bytes, tostring(root, encoding="utf-8", xml_declaration=True)) 

929 

930 def dump_xml(self, filepath: str | Path, *, pretty_print: bool = True) -> None: 

931 """Write database XML payload to a file. 

932 

933 Writes the decrypted, decompressed XML payload to a file. 

934 Protected values (passwords, etc.) are shown in plaintext. 

935 Useful for debugging and migration. 

936 

937 Args: 

938 filepath: Path to write the XML file 

939 pretty_print: If True (default), format XML with indentation 

940 """ 

941 xml_data = self.xml(pretty_print=pretty_print) 

942 Path(filepath).write_bytes(xml_data) 

943 

944 @staticmethod 

945 def _indent_xml(elem: Element, level: int = 0) -> None: 

946 """Add indentation to XML element tree for pretty printing.""" 

947 indent = "\n" + " " * level 

948 if len(elem): 

949 if not elem.text or not elem.text.strip(): 

950 elem.text = indent + " " 

951 if not elem.tail or not elem.tail.strip(): 

952 elem.tail = indent 

953 for child in elem: 

954 Database._indent_xml(child, level + 1) 

955 if not child.tail or not child.tail.strip(): 

956 child.tail = indent 

957 else: 

958 if level and (not elem.tail or not elem.tail.strip()): 

959 elem.tail = indent 

960 

961 def to_bytes( 

962 self, 

963 *, 

964 regenerate_seeds: bool = True, 

965 kdf_config: KdfConfig | None = None, 

966 cipher: Cipher | None = None, 

967 yubikey_slot: int | None = None, 

968 yubikey_serial: int | None = None, 

969 ) -> bytes: 

970 """Serialize the database to KDBX4 format. 

971 

972 KDBX3 databases are automatically upgraded to KDBX4 on save. 

973 This includes converting to the specified KDF and ChaCha20 protected stream. 

974 

975 Args: 

976 regenerate_seeds: If True (default), regenerate all cryptographic seeds 

977 (master_seed, encryption_iv, kdf_salt, random_stream_key) on save. 

978 This prevents precomputation attacks where an attacker can derive 

979 the encryption key in advance. Set to False only for testing or 

980 when using pre-computed transformed keys. 

981 kdf_config: Optional KDF configuration for KDBX3 upgrade. Use presets like: 

982 - Argon2Config.standard() / high_security() / fast() 

983 - AesKdfConfig.standard() / high_security() / fast() 

984 Defaults to Argon2Config.standard() with Argon2d variant. 

985 cipher: Optional encryption cipher. Use one of: 

986 - Cipher.AES256_CBC (default, widely compatible) 

987 - Cipher.CHACHA20 (modern, faster in software) 

988 - Cipher.TWOFISH256_CBC (requires oxifish package) 

989 If not specified, preserves existing cipher. 

990 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional). 

991 If provided, the (new) KDF salt is used as challenge and the 

992 20-byte HMAC-SHA1 response is incorporated into key derivation. 

993 Requires yubikey-manager package: pip install kdbxtool[yubikey] 

994 yubikey_serial: Serial number of specific YubiKey to use when multiple 

995 devices are connected. Use list_yubikeys() to discover serials. 

996 

997 Returns: 

998 KDBX4 file contents as bytes 

999 

1000 Raises: 

1001 MissingCredentialsError: If no credentials are set 

1002 YubiKeyError: If YubiKey operation fails 

1003 """ 

1004 # Need either credentials, a transformed key, or YubiKey 

1005 has_credentials = self._password is not None or self._keyfile_data is not None 

1006 has_transformed_key = self._transformed_key is not None 

1007 has_yubikey = yubikey_slot is not None 

1008 if not has_credentials and not has_transformed_key and not has_yubikey: 

1009 raise MissingCredentialsError() 

1010 

1011 if self._header is None: 

1012 raise DatabaseError("No header - database not properly initialized") 

1013 

1014 if self._inner_header is None: 

1015 raise DatabaseError("No inner header - database not properly initialized") 

1016 

1017 # Auto-upgrade KDBX3 to KDBX4 

1018 if self._header.version == KdbxVersion.KDBX3: 

1019 self._upgrade_to_kdbx4(kdf_config=kdf_config, cipher=cipher) 

1020 elif cipher is not None and cipher != self._header.cipher: 

1021 # Change cipher for KDBX4 database 

1022 self._header.cipher = cipher 

1023 self._header.encryption_iv = os.urandom(cipher.iv_size) 

1024 # Cipher change invalidates transformed key 

1025 self._transformed_key = None 

1026 

1027 # Regenerate all cryptographic seeds to prevent precomputation attacks. 

1028 # This ensures each save produces a file encrypted with fresh randomness. 

1029 # See: https://github.com/libkeepass/pykeepass/issues/219 

1030 if regenerate_seeds: 

1031 self._header.master_seed = os.urandom(32) 

1032 self._header.encryption_iv = os.urandom(self._header.cipher.iv_size) 

1033 self._header.kdf_salt = os.urandom(32) 

1034 self._inner_header.random_stream_key = os.urandom(64) 

1035 # Cached transformed_key is now invalid (salt changed) 

1036 self._transformed_key = None 

1037 

1038 # Get YubiKey response if slot specified 

1039 # KeePassXC uses the KDF salt as the challenge, not master_seed 

1040 yubikey_response: bytes | None = None 

1041 if yubikey_slot is not None: 

1042 if not yubikey_module.YUBIKEY_AVAILABLE: 

1043 from .exceptions import YubiKeyNotAvailableError 

1044 

1045 raise YubiKeyNotAvailableError() 

1046 config = YubiKeyConfig(slot=yubikey_slot, serial=yubikey_serial) 

1047 response = compute_challenge_response(self._header.kdf_salt, config) 

1048 yubikey_response = response.data 

1049 

1050 # Sync binaries to inner header (preserve protection flags where possible) 

1051 existing_binaries = self._inner_header.binaries 

1052 new_binaries: dict[int, tuple[bool, bytes]] = {} 

1053 for ref, data in self._binaries.items(): 

1054 if ref in existing_binaries: 

1055 # Preserve existing protection flag 

1056 protected, _ = existing_binaries[ref] 

1057 new_binaries[ref] = (protected, data) 

1058 else: 

1059 # New binary, default to protected 

1060 new_binaries[ref] = (True, data) 

1061 self._inner_header.binaries = new_binaries 

1062 

1063 # Build XML 

1064 xml_data = self._build_xml() 

1065 

1066 # Encrypt and return 

1067 # Use cached transformed_key if available (faster), otherwise use credentials 

1068 return write_kdbx4( 

1069 header=self._header, 

1070 inner_header=self._inner_header, 

1071 xml_data=xml_data, 

1072 password=self._password, 

1073 keyfile_data=self._keyfile_data, 

1074 transformed_key=self._transformed_key, 

1075 yubikey_response=yubikey_response, 

1076 ) 

1077 

1078 def set_credentials( 

1079 self, 

1080 password: str | None = None, 

1081 keyfile_data: bytes | None = None, 

1082 ) -> None: 

1083 """Set or update database credentials. 

1084 

1085 Args: 

1086 password: New password (None to remove) 

1087 keyfile_data: New keyfile data (None to remove) 

1088 

1089 Raises: 

1090 ValueError: If both password and keyfile are None 

1091 """ 

1092 if password is None and keyfile_data is None: 

1093 raise MissingCredentialsError() 

1094 self._password = password 

1095 self._keyfile_data = keyfile_data 

1096 

1097 # --- Search operations --- 

1098 

1099 def find_entries( 

1100 self, 

1101 title: str | None = None, 

1102 username: str | None = None, 

1103 password: str | None = None, 

1104 url: str | None = None, 

1105 notes: str | None = None, 

1106 otp: str | None = None, 

1107 tags: list[str] | None = None, 

1108 string: dict[str, str] | None = None, 

1109 autotype_enabled: bool | None = None, 

1110 autotype_sequence: str | None = None, 

1111 autotype_window: str | None = None, 

1112 uuid: uuid_module.UUID | None = None, 

1113 path: list[str] | str | None = None, 

1114 recursive: bool = True, 

1115 history: bool = False, 

1116 first: bool = False, 

1117 ) -> list[Entry] | Entry | None: 

1118 """Find entries matching criteria. 

1119 

1120 Args: 

1121 title: Match entries with this title 

1122 username: Match entries with this username 

1123 password: Match entries with this password 

1124 url: Match entries with this URL 

1125 notes: Match entries with these notes 

1126 otp: Match entries with this OTP 

1127 tags: Match entries with all these tags 

1128 string: Match entries with custom properties (dict of key:value) 

1129 autotype_enabled: Filter by AutoType enabled state 

1130 autotype_sequence: Match entries with this AutoType sequence 

1131 autotype_window: Match entries with this AutoType window 

1132 uuid: Match entry with this UUID 

1133 path: Path to entry as list of group names ending with entry title, 

1134 or as a '/'-separated string. When specified, other criteria 

1135 are ignored. 

1136 recursive: Search in subgroups 

1137 history: Include history entries in search 

1138 first: If True, return first match or None. If False, return list. 

1139 

1140 Returns: 

1141 If first=True: Entry or None 

1142 If first=False: List of matching entries 

1143 """ 

1144 # Path-based search 

1145 if path is not None: 

1146 results = self._find_entry_by_path(path) 

1147 if first: 

1148 return results[0] if results else None 

1149 return results 

1150 

1151 if uuid is not None: 

1152 entry = self._root_group.find_entry_by_uuid(uuid, recursive=recursive) 

1153 if first: 

1154 return entry 

1155 return [entry] if entry else [] 

1156 

1157 results = self._root_group.find_entries( 

1158 title=title, 

1159 username=username, 

1160 password=password, 

1161 url=url, 

1162 notes=notes, 

1163 otp=otp, 

1164 tags=tags, 

1165 string=string, 

1166 autotype_enabled=autotype_enabled, 

1167 autotype_sequence=autotype_sequence, 

1168 autotype_window=autotype_window, 

1169 recursive=recursive, 

1170 history=history, 

1171 ) 

1172 

1173 if first: 

1174 return results[0] if results else None 

1175 return results 

1176 

1177 def _find_entry_by_path(self, path: list[str] | str) -> list[Entry]: 

1178 """Find entry by path. 

1179 

1180 Args: 

1181 path: Path as list ['group1', 'group2', 'entry_title'] or 

1182 string 'group1/group2/entry_title' 

1183 

1184 Returns: 

1185 List containing matching entry, or empty list if not found 

1186 """ 

1187 if isinstance(path, str): 

1188 path = [p for p in path.split("/") if p] 

1189 

1190 if not path: 

1191 return [] 

1192 

1193 # Last element is entry title, rest are group names 

1194 entry_title = path[-1] 

1195 group_path = path[:-1] 

1196 

1197 # Navigate to target group 

1198 current = self._root_group 

1199 for group_name in group_path: 

1200 found = None 

1201 for subgroup in current.subgroups: 

1202 if subgroup.name == group_name: 

1203 found = subgroup 

1204 break 

1205 if found is None: 

1206 return [] 

1207 current = found 

1208 

1209 # Find entry in target group (non-recursive) 

1210 for entry in current.entries: 

1211 if entry.title == entry_title: 

1212 return [entry] 

1213 

1214 return [] 

1215 

1216 def find_groups( 

1217 self, 

1218 name: str | None = None, 

1219 uuid: uuid_module.UUID | None = None, 

1220 path: list[str] | str | None = None, 

1221 recursive: bool = True, 

1222 first: bool = False, 

1223 ) -> list[Group] | Group | None: 

1224 """Find groups matching criteria. 

1225 

1226 Args: 

1227 name: Match groups with this name 

1228 uuid: Match group with this UUID 

1229 path: Path to group as list of group names or as a '/'-separated 

1230 string. When specified, other criteria are ignored. 

1231 recursive: Search in nested subgroups 

1232 first: If True, return first matching group or None instead of list 

1233 

1234 Returns: 

1235 List of matching groups, or single Group/None if first=True 

1236 """ 

1237 # Path-based search 

1238 if path is not None: 

1239 results = self._find_group_by_path(path) 

1240 if first: 

1241 return results[0] if results else None 

1242 return results 

1243 

1244 if uuid is not None: 

1245 group = self._root_group.find_group_by_uuid(uuid, recursive=recursive) 

1246 if first: 

1247 return group 

1248 return [group] if group else [] 

1249 

1250 # find_groups with first=False always returns list 

1251 group_results = cast( 

1252 list[Group], 

1253 self._root_group.find_groups(name=name, recursive=recursive), 

1254 ) 

1255 if first: 

1256 return group_results[0] if group_results else None 

1257 return group_results 

1258 

1259 def _find_group_by_path(self, path: list[str] | str) -> list[Group]: 

1260 """Find group by path. 

1261 

1262 Args: 

1263 path: Path as list ['group1', 'group2'] or string 'group1/group2' 

1264 

1265 Returns: 

1266 List containing matching group, or empty list if not found 

1267 """ 

1268 if isinstance(path, str): 

1269 path = [p for p in path.split("/") if p] 

1270 

1271 if not path: 

1272 return [self._root_group] 

1273 

1274 # Navigate through path 

1275 current = self._root_group 

1276 for group_name in path: 

1277 found = None 

1278 for subgroup in current.subgroups: 

1279 if subgroup.name == group_name: 

1280 found = subgroup 

1281 break 

1282 if found is None: 

1283 return [] 

1284 current = found 

1285 

1286 return [current] 

1287 

1288 def find_entries_contains( 

1289 self, 

1290 title: str | None = None, 

1291 username: str | None = None, 

1292 password: str | None = None, 

1293 url: str | None = None, 

1294 notes: str | None = None, 

1295 otp: str | None = None, 

1296 recursive: bool = True, 

1297 case_sensitive: bool = False, 

1298 history: bool = False, 

1299 ) -> list[Entry]: 

1300 """Find entries where fields contain the given substrings. 

1301 

1302 All criteria are combined with AND logic. None means "any value". 

1303 

1304 Args: 

1305 title: Match entries whose title contains this substring 

1306 username: Match entries whose username contains this substring 

1307 password: Match entries whose password contains this substring 

1308 url: Match entries whose URL contains this substring 

1309 notes: Match entries whose notes contain this substring 

1310 otp: Match entries whose OTP contains this substring 

1311 recursive: Search in subgroups 

1312 case_sensitive: If False (default), matching is case-insensitive 

1313 history: Include history entries in search 

1314 

1315 Returns: 

1316 List of matching entries 

1317 """ 

1318 return self._root_group.find_entries_contains( 

1319 title=title, 

1320 username=username, 

1321 password=password, 

1322 url=url, 

1323 notes=notes, 

1324 otp=otp, 

1325 recursive=recursive, 

1326 case_sensitive=case_sensitive, 

1327 history=history, 

1328 ) 

1329 

1330 def find_entries_regex( 

1331 self, 

1332 title: str | None = None, 

1333 username: str | None = None, 

1334 password: str | None = None, 

1335 url: str | None = None, 

1336 notes: str | None = None, 

1337 otp: str | None = None, 

1338 recursive: bool = True, 

1339 case_sensitive: bool = False, 

1340 history: bool = False, 

1341 ) -> list[Entry]: 

1342 """Find entries where fields match the given regex patterns. 

1343 

1344 All criteria are combined with AND logic. None means "any value". 

1345 

1346 Args: 

1347 title: Regex pattern to match against title 

1348 username: Regex pattern to match against username 

1349 password: Regex pattern to match against password 

1350 url: Regex pattern to match against URL 

1351 notes: Regex pattern to match against notes 

1352 otp: Regex pattern to match against OTP 

1353 recursive: Search in subgroups 

1354 case_sensitive: If False (default), matching is case-insensitive 

1355 history: Include history entries in search 

1356 

1357 Returns: 

1358 List of matching entries 

1359 

1360 Raises: 

1361 re.error: If any pattern is not a valid regex 

1362 """ 

1363 return self._root_group.find_entries_regex( 

1364 title=title, 

1365 username=username, 

1366 password=password, 

1367 url=url, 

1368 notes=notes, 

1369 otp=otp, 

1370 recursive=recursive, 

1371 case_sensitive=case_sensitive, 

1372 history=history, 

1373 ) 

1374 

1375 def find_attachments( 

1376 self, 

1377 id: int | None = None, 

1378 filename: str | None = None, 

1379 regex: bool = False, 

1380 recursive: bool = True, 

1381 history: bool = False, 

1382 first: bool = False, 

1383 ) -> list[Attachment] | Attachment | None: 

1384 """Find attachments in the database. 

1385 

1386 Args: 

1387 id: Match attachments with this binary reference ID 

1388 filename: Match attachments with this filename (exact or regex) 

1389 regex: If True, treat filename as a regex pattern 

1390 recursive: Search in subgroups 

1391 history: Include history entries in search 

1392 first: If True, return first match or None. If False, return list. 

1393 

1394 Returns: 

1395 If first=True: Attachment or None 

1396 If first=False: List of matching attachments 

1397 """ 

1398 import re as re_module 

1399 

1400 results: list[Attachment] = [] 

1401 pattern: re_module.Pattern[str] | None = None 

1402 

1403 if regex and filename is not None: 

1404 pattern = re_module.compile(filename) 

1405 

1406 for entry in self._root_group.iter_entries(recursive=recursive, history=history): 

1407 for binary_ref in entry.binaries: 

1408 # Check ID filter 

1409 if id is not None and binary_ref.ref != id: 

1410 continue 

1411 

1412 # Check filename filter 

1413 if filename is not None: 

1414 if regex and pattern is not None: 

1415 if not pattern.search(binary_ref.key): 

1416 continue 

1417 elif binary_ref.key != filename: 

1418 continue 

1419 

1420 attachment = Attachment( 

1421 filename=binary_ref.key, 

1422 id=binary_ref.ref, 

1423 entry=entry, 

1424 ) 

1425 results.append(attachment) 

1426 

1427 if first: 

1428 return attachment 

1429 

1430 if first: 

1431 return None 

1432 return results 

1433 

1434 @property 

1435 def attachments(self) -> list[Attachment]: 

1436 """Get all attachments in the database.""" 

1437 result = self.find_attachments(filename=".*", regex=True) 

1438 # find_attachments returns list when first=False (default) 

1439 return result if isinstance(result, list) else [] 

1440 

1441 def iter_entries(self, recursive: bool = True) -> Iterator[Entry]: 

1442 """Iterate over all entries in the database. 

1443 

1444 Args: 

1445 recursive: Include entries from all subgroups 

1446 

1447 Yields: 

1448 Entry objects 

1449 """ 

1450 yield from self._root_group.iter_entries(recursive=recursive) 

1451 

1452 def iter_groups(self, recursive: bool = True) -> Iterator[Group]: 

1453 """Iterate over all groups in the database. 

1454 

1455 Args: 

1456 recursive: Include nested subgroups 

1457 

1458 Yields: 

1459 Group objects 

1460 """ 

1461 yield from self._root_group.iter_groups(recursive=recursive) 

1462 

1463 # --- Field References --- 

1464 

1465 def deref(self, value: str | None) -> str | uuid_module.UUID | None: 

1466 """Resolve KeePass field references in a value. 

1467 

1468 Parses field references in the format {REF:X@Y:Z} and replaces them 

1469 with the actual values from the referenced entries: 

1470 - X = Field to retrieve (T=Title, U=Username, P=Password, A=URL, N=Notes, I=UUID) 

1471 - Y = Field to search by (T, U, P, A, N, I) 

1472 - Z = Search value 

1473 

1474 References are resolved recursively, so a reference that resolves to 

1475 another reference will continue resolving until a final value is found. 

1476 

1477 Args: 

1478 value: String potentially containing field references 

1479 

1480 Returns: 

1481 - The resolved string with all references replaced 

1482 - A UUID if the final result is a UUID reference 

1483 - None if any referenced entry cannot be found 

1484 - The original value if it contains no references or is None 

1485 

1486 Example: 

1487 >>> # Entry with password = '{REF:P@I:ABCD1234...}' 

1488 >>> db.deref(entry.password) # Returns the referenced password 

1489 >>> 

1490 >>> # With prefix/suffix: 'prefix{REF:U@I:...}suffix' 

1491 >>> db.deref(value) # Returns 'prefix<username>suffix' 

1492 """ 

1493 import re 

1494 

1495 if not value: 

1496 return value 

1497 

1498 # Pattern matches {REF:X@Y:Z} where X and Y are field codes, Z is search value 

1499 pattern = r"(\{REF:([TUPANI])@([TUPANI]):([^}]+)\})" 

1500 references = set(re.findall(pattern, value)) 

1501 

1502 if not references: 

1503 return value 

1504 

1505 field_to_attr = { 

1506 "T": "title", 

1507 "U": "username", 

1508 "P": "password", 

1509 "A": "url", 

1510 "N": "notes", 

1511 "I": "uuid", 

1512 } 

1513 

1514 for ref_str, wanted_field, search_field, search_value in references: 

1515 wanted_attr = field_to_attr[wanted_field] 

1516 search_attr = field_to_attr[search_field] 

1517 

1518 # Convert UUID search value to proper UUID object 

1519 if search_attr == "uuid": 

1520 try: 

1521 search_value = uuid_module.UUID(search_value) 

1522 except ValueError: 

1523 return None 

1524 

1525 # Find the referenced entry 

1526 ref_entry = self.find_entries(first=True, **{search_attr: search_value}) 

1527 if ref_entry is None: 

1528 return None 

1529 

1530 # Get the wanted field value 

1531 resolved_value = getattr(ref_entry, wanted_attr) 

1532 if resolved_value is None: 

1533 resolved_value = "" 

1534 

1535 # UUID needs special handling - convert to string for replacement 

1536 if isinstance(resolved_value, uuid_module.UUID): 

1537 resolved_value = str(resolved_value) 

1538 

1539 value = value.replace(ref_str, resolved_value) 

1540 

1541 # Recursively resolve any nested references 

1542 return self.deref(value) 

1543 

1544 # --- Move operations --- 

1545 

1546 def move_entry(self, entry: Entry, destination: Group) -> None: 

1547 """Move an entry to a different group. 

1548 

1549 This is a convenience method that calls entry.move_to(). It validates 

1550 that both the entry and destination belong to this database. 

1551 

1552 Args: 

1553 entry: Entry to move 

1554 destination: Target group to move the entry to 

1555 

1556 Raises: 

1557 ValueError: If entry or destination is not in this database 

1558 ValueError: If entry has no parent 

1559 ValueError: If destination is the current parent 

1560 """ 

1561 # Validate entry is in this database 

1562 if entry.parent is None: 

1563 raise ValueError("Entry has no parent group") 

1564 found = self._root_group.find_entry_by_uuid(entry.uuid) 

1565 if found is None: 

1566 raise ValueError("Entry is not in this database") 

1567 

1568 # Validate destination is in this database 

1569 if destination is not self._root_group: 

1570 found_group = self._root_group.find_group_by_uuid(destination.uuid) 

1571 if found_group is None: 

1572 raise ValueError("Destination group is not in this database") 

1573 

1574 entry.move_to(destination) 

1575 

1576 def move_group(self, group: Group, destination: Group) -> None: 

1577 """Move a group to a different parent group. 

1578 

1579 This is a convenience method that calls group.move_to(). It validates 

1580 that both the group and destination belong to this database. 

1581 

1582 Args: 

1583 group: Group to move 

1584 destination: Target parent group to move the group to 

1585 

1586 Raises: 

1587 ValueError: If group or destination is not in this database 

1588 ValueError: If group is the root group 

1589 ValueError: If group has no parent 

1590 ValueError: If destination is the current parent 

1591 ValueError: If destination is the group itself or a descendant 

1592 """ 

1593 # Validate group is in this database (and not root) 

1594 if group.is_root_group: 

1595 raise ValueError("Cannot move the root group") 

1596 if group.parent is None: 

1597 raise ValueError("Group has no parent") 

1598 found = self._root_group.find_group_by_uuid(group.uuid) 

1599 if found is None: 

1600 raise ValueError("Group is not in this database") 

1601 

1602 # Validate destination is in this database 

1603 if destination is not self._root_group: 

1604 found_dest = self._root_group.find_group_by_uuid(destination.uuid) 

1605 if found_dest is None: 

1606 raise ValueError("Destination group is not in this database") 

1607 

1608 group.move_to(destination) 

1609 

1610 # --- Recycle bin operations --- 

1611 

1612 @property 

1613 def recyclebin_group(self) -> Group | None: 

1614 """Get the recycle bin group, or None if disabled. 

1615 

1616 If recycle_bin_enabled is True but no recycle bin exists yet, 

1617 this creates one automatically. 

1618 

1619 Returns: 

1620 Recycle bin Group, or None if recycle bin is disabled 

1621 """ 

1622 if not self._settings.recycle_bin_enabled: 

1623 return None 

1624 

1625 # Try to find existing recycle bin 

1626 if self._settings.recycle_bin_uuid is not None: 

1627 group = self._root_group.find_group_by_uuid(self._settings.recycle_bin_uuid) 

1628 if group is not None: 

1629 return group 

1630 

1631 # Create new recycle bin 

1632 recycle_bin = Group(name="Recycle Bin", icon_id="43") 

1633 self._root_group.add_subgroup(recycle_bin) 

1634 self._settings.recycle_bin_uuid = recycle_bin.uuid 

1635 return recycle_bin 

1636 

1637 def trash_entry(self, entry: Entry) -> None: 

1638 """Move an entry to the recycle bin. 

1639 

1640 If the entry is already in the recycle bin, it is permanently deleted. 

1641 

1642 Args: 

1643 entry: Entry to trash 

1644 

1645 Raises: 

1646 ValueError: If entry is not in this database 

1647 ValueError: If recycle bin is disabled 

1648 """ 

1649 # Validate entry is in this database 

1650 if entry.parent is None: 

1651 raise ValueError("Entry has no parent group") 

1652 found = self._root_group.find_entry_by_uuid(entry.uuid) 

1653 if found is None: 

1654 raise ValueError("Entry is not in this database") 

1655 

1656 recycle_bin = self.recyclebin_group 

1657 if recycle_bin is None: 

1658 raise ValueError("Recycle bin is disabled") 

1659 

1660 # If already in recycle bin, delete permanently 

1661 if entry.parent is recycle_bin: 

1662 recycle_bin.remove_entry(entry) 

1663 return 

1664 

1665 # Move to recycle bin 

1666 entry.move_to(recycle_bin) 

1667 

1668 def trash_group(self, group: Group) -> None: 

1669 """Move a group to the recycle bin. 

1670 

1671 If the group is already in the recycle bin, it is permanently deleted. 

1672 Cannot trash the root group or the recycle bin itself. 

1673 

1674 Args: 

1675 group: Group to trash 

1676 

1677 Raises: 

1678 ValueError: If group is not in this database 

1679 ValueError: If group is the root group 

1680 ValueError: If group is the recycle bin 

1681 ValueError: If recycle bin is disabled 

1682 """ 

1683 # Validate group 

1684 if group.is_root_group: 

1685 raise ValueError("Cannot trash the root group") 

1686 if group.parent is None: 

1687 raise ValueError("Group has no parent") 

1688 found = self._root_group.find_group_by_uuid(group.uuid) 

1689 if found is None: 

1690 raise ValueError("Group is not in this database") 

1691 

1692 recycle_bin = self.recyclebin_group 

1693 if recycle_bin is None: 

1694 raise ValueError("Recycle bin is disabled") 

1695 

1696 # Cannot trash the recycle bin itself 

1697 if group is recycle_bin: 

1698 raise ValueError("Cannot trash the recycle bin") 

1699 

1700 # If already in recycle bin, delete permanently 

1701 if group.parent is recycle_bin: 

1702 recycle_bin.remove_subgroup(group) 

1703 return 

1704 

1705 # Move to recycle bin 

1706 group.move_to(recycle_bin) 

1707 

1708 def empty_group(self, group: Group) -> None: 

1709 """Delete all entries and subgroups from a group. 

1710 

1711 This permanently deletes all contents (does not use recycle bin). 

1712 The group itself is not deleted. 

1713 

1714 Args: 

1715 group: Group to empty 

1716 

1717 Raises: 

1718 ValueError: If group is not in this database 

1719 """ 

1720 # Validate group is in this database 

1721 if group is not self._root_group: 

1722 found = self._root_group.find_group_by_uuid(group.uuid) 

1723 if found is None: 

1724 raise ValueError("Group is not in this database") 

1725 

1726 # Delete all subgroups (iterate over copy since we're modifying) 

1727 for subgroup in list(group.subgroups): 

1728 group.remove_subgroup(subgroup) 

1729 

1730 # Delete all entries 

1731 for entry in list(group.entries): 

1732 group.remove_entry(entry) 

1733 

1734 # --- Memory protection --- 

1735 

1736 def apply_protection_policy(self, entry: Entry) -> None: 

1737 """Apply the database's memory protection policy to an entry. 

1738 

1739 Updates the `protected` flag on the entry's string fields 

1740 according to the database's memory_protection settings. 

1741 

1742 This is automatically applied when saving the database, but 

1743 can be called manually if you need protection applied immediately 

1744 for in-memory operations. 

1745 

1746 Args: 

1747 entry: Entry to apply policy to 

1748 """ 

1749 for key, string_field in entry.strings.items(): 

1750 if key in self._settings.memory_protection: 

1751 string_field.protected = self._settings.memory_protection[key] 

1752 

1753 def apply_protection_policy_all(self) -> None: 

1754 """Apply memory protection policy to all entries in the database. 

1755 

1756 Updates all entries' string field protection flags according 

1757 to the database's memory_protection settings. 

1758 """ 

1759 for entry in self.iter_entries(): 

1760 self.apply_protection_policy(entry) 

1761 

1762 # --- Binary attachments --- 

1763 

1764 def get_binary(self, ref: int) -> bytes | None: 

1765 """Get binary attachment data by reference ID. 

1766 

1767 Args: 

1768 ref: Binary reference ID 

1769 

1770 Returns: 

1771 Binary data or None if not found 

1772 """ 

1773 return self._binaries.get(ref) 

1774 

1775 def add_binary(self, data: bytes, protected: bool = True) -> int: 

1776 """Add a new binary attachment to the database. 

1777 

1778 Args: 

1779 data: Binary data 

1780 protected: Whether the binary should be memory-protected 

1781 

1782 Returns: 

1783 Reference ID for the new binary 

1784 """ 

1785 # Find next available index 

1786 ref = max(self._binaries.keys(), default=-1) + 1 

1787 self._binaries[ref] = data 

1788 # Update inner header 

1789 if self._inner_header is not None: 

1790 self._inner_header.binaries[ref] = (protected, data) 

1791 return ref 

1792 

1793 def remove_binary(self, ref: int) -> bool: 

1794 """Remove a binary attachment from the database. 

1795 

1796 Args: 

1797 ref: Binary reference ID 

1798 

1799 Returns: 

1800 True if removed, False if not found 

1801 """ 

1802 if ref in self._binaries: 

1803 del self._binaries[ref] 

1804 if self._inner_header is not None and ref in self._inner_header.binaries: 

1805 del self._inner_header.binaries[ref] 

1806 return True 

1807 return False 

1808 

1809 def get_attachment(self, entry: Entry, name: str) -> bytes | None: 

1810 """Get an attachment from an entry by filename. 

1811 

1812 Args: 

1813 entry: Entry to get attachment from 

1814 name: Filename of the attachment 

1815 

1816 Returns: 

1817 Attachment data or None if not found 

1818 """ 

1819 for binary_ref in entry.binaries: 

1820 if binary_ref.key == name: 

1821 return self._binaries.get(binary_ref.ref) 

1822 return None 

1823 

1824 def add_attachment(self, entry: Entry, name: str, data: bytes, protected: bool = True) -> None: 

1825 """Add an attachment to an entry. 

1826 

1827 Args: 

1828 entry: Entry to add attachment to 

1829 name: Filename for the attachment 

1830 data: Attachment data 

1831 protected: Whether the attachment should be memory-protected 

1832 """ 

1833 ref = self.add_binary(data, protected=protected) 

1834 entry.binaries.append(BinaryRef(key=name, ref=ref)) 

1835 

1836 def remove_attachment(self, entry: Entry, name: str) -> bool: 

1837 """Remove an attachment from an entry by filename. 

1838 

1839 Args: 

1840 entry: Entry to remove attachment from 

1841 name: Filename of the attachment 

1842 

1843 Returns: 

1844 True if removed, False if not found 

1845 """ 

1846 for i, binary_ref in enumerate(entry.binaries): 

1847 if binary_ref.key == name: 

1848 # Remove from entry's list 

1849 entry.binaries.pop(i) 

1850 # Note: We don't remove from _binaries as other entries may reference it 

1851 return True 

1852 return False 

1853 

1854 def list_attachments(self, entry: Entry) -> list[str]: 

1855 """List all attachment filenames for an entry. 

1856 

1857 Args: 

1858 entry: Entry to list attachments for 

1859 

1860 Returns: 

1861 List of attachment filenames 

1862 """ 

1863 return [binary_ref.key for binary_ref in entry.binaries] 

1864 

1865 # --- Custom icons --- 

1866 

1867 @property 

1868 def custom_icons(self) -> dict[uuid_module.UUID, CustomIcon]: 

1869 """Get dictionary of custom icons (UUID -> CustomIcon).""" 

1870 return self._settings.custom_icons 

1871 

1872 def get_custom_icon(self, uuid: uuid_module.UUID) -> bytes | None: 

1873 """Get custom icon data by UUID. 

1874 

1875 Args: 

1876 uuid: UUID of the custom icon 

1877 

1878 Returns: 

1879 PNG image data, or None if not found 

1880 """ 

1881 icon = self._settings.custom_icons.get(uuid) 

1882 return icon.data if icon else None 

1883 

1884 def add_custom_icon(self, data: bytes, name: str | None = None) -> uuid_module.UUID: 

1885 """Add a custom icon to the database. 

1886 

1887 Args: 

1888 data: PNG image data 

1889 name: Optional display name for the icon 

1890 

1891 Returns: 

1892 UUID of the new custom icon 

1893 """ 

1894 icon_uuid = uuid_module.uuid4() 

1895 icon = CustomIcon( 

1896 uuid=icon_uuid, 

1897 data=data, 

1898 name=name, 

1899 last_modification_time=datetime.now(UTC), 

1900 ) 

1901 self._settings.custom_icons[icon_uuid] = icon 

1902 return icon_uuid 

1903 

1904 def remove_custom_icon(self, uuid: uuid_module.UUID) -> bool: 

1905 """Remove a custom icon from the database. 

1906 

1907 Note: This does not update entries/groups that reference this icon. 

1908 They will continue to reference the now-missing UUID. 

1909 

1910 Args: 

1911 uuid: UUID of the custom icon to remove 

1912 

1913 Returns: 

1914 True if removed, False if not found 

1915 """ 

1916 if uuid in self._settings.custom_icons: 

1917 del self._settings.custom_icons[uuid] 

1918 return True 

1919 return False 

1920 

1921 def find_custom_icon_by_name(self, name: str) -> uuid_module.UUID | None: 

1922 """Find a custom icon by name. 

1923 

1924 Args: 

1925 name: Name of the icon to find (must match exactly one icon) 

1926 

1927 Returns: 

1928 UUID of the matching icon, or None if not found 

1929 

1930 Raises: 

1931 ValueError: If multiple icons have the same name 

1932 """ 

1933 matches = [icon.uuid for icon in self._settings.custom_icons.values() if icon.name == name] 

1934 if len(matches) == 0: 

1935 return None 

1936 if len(matches) > 1: 

1937 raise ValueError(f"Multiple custom icons found with name: {name}") 

1938 return matches[0] 

1939 

1940 # --- XML parsing --- 

1941 

1942 @classmethod 

1943 def _parse_xml( 

1944 cls, xml_data: bytes, inner_header: InnerHeader | None = None 

1945 ) -> tuple[Group, DatabaseSettings, dict[int, bytes]]: 

1946 """Parse KDBX XML into models. 

1947 

1948 Args: 

1949 xml_data: XML payload bytes 

1950 inner_header: Inner header with stream cipher info (for decrypting protected values) 

1951 

1952 Returns: 

1953 Tuple of (root_group, settings, binaries) 

1954 """ 

1955 root = DefusedET.fromstring(xml_data) 

1956 

1957 # Decrypt protected values in-place before parsing 

1958 if inner_header is not None: 

1959 cls._decrypt_protected_values(root, inner_header) 

1960 

1961 # Parse Meta section for settings 

1962 settings = cls._parse_meta(root.find("Meta")) 

1963 

1964 # Parse Root/Group for entries 

1965 root_elem = root.find("Root") 

1966 if root_elem is None: 

1967 raise InvalidXmlError("Missing Root element") 

1968 

1969 group_elem = root_elem.find("Group") 

1970 if group_elem is None: 

1971 raise InvalidXmlError("Missing root Group element") 

1972 

1973 root_group = cls._parse_group(group_elem) 

1974 root_group._is_root = True 

1975 

1976 # Extract binaries from inner header (KDBX4 style) 

1977 # The protection flag indicates memory protection policy, not encryption 

1978 binaries: dict[int, bytes] = {} 

1979 if inner_header is not None: 

1980 for idx, (_protected, data) in inner_header.binaries.items(): 

1981 binaries[idx] = data 

1982 

1983 return root_group, settings, binaries 

1984 

1985 @classmethod 

1986 def _decrypt_protected_values(cls, root: Element, inner_header: InnerHeader) -> None: 

1987 """Decrypt all protected values in the XML tree in document order. 

1988 

1989 Protected values are XOR'd with a stream cipher and base64 encoded. 

1990 This method decrypts them in-place. 

1991 """ 

1992 cipher = ProtectedStreamCipher( 

1993 inner_header.random_stream_id, 

1994 inner_header.random_stream_key, 

1995 ) 

1996 

1997 # Find all Value elements with Protected="True" in document order 

1998 for elem in root.iter("Value"): 

1999 if elem.get("Protected") == "True" and elem.text: 

2000 try: 

2001 ciphertext = base64.b64decode(elem.text) 

2002 plaintext = cipher.decrypt(ciphertext) 

2003 elem.text = plaintext.decode("utf-8") 

2004 except (binascii.Error, ValueError, UnicodeDecodeError): 

2005 # If decryption fails, leave as-is 

2006 pass 

2007 

2008 @classmethod 

2009 def _parse_meta(cls, meta_elem: Element | None) -> DatabaseSettings: 

2010 """Parse Meta element into DatabaseSettings.""" 

2011 settings = DatabaseSettings() 

2012 

2013 if meta_elem is None: 

2014 return settings 

2015 

2016 def get_text(tag: str) -> str | None: 

2017 elem = meta_elem.find(tag) 

2018 return elem.text if elem is not None else None 

2019 

2020 if name := get_text("DatabaseName"): 

2021 settings.database_name = name 

2022 if desc := get_text("DatabaseDescription"): 

2023 settings.database_description = desc 

2024 if username := get_text("DefaultUserName"): 

2025 settings.default_username = username 

2026 if gen := get_text("Generator"): 

2027 settings.generator = gen 

2028 

2029 # Parse memory protection 

2030 mp_elem = meta_elem.find("MemoryProtection") 

2031 if mp_elem is not None: 

2032 for field in ["Title", "UserName", "Password", "URL", "Notes"]: 

2033 elem = mp_elem.find(f"Protect{field}") 

2034 if elem is not None: 

2035 settings.memory_protection[field] = elem.text == "True" 

2036 

2037 # Parse recycle bin 

2038 if rb := get_text("RecycleBinEnabled"): 

2039 settings.recycle_bin_enabled = rb == "True" 

2040 if rb_uuid := get_text("RecycleBinUUID"): 

2041 import contextlib 

2042 

2043 with contextlib.suppress(binascii.Error, ValueError): 

2044 settings.recycle_bin_uuid = uuid_module.UUID(bytes=base64.b64decode(rb_uuid)) 

2045 

2046 # Parse custom icons 

2047 custom_icons_elem = meta_elem.find("CustomIcons") 

2048 if custom_icons_elem is not None: 

2049 for icon_elem in custom_icons_elem.findall("Icon"): 

2050 icon_uuid_elem = icon_elem.find("UUID") 

2051 icon_data_elem = icon_elem.find("Data") 

2052 if ( 

2053 icon_uuid_elem is not None 

2054 and icon_uuid_elem.text 

2055 and icon_data_elem is not None 

2056 and icon_data_elem.text 

2057 ): 

2058 try: 

2059 icon_uuid = uuid_module.UUID(bytes=base64.b64decode(icon_uuid_elem.text)) 

2060 icon_data = base64.b64decode(icon_data_elem.text) 

2061 icon_name = None 

2062 name_elem = icon_elem.find("Name") 

2063 if name_elem is not None: 

2064 icon_name = name_elem.text 

2065 icon_mtime = None 

2066 mtime_elem = icon_elem.find("LastModificationTime") 

2067 if mtime_elem is not None and mtime_elem.text: 

2068 icon_mtime = cls._decode_time(mtime_elem.text) 

2069 settings.custom_icons[icon_uuid] = CustomIcon( 

2070 uuid=icon_uuid, 

2071 data=icon_data, 

2072 name=icon_name, 

2073 last_modification_time=icon_mtime, 

2074 ) 

2075 except (binascii.Error, ValueError): 

2076 pass # Skip invalid icon 

2077 

2078 return settings 

2079 

2080 @classmethod 

2081 def _parse_group(cls, elem: Element) -> Group: 

2082 """Parse a Group element into a Group model.""" 

2083 group = Group() 

2084 

2085 # UUID 

2086 uuid_elem = elem.find("UUID") 

2087 if uuid_elem is not None and uuid_elem.text: 

2088 group.uuid = uuid_module.UUID(bytes=base64.b64decode(uuid_elem.text)) 

2089 

2090 # Name 

2091 name_elem = elem.find("Name") 

2092 if name_elem is not None: 

2093 group.name = name_elem.text 

2094 

2095 # Notes 

2096 notes_elem = elem.find("Notes") 

2097 if notes_elem is not None: 

2098 group.notes = notes_elem.text 

2099 

2100 # Icon 

2101 icon_elem = elem.find("IconID") 

2102 if icon_elem is not None and icon_elem.text: 

2103 group.icon_id = icon_elem.text 

2104 

2105 # Custom icon UUID 

2106 custom_icon_elem = elem.find("CustomIconUUID") 

2107 if custom_icon_elem is not None and custom_icon_elem.text: 

2108 with contextlib.suppress(binascii.Error, ValueError): 

2109 group.custom_icon_uuid = uuid_module.UUID( 

2110 bytes=base64.b64decode(custom_icon_elem.text) 

2111 ) 

2112 

2113 # Times 

2114 group.times = cls._parse_times(elem.find("Times")) 

2115 

2116 # Entries 

2117 for entry_elem in elem.findall("Entry"): 

2118 entry = cls._parse_entry(entry_elem) 

2119 group.add_entry(entry) 

2120 

2121 # Subgroups (recursive) 

2122 for subgroup_elem in elem.findall("Group"): 

2123 subgroup = cls._parse_group(subgroup_elem) 

2124 group.add_subgroup(subgroup) 

2125 

2126 return group 

2127 

2128 @classmethod 

2129 def _parse_entry(cls, elem: Element) -> Entry: 

2130 """Parse an Entry element into an Entry model.""" 

2131 entry = Entry() 

2132 

2133 # UUID 

2134 uuid_elem = elem.find("UUID") 

2135 if uuid_elem is not None and uuid_elem.text: 

2136 entry.uuid = uuid_module.UUID(bytes=base64.b64decode(uuid_elem.text)) 

2137 

2138 # Icon 

2139 icon_elem = elem.find("IconID") 

2140 if icon_elem is not None and icon_elem.text: 

2141 entry.icon_id = icon_elem.text 

2142 

2143 # Custom icon UUID 

2144 custom_icon_elem = elem.find("CustomIconUUID") 

2145 if custom_icon_elem is not None and custom_icon_elem.text: 

2146 with contextlib.suppress(binascii.Error, ValueError): 

2147 entry.custom_icon_uuid = uuid_module.UUID( 

2148 bytes=base64.b64decode(custom_icon_elem.text) 

2149 ) 

2150 

2151 # Tags 

2152 tags_elem = elem.find("Tags") 

2153 if tags_elem is not None and tags_elem.text: 

2154 tag_text = tags_elem.text.replace(",", ";") 

2155 entry.tags = [t.strip() for t in tag_text.split(";") if t.strip()] 

2156 

2157 # Times 

2158 entry.times = cls._parse_times(elem.find("Times")) 

2159 

2160 # String fields 

2161 for string_elem in elem.findall("String"): 

2162 key_elem = string_elem.find("Key") 

2163 value_elem = string_elem.find("Value") 

2164 if key_elem is not None and key_elem.text: 

2165 key = key_elem.text 

2166 value = value_elem.text if value_elem is not None else None 

2167 protected = value_elem is not None and value_elem.get("Protected") == "True" 

2168 entry.strings[key] = StringField(key=key, value=value, protected=protected) 

2169 

2170 # Binary references 

2171 for binary_elem in elem.findall("Binary"): 

2172 key_elem = binary_elem.find("Key") 

2173 value_elem = binary_elem.find("Value") 

2174 if key_elem is not None and key_elem.text and value_elem is not None: 

2175 ref = value_elem.get("Ref") 

2176 if ref is not None: 

2177 entry.binaries.append(BinaryRef(key=key_elem.text, ref=int(ref))) 

2178 

2179 # AutoType 

2180 at_elem = elem.find("AutoType") 

2181 if at_elem is not None: 

2182 enabled_elem = at_elem.find("Enabled") 

2183 seq_elem = at_elem.find("DefaultSequence") 

2184 obf_elem = at_elem.find("DataTransferObfuscation") 

2185 

2186 entry.autotype = AutoType( 

2187 enabled=enabled_elem is not None and enabled_elem.text == "True", 

2188 sequence=seq_elem.text if seq_elem is not None else None, 

2189 obfuscation=int(obf_elem.text) if obf_elem is not None and obf_elem.text else 0, 

2190 ) 

2191 

2192 # Window from Association 

2193 assoc_elem = at_elem.find("Association") 

2194 if assoc_elem is not None: 

2195 window_elem = assoc_elem.find("Window") 

2196 if window_elem is not None: 

2197 entry.autotype.window = window_elem.text 

2198 

2199 # History 

2200 history_elem = elem.find("History") 

2201 if history_elem is not None: 

2202 for hist_entry_elem in history_elem.findall("Entry"): 

2203 hist_entry = cls._parse_entry(hist_entry_elem) 

2204 history_entry = HistoryEntry.from_entry(hist_entry) 

2205 entry.history.append(history_entry) 

2206 

2207 return entry 

2208 

2209 @classmethod 

2210 def _parse_times(cls, times_elem: Element | None) -> Times: 

2211 """Parse Times element into Times model.""" 

2212 times = Times.create_new() 

2213 

2214 if times_elem is None: 

2215 return times 

2216 

2217 def parse_time(tag: str) -> datetime | None: 

2218 elem = times_elem.find(tag) 

2219 if elem is not None and elem.text: 

2220 return cls._decode_time(elem.text) 

2221 return None 

2222 

2223 if ct := parse_time("CreationTime"): 

2224 times.creation_time = ct 

2225 if mt := parse_time("LastModificationTime"): 

2226 times.last_modification_time = mt 

2227 if at := parse_time("LastAccessTime"): 

2228 times.last_access_time = at 

2229 if et := parse_time("ExpiryTime"): 

2230 times.expiry_time = et 

2231 if lc := parse_time("LocationChanged"): 

2232 times.location_changed = lc 

2233 

2234 expires_elem = times_elem.find("Expires") 

2235 if expires_elem is not None: 

2236 times.expires = expires_elem.text == "True" 

2237 

2238 usage_elem = times_elem.find("UsageCount") 

2239 if usage_elem is not None and usage_elem.text: 

2240 times.usage_count = int(usage_elem.text) 

2241 

2242 return times 

2243 

2244 @classmethod 

2245 def _decode_time(cls, time_str: str) -> datetime: 

2246 """Decode KDBX time string to datetime. 

2247 

2248 KDBX4 uses base64-encoded binary timestamps or ISO format. 

2249 """ 

2250 # Try base64 binary format first (KDBX4) 

2251 # Base64 strings don't contain - or : which are present in ISO dates 

2252 if "-" not in time_str and ":" not in time_str: 

2253 try: 

2254 binary = base64.b64decode(time_str) 

2255 if len(binary) == 8: # int64 = 8 bytes 

2256 # KDBX4 stores seconds since 0001-01-01 as int64 

2257 import struct 

2258 

2259 seconds = struct.unpack("<q", binary)[0] 

2260 # Convert to datetime (epoch is 0001-01-01) 

2261 base = datetime(1, 1, 1, tzinfo=UTC) 

2262 return base + timedelta(seconds=seconds) 

2263 except (ValueError, struct.error): 

2264 pass # Not valid base64 or wrong size 

2265 

2266 # Try ISO format 

2267 try: 

2268 return datetime.strptime(time_str, KDBX4_TIME_FORMAT).replace(tzinfo=UTC) 

2269 except ValueError: 

2270 pass 

2271 

2272 # Fallback: try without timezone 

2273 try: 

2274 return datetime.fromisoformat(time_str.replace("Z", "+00:00")) 

2275 except ValueError: 

2276 return datetime.now(UTC) 

2277 

2278 @classmethod 

2279 def _encode_time(cls, dt: datetime) -> str: 

2280 """Encode datetime to ISO 8601 format for KDBX4. 

2281 

2282 Uses ISO 8601 format (e.g., 2025-01-15T10:30:45Z) which is 

2283 human-readable and compatible with KeePassXC. 

2284 """ 

2285 # Ensure UTC timezone 

2286 if dt.tzinfo is None: 

2287 dt = dt.replace(tzinfo=UTC) 

2288 return dt.strftime(KDBX4_TIME_FORMAT) 

2289 

2290 # --- XML building --- 

2291 

2292 def _build_xml(self) -> bytes: 

2293 """Build KDBX XML from models.""" 

2294 root = Element("KeePassFile") 

2295 

2296 # Meta section 

2297 meta = SubElement(root, "Meta") 

2298 self._build_meta(meta) 

2299 

2300 # Root section 

2301 root_elem = SubElement(root, "Root") 

2302 self._build_group(root_elem, self._root_group) 

2303 

2304 # Encrypt protected values before serializing 

2305 if self._inner_header is not None: 

2306 self._encrypt_protected_values(root, self._inner_header) 

2307 

2308 # Serialize to bytes (tostring returns bytes when encoding is specified) 

2309 return cast(bytes, tostring(root, encoding="utf-8", xml_declaration=True)) 

2310 

2311 def _encrypt_protected_values(self, root: Element, inner_header: InnerHeader) -> None: 

2312 """Encrypt all protected values in the XML tree in document order. 

2313 

2314 Protected values are XOR'd with a stream cipher and base64 encoded. 

2315 This method encrypts them in-place. 

2316 """ 

2317 cipher = ProtectedStreamCipher( 

2318 inner_header.random_stream_id, 

2319 inner_header.random_stream_key, 

2320 ) 

2321 

2322 # Find all Value elements with Protected="True" in document order 

2323 for elem in root.iter("Value"): 

2324 if elem.get("Protected") == "True": 

2325 plaintext = (elem.text or "").encode("utf-8") 

2326 ciphertext = cipher.encrypt(plaintext) 

2327 elem.text = base64.b64encode(ciphertext).decode("ascii") 

2328 

2329 def _build_meta(self, meta: Element) -> None: 

2330 """Build Meta element from settings.""" 

2331 s = self._settings 

2332 

2333 SubElement(meta, "Generator").text = s.generator 

2334 SubElement(meta, "DatabaseName").text = s.database_name 

2335 if s.database_description: 

2336 SubElement(meta, "DatabaseDescription").text = s.database_description 

2337 if s.default_username: 

2338 SubElement(meta, "DefaultUserName").text = s.default_username 

2339 

2340 SubElement(meta, "MaintenanceHistoryDays").text = str(s.maintenance_history_days) 

2341 SubElement(meta, "MasterKeyChangeRec").text = str(s.master_key_change_rec) 

2342 SubElement(meta, "MasterKeyChangeForce").text = str(s.master_key_change_force) 

2343 

2344 # Memory protection 

2345 mp = SubElement(meta, "MemoryProtection") 

2346 for field_name, is_protected in s.memory_protection.items(): 

2347 SubElement(mp, f"Protect{field_name}").text = str(is_protected) 

2348 

2349 SubElement(meta, "RecycleBinEnabled").text = str(s.recycle_bin_enabled) 

2350 if s.recycle_bin_uuid: 

2351 SubElement(meta, "RecycleBinUUID").text = base64.b64encode( 

2352 s.recycle_bin_uuid.bytes 

2353 ).decode("ascii") 

2354 else: 

2355 # Empty UUID 

2356 SubElement(meta, "RecycleBinUUID").text = base64.b64encode(b"\x00" * 16).decode("ascii") 

2357 

2358 SubElement(meta, "HistoryMaxItems").text = str(s.history_max_items) 

2359 SubElement(meta, "HistoryMaxSize").text = str(s.history_max_size) 

2360 

2361 # Custom icons 

2362 if s.custom_icons: 

2363 custom_icons_elem = SubElement(meta, "CustomIcons") 

2364 for icon in s.custom_icons.values(): 

2365 icon_elem = SubElement(custom_icons_elem, "Icon") 

2366 SubElement(icon_elem, "UUID").text = base64.b64encode(icon.uuid.bytes).decode( 

2367 "ascii" 

2368 ) 

2369 SubElement(icon_elem, "Data").text = base64.b64encode(icon.data).decode("ascii") 

2370 if icon.name: 

2371 SubElement(icon_elem, "Name").text = icon.name 

2372 if icon.last_modification_time: 

2373 SubElement(icon_elem, "LastModificationTime").text = self._encode_time( 

2374 icon.last_modification_time 

2375 ) 

2376 

2377 def _build_group(self, parent: Element, group: Group) -> None: 

2378 """Build Group element from Group model.""" 

2379 elem = SubElement(parent, "Group") 

2380 

2381 SubElement(elem, "UUID").text = base64.b64encode(group.uuid.bytes).decode("ascii") 

2382 SubElement(elem, "Name").text = group.name or "" 

2383 if group.notes: 

2384 SubElement(elem, "Notes").text = group.notes 

2385 SubElement(elem, "IconID").text = group.icon_id 

2386 if group.custom_icon_uuid: 

2387 SubElement(elem, "CustomIconUUID").text = base64.b64encode( 

2388 group.custom_icon_uuid.bytes 

2389 ).decode("ascii") 

2390 

2391 self._build_times(elem, group.times) 

2392 

2393 SubElement(elem, "IsExpanded").text = str(group.is_expanded) 

2394 

2395 if group.default_autotype_sequence: 

2396 SubElement(elem, "DefaultAutoTypeSequence").text = group.default_autotype_sequence 

2397 if group.enable_autotype is not None: 

2398 SubElement(elem, "EnableAutoType").text = str(group.enable_autotype) 

2399 if group.enable_searching is not None: 

2400 SubElement(elem, "EnableSearching").text = str(group.enable_searching) 

2401 

2402 SubElement(elem, "LastTopVisibleEntry").text = base64.b64encode( 

2403 (group.last_top_visible_entry or uuid_module.UUID(int=0)).bytes 

2404 ).decode("ascii") 

2405 

2406 # Entries 

2407 for entry in group.entries: 

2408 self._build_entry(elem, entry) 

2409 

2410 # Subgroups (recursive) 

2411 for subgroup in group.subgroups: 

2412 self._build_group(elem, subgroup) 

2413 

2414 def _build_entry(self, parent: Element, entry: Entry) -> None: 

2415 """Build Entry element from Entry model.""" 

2416 elem = SubElement(parent, "Entry") 

2417 

2418 SubElement(elem, "UUID").text = base64.b64encode(entry.uuid.bytes).decode("ascii") 

2419 SubElement(elem, "IconID").text = entry.icon_id 

2420 if entry.custom_icon_uuid: 

2421 SubElement(elem, "CustomIconUUID").text = base64.b64encode( 

2422 entry.custom_icon_uuid.bytes 

2423 ).decode("ascii") 

2424 

2425 if entry.foreground_color: 

2426 SubElement(elem, "ForegroundColor").text = entry.foreground_color 

2427 if entry.background_color: 

2428 SubElement(elem, "BackgroundColor").text = entry.background_color 

2429 if entry.override_url: 

2430 SubElement(elem, "OverrideURL").text = entry.override_url 

2431 

2432 if entry.tags: 

2433 SubElement(elem, "Tags").text = ";".join(entry.tags) 

2434 

2435 self._build_times(elem, entry.times) 

2436 

2437 # String fields - apply memory protection policy from database settings 

2438 for key, string_field in entry.strings.items(): 

2439 string_elem = SubElement(elem, "String") 

2440 SubElement(string_elem, "Key").text = key 

2441 value_elem = SubElement(string_elem, "Value") 

2442 value_elem.text = string_field.value or "" 

2443 # Use database memory_protection policy for standard fields, 

2444 # fall back to string_field.protected for custom fields 

2445 if key in self._settings.memory_protection: 

2446 should_protect = self._settings.memory_protection[key] 

2447 else: 

2448 should_protect = string_field.protected 

2449 if should_protect: 

2450 value_elem.set("Protected", "True") 

2451 

2452 # Binary references 

2453 for binary_ref in entry.binaries: 

2454 binary_elem = SubElement(elem, "Binary") 

2455 SubElement(binary_elem, "Key").text = binary_ref.key 

2456 value_elem = SubElement(binary_elem, "Value") 

2457 value_elem.set("Ref", str(binary_ref.ref)) 

2458 

2459 # AutoType 

2460 at = entry.autotype 

2461 at_elem = SubElement(elem, "AutoType") 

2462 SubElement(at_elem, "Enabled").text = str(at.enabled) 

2463 SubElement(at_elem, "DataTransferObfuscation").text = str(at.obfuscation) 

2464 SubElement(at_elem, "DefaultSequence").text = at.sequence or "" 

2465 

2466 if at.window: 

2467 assoc = SubElement(at_elem, "Association") 

2468 SubElement(assoc, "Window").text = at.window 

2469 SubElement(assoc, "KeystrokeSequence").text = "" 

2470 

2471 # History 

2472 if entry.history: 

2473 history_elem = SubElement(elem, "History") 

2474 for hist_entry in entry.history: 

2475 self._build_entry(history_elem, hist_entry) 

2476 

2477 def _build_times(self, parent: Element, times: Times) -> None: 

2478 """Build Times element from Times model.""" 

2479 elem = SubElement(parent, "Times") 

2480 

2481 SubElement(elem, "CreationTime").text = self._encode_time(times.creation_time) 

2482 SubElement(elem, "LastModificationTime").text = self._encode_time( 

2483 times.last_modification_time 

2484 ) 

2485 SubElement(elem, "LastAccessTime").text = self._encode_time(times.last_access_time) 

2486 if times.expiry_time: 

2487 SubElement(elem, "ExpiryTime").text = self._encode_time(times.expiry_time) 

2488 else: 

2489 SubElement(elem, "ExpiryTime").text = self._encode_time(times.creation_time) 

2490 SubElement(elem, "Expires").text = str(times.expires) 

2491 SubElement(elem, "UsageCount").text = str(times.usage_count) 

2492 if times.location_changed: 

2493 SubElement(elem, "LocationChanged").text = self._encode_time(times.location_changed) 

2494 

2495 def __str__(self) -> str: 

2496 entry_count = sum(1 for _ in self.iter_entries()) 

2497 group_count = sum(1 for _ in self.iter_groups()) 

2498 name = self._settings.database_name 

2499 return f'Database: "{name}" ({entry_count} entries, {group_count} groups)'