Coverage for src / kdbxtool / database.py: 93%

967 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-20 19:19 +0000

1"""High-level Database API for KDBX files. 

2 

3This module provides the main interface for working with KeePass databases: 

4- Opening and decrypting KDBX files 

5- Creating new databases 

6- Searching for entries and groups 

7- Saving databases 

8""" 

9 

10from __future__ import annotations 

11 

12import base64 

13import binascii 

14import contextlib 

15import getpass 

16import hashlib 

17import logging 

18import os 

19import uuid as uuid_module 

20from collections.abc import Iterator 

21from dataclasses import dataclass, field 

22from datetime import UTC, datetime, timedelta 

23from pathlib import Path 

24from types import TracebackType 

25from typing import TYPE_CHECKING, Protocol, cast 

26from xml.etree.ElementTree import Element, SubElement, tostring 

27 

28if TYPE_CHECKING: 

29 from .merge import DeletedObject, MergeMode, MergeResult 

30 

31from Cryptodome.Cipher import ChaCha20, Salsa20 

32from defusedxml import ElementTree as DefusedET 

33 

34from .exceptions import ( 

35 AuthenticationError, 

36 DatabaseError, 

37 InvalidXmlError, 

38 Kdbx3UpgradeRequired, 

39 MissingCredentialsError, 

40 UnknownCipherError, 

41) 

42from .models import Attachment, Entry, Group, HistoryEntry, Times 

43from .models.entry import AutoType, BinaryRef, StringField 

44from .parsing import CompressionType, KdbxHeader, KdbxVersion 

45from .parsing.kdbx3 import read_kdbx3 

46from .parsing.kdbx4 import InnerHeader, read_kdbx4, write_kdbx4 

47from .security import AesKdfConfig, Argon2Config, Cipher, KdfType 

48from .security import yubikey as yubikey_module 

49from .security.yubikey import YubiKeyConfig, compute_challenge_response 

50 

51logger = logging.getLogger(__name__) 

52 

53# Union type for KDF configurations 

54KdfConfig = Argon2Config | AesKdfConfig 

55 

56 

57class _StreamCipher(Protocol): 

58 """Protocol for stream ciphers used for protected value encryption.""" 

59 

60 def encrypt(self, plaintext: bytes) -> bytes: ... 

61 def decrypt(self, ciphertext: bytes) -> bytes: ... 

62 

63 

64# KDBX4 time format (ISO 8601, compatible with KeePassXC) 

65KDBX4_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" 

66 

67# Protected stream cipher IDs 

68PROTECTED_STREAM_SALSA20 = 2 

69PROTECTED_STREAM_CHACHA20 = 3 

70 

71 

72class ProtectedStreamCipher: 

73 """Stream cipher for encrypting/decrypting protected values in XML. 

74 

75 KDBX uses a stream cipher (ChaCha20 or Salsa20) to protect sensitive 

76 values like passwords in the XML payload. Each protected value is 

77 XOR'd with the cipher output in document order. 

78 """ 

79 

80 def __init__(self, stream_id: int, stream_key: bytes) -> None: 

81 """Initialize the stream cipher. 

82 

83 Args: 

84 stream_id: Cipher type (2=Salsa20, 3=ChaCha20) 

85 stream_key: Key material from inner header (typically 64 bytes) 

86 """ 

87 self._stream_id = stream_id 

88 self._stream_key = stream_key 

89 self._cipher = self._create_cipher() 

90 

91 def _create_cipher(self) -> _StreamCipher: 

92 """Create the appropriate cipher based on stream_id.""" 

93 if self._stream_id == PROTECTED_STREAM_CHACHA20: 

94 # ChaCha20: SHA-512 of key, first 32 bytes = key, bytes 32-44 = nonce 

95 key_hash = hashlib.sha512(self._stream_key).digest() 

96 key = key_hash[:32] 

97 nonce = key_hash[32:44] 

98 return ChaCha20.new(key=key, nonce=nonce) 

99 elif self._stream_id == PROTECTED_STREAM_SALSA20: 

100 # Salsa20: SHA-256 of key, fixed nonce 

101 key = hashlib.sha256(self._stream_key).digest() 

102 nonce = b"\xe8\x30\x09\x4b\x97\x20\x5d\x2a" 

103 return Salsa20.new(key=key, nonce=nonce) 

104 else: 

105 raise UnknownCipherError(self._stream_id.to_bytes(4, "little")) 

106 

107 def decrypt(self, ciphertext: bytes) -> bytes: 

108 """Decrypt protected value (XOR with stream).""" 

109 return self._cipher.decrypt(ciphertext) 

110 

111 def encrypt(self, plaintext: bytes) -> bytes: 

112 """Encrypt protected value (XOR with stream).""" 

113 return self._cipher.encrypt(plaintext) 

114 

115 

116@dataclass 

117class CustomIcon: 

118 """A custom icon in a KDBX database. 

119 

120 Custom icons are PNG images that can be assigned to entries and groups 

121 for visual customization beyond the standard icon set. 

122 

123 Attributes: 

124 uuid: Unique identifier for the icon 

125 data: PNG image data 

126 name: Optional display name for the icon 

127 last_modification_time: When the icon was last modified 

128 """ 

129 

130 uuid: uuid_module.UUID 

131 data: bytes 

132 name: str | None = None 

133 last_modification_time: datetime | None = None 

134 

135 

136@dataclass 

137class DatabaseSettings: 

138 """Settings for a KDBX database. 

139 

140 Attributes: 

141 generator: Generator application name 

142 database_name: Name of the database 

143 database_description: Description of the database 

144 default_username: Default username for new entries 

145 maintenance_history_days: Days to keep deleted items 

146 color: Database color (hex) 

147 master_key_change_rec: Days until master key change recommended 

148 master_key_change_force: Days until master key change forced 

149 memory_protection: Which fields to protect in memory 

150 recycle_bin_enabled: Whether recycle bin is enabled 

151 recycle_bin_uuid: UUID of recycle bin group 

152 history_max_items: Max history entries per entry 

153 history_max_size: Max history size in bytes 

154 custom_icons: Dictionary of custom icons (UUID -> CustomIcon) 

155 """ 

156 

157 generator: str = "kdbxtool" 

158 database_name: str = "Database" 

159 database_description: str = "" 

160 default_username: str = "" 

161 maintenance_history_days: int = 365 

162 color: str | None = None 

163 master_key_change_rec: int = -1 

164 master_key_change_force: int = -1 

165 memory_protection: dict[str, bool] = field( 

166 default_factory=lambda: { 

167 "Title": False, 

168 "UserName": False, 

169 "Password": True, 

170 "URL": False, 

171 "Notes": False, 

172 } 

173 ) 

174 recycle_bin_enabled: bool = True 

175 recycle_bin_uuid: uuid_module.UUID | None = None 

176 history_max_items: int = 10 

177 history_max_size: int = 6 * 1024 * 1024 # 6 MiB 

178 custom_icons: dict[uuid_module.UUID, CustomIcon] = field(default_factory=dict) 

179 deleted_objects: list[DeletedObject] = field(default_factory=list) 

180 

181 

182class Database: 

183 """High-level interface for KDBX databases. 

184 

185 This class provides the main API for working with KeePass databases. 

186 It handles encryption/decryption, XML parsing, and model management. 

187 

188 Example usage: 

189 # Open existing database 

190 db = Database.open("passwords.kdbx", password="secret") 

191 

192 # Find entries 

193 entries = db.find_entries(title="GitHub") 

194 

195 # Create entry 

196 entry = db.root_group.create_entry( 

197 title="New Site", 

198 username="user", 

199 password="pass123", 

200 ) 

201 

202 # Save changes 

203 db.save() 

204 """ 

205 

206 def __init__( 

207 self, 

208 root_group: Group, 

209 settings: DatabaseSettings | None = None, 

210 header: KdbxHeader | None = None, 

211 inner_header: InnerHeader | None = None, 

212 binaries: dict[int, bytes] | None = None, 

213 ) -> None: 

214 """Initialize database. 

215 

216 Usually you should use Database.open() or Database.create() instead. 

217 

218 Args: 

219 root_group: Root group containing all entries/groups 

220 settings: Database settings 

221 header: KDBX header (for existing databases) 

222 inner_header: Inner header (for existing databases) 

223 binaries: Binary attachments 

224 """ 

225 self._root_group = root_group 

226 self._settings = settings or DatabaseSettings() 

227 self._header = header 

228 self._inner_header = inner_header 

229 self._binaries = binaries or {} 

230 self._password: str | None = None 

231 self._keyfile_data: bytes | None = None 

232 self._transformed_key: bytes | None = None 

233 self._filepath: Path | None = None 

234 self._yubikey_slot: int | None = None 

235 self._yubikey_serial: int | None = None 

236 # Set database reference on all entries and groups 

237 self._set_database_references(root_group) 

238 

239 def _set_database_references(self, group: Group) -> None: 

240 """Recursively set _database reference on a group and all its contents.""" 

241 group._database = self 

242 for entry in group.entries: 

243 entry._database = self 

244 for subgroup in group.subgroups: 

245 self._set_database_references(subgroup) 

246 

247 def __enter__(self) -> Database: 

248 """Enter context manager.""" 

249 return self 

250 

251 def __exit__( 

252 self, 

253 exc_type: type[BaseException] | None, 

254 exc_val: BaseException | None, 

255 exc_tb: TracebackType | None, 

256 ) -> None: 

257 """Exit context manager, zeroizing credentials.""" 

258 self.zeroize_credentials() 

259 

260 def zeroize_credentials(self) -> None: 

261 """Explicitly zeroize stored credentials from memory. 

262 

263 Call this when done with the database to minimize the time 

264 credentials remain in memory. Note that Python's string 

265 interning may retain copies; for maximum security, use 

266 SecureBytes for credential input. 

267 """ 

268 # Clear password (Python GC will eventually free memory) 

269 self._password = None 

270 # Clear keyfile data (convert to bytearray and zeroize if possible) 

271 if self._keyfile_data is not None: 

272 try: 

273 # Attempt to overwrite the memory 

274 data = bytearray(self._keyfile_data) 

275 for i in range(len(data)): 

276 data[i] = 0 

277 except TypeError: 

278 pass # bytes is immutable, just dereference 

279 self._keyfile_data = None 

280 # Clear transformed key 

281 if self._transformed_key is not None: 

282 try: 

283 data = bytearray(self._transformed_key) 

284 for i in range(len(data)): 

285 data[i] = 0 

286 except TypeError: 

287 pass 

288 self._transformed_key = None 

289 

290 def dump(self) -> str: 

291 """Return a human-readable summary of the database for debugging. 

292 

293 Returns: 

294 Multi-line string with database metadata and statistics. 

295 """ 

296 lines = [f'Database: "{self._settings.database_name or "(unnamed)"}"'] 

297 if self._header is not None: 

298 lines.append(f" Format: KDBX{self._header.version.value}") 

299 lines.append(f" Cipher: {self._header.cipher.name}") 

300 lines.append(f" KDF: {self._header.kdf_type.name}") 

301 

302 # Count entries and groups 

303 entry_count = sum(1 for _ in self._root_group.iter_entries(recursive=True)) 

304 group_count = sum(1 for _ in self._root_group.iter_groups(recursive=True)) 

305 lines.append(f" Total entries: {entry_count}") 

306 lines.append(f" Total groups: {group_count}") 

307 

308 # Custom icons 

309 if self.custom_icons: 

310 lines.append(f" Custom icons: {len(self.custom_icons)}") 

311 

312 # Recycle bin 

313 if self._settings.recycle_bin_enabled: 

314 lines.append(" Recycle bin: enabled") 

315 

316 return "\n".join(lines) 

317 

318 def merge( 

319 self, 

320 source: Database, 

321 *, 

322 mode: MergeMode | None = None, 

323 ) -> MergeResult: 

324 """Merge another database into this one. 

325 

326 Combines entries, groups, history, attachments, and custom icons 

327 from the source database into this database using UUID-based 

328 matching and timestamp-based conflict resolution. 

329 

330 Args: 

331 source: Database to merge from (read-only) 

332 mode: Merge mode (STANDARD or SYNCHRONIZE). Defaults to STANDARD. 

333 - STANDARD: Add and update only, never deletes 

334 - SYNCHRONIZE: Full sync including deletions 

335 

336 Returns: 

337 MergeResult with counts and statistics about the merge 

338 

339 Raises: 

340 MergeError: If merge cannot be completed 

341 

342 Example: 

343 >>> target_db = Database.open("main.kdbx", password="secret") 

344 >>> source_db = Database.open("branch.kdbx", password="secret") 

345 >>> result = target_db.merge(source_db) 

346 >>> print(f"Added {result.entries_added} entries") 

347 >>> target_db.save() 

348 """ 

349 from .merge import MergeMode, Merger 

350 

351 if mode is None: 

352 mode = MergeMode.STANDARD 

353 

354 merger = Merger(self, source, mode=mode) 

355 return merger.merge() 

356 

357 @property 

358 def transformed_key(self) -> bytes | None: 

359 """Get the transformed key for caching. 

360 

361 The transformed key is the result of applying the KDF (Argon2) to 

362 the credentials. Caching this allows fast repeated database opens 

363 without re-running the expensive KDF. 

364 

365 Security note: The transformed key is as sensitive as the password. 

366 Anyone with this key can decrypt the database. Store securely and 

367 zeroize when done. 

368 

369 Returns: 

370 The transformed key, or None if not available (e.g., newly created DB) 

371 """ 

372 return self._transformed_key 

373 

374 @property 

375 def kdf_salt(self) -> bytes | None: 

376 """Get the KDF salt used for key derivation. 

377 

378 The salt is used together with credentials to derive the transformed key. 

379 If the salt changes (e.g., after save with regenerate_seeds=True), 

380 any cached transformed_key becomes invalid. 

381 

382 Returns: 

383 The KDF salt, or None if no header is set 

384 """ 

385 if self._header is None: 

386 return None 

387 return self._header.kdf_salt 

388 

389 @property 

390 def root_group(self) -> Group: 

391 """Get the root group of the database.""" 

392 return self._root_group 

393 

394 @property 

395 def settings(self) -> DatabaseSettings: 

396 """Get database settings.""" 

397 return self._settings 

398 

399 @property 

400 def filepath(self) -> Path | None: 

401 """Get the file path (if opened from file).""" 

402 return self._filepath 

403 

404 # --- Opening databases --- 

405 

406 @classmethod 

407 def open( 

408 cls, 

409 filepath: str | Path, 

410 password: str | None = None, 

411 keyfile: str | Path | None = None, 

412 yubikey_slot: int | None = None, 

413 yubikey_serial: int | None = None, 

414 ) -> Database: 

415 """Open an existing KDBX database. 

416 

417 Args: 

418 filepath: Path to the .kdbx file 

419 password: Database password 

420 keyfile: Path to keyfile (optional) 

421 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional). 

422 If provided, the database's KDF salt is used as challenge and 

423 the 20-byte HMAC-SHA1 response is incorporated into key derivation. 

424 Requires yubikey-manager package: pip install kdbxtool[yubikey] 

425 yubikey_serial: Serial number of specific YubiKey to use when multiple 

426 devices are connected. Use list_yubikeys() to discover serials. 

427 

428 Returns: 

429 Database instance 

430 

431 Raises: 

432 FileNotFoundError: If file doesn't exist 

433 ValueError: If credentials are wrong or file is corrupted 

434 YubiKeyError: If YubiKey operation fails 

435 """ 

436 logger.info("Opening database: %s", filepath) 

437 filepath = Path(filepath) 

438 if not filepath.exists(): 

439 raise FileNotFoundError(f"Database file not found: {filepath}") 

440 

441 data = filepath.read_bytes() 

442 

443 keyfile_data = None 

444 if keyfile: 

445 keyfile_path = Path(keyfile) 

446 if not keyfile_path.exists(): 

447 raise FileNotFoundError(f"Keyfile not found: {keyfile}") 

448 keyfile_data = keyfile_path.read_bytes() 

449 

450 return cls.open_bytes( 

451 data, 

452 password=password, 

453 keyfile_data=keyfile_data, 

454 filepath=filepath, 

455 yubikey_slot=yubikey_slot, 

456 yubikey_serial=yubikey_serial, 

457 ) 

458 

459 @classmethod 

460 def open_interactive( 

461 cls, 

462 filepath: str | Path, 

463 keyfile: str | Path | None = None, 

464 yubikey_slot: int | None = None, 

465 yubikey_serial: int | None = None, 

466 prompt: str = "Password: ", 

467 max_attempts: int = 3, 

468 ) -> Database: 

469 """Open a KDBX database with interactive password prompt. 

470 

471 Prompts the user for a password using secure input (no echo). If the 

472 password is incorrect, allows retrying up to max_attempts times. 

473 

474 This is a convenience method for CLI applications that need to securely 

475 prompt for database credentials. 

476 

477 Args: 

478 filepath: Path to the .kdbx file 

479 keyfile: Path to keyfile (optional) 

480 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional) 

481 yubikey_serial: Serial number of specific YubiKey to use 

482 prompt: Custom prompt string (default: "Password: ") 

483 max_attempts: Maximum password attempts before raising (default: 3) 

484 

485 Returns: 

486 Database instance 

487 

488 Raises: 

489 FileNotFoundError: If file or keyfile doesn't exist 

490 AuthenticationError: If max_attempts exceeded with wrong password 

491 YubiKeyError: If YubiKey operation fails 

492 

493 Example: 

494 >>> db = Database.open_interactive("vault.kdbx") 

495 Password: 

496 >>> db = Database.open_interactive("vault.kdbx", keyfile="vault.key") 

497 Password: 

498 """ 

499 import sys 

500 

501 for attempt in range(max_attempts): 

502 password = getpass.getpass(prompt) 

503 try: 

504 return cls.open( 

505 filepath, 

506 password=password, 

507 keyfile=keyfile, 

508 yubikey_slot=yubikey_slot, 

509 yubikey_serial=yubikey_serial, 

510 ) 

511 except AuthenticationError: 

512 if attempt < max_attempts - 1: 

513 print("Invalid password, try again.", file=sys.stderr) 

514 else: 

515 raise AuthenticationError( 

516 f"Authentication failed after {max_attempts} attempts" 

517 ) from None 

518 # This should never be reached due to the raise in the loop 

519 raise AuthenticationError(f"Authentication failed after {max_attempts} attempts") 

520 

521 @classmethod 

522 def open_bytes( 

523 cls, 

524 data: bytes, 

525 password: str | None = None, 

526 keyfile_data: bytes | None = None, 

527 filepath: Path | None = None, 

528 transformed_key: bytes | None = None, 

529 yubikey_slot: int | None = None, 

530 yubikey_serial: int | None = None, 

531 ) -> Database: 

532 """Open a KDBX database from bytes. 

533 

534 Supports both KDBX3 and KDBX4 formats. KDBX3 databases are opened 

535 read-only and will be automatically upgraded to KDBX4 on save. 

536 

537 Args: 

538 data: KDBX file contents 

539 password: Database password 

540 keyfile_data: Keyfile contents (optional) 

541 filepath: Original file path (for save) 

542 transformed_key: Precomputed transformed key (skips KDF, faster opens) 

543 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional). 

544 If provided, the database's KDF salt is used as challenge and 

545 the 20-byte HMAC-SHA1 response is incorporated into key derivation. 

546 Requires yubikey-manager package: pip install kdbxtool[yubikey] 

547 yubikey_serial: Serial number of specific YubiKey to use when multiple 

548 devices are connected. Use list_yubikeys() to discover serials. 

549 

550 Returns: 

551 Database instance 

552 

553 Raises: 

554 YubiKeyError: If YubiKey operation fails 

555 """ 

556 # Detect version from header (parse just enough to get version) 

557 header, _ = KdbxHeader.parse(data) 

558 logger.debug("KDBX version: %s", header.version.name) 

559 

560 # Get YubiKey response if slot specified 

561 # KeePassXC uses the KDF salt as the challenge, not master_seed 

562 yubikey_response: bytes | None = None 

563 if yubikey_slot is not None: 

564 if not yubikey_module.YUBIKEY_AVAILABLE: 

565 from .exceptions import YubiKeyNotAvailableError 

566 

567 raise YubiKeyNotAvailableError() 

568 config = YubiKeyConfig(slot=yubikey_slot, serial=yubikey_serial) 

569 response = compute_challenge_response(header.kdf_salt, config) 

570 yubikey_response = response.data 

571 

572 # Decrypt the file using appropriate reader 

573 is_kdbx3 = header.version == KdbxVersion.KDBX3 

574 if is_kdbx3: 

575 import warnings 

576 

577 warnings.warn( 

578 "Opening KDBX3 database. Saving will automatically upgrade to KDBX4 " 

579 "with modern security settings (Argon2d, ChaCha20). " 

580 "Use save(allow_upgrade=True) to confirm.", 

581 UserWarning, 

582 stacklevel=3, 

583 ) 

584 # KDBX3 doesn't support YubiKey CR in the same way 

585 # (KeeChallenge used a sidecar XML file, not integrated) 

586 if yubikey_slot is not None: 

587 raise DatabaseError( 

588 "YubiKey challenge-response is not supported for KDBX3 databases. " 

589 "Upgrade to KDBX4 first." 

590 ) 

591 payload = read_kdbx3( 

592 data, 

593 password=password, 

594 keyfile_data=keyfile_data, 

595 transformed_key=transformed_key, 

596 ) 

597 else: 

598 payload = read_kdbx4( 

599 data, 

600 password=password, 

601 keyfile_data=keyfile_data, 

602 transformed_key=transformed_key, 

603 yubikey_response=yubikey_response, 

604 ) 

605 

606 # Parse XML into models (with protected value decryption) 

607 root_group, settings, binaries = cls._parse_xml(payload.xml_data, payload.inner_header) 

608 

609 db = cls( 

610 root_group=root_group, 

611 settings=settings, 

612 header=payload.header, 

613 inner_header=payload.inner_header, 

614 binaries=binaries, 

615 ) 

616 db._password = password 

617 db._keyfile_data = keyfile_data 

618 db._transformed_key = payload.transformed_key 

619 db._filepath = filepath 

620 db._opened_as_kdbx3 = is_kdbx3 

621 db._yubikey_slot = yubikey_slot 

622 db._yubikey_serial = yubikey_serial 

623 

624 logger.info("Database opened successfully") 

625 logger.debug( 

626 "Loaded %d entries, %d groups", 

627 len(cast(list[Entry], db.find_entries())), 

628 len(cast(list[Group], db.find_groups())), 

629 ) 

630 

631 return db 

632 

633 # --- Creating databases --- 

634 

635 @classmethod 

636 def create( 

637 cls, 

638 filepath: str | Path | None = None, 

639 password: str | None = None, 

640 keyfile: str | Path | None = None, 

641 database_name: str = "Database", 

642 cipher: Cipher = Cipher.AES256_CBC, 

643 kdf_config: KdfConfig | None = None, 

644 ) -> Database: 

645 """Create a new KDBX database. 

646 

647 Args: 

648 filepath: Path to save the database (optional) 

649 password: Database password 

650 keyfile: Path to keyfile (optional) 

651 database_name: Name for the database 

652 cipher: Encryption cipher to use 

653 kdf_config: KDF configuration (Argon2Config or AesKdfConfig). 

654 Defaults to Argon2Config.standard() with Argon2d variant. 

655 

656 Returns: 

657 New Database instance 

658 """ 

659 logger.info("Creating new database") 

660 

661 if password is None and keyfile is None: 

662 raise MissingCredentialsError() 

663 

664 keyfile_data = None 

665 if keyfile: 

666 keyfile_path = Path(keyfile) 

667 if not keyfile_path.exists(): 

668 raise FileNotFoundError(f"Keyfile not found: {keyfile}") 

669 keyfile_data = keyfile_path.read_bytes() 

670 

671 # Use provided config or standard Argon2d defaults 

672 if kdf_config is None: 

673 kdf_config = Argon2Config.standard() 

674 

675 # Create root group 

676 root_group = Group.create_root(database_name) 

677 

678 # Create recycle bin group 

679 recycle_bin = Group(name="Recycle Bin", icon_id="43") 

680 root_group.add_subgroup(recycle_bin) 

681 

682 # Create header based on KDF config type 

683 if isinstance(kdf_config, Argon2Config): 

684 header = KdbxHeader( 

685 version=KdbxVersion.KDBX4, 

686 cipher=cipher, 

687 compression=CompressionType.GZIP, 

688 master_seed=os.urandom(32), 

689 encryption_iv=os.urandom(cipher.iv_size), 

690 kdf_type=kdf_config.variant, 

691 kdf_salt=kdf_config.salt, 

692 argon2_memory_kib=kdf_config.memory_kib, 

693 argon2_iterations=kdf_config.iterations, 

694 argon2_parallelism=kdf_config.parallelism, 

695 ) 

696 elif isinstance(kdf_config, AesKdfConfig): 

697 header = KdbxHeader( 

698 version=KdbxVersion.KDBX4, 

699 cipher=cipher, 

700 compression=CompressionType.GZIP, 

701 master_seed=os.urandom(32), 

702 encryption_iv=os.urandom(cipher.iv_size), 

703 kdf_type=KdfType.AES_KDF, 

704 kdf_salt=kdf_config.salt, 

705 aes_kdf_rounds=kdf_config.rounds, 

706 ) 

707 else: 

708 raise DatabaseError(f"Unsupported KDF config type: {type(kdf_config)}") 

709 

710 # Create inner header 

711 inner_header = InnerHeader( 

712 random_stream_id=3, # ChaCha20 

713 random_stream_key=os.urandom(64), 

714 binaries={}, 

715 ) 

716 

717 settings = DatabaseSettings( 

718 database_name=database_name, 

719 recycle_bin_enabled=True, 

720 recycle_bin_uuid=recycle_bin.uuid, 

721 ) 

722 

723 db = cls( 

724 root_group=root_group, 

725 settings=settings, 

726 header=header, 

727 inner_header=inner_header, 

728 ) 

729 db._password = password 

730 db._keyfile_data = keyfile_data 

731 if filepath: 

732 db._filepath = Path(filepath) 

733 

734 return db 

735 

736 # --- Saving databases --- 

737 

738 def _apply_encryption_config( 

739 self, 

740 kdf_config: KdfConfig | None = None, 

741 cipher: Cipher | None = None, 

742 ) -> None: 

743 """Apply encryption configuration to the database header. 

744 

745 Updates KDF and/or cipher settings. Used for both KDBX3 upgrades 

746 and modifying existing KDBX4 databases. Always results in KDBX4. 

747 

748 Args: 

749 kdf_config: KDF configuration (Argon2Config or AesKdfConfig). 

750 If None, preserves existing KDF settings. 

751 cipher: Encryption cipher. If None, preserves existing cipher. 

752 """ 

753 if self._header is None: 

754 raise DatabaseError("No header - database not properly initialized") 

755 

756 target_cipher = cipher if cipher is not None else self._header.cipher 

757 

758 if kdf_config is not None: 

759 if isinstance(kdf_config, Argon2Config): 

760 self._header = KdbxHeader( 

761 version=KdbxVersion.KDBX4, 

762 cipher=target_cipher, 

763 compression=self._header.compression, 

764 master_seed=self._header.master_seed, 

765 encryption_iv=self._header.encryption_iv, 

766 kdf_type=kdf_config.variant, 

767 kdf_salt=kdf_config.salt, 

768 argon2_memory_kib=kdf_config.memory_kib, 

769 argon2_iterations=kdf_config.iterations, 

770 argon2_parallelism=kdf_config.parallelism, 

771 ) 

772 elif isinstance(kdf_config, AesKdfConfig): 

773 self._header = KdbxHeader( 

774 version=KdbxVersion.KDBX4, 

775 cipher=target_cipher, 

776 compression=self._header.compression, 

777 master_seed=self._header.master_seed, 

778 encryption_iv=self._header.encryption_iv, 

779 kdf_type=KdfType.AES_KDF, 

780 kdf_salt=kdf_config.salt, 

781 aes_kdf_rounds=kdf_config.rounds, 

782 ) 

783 # KDF change invalidates cached transformed key 

784 self._transformed_key = None 

785 elif cipher is not None and cipher != self._header.cipher: 

786 # Cipher-only change 

787 self._header.cipher = target_cipher 

788 self._header.encryption_iv = os.urandom(target_cipher.iv_size) 

789 self._transformed_key = None 

790 

791 def save( 

792 self, 

793 filepath: str | Path | None = None, 

794 *, 

795 allow_upgrade: bool = False, 

796 regenerate_seeds: bool = True, 

797 kdf_config: KdfConfig | None = None, 

798 cipher: Cipher | None = None, 

799 yubikey_slot: int | None = None, 

800 yubikey_serial: int | None = None, 

801 ) -> None: 

802 """Save the database to a file. 

803 

804 KDBX3 databases are automatically upgraded to KDBX4 on save. When saving 

805 a KDBX3 database to its original file, explicit confirmation is required 

806 via the allow_upgrade parameter. 

807 

808 Args: 

809 filepath: Path to save to (uses original path if not specified) 

810 allow_upgrade: Must be True to confirm KDBX3 to KDBX4 upgrade when 

811 saving to the original file. Not required when saving to a new file. 

812 regenerate_seeds: If True (default), regenerate all cryptographic seeds 

813 (master_seed, encryption_iv, kdf_salt, random_stream_key) on save. 

814 Set to False only for testing or when using pre-computed transformed keys. 

815 kdf_config: Optional KDF configuration. Use presets like: 

816 - Argon2Config.standard() / high_security() / fast() 

817 - AesKdfConfig.standard() / high_security() / fast() 

818 For KDBX3 databases, defaults to Argon2Config.standard() for upgrade. 

819 For KDBX4 databases, allows changing KDF settings. 

820 cipher: Optional encryption cipher. Use one of: 

821 - Cipher.AES256_CBC (default, widely compatible) 

822 - Cipher.CHACHA20 (modern, faster in software) 

823 - Cipher.TWOFISH256_CBC (requires oxifish package) 

824 If not specified, preserves existing cipher. 

825 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional). 

826 If provided (or if database was opened with yubikey_slot), the new 

827 KDF salt is used as challenge and the response is incorporated 

828 into key derivation. Requires yubikey-manager package. 

829 yubikey_serial: Serial number of specific YubiKey to use when multiple 

830 devices are connected. Use list_yubikeys() to discover serials. 

831 

832 Raises: 

833 DatabaseError: If no filepath specified and database wasn't opened from file 

834 Kdbx3UpgradeRequired: If saving KDBX3 to original file without allow_upgrade=True 

835 YubiKeyError: If YubiKey operation fails 

836 """ 

837 logger.info("Saving database to: %s", filepath or self._filepath) 

838 

839 save_to_new_file = filepath is not None 

840 if filepath: 

841 self._filepath = Path(filepath) 

842 elif self._filepath is None: 

843 raise DatabaseError("No filepath specified and database wasn't opened from file") 

844 

845 # Require explicit confirmation when saving KDBX3 to original file 

846 was_kdbx3 = getattr(self, "_opened_as_kdbx3", False) 

847 if was_kdbx3 and not save_to_new_file and not allow_upgrade: 

848 raise Kdbx3UpgradeRequired() 

849 

850 # Use provided yubikey params, or fall back to stored ones 

851 effective_yubikey_slot = yubikey_slot if yubikey_slot is not None else self._yubikey_slot 

852 effective_yubikey_serial = ( 

853 yubikey_serial if yubikey_serial is not None else self._yubikey_serial 

854 ) 

855 

856 data = self.to_bytes( 

857 regenerate_seeds=regenerate_seeds, 

858 kdf_config=kdf_config, 

859 cipher=cipher, 

860 yubikey_slot=effective_yubikey_slot, 

861 yubikey_serial=effective_yubikey_serial, 

862 ) 

863 self._filepath.write_bytes(data) 

864 logger.debug("Database saved successfully") 

865 

866 # Update stored yubikey params if changed 

867 if yubikey_slot is not None: 

868 self._yubikey_slot = yubikey_slot 

869 if yubikey_serial is not None: 

870 self._yubikey_serial = yubikey_serial 

871 

872 # After KDBX3 upgrade, reload to get proper KDBX4 state (including transformed_key) 

873 if was_kdbx3: 

874 self.reload() 

875 self._opened_as_kdbx3 = False 

876 

877 def reload(self) -> None: 

878 """Reload the database from disk using stored credentials. 

879 

880 Re-reads the database file and replaces all in-memory state with 

881 the current file contents. Useful for discarding unsaved changes 

882 or syncing with external modifications. 

883 

884 Raises: 

885 DatabaseError: If database wasn't opened from a file 

886 MissingCredentialsError: If no credentials are stored 

887 """ 

888 logger.debug("Reloading database from disk") 

889 

890 if self._filepath is None: 

891 raise DatabaseError("Cannot reload: database wasn't opened from a file") 

892 

893 if self._password is None and self._keyfile_data is None: 

894 raise MissingCredentialsError() 

895 

896 # Re-read and parse the file 

897 data = self._filepath.read_bytes() 

898 reloaded = self.open_bytes( 

899 data, 

900 password=self._password, 

901 keyfile_data=self._keyfile_data, 

902 filepath=self._filepath, 

903 ) 

904 

905 # Copy all state from reloaded database 

906 self._root_group = reloaded._root_group 

907 self._settings = reloaded._settings 

908 self._header = reloaded._header 

909 self._inner_header = reloaded._inner_header 

910 self._binaries = reloaded._binaries 

911 self._transformed_key = reloaded._transformed_key 

912 self._opened_as_kdbx3 = reloaded._opened_as_kdbx3 

913 

914 def xml(self, *, pretty_print: bool = False) -> bytes: 

915 """Export database XML payload. 

916 

917 Returns the decrypted, decompressed XML payload of the database. 

918 Protected values (passwords, etc.) are shown in plaintext. 

919 Useful for debugging and migration. 

920 

921 Args: 

922 pretty_print: If True, format XML with indentation for readability 

923 

924 Returns: 

925 XML payload as bytes (UTF-8 encoded) 

926 """ 

927 root = Element("KeePassFile") 

928 

929 # Meta section 

930 meta = SubElement(root, "Meta") 

931 self._build_meta(meta) 

932 

933 # Root section 

934 root_elem = SubElement(root, "Root") 

935 self._build_group(root_elem, self._root_group) 

936 

937 # Note: We do NOT encrypt protected values here - the point is to 

938 # export the decrypted XML for debugging/inspection 

939 

940 if pretty_print: 

941 self._indent_xml(root) 

942 

943 return cast(bytes, tostring(root, encoding="utf-8", xml_declaration=True)) 

944 

945 def dump_xml(self, filepath: str | Path, *, pretty_print: bool = True) -> None: 

946 """Write database XML payload to a file. 

947 

948 Writes the decrypted, decompressed XML payload to a file. 

949 Protected values (passwords, etc.) are shown in plaintext. 

950 Useful for debugging and migration. 

951 

952 Args: 

953 filepath: Path to write the XML file 

954 pretty_print: If True (default), format XML with indentation 

955 """ 

956 xml_data = self.xml(pretty_print=pretty_print) 

957 Path(filepath).write_bytes(xml_data) 

958 

959 @staticmethod 

960 def _indent_xml(elem: Element, level: int = 0) -> None: 

961 """Add indentation to XML element tree for pretty printing.""" 

962 indent = "\n" + " " * level 

963 if len(elem): 

964 if not elem.text or not elem.text.strip(): 

965 elem.text = indent + " " 

966 if not elem.tail or not elem.tail.strip(): 

967 elem.tail = indent 

968 for child in elem: 

969 Database._indent_xml(child, level + 1) 

970 if not child.tail or not child.tail.strip(): 

971 child.tail = indent 

972 else: 

973 if level and (not elem.tail or not elem.tail.strip()): 

974 elem.tail = indent 

975 

976 def to_bytes( 

977 self, 

978 *, 

979 regenerate_seeds: bool = True, 

980 kdf_config: KdfConfig | None = None, 

981 cipher: Cipher | None = None, 

982 yubikey_slot: int | None = None, 

983 yubikey_serial: int | None = None, 

984 ) -> bytes: 

985 """Serialize the database to KDBX4 format. 

986 

987 KDBX3 databases are automatically upgraded to KDBX4 on save. 

988 This includes converting to the specified KDF and ChaCha20 protected stream. 

989 

990 Args: 

991 regenerate_seeds: If True (default), regenerate all cryptographic seeds 

992 (master_seed, encryption_iv, kdf_salt, random_stream_key) on save. 

993 This prevents precomputation attacks where an attacker can derive 

994 the encryption key in advance. Set to False only for testing or 

995 when using pre-computed transformed keys. 

996 kdf_config: Optional KDF configuration. Use presets like: 

997 - Argon2Config.standard() / high_security() / fast() 

998 - AesKdfConfig.standard() / high_security() / fast() 

999 For KDBX3 databases, defaults to Argon2Config.standard() for upgrade. 

1000 For KDBX4 databases, allows changing KDF settings. 

1001 cipher: Optional encryption cipher. Use one of: 

1002 - Cipher.AES256_CBC (default, widely compatible) 

1003 - Cipher.CHACHA20 (modern, faster in software) 

1004 - Cipher.TWOFISH256_CBC (requires oxifish package) 

1005 If not specified, preserves existing cipher. 

1006 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional). 

1007 If provided, the (new) KDF salt is used as challenge and the 

1008 20-byte HMAC-SHA1 response is incorporated into key derivation. 

1009 Requires yubikey-manager package: pip install kdbxtool[yubikey] 

1010 yubikey_serial: Serial number of specific YubiKey to use when multiple 

1011 devices are connected. Use list_yubikeys() to discover serials. 

1012 

1013 Returns: 

1014 KDBX4 file contents as bytes 

1015 

1016 Raises: 

1017 MissingCredentialsError: If no credentials are set 

1018 YubiKeyError: If YubiKey operation fails 

1019 """ 

1020 # Need either credentials, a transformed key, or YubiKey 

1021 has_credentials = self._password is not None or self._keyfile_data is not None 

1022 has_transformed_key = self._transformed_key is not None 

1023 has_yubikey = yubikey_slot is not None 

1024 if not has_credentials and not has_transformed_key and not has_yubikey: 

1025 raise MissingCredentialsError() 

1026 

1027 if self._header is None: 

1028 raise DatabaseError("No header - database not properly initialized") 

1029 

1030 if self._inner_header is None: 

1031 raise DatabaseError("No inner header - database not properly initialized") 

1032 

1033 # Handle KDBX3 upgrade or KDBX4 config changes 

1034 if self._header.version == KdbxVersion.KDBX3: 

1035 self._apply_encryption_config(kdf_config or Argon2Config.standard(), cipher=cipher) 

1036 # Upgrade inner header to ChaCha20 (KDBX3 uses Salsa20) 

1037 self._inner_header.random_stream_id = PROTECTED_STREAM_CHACHA20 

1038 self._inner_header.random_stream_key = os.urandom(64) 

1039 elif kdf_config is not None or cipher is not None: 

1040 self._apply_encryption_config(kdf_config, cipher=cipher) 

1041 

1042 # Regenerate all cryptographic seeds to prevent precomputation attacks. 

1043 # This ensures each save produces a file encrypted with fresh randomness. 

1044 # See: https://github.com/libkeepass/pykeepass/issues/219 

1045 if regenerate_seeds: 

1046 self._header.master_seed = os.urandom(32) 

1047 self._header.encryption_iv = os.urandom(self._header.cipher.iv_size) 

1048 self._header.kdf_salt = os.urandom(32) 

1049 self._inner_header.random_stream_key = os.urandom(64) 

1050 # Cached transformed_key is now invalid (salt changed) 

1051 self._transformed_key = None 

1052 

1053 # Get YubiKey response if slot specified 

1054 # KeePassXC uses the KDF salt as the challenge, not master_seed 

1055 yubikey_response: bytes | None = None 

1056 if yubikey_slot is not None: 

1057 if not yubikey_module.YUBIKEY_AVAILABLE: 

1058 from .exceptions import YubiKeyNotAvailableError 

1059 

1060 raise YubiKeyNotAvailableError() 

1061 config = YubiKeyConfig(slot=yubikey_slot, serial=yubikey_serial) 

1062 response = compute_challenge_response(self._header.kdf_salt, config) 

1063 yubikey_response = response.data 

1064 

1065 # Sync binaries to inner header (preserve protection flags where possible) 

1066 existing_binaries = self._inner_header.binaries 

1067 new_binaries: dict[int, tuple[bool, bytes]] = {} 

1068 for ref, data in self._binaries.items(): 

1069 if ref in existing_binaries: 

1070 # Preserve existing protection flag 

1071 protected, _ = existing_binaries[ref] 

1072 new_binaries[ref] = (protected, data) 

1073 else: 

1074 # New binary, default to protected 

1075 new_binaries[ref] = (True, data) 

1076 self._inner_header.binaries = new_binaries 

1077 

1078 # Build XML 

1079 xml_data = self._build_xml() 

1080 

1081 # Encrypt and return 

1082 # Use cached transformed_key if available (faster), otherwise use credentials 

1083 return write_kdbx4( 

1084 header=self._header, 

1085 inner_header=self._inner_header, 

1086 xml_data=xml_data, 

1087 password=self._password, 

1088 keyfile_data=self._keyfile_data, 

1089 transformed_key=self._transformed_key, 

1090 yubikey_response=yubikey_response, 

1091 ) 

1092 

1093 def set_credentials( 

1094 self, 

1095 password: str | None = None, 

1096 keyfile_data: bytes | None = None, 

1097 ) -> None: 

1098 """Set or update database credentials. 

1099 

1100 Args: 

1101 password: New password (None to remove) 

1102 keyfile_data: New keyfile data (None to remove) 

1103 

1104 Raises: 

1105 ValueError: If both password and keyfile are None 

1106 """ 

1107 if password is None and keyfile_data is None: 

1108 raise MissingCredentialsError() 

1109 self._password = password 

1110 self._keyfile_data = keyfile_data 

1111 

1112 # --- Search operations --- 

1113 

1114 def find_entries( 

1115 self, 

1116 title: str | None = None, 

1117 username: str | None = None, 

1118 password: str | None = None, 

1119 url: str | None = None, 

1120 notes: str | None = None, 

1121 otp: str | None = None, 

1122 tags: list[str] | None = None, 

1123 string: dict[str, str] | None = None, 

1124 autotype_enabled: bool | None = None, 

1125 autotype_sequence: str | None = None, 

1126 autotype_window: str | None = None, 

1127 uuid: uuid_module.UUID | None = None, 

1128 path: list[str] | str | None = None, 

1129 recursive: bool = True, 

1130 history: bool = False, 

1131 first: bool = False, 

1132 ) -> list[Entry] | Entry | None: 

1133 """Find entries matching criteria. 

1134 

1135 Args: 

1136 title: Match entries with this title 

1137 username: Match entries with this username 

1138 password: Match entries with this password 

1139 url: Match entries with this URL 

1140 notes: Match entries with these notes 

1141 otp: Match entries with this OTP 

1142 tags: Match entries with all these tags 

1143 string: Match entries with custom properties (dict of key:value) 

1144 autotype_enabled: Filter by AutoType enabled state 

1145 autotype_sequence: Match entries with this AutoType sequence 

1146 autotype_window: Match entries with this AutoType window 

1147 uuid: Match entry with this UUID 

1148 path: Path to entry as list of group names ending with entry title, 

1149 or as a '/'-separated string. When specified, other criteria 

1150 are ignored. 

1151 recursive: Search in subgroups 

1152 history: Include history entries in search 

1153 first: If True, return first match or None. If False, return list. 

1154 

1155 Returns: 

1156 If first=True: Entry or None 

1157 If first=False: List of matching entries 

1158 """ 

1159 # Path-based search 

1160 if path is not None: 

1161 results = self._find_entry_by_path(path) 

1162 if first: 

1163 return results[0] if results else None 

1164 return results 

1165 

1166 if uuid is not None: 

1167 entry = self._root_group.find_entry_by_uuid(uuid, recursive=recursive) 

1168 if first: 

1169 return entry 

1170 return [entry] if entry else [] 

1171 

1172 results = self._root_group.find_entries( 

1173 title=title, 

1174 username=username, 

1175 password=password, 

1176 url=url, 

1177 notes=notes, 

1178 otp=otp, 

1179 tags=tags, 

1180 string=string, 

1181 autotype_enabled=autotype_enabled, 

1182 autotype_sequence=autotype_sequence, 

1183 autotype_window=autotype_window, 

1184 recursive=recursive, 

1185 history=history, 

1186 ) 

1187 

1188 if first: 

1189 return results[0] if results else None 

1190 return results 

1191 

1192 def _find_entry_by_path(self, path: list[str] | str) -> list[Entry]: 

1193 """Find entry by path. 

1194 

1195 Args: 

1196 path: Path as list ['group1', 'group2', 'entry_title'] or 

1197 string 'group1/group2/entry_title' 

1198 

1199 Returns: 

1200 List containing matching entry, or empty list if not found 

1201 """ 

1202 if isinstance(path, str): 

1203 path = [p for p in path.split("/") if p] 

1204 

1205 if not path: 

1206 return [] 

1207 

1208 # Last element is entry title, rest are group names 

1209 entry_title = path[-1] 

1210 group_path = path[:-1] 

1211 

1212 # Navigate to target group 

1213 current = self._root_group 

1214 for group_name in group_path: 

1215 found = None 

1216 for subgroup in current.subgroups: 

1217 if subgroup.name == group_name: 

1218 found = subgroup 

1219 break 

1220 if found is None: 

1221 return [] 

1222 current = found 

1223 

1224 # Find entry in target group (non-recursive) 

1225 for entry in current.entries: 

1226 if entry.title == entry_title: 

1227 return [entry] 

1228 

1229 return [] 

1230 

1231 def find_groups( 

1232 self, 

1233 name: str | None = None, 

1234 uuid: uuid_module.UUID | None = None, 

1235 path: list[str] | str | None = None, 

1236 recursive: bool = True, 

1237 first: bool = False, 

1238 ) -> list[Group] | Group | None: 

1239 """Find groups matching criteria. 

1240 

1241 Args: 

1242 name: Match groups with this name 

1243 uuid: Match group with this UUID 

1244 path: Path to group as list of group names or as a '/'-separated 

1245 string. When specified, other criteria are ignored. 

1246 recursive: Search in nested subgroups 

1247 first: If True, return first matching group or None instead of list 

1248 

1249 Returns: 

1250 List of matching groups, or single Group/None if first=True 

1251 """ 

1252 # Path-based search 

1253 if path is not None: 

1254 results = self._find_group_by_path(path) 

1255 if first: 

1256 return results[0] if results else None 

1257 return results 

1258 

1259 if uuid is not None: 

1260 group = self._root_group.find_group_by_uuid(uuid, recursive=recursive) 

1261 if first: 

1262 return group 

1263 return [group] if group else [] 

1264 

1265 # find_groups with first=False always returns list 

1266 group_results = cast( 

1267 list[Group], 

1268 self._root_group.find_groups(name=name, recursive=recursive), 

1269 ) 

1270 if first: 

1271 return group_results[0] if group_results else None 

1272 return group_results 

1273 

1274 def _find_group_by_path(self, path: list[str] | str) -> list[Group]: 

1275 """Find group by path. 

1276 

1277 Args: 

1278 path: Path as list ['group1', 'group2'] or string 'group1/group2' 

1279 

1280 Returns: 

1281 List containing matching group, or empty list if not found 

1282 """ 

1283 if isinstance(path, str): 

1284 path = [p for p in path.split("/") if p] 

1285 

1286 if not path: 

1287 return [self._root_group] 

1288 

1289 # Navigate through path 

1290 current = self._root_group 

1291 for group_name in path: 

1292 found = None 

1293 for subgroup in current.subgroups: 

1294 if subgroup.name == group_name: 

1295 found = subgroup 

1296 break 

1297 if found is None: 

1298 return [] 

1299 current = found 

1300 

1301 return [current] 

1302 

1303 def find_entries_contains( 

1304 self, 

1305 title: str | None = None, 

1306 username: str | None = None, 

1307 password: str | None = None, 

1308 url: str | None = None, 

1309 notes: str | None = None, 

1310 otp: str | None = None, 

1311 recursive: bool = True, 

1312 case_sensitive: bool = False, 

1313 history: bool = False, 

1314 ) -> list[Entry]: 

1315 """Find entries where fields contain the given substrings. 

1316 

1317 All criteria are combined with AND logic. None means "any value". 

1318 

1319 Args: 

1320 title: Match entries whose title contains this substring 

1321 username: Match entries whose username contains this substring 

1322 password: Match entries whose password contains this substring 

1323 url: Match entries whose URL contains this substring 

1324 notes: Match entries whose notes contain this substring 

1325 otp: Match entries whose OTP contains this substring 

1326 recursive: Search in subgroups 

1327 case_sensitive: If False (default), matching is case-insensitive 

1328 history: Include history entries in search 

1329 

1330 Returns: 

1331 List of matching entries 

1332 """ 

1333 return self._root_group.find_entries_contains( 

1334 title=title, 

1335 username=username, 

1336 password=password, 

1337 url=url, 

1338 notes=notes, 

1339 otp=otp, 

1340 recursive=recursive, 

1341 case_sensitive=case_sensitive, 

1342 history=history, 

1343 ) 

1344 

1345 def find_entries_regex( 

1346 self, 

1347 title: str | None = None, 

1348 username: str | None = None, 

1349 password: str | None = None, 

1350 url: str | None = None, 

1351 notes: str | None = None, 

1352 otp: str | None = None, 

1353 recursive: bool = True, 

1354 case_sensitive: bool = False, 

1355 history: bool = False, 

1356 ) -> list[Entry]: 

1357 """Find entries where fields match the given regex patterns. 

1358 

1359 All criteria are combined with AND logic. None means "any value". 

1360 

1361 Args: 

1362 title: Regex pattern to match against title 

1363 username: Regex pattern to match against username 

1364 password: Regex pattern to match against password 

1365 url: Regex pattern to match against URL 

1366 notes: Regex pattern to match against notes 

1367 otp: Regex pattern to match against OTP 

1368 recursive: Search in subgroups 

1369 case_sensitive: If False (default), matching is case-insensitive 

1370 history: Include history entries in search 

1371 

1372 Returns: 

1373 List of matching entries 

1374 

1375 Raises: 

1376 re.error: If any pattern is not a valid regex 

1377 """ 

1378 return self._root_group.find_entries_regex( 

1379 title=title, 

1380 username=username, 

1381 password=password, 

1382 url=url, 

1383 notes=notes, 

1384 otp=otp, 

1385 recursive=recursive, 

1386 case_sensitive=case_sensitive, 

1387 history=history, 

1388 ) 

1389 

1390 def find_attachments( 

1391 self, 

1392 id: int | None = None, 

1393 filename: str | None = None, 

1394 regex: bool = False, 

1395 recursive: bool = True, 

1396 history: bool = False, 

1397 first: bool = False, 

1398 ) -> list[Attachment] | Attachment | None: 

1399 """Find attachments in the database. 

1400 

1401 Args: 

1402 id: Match attachments with this binary reference ID 

1403 filename: Match attachments with this filename (exact or regex) 

1404 regex: If True, treat filename as a regex pattern 

1405 recursive: Search in subgroups 

1406 history: Include history entries in search 

1407 first: If True, return first match or None. If False, return list. 

1408 

1409 Returns: 

1410 If first=True: Attachment or None 

1411 If first=False: List of matching attachments 

1412 """ 

1413 import re as re_module 

1414 

1415 results: list[Attachment] = [] 

1416 pattern: re_module.Pattern[str] | None = None 

1417 

1418 if regex and filename is not None: 

1419 pattern = re_module.compile(filename) 

1420 

1421 for entry in self._root_group.iter_entries(recursive=recursive, history=history): 

1422 for binary_ref in entry.binaries: 

1423 # Check ID filter 

1424 if id is not None and binary_ref.ref != id: 

1425 continue 

1426 

1427 # Check filename filter 

1428 if filename is not None: 

1429 if regex and pattern is not None: 

1430 if not pattern.search(binary_ref.key): 

1431 continue 

1432 elif binary_ref.key != filename: 

1433 continue 

1434 

1435 attachment = Attachment( 

1436 filename=binary_ref.key, 

1437 id=binary_ref.ref, 

1438 entry=entry, 

1439 ) 

1440 results.append(attachment) 

1441 

1442 if first: 

1443 return attachment 

1444 

1445 if first: 

1446 return None 

1447 return results 

1448 

1449 @property 

1450 def attachments(self) -> list[Attachment]: 

1451 """Get all attachments in the database.""" 

1452 result = self.find_attachments(filename=".*", regex=True) 

1453 # find_attachments returns list when first=False (default) 

1454 return result if isinstance(result, list) else [] 

1455 

1456 def iter_entries(self, recursive: bool = True) -> Iterator[Entry]: 

1457 """Iterate over all entries in the database. 

1458 

1459 Args: 

1460 recursive: Include entries from all subgroups 

1461 

1462 Yields: 

1463 Entry objects 

1464 """ 

1465 yield from self._root_group.iter_entries(recursive=recursive) 

1466 

1467 def iter_groups(self, recursive: bool = True) -> Iterator[Group]: 

1468 """Iterate over all groups in the database. 

1469 

1470 Args: 

1471 recursive: Include nested subgroups 

1472 

1473 Yields: 

1474 Group objects 

1475 """ 

1476 yield from self._root_group.iter_groups(recursive=recursive) 

1477 

1478 # --- Field References --- 

1479 

1480 def deref(self, value: str | None) -> str | uuid_module.UUID | None: 

1481 """Resolve KeePass field references in a value. 

1482 

1483 Parses field references in the format {REF:X@Y:Z} and replaces them 

1484 with the actual values from the referenced entries: 

1485 - X = Field to retrieve (T=Title, U=Username, P=Password, A=URL, N=Notes, I=UUID) 

1486 - Y = Field to search by (T, U, P, A, N, I) 

1487 - Z = Search value 

1488 

1489 References are resolved recursively, so a reference that resolves to 

1490 another reference will continue resolving until a final value is found. 

1491 

1492 Args: 

1493 value: String potentially containing field references 

1494 

1495 Returns: 

1496 - The resolved string with all references replaced 

1497 - A UUID if the final result is a UUID reference 

1498 - None if any referenced entry cannot be found 

1499 - The original value if it contains no references or is None 

1500 

1501 Example: 

1502 >>> # Entry with password = '{REF:P@I:ABCD1234...}' 

1503 >>> db.deref(entry.password) # Returns the referenced password 

1504 >>> 

1505 >>> # With prefix/suffix: 'prefix{REF:U@I:...}suffix' 

1506 >>> db.deref(value) # Returns 'prefix<username>suffix' 

1507 """ 

1508 import re 

1509 

1510 if not value: 

1511 return value 

1512 

1513 # Pattern matches {REF:X@Y:Z} where X and Y are field codes, Z is search value 

1514 pattern = r"(\{REF:([TUPANI])@([TUPANI]):([^}]+)\})" 

1515 references = set(re.findall(pattern, value)) 

1516 

1517 if not references: 

1518 return value 

1519 

1520 field_to_attr = { 

1521 "T": "title", 

1522 "U": "username", 

1523 "P": "password", 

1524 "A": "url", 

1525 "N": "notes", 

1526 "I": "uuid", 

1527 } 

1528 

1529 for ref_str, wanted_field, search_field, search_value in references: 

1530 wanted_attr = field_to_attr[wanted_field] 

1531 search_attr = field_to_attr[search_field] 

1532 

1533 # Convert UUID search value to proper UUID object 

1534 if search_attr == "uuid": 

1535 try: 

1536 search_value = uuid_module.UUID(search_value) 

1537 except ValueError: 

1538 return None 

1539 

1540 # Find the referenced entry 

1541 ref_entry = self.find_entries(first=True, **{search_attr: search_value}) 

1542 if ref_entry is None: 

1543 return None 

1544 

1545 # Get the wanted field value 

1546 resolved_value = getattr(ref_entry, wanted_attr) 

1547 if resolved_value is None: 

1548 resolved_value = "" 

1549 

1550 # UUID needs special handling - convert to string for replacement 

1551 if isinstance(resolved_value, uuid_module.UUID): 

1552 resolved_value = str(resolved_value) 

1553 

1554 value = value.replace(ref_str, resolved_value) 

1555 

1556 # Recursively resolve any nested references 

1557 return self.deref(value) 

1558 

1559 # --- Move operations --- 

1560 

1561 def move_entry(self, entry: Entry, destination: Group) -> None: 

1562 """Move an entry to a different group. 

1563 

1564 This is a convenience method that calls entry.move_to(). It validates 

1565 that both the entry and destination belong to this database. 

1566 

1567 Args: 

1568 entry: Entry to move 

1569 destination: Target group to move the entry to 

1570 

1571 Raises: 

1572 ValueError: If entry or destination is not in this database 

1573 ValueError: If entry has no parent 

1574 ValueError: If destination is the current parent 

1575 """ 

1576 # Validate entry is in this database 

1577 if entry.parent is None: 

1578 raise ValueError("Entry has no parent group") 

1579 found = self._root_group.find_entry_by_uuid(entry.uuid) 

1580 if found is None: 

1581 raise ValueError("Entry is not in this database") 

1582 

1583 # Validate destination is in this database 

1584 if destination is not self._root_group: 

1585 found_group = self._root_group.find_group_by_uuid(destination.uuid) 

1586 if found_group is None: 

1587 raise ValueError("Destination group is not in this database") 

1588 

1589 entry.move_to(destination) 

1590 

1591 def move_group(self, group: Group, destination: Group) -> None: 

1592 """Move a group to a different parent group. 

1593 

1594 This is a convenience method that calls group.move_to(). It validates 

1595 that both the group and destination belong to this database. 

1596 

1597 Args: 

1598 group: Group to move 

1599 destination: Target parent group to move the group to 

1600 

1601 Raises: 

1602 ValueError: If group or destination is not in this database 

1603 ValueError: If group is the root group 

1604 ValueError: If group has no parent 

1605 ValueError: If destination is the current parent 

1606 ValueError: If destination is the group itself or a descendant 

1607 """ 

1608 # Validate group is in this database (and not root) 

1609 if group.is_root_group: 

1610 raise ValueError("Cannot move the root group") 

1611 if group.parent is None: 

1612 raise ValueError("Group has no parent") 

1613 found = self._root_group.find_group_by_uuid(group.uuid) 

1614 if found is None: 

1615 raise ValueError("Group is not in this database") 

1616 

1617 # Validate destination is in this database 

1618 if destination is not self._root_group: 

1619 found_dest = self._root_group.find_group_by_uuid(destination.uuid) 

1620 if found_dest is None: 

1621 raise ValueError("Destination group is not in this database") 

1622 

1623 group.move_to(destination) 

1624 

1625 # --- Recycle bin operations --- 

1626 

1627 @property 

1628 def recyclebin_group(self) -> Group | None: 

1629 """Get the recycle bin group, or None if disabled. 

1630 

1631 If recycle_bin_enabled is True but no recycle bin exists yet, 

1632 this creates one automatically. 

1633 

1634 Returns: 

1635 Recycle bin Group, or None if recycle bin is disabled 

1636 """ 

1637 if not self._settings.recycle_bin_enabled: 

1638 return None 

1639 

1640 # Try to find existing recycle bin 

1641 if self._settings.recycle_bin_uuid is not None: 

1642 group = self._root_group.find_group_by_uuid(self._settings.recycle_bin_uuid) 

1643 if group is not None: 

1644 return group 

1645 

1646 # Create new recycle bin 

1647 recycle_bin = Group(name="Recycle Bin", icon_id="43") 

1648 self._root_group.add_subgroup(recycle_bin) 

1649 self._settings.recycle_bin_uuid = recycle_bin.uuid 

1650 return recycle_bin 

1651 

1652 def trash_entry(self, entry: Entry) -> None: 

1653 """Move an entry to the recycle bin. 

1654 

1655 If the entry is already in the recycle bin, it is permanently deleted. 

1656 

1657 Args: 

1658 entry: Entry to trash 

1659 

1660 Raises: 

1661 ValueError: If entry is not in this database 

1662 ValueError: If recycle bin is disabled 

1663 """ 

1664 # Validate entry is in this database 

1665 if entry.parent is None: 

1666 raise ValueError("Entry has no parent group") 

1667 found = self._root_group.find_entry_by_uuid(entry.uuid) 

1668 if found is None: 

1669 raise ValueError("Entry is not in this database") 

1670 

1671 recycle_bin = self.recyclebin_group 

1672 if recycle_bin is None: 

1673 raise ValueError("Recycle bin is disabled") 

1674 

1675 # If already in recycle bin, delete permanently 

1676 if entry.parent is recycle_bin: 

1677 recycle_bin.remove_entry(entry) 

1678 return 

1679 

1680 # Move to recycle bin 

1681 entry.move_to(recycle_bin) 

1682 

1683 def trash_group(self, group: Group) -> None: 

1684 """Move a group to the recycle bin. 

1685 

1686 If the group is already in the recycle bin, it is permanently deleted. 

1687 Cannot trash the root group or the recycle bin itself. 

1688 

1689 Args: 

1690 group: Group to trash 

1691 

1692 Raises: 

1693 ValueError: If group is not in this database 

1694 ValueError: If group is the root group 

1695 ValueError: If group is the recycle bin 

1696 ValueError: If recycle bin is disabled 

1697 """ 

1698 # Validate group 

1699 if group.is_root_group: 

1700 raise ValueError("Cannot trash the root group") 

1701 if group.parent is None: 

1702 raise ValueError("Group has no parent") 

1703 found = self._root_group.find_group_by_uuid(group.uuid) 

1704 if found is None: 

1705 raise ValueError("Group is not in this database") 

1706 

1707 recycle_bin = self.recyclebin_group 

1708 if recycle_bin is None: 

1709 raise ValueError("Recycle bin is disabled") 

1710 

1711 # Cannot trash the recycle bin itself 

1712 if group is recycle_bin: 

1713 raise ValueError("Cannot trash the recycle bin") 

1714 

1715 # If already in recycle bin, delete permanently 

1716 if group.parent is recycle_bin: 

1717 recycle_bin.remove_subgroup(group) 

1718 return 

1719 

1720 # Move to recycle bin 

1721 group.move_to(recycle_bin) 

1722 

1723 def empty_group(self, group: Group) -> None: 

1724 """Delete all entries and subgroups from a group. 

1725 

1726 This permanently deletes all contents (does not use recycle bin). 

1727 The group itself is not deleted. 

1728 

1729 Args: 

1730 group: Group to empty 

1731 

1732 Raises: 

1733 ValueError: If group is not in this database 

1734 """ 

1735 # Validate group is in this database 

1736 if group is not self._root_group: 

1737 found = self._root_group.find_group_by_uuid(group.uuid) 

1738 if found is None: 

1739 raise ValueError("Group is not in this database") 

1740 

1741 # Delete all subgroups (iterate over copy since we're modifying) 

1742 for subgroup in list(group.subgroups): 

1743 group.remove_subgroup(subgroup) 

1744 

1745 # Delete all entries 

1746 for entry in list(group.entries): 

1747 group.remove_entry(entry) 

1748 

1749 # --- Memory protection --- 

1750 

1751 def apply_protection_policy(self, entry: Entry) -> None: 

1752 """Apply the database's memory protection policy to an entry. 

1753 

1754 Updates the `protected` flag on the entry's string fields 

1755 according to the database's memory_protection settings. 

1756 

1757 This is automatically applied when saving the database, but 

1758 can be called manually if you need protection applied immediately 

1759 for in-memory operations. 

1760 

1761 Args: 

1762 entry: Entry to apply policy to 

1763 """ 

1764 for key, string_field in entry.strings.items(): 

1765 if key in self._settings.memory_protection: 

1766 string_field.protected = self._settings.memory_protection[key] 

1767 

1768 def apply_protection_policy_all(self) -> None: 

1769 """Apply memory protection policy to all entries in the database. 

1770 

1771 Updates all entries' string field protection flags according 

1772 to the database's memory_protection settings. 

1773 """ 

1774 for entry in self.iter_entries(): 

1775 self.apply_protection_policy(entry) 

1776 

1777 # --- Binary attachments --- 

1778 

1779 def get_binary(self, ref: int) -> bytes | None: 

1780 """Get binary attachment data by reference ID. 

1781 

1782 Args: 

1783 ref: Binary reference ID 

1784 

1785 Returns: 

1786 Binary data or None if not found 

1787 """ 

1788 return self._binaries.get(ref) 

1789 

1790 def add_binary(self, data: bytes, protected: bool = True) -> int: 

1791 """Add a new binary attachment to the database. 

1792 

1793 Args: 

1794 data: Binary data 

1795 protected: Whether the binary should be memory-protected 

1796 

1797 Returns: 

1798 Reference ID for the new binary 

1799 """ 

1800 # Find next available index 

1801 ref = max(self._binaries.keys(), default=-1) + 1 

1802 self._binaries[ref] = data 

1803 # Update inner header 

1804 if self._inner_header is not None: 

1805 self._inner_header.binaries[ref] = (protected, data) 

1806 return ref 

1807 

1808 def remove_binary(self, ref: int) -> bool: 

1809 """Remove a binary attachment from the database. 

1810 

1811 Args: 

1812 ref: Binary reference ID 

1813 

1814 Returns: 

1815 True if removed, False if not found 

1816 """ 

1817 if ref in self._binaries: 

1818 del self._binaries[ref] 

1819 if self._inner_header is not None and ref in self._inner_header.binaries: 

1820 del self._inner_header.binaries[ref] 

1821 return True 

1822 return False 

1823 

1824 def get_attachment(self, entry: Entry, name: str) -> bytes | None: 

1825 """Get an attachment from an entry by filename. 

1826 

1827 Args: 

1828 entry: Entry to get attachment from 

1829 name: Filename of the attachment 

1830 

1831 Returns: 

1832 Attachment data or None if not found 

1833 """ 

1834 for binary_ref in entry.binaries: 

1835 if binary_ref.key == name: 

1836 return self._binaries.get(binary_ref.ref) 

1837 return None 

1838 

1839 def add_attachment(self, entry: Entry, name: str, data: bytes, protected: bool = True) -> None: 

1840 """Add an attachment to an entry. 

1841 

1842 Args: 

1843 entry: Entry to add attachment to 

1844 name: Filename for the attachment 

1845 data: Attachment data 

1846 protected: Whether the attachment should be memory-protected 

1847 """ 

1848 ref = self.add_binary(data, protected=protected) 

1849 entry.binaries.append(BinaryRef(key=name, ref=ref)) 

1850 

1851 def remove_attachment(self, entry: Entry, name: str) -> bool: 

1852 """Remove an attachment from an entry by filename. 

1853 

1854 Args: 

1855 entry: Entry to remove attachment from 

1856 name: Filename of the attachment 

1857 

1858 Returns: 

1859 True if removed, False if not found 

1860 """ 

1861 for i, binary_ref in enumerate(entry.binaries): 

1862 if binary_ref.key == name: 

1863 # Remove from entry's list 

1864 entry.binaries.pop(i) 

1865 # Note: We don't remove from _binaries as other entries may reference it 

1866 return True 

1867 return False 

1868 

1869 def list_attachments(self, entry: Entry) -> list[str]: 

1870 """List all attachment filenames for an entry. 

1871 

1872 Args: 

1873 entry: Entry to list attachments for 

1874 

1875 Returns: 

1876 List of attachment filenames 

1877 """ 

1878 return [binary_ref.key for binary_ref in entry.binaries] 

1879 

1880 # --- Custom icons --- 

1881 

1882 @property 

1883 def custom_icons(self) -> dict[uuid_module.UUID, CustomIcon]: 

1884 """Get dictionary of custom icons (UUID -> CustomIcon).""" 

1885 return self._settings.custom_icons 

1886 

1887 def get_custom_icon(self, uuid: uuid_module.UUID) -> bytes | None: 

1888 """Get custom icon data by UUID. 

1889 

1890 Args: 

1891 uuid: UUID of the custom icon 

1892 

1893 Returns: 

1894 PNG image data, or None if not found 

1895 """ 

1896 icon = self._settings.custom_icons.get(uuid) 

1897 return icon.data if icon else None 

1898 

1899 def add_custom_icon(self, data: bytes, name: str | None = None) -> uuid_module.UUID: 

1900 """Add a custom icon to the database. 

1901 

1902 Args: 

1903 data: PNG image data 

1904 name: Optional display name for the icon 

1905 

1906 Returns: 

1907 UUID of the new custom icon 

1908 """ 

1909 icon_uuid = uuid_module.uuid4() 

1910 icon = CustomIcon( 

1911 uuid=icon_uuid, 

1912 data=data, 

1913 name=name, 

1914 last_modification_time=datetime.now(UTC), 

1915 ) 

1916 self._settings.custom_icons[icon_uuid] = icon 

1917 return icon_uuid 

1918 

1919 def remove_custom_icon(self, uuid: uuid_module.UUID) -> bool: 

1920 """Remove a custom icon from the database. 

1921 

1922 Note: This does not update entries/groups that reference this icon. 

1923 They will continue to reference the now-missing UUID. 

1924 

1925 Args: 

1926 uuid: UUID of the custom icon to remove 

1927 

1928 Returns: 

1929 True if removed, False if not found 

1930 """ 

1931 if uuid in self._settings.custom_icons: 

1932 del self._settings.custom_icons[uuid] 

1933 return True 

1934 return False 

1935 

1936 def find_custom_icon_by_name(self, name: str) -> uuid_module.UUID | None: 

1937 """Find a custom icon by name. 

1938 

1939 Args: 

1940 name: Name of the icon to find (must match exactly one icon) 

1941 

1942 Returns: 

1943 UUID of the matching icon, or None if not found 

1944 

1945 Raises: 

1946 ValueError: If multiple icons have the same name 

1947 """ 

1948 matches = [icon.uuid for icon in self._settings.custom_icons.values() if icon.name == name] 

1949 if len(matches) == 0: 

1950 return None 

1951 if len(matches) > 1: 

1952 raise ValueError(f"Multiple custom icons found with name: {name}") 

1953 return matches[0] 

1954 

1955 # --- XML parsing --- 

1956 

1957 @classmethod 

1958 def _parse_xml( 

1959 cls, xml_data: bytes, inner_header: InnerHeader | None = None 

1960 ) -> tuple[Group, DatabaseSettings, dict[int, bytes]]: 

1961 """Parse KDBX XML into models. 

1962 

1963 Args: 

1964 xml_data: XML payload bytes 

1965 inner_header: Inner header with stream cipher info (for decrypting protected values) 

1966 

1967 Returns: 

1968 Tuple of (root_group, settings, binaries) 

1969 """ 

1970 root = DefusedET.fromstring(xml_data) 

1971 

1972 # Decrypt protected values in-place before parsing 

1973 if inner_header is not None: 

1974 cls._decrypt_protected_values(root, inner_header) 

1975 

1976 # Parse Meta section for settings 

1977 settings = cls._parse_meta(root.find("Meta")) 

1978 

1979 # Parse Root/Group for entries 

1980 root_elem = root.find("Root") 

1981 if root_elem is None: 

1982 raise InvalidXmlError("Missing Root element") 

1983 

1984 group_elem = root_elem.find("Group") 

1985 if group_elem is None: 

1986 raise InvalidXmlError("Missing root Group element") 

1987 

1988 root_group = cls._parse_group(group_elem) 

1989 root_group._is_root = True 

1990 

1991 # Extract binaries from inner header (KDBX4 style) 

1992 # The protection flag indicates memory protection policy, not encryption 

1993 binaries: dict[int, bytes] = {} 

1994 if inner_header is not None: 

1995 for idx, (_protected, data) in inner_header.binaries.items(): 

1996 binaries[idx] = data 

1997 

1998 return root_group, settings, binaries 

1999 

2000 @classmethod 

2001 def _decrypt_protected_values(cls, root: Element, inner_header: InnerHeader) -> None: 

2002 """Decrypt all protected values in the XML tree in document order. 

2003 

2004 Protected values are XOR'd with a stream cipher and base64 encoded. 

2005 This method decrypts them in-place. 

2006 """ 

2007 cipher = ProtectedStreamCipher( 

2008 inner_header.random_stream_id, 

2009 inner_header.random_stream_key, 

2010 ) 

2011 

2012 # Find all Value elements with Protected="True" in document order 

2013 for elem in root.iter("Value"): 

2014 if elem.get("Protected") == "True" and elem.text: 

2015 try: 

2016 ciphertext = base64.b64decode(elem.text) 

2017 plaintext = cipher.decrypt(ciphertext) 

2018 elem.text = plaintext.decode("utf-8") 

2019 except (binascii.Error, ValueError, UnicodeDecodeError): 

2020 # If decryption fails, leave as-is 

2021 pass 

2022 

2023 @classmethod 

2024 def _parse_meta(cls, meta_elem: Element | None) -> DatabaseSettings: 

2025 """Parse Meta element into DatabaseSettings.""" 

2026 settings = DatabaseSettings() 

2027 

2028 if meta_elem is None: 

2029 return settings 

2030 

2031 def get_text(tag: str) -> str | None: 

2032 elem = meta_elem.find(tag) 

2033 return elem.text if elem is not None else None 

2034 

2035 if name := get_text("DatabaseName"): 

2036 settings.database_name = name 

2037 if desc := get_text("DatabaseDescription"): 

2038 settings.database_description = desc 

2039 if username := get_text("DefaultUserName"): 

2040 settings.default_username = username 

2041 if gen := get_text("Generator"): 

2042 settings.generator = gen 

2043 

2044 # Parse memory protection 

2045 mp_elem = meta_elem.find("MemoryProtection") 

2046 if mp_elem is not None: 

2047 for field in ["Title", "UserName", "Password", "URL", "Notes"]: 

2048 elem = mp_elem.find(f"Protect{field}") 

2049 if elem is not None: 

2050 settings.memory_protection[field] = elem.text == "True" 

2051 

2052 # Parse recycle bin 

2053 if rb := get_text("RecycleBinEnabled"): 

2054 settings.recycle_bin_enabled = rb == "True" 

2055 if rb_uuid := get_text("RecycleBinUUID"): 

2056 import contextlib 

2057 

2058 with contextlib.suppress(binascii.Error, ValueError): 

2059 settings.recycle_bin_uuid = uuid_module.UUID(bytes=base64.b64decode(rb_uuid)) 

2060 

2061 # Parse custom icons 

2062 custom_icons_elem = meta_elem.find("CustomIcons") 

2063 if custom_icons_elem is not None: 

2064 for icon_elem in custom_icons_elem.findall("Icon"): 

2065 icon_uuid_elem = icon_elem.find("UUID") 

2066 icon_data_elem = icon_elem.find("Data") 

2067 if ( 

2068 icon_uuid_elem is not None 

2069 and icon_uuid_elem.text 

2070 and icon_data_elem is not None 

2071 and icon_data_elem.text 

2072 ): 

2073 try: 

2074 icon_uuid = uuid_module.UUID(bytes=base64.b64decode(icon_uuid_elem.text)) 

2075 icon_data = base64.b64decode(icon_data_elem.text) 

2076 icon_name = None 

2077 name_elem = icon_elem.find("Name") 

2078 if name_elem is not None: 

2079 icon_name = name_elem.text 

2080 icon_mtime = None 

2081 mtime_elem = icon_elem.find("LastModificationTime") 

2082 if mtime_elem is not None and mtime_elem.text: 

2083 icon_mtime = cls._decode_time(mtime_elem.text) 

2084 settings.custom_icons[icon_uuid] = CustomIcon( 

2085 uuid=icon_uuid, 

2086 data=icon_data, 

2087 name=icon_name, 

2088 last_modification_time=icon_mtime, 

2089 ) 

2090 except (binascii.Error, ValueError): 

2091 pass # Skip invalid icon 

2092 

2093 return settings 

2094 

2095 @classmethod 

2096 def _parse_group(cls, elem: Element) -> Group: 

2097 """Parse a Group element into a Group model.""" 

2098 group = Group() 

2099 

2100 # UUID 

2101 uuid_elem = elem.find("UUID") 

2102 if uuid_elem is not None and uuid_elem.text: 

2103 group.uuid = uuid_module.UUID(bytes=base64.b64decode(uuid_elem.text)) 

2104 

2105 # Name 

2106 name_elem = elem.find("Name") 

2107 if name_elem is not None: 

2108 group.name = name_elem.text 

2109 

2110 # Notes 

2111 notes_elem = elem.find("Notes") 

2112 if notes_elem is not None: 

2113 group.notes = notes_elem.text 

2114 

2115 # Icon 

2116 icon_elem = elem.find("IconID") 

2117 if icon_elem is not None and icon_elem.text: 

2118 group.icon_id = icon_elem.text 

2119 

2120 # Custom icon UUID 

2121 custom_icon_elem = elem.find("CustomIconUUID") 

2122 if custom_icon_elem is not None and custom_icon_elem.text: 

2123 with contextlib.suppress(binascii.Error, ValueError): 

2124 group.custom_icon_uuid = uuid_module.UUID( 

2125 bytes=base64.b64decode(custom_icon_elem.text) 

2126 ) 

2127 

2128 # Times 

2129 group.times = cls._parse_times(elem.find("Times")) 

2130 

2131 # Entries 

2132 for entry_elem in elem.findall("Entry"): 

2133 entry = cls._parse_entry(entry_elem) 

2134 group.add_entry(entry) 

2135 

2136 # Subgroups (recursive) 

2137 for subgroup_elem in elem.findall("Group"): 

2138 subgroup = cls._parse_group(subgroup_elem) 

2139 group.add_subgroup(subgroup) 

2140 

2141 return group 

2142 

2143 @classmethod 

2144 def _parse_entry(cls, elem: Element) -> Entry: 

2145 """Parse an Entry element into an Entry model.""" 

2146 entry = Entry() 

2147 

2148 # UUID 

2149 uuid_elem = elem.find("UUID") 

2150 if uuid_elem is not None and uuid_elem.text: 

2151 entry.uuid = uuid_module.UUID(bytes=base64.b64decode(uuid_elem.text)) 

2152 

2153 # Icon 

2154 icon_elem = elem.find("IconID") 

2155 if icon_elem is not None and icon_elem.text: 

2156 entry.icon_id = icon_elem.text 

2157 

2158 # Custom icon UUID 

2159 custom_icon_elem = elem.find("CustomIconUUID") 

2160 if custom_icon_elem is not None and custom_icon_elem.text: 

2161 with contextlib.suppress(binascii.Error, ValueError): 

2162 entry.custom_icon_uuid = uuid_module.UUID( 

2163 bytes=base64.b64decode(custom_icon_elem.text) 

2164 ) 

2165 

2166 # Tags 

2167 tags_elem = elem.find("Tags") 

2168 if tags_elem is not None and tags_elem.text: 

2169 tag_text = tags_elem.text.replace(",", ";") 

2170 entry.tags = [t.strip() for t in tag_text.split(";") if t.strip()] 

2171 

2172 # Times 

2173 entry.times = cls._parse_times(elem.find("Times")) 

2174 

2175 # String fields 

2176 for string_elem in elem.findall("String"): 

2177 key_elem = string_elem.find("Key") 

2178 value_elem = string_elem.find("Value") 

2179 if key_elem is not None and key_elem.text: 

2180 key = key_elem.text 

2181 value = value_elem.text if value_elem is not None else None 

2182 protected = value_elem is not None and value_elem.get("Protected") == "True" 

2183 entry.strings[key] = StringField(key=key, value=value, protected=protected) 

2184 

2185 # Binary references 

2186 for binary_elem in elem.findall("Binary"): 

2187 key_elem = binary_elem.find("Key") 

2188 value_elem = binary_elem.find("Value") 

2189 if key_elem is not None and key_elem.text and value_elem is not None: 

2190 ref = value_elem.get("Ref") 

2191 if ref is not None: 

2192 entry.binaries.append(BinaryRef(key=key_elem.text, ref=int(ref))) 

2193 

2194 # AutoType 

2195 at_elem = elem.find("AutoType") 

2196 if at_elem is not None: 

2197 enabled_elem = at_elem.find("Enabled") 

2198 seq_elem = at_elem.find("DefaultSequence") 

2199 obf_elem = at_elem.find("DataTransferObfuscation") 

2200 

2201 entry.autotype = AutoType( 

2202 enabled=enabled_elem is not None and enabled_elem.text == "True", 

2203 sequence=seq_elem.text if seq_elem is not None else None, 

2204 obfuscation=int(obf_elem.text) if obf_elem is not None and obf_elem.text else 0, 

2205 ) 

2206 

2207 # Window from Association 

2208 assoc_elem = at_elem.find("Association") 

2209 if assoc_elem is not None: 

2210 window_elem = assoc_elem.find("Window") 

2211 if window_elem is not None: 

2212 entry.autotype.window = window_elem.text 

2213 

2214 # History 

2215 history_elem = elem.find("History") 

2216 if history_elem is not None: 

2217 for hist_entry_elem in history_elem.findall("Entry"): 

2218 hist_entry = cls._parse_entry(hist_entry_elem) 

2219 history_entry = HistoryEntry.from_entry(hist_entry) 

2220 entry.history.append(history_entry) 

2221 

2222 return entry 

2223 

2224 @classmethod 

2225 def _parse_times(cls, times_elem: Element | None) -> Times: 

2226 """Parse Times element into Times model.""" 

2227 times = Times.create_new() 

2228 

2229 if times_elem is None: 

2230 return times 

2231 

2232 def parse_time(tag: str) -> datetime | None: 

2233 elem = times_elem.find(tag) 

2234 if elem is not None and elem.text: 

2235 return cls._decode_time(elem.text) 

2236 return None 

2237 

2238 if ct := parse_time("CreationTime"): 

2239 times.creation_time = ct 

2240 if mt := parse_time("LastModificationTime"): 

2241 times.last_modification_time = mt 

2242 if at := parse_time("LastAccessTime"): 

2243 times.last_access_time = at 

2244 if et := parse_time("ExpiryTime"): 

2245 times.expiry_time = et 

2246 if lc := parse_time("LocationChanged"): 

2247 times.location_changed = lc 

2248 

2249 expires_elem = times_elem.find("Expires") 

2250 if expires_elem is not None: 

2251 times.expires = expires_elem.text == "True" 

2252 

2253 usage_elem = times_elem.find("UsageCount") 

2254 if usage_elem is not None and usage_elem.text: 

2255 times.usage_count = int(usage_elem.text) 

2256 

2257 return times 

2258 

2259 @classmethod 

2260 def _decode_time(cls, time_str: str) -> datetime: 

2261 """Decode KDBX time string to datetime. 

2262 

2263 KDBX4 uses base64-encoded binary timestamps or ISO format. 

2264 """ 

2265 # Try base64 binary format first (KDBX4) 

2266 # Base64 strings don't contain - or : which are present in ISO dates 

2267 if "-" not in time_str and ":" not in time_str: 

2268 try: 

2269 binary = base64.b64decode(time_str) 

2270 if len(binary) == 8: # int64 = 8 bytes 

2271 # KDBX4 stores seconds since 0001-01-01 as int64 

2272 import struct 

2273 

2274 seconds = struct.unpack("<q", binary)[0] 

2275 # Convert to datetime (epoch is 0001-01-01) 

2276 base = datetime(1, 1, 1, tzinfo=UTC) 

2277 return base + timedelta(seconds=seconds) 

2278 except (ValueError, struct.error): 

2279 pass # Not valid base64 or wrong size 

2280 

2281 # Try ISO format 

2282 try: 

2283 return datetime.strptime(time_str, KDBX4_TIME_FORMAT).replace(tzinfo=UTC) 

2284 except ValueError: 

2285 pass 

2286 

2287 # Fallback: try without timezone 

2288 try: 

2289 return datetime.fromisoformat(time_str.replace("Z", "+00:00")) 

2290 except ValueError: 

2291 return datetime.now(UTC) 

2292 

2293 @classmethod 

2294 def _encode_time(cls, dt: datetime) -> str: 

2295 """Encode datetime to ISO 8601 format for KDBX4. 

2296 

2297 Uses ISO 8601 format (e.g., 2025-01-15T10:30:45Z) which is 

2298 human-readable and compatible with KeePassXC. 

2299 """ 

2300 # Ensure UTC timezone 

2301 if dt.tzinfo is None: 

2302 dt = dt.replace(tzinfo=UTC) 

2303 return dt.strftime(KDBX4_TIME_FORMAT) 

2304 

2305 # --- XML building --- 

2306 

2307 def _build_xml(self) -> bytes: 

2308 """Build KDBX XML from models.""" 

2309 root = Element("KeePassFile") 

2310 

2311 # Meta section 

2312 meta = SubElement(root, "Meta") 

2313 self._build_meta(meta) 

2314 

2315 # Root section 

2316 root_elem = SubElement(root, "Root") 

2317 self._build_group(root_elem, self._root_group) 

2318 

2319 # Encrypt protected values before serializing 

2320 if self._inner_header is not None: 

2321 self._encrypt_protected_values(root, self._inner_header) 

2322 

2323 # Serialize to bytes (tostring returns bytes when encoding is specified) 

2324 return cast(bytes, tostring(root, encoding="utf-8", xml_declaration=True)) 

2325 

2326 def _encrypt_protected_values(self, root: Element, inner_header: InnerHeader) -> None: 

2327 """Encrypt all protected values in the XML tree in document order. 

2328 

2329 Protected values are XOR'd with a stream cipher and base64 encoded. 

2330 This method encrypts them in-place. 

2331 """ 

2332 cipher = ProtectedStreamCipher( 

2333 inner_header.random_stream_id, 

2334 inner_header.random_stream_key, 

2335 ) 

2336 

2337 # Find all Value elements with Protected="True" in document order 

2338 for elem in root.iter("Value"): 

2339 if elem.get("Protected") == "True": 

2340 plaintext = (elem.text or "").encode("utf-8") 

2341 ciphertext = cipher.encrypt(plaintext) 

2342 elem.text = base64.b64encode(ciphertext).decode("ascii") 

2343 

2344 def _build_meta(self, meta: Element) -> None: 

2345 """Build Meta element from settings.""" 

2346 s = self._settings 

2347 

2348 SubElement(meta, "Generator").text = s.generator 

2349 SubElement(meta, "DatabaseName").text = s.database_name 

2350 if s.database_description: 

2351 SubElement(meta, "DatabaseDescription").text = s.database_description 

2352 if s.default_username: 

2353 SubElement(meta, "DefaultUserName").text = s.default_username 

2354 

2355 SubElement(meta, "MaintenanceHistoryDays").text = str(s.maintenance_history_days) 

2356 SubElement(meta, "MasterKeyChangeRec").text = str(s.master_key_change_rec) 

2357 SubElement(meta, "MasterKeyChangeForce").text = str(s.master_key_change_force) 

2358 

2359 # Memory protection 

2360 mp = SubElement(meta, "MemoryProtection") 

2361 for field_name, is_protected in s.memory_protection.items(): 

2362 SubElement(mp, f"Protect{field_name}").text = str(is_protected) 

2363 

2364 SubElement(meta, "RecycleBinEnabled").text = str(s.recycle_bin_enabled) 

2365 if s.recycle_bin_uuid: 

2366 SubElement(meta, "RecycleBinUUID").text = base64.b64encode( 

2367 s.recycle_bin_uuid.bytes 

2368 ).decode("ascii") 

2369 else: 

2370 # Empty UUID 

2371 SubElement(meta, "RecycleBinUUID").text = base64.b64encode(b"\x00" * 16).decode("ascii") 

2372 

2373 SubElement(meta, "HistoryMaxItems").text = str(s.history_max_items) 

2374 SubElement(meta, "HistoryMaxSize").text = str(s.history_max_size) 

2375 

2376 # Custom icons 

2377 if s.custom_icons: 

2378 custom_icons_elem = SubElement(meta, "CustomIcons") 

2379 for icon in s.custom_icons.values(): 

2380 icon_elem = SubElement(custom_icons_elem, "Icon") 

2381 SubElement(icon_elem, "UUID").text = base64.b64encode(icon.uuid.bytes).decode( 

2382 "ascii" 

2383 ) 

2384 SubElement(icon_elem, "Data").text = base64.b64encode(icon.data).decode("ascii") 

2385 if icon.name: 

2386 SubElement(icon_elem, "Name").text = icon.name 

2387 if icon.last_modification_time: 

2388 SubElement(icon_elem, "LastModificationTime").text = self._encode_time( 

2389 icon.last_modification_time 

2390 ) 

2391 

2392 def _build_group(self, parent: Element, group: Group) -> None: 

2393 """Build Group element from Group model.""" 

2394 elem = SubElement(parent, "Group") 

2395 

2396 SubElement(elem, "UUID").text = base64.b64encode(group.uuid.bytes).decode("ascii") 

2397 SubElement(elem, "Name").text = group.name or "" 

2398 if group.notes: 

2399 SubElement(elem, "Notes").text = group.notes 

2400 SubElement(elem, "IconID").text = group.icon_id 

2401 if group.custom_icon_uuid: 

2402 SubElement(elem, "CustomIconUUID").text = base64.b64encode( 

2403 group.custom_icon_uuid.bytes 

2404 ).decode("ascii") 

2405 

2406 self._build_times(elem, group.times) 

2407 

2408 SubElement(elem, "IsExpanded").text = str(group.is_expanded) 

2409 

2410 if group.default_autotype_sequence: 

2411 SubElement(elem, "DefaultAutoTypeSequence").text = group.default_autotype_sequence 

2412 if group.enable_autotype is not None: 

2413 SubElement(elem, "EnableAutoType").text = str(group.enable_autotype) 

2414 if group.enable_searching is not None: 

2415 SubElement(elem, "EnableSearching").text = str(group.enable_searching) 

2416 

2417 SubElement(elem, "LastTopVisibleEntry").text = base64.b64encode( 

2418 (group.last_top_visible_entry or uuid_module.UUID(int=0)).bytes 

2419 ).decode("ascii") 

2420 

2421 # Entries 

2422 for entry in group.entries: 

2423 self._build_entry(elem, entry) 

2424 

2425 # Subgroups (recursive) 

2426 for subgroup in group.subgroups: 

2427 self._build_group(elem, subgroup) 

2428 

2429 def _build_entry(self, parent: Element, entry: Entry) -> None: 

2430 """Build Entry element from Entry model.""" 

2431 elem = SubElement(parent, "Entry") 

2432 

2433 SubElement(elem, "UUID").text = base64.b64encode(entry.uuid.bytes).decode("ascii") 

2434 SubElement(elem, "IconID").text = entry.icon_id 

2435 if entry.custom_icon_uuid: 

2436 SubElement(elem, "CustomIconUUID").text = base64.b64encode( 

2437 entry.custom_icon_uuid.bytes 

2438 ).decode("ascii") 

2439 

2440 if entry.foreground_color: 

2441 SubElement(elem, "ForegroundColor").text = entry.foreground_color 

2442 if entry.background_color: 

2443 SubElement(elem, "BackgroundColor").text = entry.background_color 

2444 if entry.override_url: 

2445 SubElement(elem, "OverrideURL").text = entry.override_url 

2446 

2447 if entry.tags: 

2448 SubElement(elem, "Tags").text = ";".join(entry.tags) 

2449 

2450 self._build_times(elem, entry.times) 

2451 

2452 # String fields - apply memory protection policy from database settings 

2453 for key, string_field in entry.strings.items(): 

2454 string_elem = SubElement(elem, "String") 

2455 SubElement(string_elem, "Key").text = key 

2456 value_elem = SubElement(string_elem, "Value") 

2457 value_elem.text = string_field.value or "" 

2458 # Use database memory_protection policy for standard fields, 

2459 # fall back to string_field.protected for custom fields 

2460 if key in self._settings.memory_protection: 

2461 should_protect = self._settings.memory_protection[key] 

2462 else: 

2463 should_protect = string_field.protected 

2464 if should_protect: 

2465 value_elem.set("Protected", "True") 

2466 

2467 # Binary references 

2468 for binary_ref in entry.binaries: 

2469 binary_elem = SubElement(elem, "Binary") 

2470 SubElement(binary_elem, "Key").text = binary_ref.key 

2471 value_elem = SubElement(binary_elem, "Value") 

2472 value_elem.set("Ref", str(binary_ref.ref)) 

2473 

2474 # AutoType 

2475 at = entry.autotype 

2476 at_elem = SubElement(elem, "AutoType") 

2477 SubElement(at_elem, "Enabled").text = str(at.enabled) 

2478 SubElement(at_elem, "DataTransferObfuscation").text = str(at.obfuscation) 

2479 SubElement(at_elem, "DefaultSequence").text = at.sequence or "" 

2480 

2481 if at.window: 

2482 assoc = SubElement(at_elem, "Association") 

2483 SubElement(assoc, "Window").text = at.window 

2484 SubElement(assoc, "KeystrokeSequence").text = "" 

2485 

2486 # History 

2487 if entry.history: 

2488 history_elem = SubElement(elem, "History") 

2489 for hist_entry in entry.history: 

2490 self._build_entry(history_elem, hist_entry) 

2491 

2492 def _build_times(self, parent: Element, times: Times) -> None: 

2493 """Build Times element from Times model.""" 

2494 elem = SubElement(parent, "Times") 

2495 

2496 SubElement(elem, "CreationTime").text = self._encode_time(times.creation_time) 

2497 SubElement(elem, "LastModificationTime").text = self._encode_time( 

2498 times.last_modification_time 

2499 ) 

2500 SubElement(elem, "LastAccessTime").text = self._encode_time(times.last_access_time) 

2501 if times.expiry_time: 

2502 SubElement(elem, "ExpiryTime").text = self._encode_time(times.expiry_time) 

2503 else: 

2504 SubElement(elem, "ExpiryTime").text = self._encode_time(times.creation_time) 

2505 SubElement(elem, "Expires").text = str(times.expires) 

2506 SubElement(elem, "UsageCount").text = str(times.usage_count) 

2507 if times.location_changed: 

2508 SubElement(elem, "LocationChanged").text = self._encode_time(times.location_changed) 

2509 

2510 def __str__(self) -> str: 

2511 entry_count = sum(1 for _ in self.iter_entries()) 

2512 group_count = sum(1 for _ in self.iter_groups()) 

2513 name = self._settings.database_name 

2514 return f'Database: "{name}" ({entry_count} entries, {group_count} groups)'