Coverage for src / kdbxtool / database.py: 93%
967 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-20 19:19 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-20 19:19 +0000
1"""High-level Database API for KDBX files.
3This module provides the main interface for working with KeePass databases:
4- Opening and decrypting KDBX files
5- Creating new databases
6- Searching for entries and groups
7- Saving databases
8"""
10from __future__ import annotations
12import base64
13import binascii
14import contextlib
15import getpass
16import hashlib
17import logging
18import os
19import uuid as uuid_module
20from collections.abc import Iterator
21from dataclasses import dataclass, field
22from datetime import UTC, datetime, timedelta
23from pathlib import Path
24from types import TracebackType
25from typing import TYPE_CHECKING, Protocol, cast
26from xml.etree.ElementTree import Element, SubElement, tostring
28if TYPE_CHECKING:
29 from .merge import DeletedObject, MergeMode, MergeResult
31from Cryptodome.Cipher import ChaCha20, Salsa20
32from defusedxml import ElementTree as DefusedET
34from .exceptions import (
35 AuthenticationError,
36 DatabaseError,
37 InvalidXmlError,
38 Kdbx3UpgradeRequired,
39 MissingCredentialsError,
40 UnknownCipherError,
41)
42from .models import Attachment, Entry, Group, HistoryEntry, Times
43from .models.entry import AutoType, BinaryRef, StringField
44from .parsing import CompressionType, KdbxHeader, KdbxVersion
45from .parsing.kdbx3 import read_kdbx3
46from .parsing.kdbx4 import InnerHeader, read_kdbx4, write_kdbx4
47from .security import AesKdfConfig, Argon2Config, Cipher, KdfType
48from .security import yubikey as yubikey_module
49from .security.yubikey import YubiKeyConfig, compute_challenge_response
51logger = logging.getLogger(__name__)
53# Union type for KDF configurations
54KdfConfig = Argon2Config | AesKdfConfig
57class _StreamCipher(Protocol):
58 """Protocol for stream ciphers used for protected value encryption."""
60 def encrypt(self, plaintext: bytes) -> bytes: ...
61 def decrypt(self, ciphertext: bytes) -> bytes: ...
64# KDBX4 time format (ISO 8601, compatible with KeePassXC)
65KDBX4_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
67# Protected stream cipher IDs
68PROTECTED_STREAM_SALSA20 = 2
69PROTECTED_STREAM_CHACHA20 = 3
72class ProtectedStreamCipher:
73 """Stream cipher for encrypting/decrypting protected values in XML.
75 KDBX uses a stream cipher (ChaCha20 or Salsa20) to protect sensitive
76 values like passwords in the XML payload. Each protected value is
77 XOR'd with the cipher output in document order.
78 """
80 def __init__(self, stream_id: int, stream_key: bytes) -> None:
81 """Initialize the stream cipher.
83 Args:
84 stream_id: Cipher type (2=Salsa20, 3=ChaCha20)
85 stream_key: Key material from inner header (typically 64 bytes)
86 """
87 self._stream_id = stream_id
88 self._stream_key = stream_key
89 self._cipher = self._create_cipher()
91 def _create_cipher(self) -> _StreamCipher:
92 """Create the appropriate cipher based on stream_id."""
93 if self._stream_id == PROTECTED_STREAM_CHACHA20:
94 # ChaCha20: SHA-512 of key, first 32 bytes = key, bytes 32-44 = nonce
95 key_hash = hashlib.sha512(self._stream_key).digest()
96 key = key_hash[:32]
97 nonce = key_hash[32:44]
98 return ChaCha20.new(key=key, nonce=nonce)
99 elif self._stream_id == PROTECTED_STREAM_SALSA20:
100 # Salsa20: SHA-256 of key, fixed nonce
101 key = hashlib.sha256(self._stream_key).digest()
102 nonce = b"\xe8\x30\x09\x4b\x97\x20\x5d\x2a"
103 return Salsa20.new(key=key, nonce=nonce)
104 else:
105 raise UnknownCipherError(self._stream_id.to_bytes(4, "little"))
107 def decrypt(self, ciphertext: bytes) -> bytes:
108 """Decrypt protected value (XOR with stream)."""
109 return self._cipher.decrypt(ciphertext)
111 def encrypt(self, plaintext: bytes) -> bytes:
112 """Encrypt protected value (XOR with stream)."""
113 return self._cipher.encrypt(plaintext)
116@dataclass
117class CustomIcon:
118 """A custom icon in a KDBX database.
120 Custom icons are PNG images that can be assigned to entries and groups
121 for visual customization beyond the standard icon set.
123 Attributes:
124 uuid: Unique identifier for the icon
125 data: PNG image data
126 name: Optional display name for the icon
127 last_modification_time: When the icon was last modified
128 """
130 uuid: uuid_module.UUID
131 data: bytes
132 name: str | None = None
133 last_modification_time: datetime | None = None
136@dataclass
137class DatabaseSettings:
138 """Settings for a KDBX database.
140 Attributes:
141 generator: Generator application name
142 database_name: Name of the database
143 database_description: Description of the database
144 default_username: Default username for new entries
145 maintenance_history_days: Days to keep deleted items
146 color: Database color (hex)
147 master_key_change_rec: Days until master key change recommended
148 master_key_change_force: Days until master key change forced
149 memory_protection: Which fields to protect in memory
150 recycle_bin_enabled: Whether recycle bin is enabled
151 recycle_bin_uuid: UUID of recycle bin group
152 history_max_items: Max history entries per entry
153 history_max_size: Max history size in bytes
154 custom_icons: Dictionary of custom icons (UUID -> CustomIcon)
155 """
157 generator: str = "kdbxtool"
158 database_name: str = "Database"
159 database_description: str = ""
160 default_username: str = ""
161 maintenance_history_days: int = 365
162 color: str | None = None
163 master_key_change_rec: int = -1
164 master_key_change_force: int = -1
165 memory_protection: dict[str, bool] = field(
166 default_factory=lambda: {
167 "Title": False,
168 "UserName": False,
169 "Password": True,
170 "URL": False,
171 "Notes": False,
172 }
173 )
174 recycle_bin_enabled: bool = True
175 recycle_bin_uuid: uuid_module.UUID | None = None
176 history_max_items: int = 10
177 history_max_size: int = 6 * 1024 * 1024 # 6 MiB
178 custom_icons: dict[uuid_module.UUID, CustomIcon] = field(default_factory=dict)
179 deleted_objects: list[DeletedObject] = field(default_factory=list)
182class Database:
183 """High-level interface for KDBX databases.
185 This class provides the main API for working with KeePass databases.
186 It handles encryption/decryption, XML parsing, and model management.
188 Example usage:
189 # Open existing database
190 db = Database.open("passwords.kdbx", password="secret")
192 # Find entries
193 entries = db.find_entries(title="GitHub")
195 # Create entry
196 entry = db.root_group.create_entry(
197 title="New Site",
198 username="user",
199 password="pass123",
200 )
202 # Save changes
203 db.save()
204 """
206 def __init__(
207 self,
208 root_group: Group,
209 settings: DatabaseSettings | None = None,
210 header: KdbxHeader | None = None,
211 inner_header: InnerHeader | None = None,
212 binaries: dict[int, bytes] | None = None,
213 ) -> None:
214 """Initialize database.
216 Usually you should use Database.open() or Database.create() instead.
218 Args:
219 root_group: Root group containing all entries/groups
220 settings: Database settings
221 header: KDBX header (for existing databases)
222 inner_header: Inner header (for existing databases)
223 binaries: Binary attachments
224 """
225 self._root_group = root_group
226 self._settings = settings or DatabaseSettings()
227 self._header = header
228 self._inner_header = inner_header
229 self._binaries = binaries or {}
230 self._password: str | None = None
231 self._keyfile_data: bytes | None = None
232 self._transformed_key: bytes | None = None
233 self._filepath: Path | None = None
234 self._yubikey_slot: int | None = None
235 self._yubikey_serial: int | None = None
236 # Set database reference on all entries and groups
237 self._set_database_references(root_group)
239 def _set_database_references(self, group: Group) -> None:
240 """Recursively set _database reference on a group and all its contents."""
241 group._database = self
242 for entry in group.entries:
243 entry._database = self
244 for subgroup in group.subgroups:
245 self._set_database_references(subgroup)
247 def __enter__(self) -> Database:
248 """Enter context manager."""
249 return self
251 def __exit__(
252 self,
253 exc_type: type[BaseException] | None,
254 exc_val: BaseException | None,
255 exc_tb: TracebackType | None,
256 ) -> None:
257 """Exit context manager, zeroizing credentials."""
258 self.zeroize_credentials()
260 def zeroize_credentials(self) -> None:
261 """Explicitly zeroize stored credentials from memory.
263 Call this when done with the database to minimize the time
264 credentials remain in memory. Note that Python's string
265 interning may retain copies; for maximum security, use
266 SecureBytes for credential input.
267 """
268 # Clear password (Python GC will eventually free memory)
269 self._password = None
270 # Clear keyfile data (convert to bytearray and zeroize if possible)
271 if self._keyfile_data is not None:
272 try:
273 # Attempt to overwrite the memory
274 data = bytearray(self._keyfile_data)
275 for i in range(len(data)):
276 data[i] = 0
277 except TypeError:
278 pass # bytes is immutable, just dereference
279 self._keyfile_data = None
280 # Clear transformed key
281 if self._transformed_key is not None:
282 try:
283 data = bytearray(self._transformed_key)
284 for i in range(len(data)):
285 data[i] = 0
286 except TypeError:
287 pass
288 self._transformed_key = None
290 def dump(self) -> str:
291 """Return a human-readable summary of the database for debugging.
293 Returns:
294 Multi-line string with database metadata and statistics.
295 """
296 lines = [f'Database: "{self._settings.database_name or "(unnamed)"}"']
297 if self._header is not None:
298 lines.append(f" Format: KDBX{self._header.version.value}")
299 lines.append(f" Cipher: {self._header.cipher.name}")
300 lines.append(f" KDF: {self._header.kdf_type.name}")
302 # Count entries and groups
303 entry_count = sum(1 for _ in self._root_group.iter_entries(recursive=True))
304 group_count = sum(1 for _ in self._root_group.iter_groups(recursive=True))
305 lines.append(f" Total entries: {entry_count}")
306 lines.append(f" Total groups: {group_count}")
308 # Custom icons
309 if self.custom_icons:
310 lines.append(f" Custom icons: {len(self.custom_icons)}")
312 # Recycle bin
313 if self._settings.recycle_bin_enabled:
314 lines.append(" Recycle bin: enabled")
316 return "\n".join(lines)
318 def merge(
319 self,
320 source: Database,
321 *,
322 mode: MergeMode | None = None,
323 ) -> MergeResult:
324 """Merge another database into this one.
326 Combines entries, groups, history, attachments, and custom icons
327 from the source database into this database using UUID-based
328 matching and timestamp-based conflict resolution.
330 Args:
331 source: Database to merge from (read-only)
332 mode: Merge mode (STANDARD or SYNCHRONIZE). Defaults to STANDARD.
333 - STANDARD: Add and update only, never deletes
334 - SYNCHRONIZE: Full sync including deletions
336 Returns:
337 MergeResult with counts and statistics about the merge
339 Raises:
340 MergeError: If merge cannot be completed
342 Example:
343 >>> target_db = Database.open("main.kdbx", password="secret")
344 >>> source_db = Database.open("branch.kdbx", password="secret")
345 >>> result = target_db.merge(source_db)
346 >>> print(f"Added {result.entries_added} entries")
347 >>> target_db.save()
348 """
349 from .merge import MergeMode, Merger
351 if mode is None:
352 mode = MergeMode.STANDARD
354 merger = Merger(self, source, mode=mode)
355 return merger.merge()
357 @property
358 def transformed_key(self) -> bytes | None:
359 """Get the transformed key for caching.
361 The transformed key is the result of applying the KDF (Argon2) to
362 the credentials. Caching this allows fast repeated database opens
363 without re-running the expensive KDF.
365 Security note: The transformed key is as sensitive as the password.
366 Anyone with this key can decrypt the database. Store securely and
367 zeroize when done.
369 Returns:
370 The transformed key, or None if not available (e.g., newly created DB)
371 """
372 return self._transformed_key
374 @property
375 def kdf_salt(self) -> bytes | None:
376 """Get the KDF salt used for key derivation.
378 The salt is used together with credentials to derive the transformed key.
379 If the salt changes (e.g., after save with regenerate_seeds=True),
380 any cached transformed_key becomes invalid.
382 Returns:
383 The KDF salt, or None if no header is set
384 """
385 if self._header is None:
386 return None
387 return self._header.kdf_salt
389 @property
390 def root_group(self) -> Group:
391 """Get the root group of the database."""
392 return self._root_group
394 @property
395 def settings(self) -> DatabaseSettings:
396 """Get database settings."""
397 return self._settings
399 @property
400 def filepath(self) -> Path | None:
401 """Get the file path (if opened from file)."""
402 return self._filepath
404 # --- Opening databases ---
406 @classmethod
407 def open(
408 cls,
409 filepath: str | Path,
410 password: str | None = None,
411 keyfile: str | Path | None = None,
412 yubikey_slot: int | None = None,
413 yubikey_serial: int | None = None,
414 ) -> Database:
415 """Open an existing KDBX database.
417 Args:
418 filepath: Path to the .kdbx file
419 password: Database password
420 keyfile: Path to keyfile (optional)
421 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional).
422 If provided, the database's KDF salt is used as challenge and
423 the 20-byte HMAC-SHA1 response is incorporated into key derivation.
424 Requires yubikey-manager package: pip install kdbxtool[yubikey]
425 yubikey_serial: Serial number of specific YubiKey to use when multiple
426 devices are connected. Use list_yubikeys() to discover serials.
428 Returns:
429 Database instance
431 Raises:
432 FileNotFoundError: If file doesn't exist
433 ValueError: If credentials are wrong or file is corrupted
434 YubiKeyError: If YubiKey operation fails
435 """
436 logger.info("Opening database: %s", filepath)
437 filepath = Path(filepath)
438 if not filepath.exists():
439 raise FileNotFoundError(f"Database file not found: {filepath}")
441 data = filepath.read_bytes()
443 keyfile_data = None
444 if keyfile:
445 keyfile_path = Path(keyfile)
446 if not keyfile_path.exists():
447 raise FileNotFoundError(f"Keyfile not found: {keyfile}")
448 keyfile_data = keyfile_path.read_bytes()
450 return cls.open_bytes(
451 data,
452 password=password,
453 keyfile_data=keyfile_data,
454 filepath=filepath,
455 yubikey_slot=yubikey_slot,
456 yubikey_serial=yubikey_serial,
457 )
459 @classmethod
460 def open_interactive(
461 cls,
462 filepath: str | Path,
463 keyfile: str | Path | None = None,
464 yubikey_slot: int | None = None,
465 yubikey_serial: int | None = None,
466 prompt: str = "Password: ",
467 max_attempts: int = 3,
468 ) -> Database:
469 """Open a KDBX database with interactive password prompt.
471 Prompts the user for a password using secure input (no echo). If the
472 password is incorrect, allows retrying up to max_attempts times.
474 This is a convenience method for CLI applications that need to securely
475 prompt for database credentials.
477 Args:
478 filepath: Path to the .kdbx file
479 keyfile: Path to keyfile (optional)
480 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional)
481 yubikey_serial: Serial number of specific YubiKey to use
482 prompt: Custom prompt string (default: "Password: ")
483 max_attempts: Maximum password attempts before raising (default: 3)
485 Returns:
486 Database instance
488 Raises:
489 FileNotFoundError: If file or keyfile doesn't exist
490 AuthenticationError: If max_attempts exceeded with wrong password
491 YubiKeyError: If YubiKey operation fails
493 Example:
494 >>> db = Database.open_interactive("vault.kdbx")
495 Password:
496 >>> db = Database.open_interactive("vault.kdbx", keyfile="vault.key")
497 Password:
498 """
499 import sys
501 for attempt in range(max_attempts):
502 password = getpass.getpass(prompt)
503 try:
504 return cls.open(
505 filepath,
506 password=password,
507 keyfile=keyfile,
508 yubikey_slot=yubikey_slot,
509 yubikey_serial=yubikey_serial,
510 )
511 except AuthenticationError:
512 if attempt < max_attempts - 1:
513 print("Invalid password, try again.", file=sys.stderr)
514 else:
515 raise AuthenticationError(
516 f"Authentication failed after {max_attempts} attempts"
517 ) from None
518 # This should never be reached due to the raise in the loop
519 raise AuthenticationError(f"Authentication failed after {max_attempts} attempts")
521 @classmethod
522 def open_bytes(
523 cls,
524 data: bytes,
525 password: str | None = None,
526 keyfile_data: bytes | None = None,
527 filepath: Path | None = None,
528 transformed_key: bytes | None = None,
529 yubikey_slot: int | None = None,
530 yubikey_serial: int | None = None,
531 ) -> Database:
532 """Open a KDBX database from bytes.
534 Supports both KDBX3 and KDBX4 formats. KDBX3 databases are opened
535 read-only and will be automatically upgraded to KDBX4 on save.
537 Args:
538 data: KDBX file contents
539 password: Database password
540 keyfile_data: Keyfile contents (optional)
541 filepath: Original file path (for save)
542 transformed_key: Precomputed transformed key (skips KDF, faster opens)
543 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional).
544 If provided, the database's KDF salt is used as challenge and
545 the 20-byte HMAC-SHA1 response is incorporated into key derivation.
546 Requires yubikey-manager package: pip install kdbxtool[yubikey]
547 yubikey_serial: Serial number of specific YubiKey to use when multiple
548 devices are connected. Use list_yubikeys() to discover serials.
550 Returns:
551 Database instance
553 Raises:
554 YubiKeyError: If YubiKey operation fails
555 """
556 # Detect version from header (parse just enough to get version)
557 header, _ = KdbxHeader.parse(data)
558 logger.debug("KDBX version: %s", header.version.name)
560 # Get YubiKey response if slot specified
561 # KeePassXC uses the KDF salt as the challenge, not master_seed
562 yubikey_response: bytes | None = None
563 if yubikey_slot is not None:
564 if not yubikey_module.YUBIKEY_AVAILABLE:
565 from .exceptions import YubiKeyNotAvailableError
567 raise YubiKeyNotAvailableError()
568 config = YubiKeyConfig(slot=yubikey_slot, serial=yubikey_serial)
569 response = compute_challenge_response(header.kdf_salt, config)
570 yubikey_response = response.data
572 # Decrypt the file using appropriate reader
573 is_kdbx3 = header.version == KdbxVersion.KDBX3
574 if is_kdbx3:
575 import warnings
577 warnings.warn(
578 "Opening KDBX3 database. Saving will automatically upgrade to KDBX4 "
579 "with modern security settings (Argon2d, ChaCha20). "
580 "Use save(allow_upgrade=True) to confirm.",
581 UserWarning,
582 stacklevel=3,
583 )
584 # KDBX3 doesn't support YubiKey CR in the same way
585 # (KeeChallenge used a sidecar XML file, not integrated)
586 if yubikey_slot is not None:
587 raise DatabaseError(
588 "YubiKey challenge-response is not supported for KDBX3 databases. "
589 "Upgrade to KDBX4 first."
590 )
591 payload = read_kdbx3(
592 data,
593 password=password,
594 keyfile_data=keyfile_data,
595 transformed_key=transformed_key,
596 )
597 else:
598 payload = read_kdbx4(
599 data,
600 password=password,
601 keyfile_data=keyfile_data,
602 transformed_key=transformed_key,
603 yubikey_response=yubikey_response,
604 )
606 # Parse XML into models (with protected value decryption)
607 root_group, settings, binaries = cls._parse_xml(payload.xml_data, payload.inner_header)
609 db = cls(
610 root_group=root_group,
611 settings=settings,
612 header=payload.header,
613 inner_header=payload.inner_header,
614 binaries=binaries,
615 )
616 db._password = password
617 db._keyfile_data = keyfile_data
618 db._transformed_key = payload.transformed_key
619 db._filepath = filepath
620 db._opened_as_kdbx3 = is_kdbx3
621 db._yubikey_slot = yubikey_slot
622 db._yubikey_serial = yubikey_serial
624 logger.info("Database opened successfully")
625 logger.debug(
626 "Loaded %d entries, %d groups",
627 len(cast(list[Entry], db.find_entries())),
628 len(cast(list[Group], db.find_groups())),
629 )
631 return db
633 # --- Creating databases ---
635 @classmethod
636 def create(
637 cls,
638 filepath: str | Path | None = None,
639 password: str | None = None,
640 keyfile: str | Path | None = None,
641 database_name: str = "Database",
642 cipher: Cipher = Cipher.AES256_CBC,
643 kdf_config: KdfConfig | None = None,
644 ) -> Database:
645 """Create a new KDBX database.
647 Args:
648 filepath: Path to save the database (optional)
649 password: Database password
650 keyfile: Path to keyfile (optional)
651 database_name: Name for the database
652 cipher: Encryption cipher to use
653 kdf_config: KDF configuration (Argon2Config or AesKdfConfig).
654 Defaults to Argon2Config.standard() with Argon2d variant.
656 Returns:
657 New Database instance
658 """
659 logger.info("Creating new database")
661 if password is None and keyfile is None:
662 raise MissingCredentialsError()
664 keyfile_data = None
665 if keyfile:
666 keyfile_path = Path(keyfile)
667 if not keyfile_path.exists():
668 raise FileNotFoundError(f"Keyfile not found: {keyfile}")
669 keyfile_data = keyfile_path.read_bytes()
671 # Use provided config or standard Argon2d defaults
672 if kdf_config is None:
673 kdf_config = Argon2Config.standard()
675 # Create root group
676 root_group = Group.create_root(database_name)
678 # Create recycle bin group
679 recycle_bin = Group(name="Recycle Bin", icon_id="43")
680 root_group.add_subgroup(recycle_bin)
682 # Create header based on KDF config type
683 if isinstance(kdf_config, Argon2Config):
684 header = KdbxHeader(
685 version=KdbxVersion.KDBX4,
686 cipher=cipher,
687 compression=CompressionType.GZIP,
688 master_seed=os.urandom(32),
689 encryption_iv=os.urandom(cipher.iv_size),
690 kdf_type=kdf_config.variant,
691 kdf_salt=kdf_config.salt,
692 argon2_memory_kib=kdf_config.memory_kib,
693 argon2_iterations=kdf_config.iterations,
694 argon2_parallelism=kdf_config.parallelism,
695 )
696 elif isinstance(kdf_config, AesKdfConfig):
697 header = KdbxHeader(
698 version=KdbxVersion.KDBX4,
699 cipher=cipher,
700 compression=CompressionType.GZIP,
701 master_seed=os.urandom(32),
702 encryption_iv=os.urandom(cipher.iv_size),
703 kdf_type=KdfType.AES_KDF,
704 kdf_salt=kdf_config.salt,
705 aes_kdf_rounds=kdf_config.rounds,
706 )
707 else:
708 raise DatabaseError(f"Unsupported KDF config type: {type(kdf_config)}")
710 # Create inner header
711 inner_header = InnerHeader(
712 random_stream_id=3, # ChaCha20
713 random_stream_key=os.urandom(64),
714 binaries={},
715 )
717 settings = DatabaseSettings(
718 database_name=database_name,
719 recycle_bin_enabled=True,
720 recycle_bin_uuid=recycle_bin.uuid,
721 )
723 db = cls(
724 root_group=root_group,
725 settings=settings,
726 header=header,
727 inner_header=inner_header,
728 )
729 db._password = password
730 db._keyfile_data = keyfile_data
731 if filepath:
732 db._filepath = Path(filepath)
734 return db
736 # --- Saving databases ---
738 def _apply_encryption_config(
739 self,
740 kdf_config: KdfConfig | None = None,
741 cipher: Cipher | None = None,
742 ) -> None:
743 """Apply encryption configuration to the database header.
745 Updates KDF and/or cipher settings. Used for both KDBX3 upgrades
746 and modifying existing KDBX4 databases. Always results in KDBX4.
748 Args:
749 kdf_config: KDF configuration (Argon2Config or AesKdfConfig).
750 If None, preserves existing KDF settings.
751 cipher: Encryption cipher. If None, preserves existing cipher.
752 """
753 if self._header is None:
754 raise DatabaseError("No header - database not properly initialized")
756 target_cipher = cipher if cipher is not None else self._header.cipher
758 if kdf_config is not None:
759 if isinstance(kdf_config, Argon2Config):
760 self._header = KdbxHeader(
761 version=KdbxVersion.KDBX4,
762 cipher=target_cipher,
763 compression=self._header.compression,
764 master_seed=self._header.master_seed,
765 encryption_iv=self._header.encryption_iv,
766 kdf_type=kdf_config.variant,
767 kdf_salt=kdf_config.salt,
768 argon2_memory_kib=kdf_config.memory_kib,
769 argon2_iterations=kdf_config.iterations,
770 argon2_parallelism=kdf_config.parallelism,
771 )
772 elif isinstance(kdf_config, AesKdfConfig):
773 self._header = KdbxHeader(
774 version=KdbxVersion.KDBX4,
775 cipher=target_cipher,
776 compression=self._header.compression,
777 master_seed=self._header.master_seed,
778 encryption_iv=self._header.encryption_iv,
779 kdf_type=KdfType.AES_KDF,
780 kdf_salt=kdf_config.salt,
781 aes_kdf_rounds=kdf_config.rounds,
782 )
783 # KDF change invalidates cached transformed key
784 self._transformed_key = None
785 elif cipher is not None and cipher != self._header.cipher:
786 # Cipher-only change
787 self._header.cipher = target_cipher
788 self._header.encryption_iv = os.urandom(target_cipher.iv_size)
789 self._transformed_key = None
791 def save(
792 self,
793 filepath: str | Path | None = None,
794 *,
795 allow_upgrade: bool = False,
796 regenerate_seeds: bool = True,
797 kdf_config: KdfConfig | None = None,
798 cipher: Cipher | None = None,
799 yubikey_slot: int | None = None,
800 yubikey_serial: int | None = None,
801 ) -> None:
802 """Save the database to a file.
804 KDBX3 databases are automatically upgraded to KDBX4 on save. When saving
805 a KDBX3 database to its original file, explicit confirmation is required
806 via the allow_upgrade parameter.
808 Args:
809 filepath: Path to save to (uses original path if not specified)
810 allow_upgrade: Must be True to confirm KDBX3 to KDBX4 upgrade when
811 saving to the original file. Not required when saving to a new file.
812 regenerate_seeds: If True (default), regenerate all cryptographic seeds
813 (master_seed, encryption_iv, kdf_salt, random_stream_key) on save.
814 Set to False only for testing or when using pre-computed transformed keys.
815 kdf_config: Optional KDF configuration. Use presets like:
816 - Argon2Config.standard() / high_security() / fast()
817 - AesKdfConfig.standard() / high_security() / fast()
818 For KDBX3 databases, defaults to Argon2Config.standard() for upgrade.
819 For KDBX4 databases, allows changing KDF settings.
820 cipher: Optional encryption cipher. Use one of:
821 - Cipher.AES256_CBC (default, widely compatible)
822 - Cipher.CHACHA20 (modern, faster in software)
823 - Cipher.TWOFISH256_CBC (requires oxifish package)
824 If not specified, preserves existing cipher.
825 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional).
826 If provided (or if database was opened with yubikey_slot), the new
827 KDF salt is used as challenge and the response is incorporated
828 into key derivation. Requires yubikey-manager package.
829 yubikey_serial: Serial number of specific YubiKey to use when multiple
830 devices are connected. Use list_yubikeys() to discover serials.
832 Raises:
833 DatabaseError: If no filepath specified and database wasn't opened from file
834 Kdbx3UpgradeRequired: If saving KDBX3 to original file without allow_upgrade=True
835 YubiKeyError: If YubiKey operation fails
836 """
837 logger.info("Saving database to: %s", filepath or self._filepath)
839 save_to_new_file = filepath is not None
840 if filepath:
841 self._filepath = Path(filepath)
842 elif self._filepath is None:
843 raise DatabaseError("No filepath specified and database wasn't opened from file")
845 # Require explicit confirmation when saving KDBX3 to original file
846 was_kdbx3 = getattr(self, "_opened_as_kdbx3", False)
847 if was_kdbx3 and not save_to_new_file and not allow_upgrade:
848 raise Kdbx3UpgradeRequired()
850 # Use provided yubikey params, or fall back to stored ones
851 effective_yubikey_slot = yubikey_slot if yubikey_slot is not None else self._yubikey_slot
852 effective_yubikey_serial = (
853 yubikey_serial if yubikey_serial is not None else self._yubikey_serial
854 )
856 data = self.to_bytes(
857 regenerate_seeds=regenerate_seeds,
858 kdf_config=kdf_config,
859 cipher=cipher,
860 yubikey_slot=effective_yubikey_slot,
861 yubikey_serial=effective_yubikey_serial,
862 )
863 self._filepath.write_bytes(data)
864 logger.debug("Database saved successfully")
866 # Update stored yubikey params if changed
867 if yubikey_slot is not None:
868 self._yubikey_slot = yubikey_slot
869 if yubikey_serial is not None:
870 self._yubikey_serial = yubikey_serial
872 # After KDBX3 upgrade, reload to get proper KDBX4 state (including transformed_key)
873 if was_kdbx3:
874 self.reload()
875 self._opened_as_kdbx3 = False
877 def reload(self) -> None:
878 """Reload the database from disk using stored credentials.
880 Re-reads the database file and replaces all in-memory state with
881 the current file contents. Useful for discarding unsaved changes
882 or syncing with external modifications.
884 Raises:
885 DatabaseError: If database wasn't opened from a file
886 MissingCredentialsError: If no credentials are stored
887 """
888 logger.debug("Reloading database from disk")
890 if self._filepath is None:
891 raise DatabaseError("Cannot reload: database wasn't opened from a file")
893 if self._password is None and self._keyfile_data is None:
894 raise MissingCredentialsError()
896 # Re-read and parse the file
897 data = self._filepath.read_bytes()
898 reloaded = self.open_bytes(
899 data,
900 password=self._password,
901 keyfile_data=self._keyfile_data,
902 filepath=self._filepath,
903 )
905 # Copy all state from reloaded database
906 self._root_group = reloaded._root_group
907 self._settings = reloaded._settings
908 self._header = reloaded._header
909 self._inner_header = reloaded._inner_header
910 self._binaries = reloaded._binaries
911 self._transformed_key = reloaded._transformed_key
912 self._opened_as_kdbx3 = reloaded._opened_as_kdbx3
914 def xml(self, *, pretty_print: bool = False) -> bytes:
915 """Export database XML payload.
917 Returns the decrypted, decompressed XML payload of the database.
918 Protected values (passwords, etc.) are shown in plaintext.
919 Useful for debugging and migration.
921 Args:
922 pretty_print: If True, format XML with indentation for readability
924 Returns:
925 XML payload as bytes (UTF-8 encoded)
926 """
927 root = Element("KeePassFile")
929 # Meta section
930 meta = SubElement(root, "Meta")
931 self._build_meta(meta)
933 # Root section
934 root_elem = SubElement(root, "Root")
935 self._build_group(root_elem, self._root_group)
937 # Note: We do NOT encrypt protected values here - the point is to
938 # export the decrypted XML for debugging/inspection
940 if pretty_print:
941 self._indent_xml(root)
943 return cast(bytes, tostring(root, encoding="utf-8", xml_declaration=True))
945 def dump_xml(self, filepath: str | Path, *, pretty_print: bool = True) -> None:
946 """Write database XML payload to a file.
948 Writes the decrypted, decompressed XML payload to a file.
949 Protected values (passwords, etc.) are shown in plaintext.
950 Useful for debugging and migration.
952 Args:
953 filepath: Path to write the XML file
954 pretty_print: If True (default), format XML with indentation
955 """
956 xml_data = self.xml(pretty_print=pretty_print)
957 Path(filepath).write_bytes(xml_data)
959 @staticmethod
960 def _indent_xml(elem: Element, level: int = 0) -> None:
961 """Add indentation to XML element tree for pretty printing."""
962 indent = "\n" + " " * level
963 if len(elem):
964 if not elem.text or not elem.text.strip():
965 elem.text = indent + " "
966 if not elem.tail or not elem.tail.strip():
967 elem.tail = indent
968 for child in elem:
969 Database._indent_xml(child, level + 1)
970 if not child.tail or not child.tail.strip():
971 child.tail = indent
972 else:
973 if level and (not elem.tail or not elem.tail.strip()):
974 elem.tail = indent
976 def to_bytes(
977 self,
978 *,
979 regenerate_seeds: bool = True,
980 kdf_config: KdfConfig | None = None,
981 cipher: Cipher | None = None,
982 yubikey_slot: int | None = None,
983 yubikey_serial: int | None = None,
984 ) -> bytes:
985 """Serialize the database to KDBX4 format.
987 KDBX3 databases are automatically upgraded to KDBX4 on save.
988 This includes converting to the specified KDF and ChaCha20 protected stream.
990 Args:
991 regenerate_seeds: If True (default), regenerate all cryptographic seeds
992 (master_seed, encryption_iv, kdf_salt, random_stream_key) on save.
993 This prevents precomputation attacks where an attacker can derive
994 the encryption key in advance. Set to False only for testing or
995 when using pre-computed transformed keys.
996 kdf_config: Optional KDF configuration. Use presets like:
997 - Argon2Config.standard() / high_security() / fast()
998 - AesKdfConfig.standard() / high_security() / fast()
999 For KDBX3 databases, defaults to Argon2Config.standard() for upgrade.
1000 For KDBX4 databases, allows changing KDF settings.
1001 cipher: Optional encryption cipher. Use one of:
1002 - Cipher.AES256_CBC (default, widely compatible)
1003 - Cipher.CHACHA20 (modern, faster in software)
1004 - Cipher.TWOFISH256_CBC (requires oxifish package)
1005 If not specified, preserves existing cipher.
1006 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional).
1007 If provided, the (new) KDF salt is used as challenge and the
1008 20-byte HMAC-SHA1 response is incorporated into key derivation.
1009 Requires yubikey-manager package: pip install kdbxtool[yubikey]
1010 yubikey_serial: Serial number of specific YubiKey to use when multiple
1011 devices are connected. Use list_yubikeys() to discover serials.
1013 Returns:
1014 KDBX4 file contents as bytes
1016 Raises:
1017 MissingCredentialsError: If no credentials are set
1018 YubiKeyError: If YubiKey operation fails
1019 """
1020 # Need either credentials, a transformed key, or YubiKey
1021 has_credentials = self._password is not None or self._keyfile_data is not None
1022 has_transformed_key = self._transformed_key is not None
1023 has_yubikey = yubikey_slot is not None
1024 if not has_credentials and not has_transformed_key and not has_yubikey:
1025 raise MissingCredentialsError()
1027 if self._header is None:
1028 raise DatabaseError("No header - database not properly initialized")
1030 if self._inner_header is None:
1031 raise DatabaseError("No inner header - database not properly initialized")
1033 # Handle KDBX3 upgrade or KDBX4 config changes
1034 if self._header.version == KdbxVersion.KDBX3:
1035 self._apply_encryption_config(kdf_config or Argon2Config.standard(), cipher=cipher)
1036 # Upgrade inner header to ChaCha20 (KDBX3 uses Salsa20)
1037 self._inner_header.random_stream_id = PROTECTED_STREAM_CHACHA20
1038 self._inner_header.random_stream_key = os.urandom(64)
1039 elif kdf_config is not None or cipher is not None:
1040 self._apply_encryption_config(kdf_config, cipher=cipher)
1042 # Regenerate all cryptographic seeds to prevent precomputation attacks.
1043 # This ensures each save produces a file encrypted with fresh randomness.
1044 # See: https://github.com/libkeepass/pykeepass/issues/219
1045 if regenerate_seeds:
1046 self._header.master_seed = os.urandom(32)
1047 self._header.encryption_iv = os.urandom(self._header.cipher.iv_size)
1048 self._header.kdf_salt = os.urandom(32)
1049 self._inner_header.random_stream_key = os.urandom(64)
1050 # Cached transformed_key is now invalid (salt changed)
1051 self._transformed_key = None
1053 # Get YubiKey response if slot specified
1054 # KeePassXC uses the KDF salt as the challenge, not master_seed
1055 yubikey_response: bytes | None = None
1056 if yubikey_slot is not None:
1057 if not yubikey_module.YUBIKEY_AVAILABLE:
1058 from .exceptions import YubiKeyNotAvailableError
1060 raise YubiKeyNotAvailableError()
1061 config = YubiKeyConfig(slot=yubikey_slot, serial=yubikey_serial)
1062 response = compute_challenge_response(self._header.kdf_salt, config)
1063 yubikey_response = response.data
1065 # Sync binaries to inner header (preserve protection flags where possible)
1066 existing_binaries = self._inner_header.binaries
1067 new_binaries: dict[int, tuple[bool, bytes]] = {}
1068 for ref, data in self._binaries.items():
1069 if ref in existing_binaries:
1070 # Preserve existing protection flag
1071 protected, _ = existing_binaries[ref]
1072 new_binaries[ref] = (protected, data)
1073 else:
1074 # New binary, default to protected
1075 new_binaries[ref] = (True, data)
1076 self._inner_header.binaries = new_binaries
1078 # Build XML
1079 xml_data = self._build_xml()
1081 # Encrypt and return
1082 # Use cached transformed_key if available (faster), otherwise use credentials
1083 return write_kdbx4(
1084 header=self._header,
1085 inner_header=self._inner_header,
1086 xml_data=xml_data,
1087 password=self._password,
1088 keyfile_data=self._keyfile_data,
1089 transformed_key=self._transformed_key,
1090 yubikey_response=yubikey_response,
1091 )
1093 def set_credentials(
1094 self,
1095 password: str | None = None,
1096 keyfile_data: bytes | None = None,
1097 ) -> None:
1098 """Set or update database credentials.
1100 Args:
1101 password: New password (None to remove)
1102 keyfile_data: New keyfile data (None to remove)
1104 Raises:
1105 ValueError: If both password and keyfile are None
1106 """
1107 if password is None and keyfile_data is None:
1108 raise MissingCredentialsError()
1109 self._password = password
1110 self._keyfile_data = keyfile_data
1112 # --- Search operations ---
1114 def find_entries(
1115 self,
1116 title: str | None = None,
1117 username: str | None = None,
1118 password: str | None = None,
1119 url: str | None = None,
1120 notes: str | None = None,
1121 otp: str | None = None,
1122 tags: list[str] | None = None,
1123 string: dict[str, str] | None = None,
1124 autotype_enabled: bool | None = None,
1125 autotype_sequence: str | None = None,
1126 autotype_window: str | None = None,
1127 uuid: uuid_module.UUID | None = None,
1128 path: list[str] | str | None = None,
1129 recursive: bool = True,
1130 history: bool = False,
1131 first: bool = False,
1132 ) -> list[Entry] | Entry | None:
1133 """Find entries matching criteria.
1135 Args:
1136 title: Match entries with this title
1137 username: Match entries with this username
1138 password: Match entries with this password
1139 url: Match entries with this URL
1140 notes: Match entries with these notes
1141 otp: Match entries with this OTP
1142 tags: Match entries with all these tags
1143 string: Match entries with custom properties (dict of key:value)
1144 autotype_enabled: Filter by AutoType enabled state
1145 autotype_sequence: Match entries with this AutoType sequence
1146 autotype_window: Match entries with this AutoType window
1147 uuid: Match entry with this UUID
1148 path: Path to entry as list of group names ending with entry title,
1149 or as a '/'-separated string. When specified, other criteria
1150 are ignored.
1151 recursive: Search in subgroups
1152 history: Include history entries in search
1153 first: If True, return first match or None. If False, return list.
1155 Returns:
1156 If first=True: Entry or None
1157 If first=False: List of matching entries
1158 """
1159 # Path-based search
1160 if path is not None:
1161 results = self._find_entry_by_path(path)
1162 if first:
1163 return results[0] if results else None
1164 return results
1166 if uuid is not None:
1167 entry = self._root_group.find_entry_by_uuid(uuid, recursive=recursive)
1168 if first:
1169 return entry
1170 return [entry] if entry else []
1172 results = self._root_group.find_entries(
1173 title=title,
1174 username=username,
1175 password=password,
1176 url=url,
1177 notes=notes,
1178 otp=otp,
1179 tags=tags,
1180 string=string,
1181 autotype_enabled=autotype_enabled,
1182 autotype_sequence=autotype_sequence,
1183 autotype_window=autotype_window,
1184 recursive=recursive,
1185 history=history,
1186 )
1188 if first:
1189 return results[0] if results else None
1190 return results
1192 def _find_entry_by_path(self, path: list[str] | str) -> list[Entry]:
1193 """Find entry by path.
1195 Args:
1196 path: Path as list ['group1', 'group2', 'entry_title'] or
1197 string 'group1/group2/entry_title'
1199 Returns:
1200 List containing matching entry, or empty list if not found
1201 """
1202 if isinstance(path, str):
1203 path = [p for p in path.split("/") if p]
1205 if not path:
1206 return []
1208 # Last element is entry title, rest are group names
1209 entry_title = path[-1]
1210 group_path = path[:-1]
1212 # Navigate to target group
1213 current = self._root_group
1214 for group_name in group_path:
1215 found = None
1216 for subgroup in current.subgroups:
1217 if subgroup.name == group_name:
1218 found = subgroup
1219 break
1220 if found is None:
1221 return []
1222 current = found
1224 # Find entry in target group (non-recursive)
1225 for entry in current.entries:
1226 if entry.title == entry_title:
1227 return [entry]
1229 return []
1231 def find_groups(
1232 self,
1233 name: str | None = None,
1234 uuid: uuid_module.UUID | None = None,
1235 path: list[str] | str | None = None,
1236 recursive: bool = True,
1237 first: bool = False,
1238 ) -> list[Group] | Group | None:
1239 """Find groups matching criteria.
1241 Args:
1242 name: Match groups with this name
1243 uuid: Match group with this UUID
1244 path: Path to group as list of group names or as a '/'-separated
1245 string. When specified, other criteria are ignored.
1246 recursive: Search in nested subgroups
1247 first: If True, return first matching group or None instead of list
1249 Returns:
1250 List of matching groups, or single Group/None if first=True
1251 """
1252 # Path-based search
1253 if path is not None:
1254 results = self._find_group_by_path(path)
1255 if first:
1256 return results[0] if results else None
1257 return results
1259 if uuid is not None:
1260 group = self._root_group.find_group_by_uuid(uuid, recursive=recursive)
1261 if first:
1262 return group
1263 return [group] if group else []
1265 # find_groups with first=False always returns list
1266 group_results = cast(
1267 list[Group],
1268 self._root_group.find_groups(name=name, recursive=recursive),
1269 )
1270 if first:
1271 return group_results[0] if group_results else None
1272 return group_results
1274 def _find_group_by_path(self, path: list[str] | str) -> list[Group]:
1275 """Find group by path.
1277 Args:
1278 path: Path as list ['group1', 'group2'] or string 'group1/group2'
1280 Returns:
1281 List containing matching group, or empty list if not found
1282 """
1283 if isinstance(path, str):
1284 path = [p for p in path.split("/") if p]
1286 if not path:
1287 return [self._root_group]
1289 # Navigate through path
1290 current = self._root_group
1291 for group_name in path:
1292 found = None
1293 for subgroup in current.subgroups:
1294 if subgroup.name == group_name:
1295 found = subgroup
1296 break
1297 if found is None:
1298 return []
1299 current = found
1301 return [current]
1303 def find_entries_contains(
1304 self,
1305 title: str | None = None,
1306 username: str | None = None,
1307 password: str | None = None,
1308 url: str | None = None,
1309 notes: str | None = None,
1310 otp: str | None = None,
1311 recursive: bool = True,
1312 case_sensitive: bool = False,
1313 history: bool = False,
1314 ) -> list[Entry]:
1315 """Find entries where fields contain the given substrings.
1317 All criteria are combined with AND logic. None means "any value".
1319 Args:
1320 title: Match entries whose title contains this substring
1321 username: Match entries whose username contains this substring
1322 password: Match entries whose password contains this substring
1323 url: Match entries whose URL contains this substring
1324 notes: Match entries whose notes contain this substring
1325 otp: Match entries whose OTP contains this substring
1326 recursive: Search in subgroups
1327 case_sensitive: If False (default), matching is case-insensitive
1328 history: Include history entries in search
1330 Returns:
1331 List of matching entries
1332 """
1333 return self._root_group.find_entries_contains(
1334 title=title,
1335 username=username,
1336 password=password,
1337 url=url,
1338 notes=notes,
1339 otp=otp,
1340 recursive=recursive,
1341 case_sensitive=case_sensitive,
1342 history=history,
1343 )
1345 def find_entries_regex(
1346 self,
1347 title: str | None = None,
1348 username: str | None = None,
1349 password: str | None = None,
1350 url: str | None = None,
1351 notes: str | None = None,
1352 otp: str | None = None,
1353 recursive: bool = True,
1354 case_sensitive: bool = False,
1355 history: bool = False,
1356 ) -> list[Entry]:
1357 """Find entries where fields match the given regex patterns.
1359 All criteria are combined with AND logic. None means "any value".
1361 Args:
1362 title: Regex pattern to match against title
1363 username: Regex pattern to match against username
1364 password: Regex pattern to match against password
1365 url: Regex pattern to match against URL
1366 notes: Regex pattern to match against notes
1367 otp: Regex pattern to match against OTP
1368 recursive: Search in subgroups
1369 case_sensitive: If False (default), matching is case-insensitive
1370 history: Include history entries in search
1372 Returns:
1373 List of matching entries
1375 Raises:
1376 re.error: If any pattern is not a valid regex
1377 """
1378 return self._root_group.find_entries_regex(
1379 title=title,
1380 username=username,
1381 password=password,
1382 url=url,
1383 notes=notes,
1384 otp=otp,
1385 recursive=recursive,
1386 case_sensitive=case_sensitive,
1387 history=history,
1388 )
1390 def find_attachments(
1391 self,
1392 id: int | None = None,
1393 filename: str | None = None,
1394 regex: bool = False,
1395 recursive: bool = True,
1396 history: bool = False,
1397 first: bool = False,
1398 ) -> list[Attachment] | Attachment | None:
1399 """Find attachments in the database.
1401 Args:
1402 id: Match attachments with this binary reference ID
1403 filename: Match attachments with this filename (exact or regex)
1404 regex: If True, treat filename as a regex pattern
1405 recursive: Search in subgroups
1406 history: Include history entries in search
1407 first: If True, return first match or None. If False, return list.
1409 Returns:
1410 If first=True: Attachment or None
1411 If first=False: List of matching attachments
1412 """
1413 import re as re_module
1415 results: list[Attachment] = []
1416 pattern: re_module.Pattern[str] | None = None
1418 if regex and filename is not None:
1419 pattern = re_module.compile(filename)
1421 for entry in self._root_group.iter_entries(recursive=recursive, history=history):
1422 for binary_ref in entry.binaries:
1423 # Check ID filter
1424 if id is not None and binary_ref.ref != id:
1425 continue
1427 # Check filename filter
1428 if filename is not None:
1429 if regex and pattern is not None:
1430 if not pattern.search(binary_ref.key):
1431 continue
1432 elif binary_ref.key != filename:
1433 continue
1435 attachment = Attachment(
1436 filename=binary_ref.key,
1437 id=binary_ref.ref,
1438 entry=entry,
1439 )
1440 results.append(attachment)
1442 if first:
1443 return attachment
1445 if first:
1446 return None
1447 return results
1449 @property
1450 def attachments(self) -> list[Attachment]:
1451 """Get all attachments in the database."""
1452 result = self.find_attachments(filename=".*", regex=True)
1453 # find_attachments returns list when first=False (default)
1454 return result if isinstance(result, list) else []
1456 def iter_entries(self, recursive: bool = True) -> Iterator[Entry]:
1457 """Iterate over all entries in the database.
1459 Args:
1460 recursive: Include entries from all subgroups
1462 Yields:
1463 Entry objects
1464 """
1465 yield from self._root_group.iter_entries(recursive=recursive)
1467 def iter_groups(self, recursive: bool = True) -> Iterator[Group]:
1468 """Iterate over all groups in the database.
1470 Args:
1471 recursive: Include nested subgroups
1473 Yields:
1474 Group objects
1475 """
1476 yield from self._root_group.iter_groups(recursive=recursive)
1478 # --- Field References ---
1480 def deref(self, value: str | None) -> str | uuid_module.UUID | None:
1481 """Resolve KeePass field references in a value.
1483 Parses field references in the format {REF:X@Y:Z} and replaces them
1484 with the actual values from the referenced entries:
1485 - X = Field to retrieve (T=Title, U=Username, P=Password, A=URL, N=Notes, I=UUID)
1486 - Y = Field to search by (T, U, P, A, N, I)
1487 - Z = Search value
1489 References are resolved recursively, so a reference that resolves to
1490 another reference will continue resolving until a final value is found.
1492 Args:
1493 value: String potentially containing field references
1495 Returns:
1496 - The resolved string with all references replaced
1497 - A UUID if the final result is a UUID reference
1498 - None if any referenced entry cannot be found
1499 - The original value if it contains no references or is None
1501 Example:
1502 >>> # Entry with password = '{REF:P@I:ABCD1234...}'
1503 >>> db.deref(entry.password) # Returns the referenced password
1504 >>>
1505 >>> # With prefix/suffix: 'prefix{REF:U@I:...}suffix'
1506 >>> db.deref(value) # Returns 'prefix<username>suffix'
1507 """
1508 import re
1510 if not value:
1511 return value
1513 # Pattern matches {REF:X@Y:Z} where X and Y are field codes, Z is search value
1514 pattern = r"(\{REF:([TUPANI])@([TUPANI]):([^}]+)\})"
1515 references = set(re.findall(pattern, value))
1517 if not references:
1518 return value
1520 field_to_attr = {
1521 "T": "title",
1522 "U": "username",
1523 "P": "password",
1524 "A": "url",
1525 "N": "notes",
1526 "I": "uuid",
1527 }
1529 for ref_str, wanted_field, search_field, search_value in references:
1530 wanted_attr = field_to_attr[wanted_field]
1531 search_attr = field_to_attr[search_field]
1533 # Convert UUID search value to proper UUID object
1534 if search_attr == "uuid":
1535 try:
1536 search_value = uuid_module.UUID(search_value)
1537 except ValueError:
1538 return None
1540 # Find the referenced entry
1541 ref_entry = self.find_entries(first=True, **{search_attr: search_value})
1542 if ref_entry is None:
1543 return None
1545 # Get the wanted field value
1546 resolved_value = getattr(ref_entry, wanted_attr)
1547 if resolved_value is None:
1548 resolved_value = ""
1550 # UUID needs special handling - convert to string for replacement
1551 if isinstance(resolved_value, uuid_module.UUID):
1552 resolved_value = str(resolved_value)
1554 value = value.replace(ref_str, resolved_value)
1556 # Recursively resolve any nested references
1557 return self.deref(value)
1559 # --- Move operations ---
1561 def move_entry(self, entry: Entry, destination: Group) -> None:
1562 """Move an entry to a different group.
1564 This is a convenience method that calls entry.move_to(). It validates
1565 that both the entry and destination belong to this database.
1567 Args:
1568 entry: Entry to move
1569 destination: Target group to move the entry to
1571 Raises:
1572 ValueError: If entry or destination is not in this database
1573 ValueError: If entry has no parent
1574 ValueError: If destination is the current parent
1575 """
1576 # Validate entry is in this database
1577 if entry.parent is None:
1578 raise ValueError("Entry has no parent group")
1579 found = self._root_group.find_entry_by_uuid(entry.uuid)
1580 if found is None:
1581 raise ValueError("Entry is not in this database")
1583 # Validate destination is in this database
1584 if destination is not self._root_group:
1585 found_group = self._root_group.find_group_by_uuid(destination.uuid)
1586 if found_group is None:
1587 raise ValueError("Destination group is not in this database")
1589 entry.move_to(destination)
1591 def move_group(self, group: Group, destination: Group) -> None:
1592 """Move a group to a different parent group.
1594 This is a convenience method that calls group.move_to(). It validates
1595 that both the group and destination belong to this database.
1597 Args:
1598 group: Group to move
1599 destination: Target parent group to move the group to
1601 Raises:
1602 ValueError: If group or destination is not in this database
1603 ValueError: If group is the root group
1604 ValueError: If group has no parent
1605 ValueError: If destination is the current parent
1606 ValueError: If destination is the group itself or a descendant
1607 """
1608 # Validate group is in this database (and not root)
1609 if group.is_root_group:
1610 raise ValueError("Cannot move the root group")
1611 if group.parent is None:
1612 raise ValueError("Group has no parent")
1613 found = self._root_group.find_group_by_uuid(group.uuid)
1614 if found is None:
1615 raise ValueError("Group is not in this database")
1617 # Validate destination is in this database
1618 if destination is not self._root_group:
1619 found_dest = self._root_group.find_group_by_uuid(destination.uuid)
1620 if found_dest is None:
1621 raise ValueError("Destination group is not in this database")
1623 group.move_to(destination)
1625 # --- Recycle bin operations ---
1627 @property
1628 def recyclebin_group(self) -> Group | None:
1629 """Get the recycle bin group, or None if disabled.
1631 If recycle_bin_enabled is True but no recycle bin exists yet,
1632 this creates one automatically.
1634 Returns:
1635 Recycle bin Group, or None if recycle bin is disabled
1636 """
1637 if not self._settings.recycle_bin_enabled:
1638 return None
1640 # Try to find existing recycle bin
1641 if self._settings.recycle_bin_uuid is not None:
1642 group = self._root_group.find_group_by_uuid(self._settings.recycle_bin_uuid)
1643 if group is not None:
1644 return group
1646 # Create new recycle bin
1647 recycle_bin = Group(name="Recycle Bin", icon_id="43")
1648 self._root_group.add_subgroup(recycle_bin)
1649 self._settings.recycle_bin_uuid = recycle_bin.uuid
1650 return recycle_bin
1652 def trash_entry(self, entry: Entry) -> None:
1653 """Move an entry to the recycle bin.
1655 If the entry is already in the recycle bin, it is permanently deleted.
1657 Args:
1658 entry: Entry to trash
1660 Raises:
1661 ValueError: If entry is not in this database
1662 ValueError: If recycle bin is disabled
1663 """
1664 # Validate entry is in this database
1665 if entry.parent is None:
1666 raise ValueError("Entry has no parent group")
1667 found = self._root_group.find_entry_by_uuid(entry.uuid)
1668 if found is None:
1669 raise ValueError("Entry is not in this database")
1671 recycle_bin = self.recyclebin_group
1672 if recycle_bin is None:
1673 raise ValueError("Recycle bin is disabled")
1675 # If already in recycle bin, delete permanently
1676 if entry.parent is recycle_bin:
1677 recycle_bin.remove_entry(entry)
1678 return
1680 # Move to recycle bin
1681 entry.move_to(recycle_bin)
1683 def trash_group(self, group: Group) -> None:
1684 """Move a group to the recycle bin.
1686 If the group is already in the recycle bin, it is permanently deleted.
1687 Cannot trash the root group or the recycle bin itself.
1689 Args:
1690 group: Group to trash
1692 Raises:
1693 ValueError: If group is not in this database
1694 ValueError: If group is the root group
1695 ValueError: If group is the recycle bin
1696 ValueError: If recycle bin is disabled
1697 """
1698 # Validate group
1699 if group.is_root_group:
1700 raise ValueError("Cannot trash the root group")
1701 if group.parent is None:
1702 raise ValueError("Group has no parent")
1703 found = self._root_group.find_group_by_uuid(group.uuid)
1704 if found is None:
1705 raise ValueError("Group is not in this database")
1707 recycle_bin = self.recyclebin_group
1708 if recycle_bin is None:
1709 raise ValueError("Recycle bin is disabled")
1711 # Cannot trash the recycle bin itself
1712 if group is recycle_bin:
1713 raise ValueError("Cannot trash the recycle bin")
1715 # If already in recycle bin, delete permanently
1716 if group.parent is recycle_bin:
1717 recycle_bin.remove_subgroup(group)
1718 return
1720 # Move to recycle bin
1721 group.move_to(recycle_bin)
1723 def empty_group(self, group: Group) -> None:
1724 """Delete all entries and subgroups from a group.
1726 This permanently deletes all contents (does not use recycle bin).
1727 The group itself is not deleted.
1729 Args:
1730 group: Group to empty
1732 Raises:
1733 ValueError: If group is not in this database
1734 """
1735 # Validate group is in this database
1736 if group is not self._root_group:
1737 found = self._root_group.find_group_by_uuid(group.uuid)
1738 if found is None:
1739 raise ValueError("Group is not in this database")
1741 # Delete all subgroups (iterate over copy since we're modifying)
1742 for subgroup in list(group.subgroups):
1743 group.remove_subgroup(subgroup)
1745 # Delete all entries
1746 for entry in list(group.entries):
1747 group.remove_entry(entry)
1749 # --- Memory protection ---
1751 def apply_protection_policy(self, entry: Entry) -> None:
1752 """Apply the database's memory protection policy to an entry.
1754 Updates the `protected` flag on the entry's string fields
1755 according to the database's memory_protection settings.
1757 This is automatically applied when saving the database, but
1758 can be called manually if you need protection applied immediately
1759 for in-memory operations.
1761 Args:
1762 entry: Entry to apply policy to
1763 """
1764 for key, string_field in entry.strings.items():
1765 if key in self._settings.memory_protection:
1766 string_field.protected = self._settings.memory_protection[key]
1768 def apply_protection_policy_all(self) -> None:
1769 """Apply memory protection policy to all entries in the database.
1771 Updates all entries' string field protection flags according
1772 to the database's memory_protection settings.
1773 """
1774 for entry in self.iter_entries():
1775 self.apply_protection_policy(entry)
1777 # --- Binary attachments ---
1779 def get_binary(self, ref: int) -> bytes | None:
1780 """Get binary attachment data by reference ID.
1782 Args:
1783 ref: Binary reference ID
1785 Returns:
1786 Binary data or None if not found
1787 """
1788 return self._binaries.get(ref)
1790 def add_binary(self, data: bytes, protected: bool = True) -> int:
1791 """Add a new binary attachment to the database.
1793 Args:
1794 data: Binary data
1795 protected: Whether the binary should be memory-protected
1797 Returns:
1798 Reference ID for the new binary
1799 """
1800 # Find next available index
1801 ref = max(self._binaries.keys(), default=-1) + 1
1802 self._binaries[ref] = data
1803 # Update inner header
1804 if self._inner_header is not None:
1805 self._inner_header.binaries[ref] = (protected, data)
1806 return ref
1808 def remove_binary(self, ref: int) -> bool:
1809 """Remove a binary attachment from the database.
1811 Args:
1812 ref: Binary reference ID
1814 Returns:
1815 True if removed, False if not found
1816 """
1817 if ref in self._binaries:
1818 del self._binaries[ref]
1819 if self._inner_header is not None and ref in self._inner_header.binaries:
1820 del self._inner_header.binaries[ref]
1821 return True
1822 return False
1824 def get_attachment(self, entry: Entry, name: str) -> bytes | None:
1825 """Get an attachment from an entry by filename.
1827 Args:
1828 entry: Entry to get attachment from
1829 name: Filename of the attachment
1831 Returns:
1832 Attachment data or None if not found
1833 """
1834 for binary_ref in entry.binaries:
1835 if binary_ref.key == name:
1836 return self._binaries.get(binary_ref.ref)
1837 return None
1839 def add_attachment(self, entry: Entry, name: str, data: bytes, protected: bool = True) -> None:
1840 """Add an attachment to an entry.
1842 Args:
1843 entry: Entry to add attachment to
1844 name: Filename for the attachment
1845 data: Attachment data
1846 protected: Whether the attachment should be memory-protected
1847 """
1848 ref = self.add_binary(data, protected=protected)
1849 entry.binaries.append(BinaryRef(key=name, ref=ref))
1851 def remove_attachment(self, entry: Entry, name: str) -> bool:
1852 """Remove an attachment from an entry by filename.
1854 Args:
1855 entry: Entry to remove attachment from
1856 name: Filename of the attachment
1858 Returns:
1859 True if removed, False if not found
1860 """
1861 for i, binary_ref in enumerate(entry.binaries):
1862 if binary_ref.key == name:
1863 # Remove from entry's list
1864 entry.binaries.pop(i)
1865 # Note: We don't remove from _binaries as other entries may reference it
1866 return True
1867 return False
1869 def list_attachments(self, entry: Entry) -> list[str]:
1870 """List all attachment filenames for an entry.
1872 Args:
1873 entry: Entry to list attachments for
1875 Returns:
1876 List of attachment filenames
1877 """
1878 return [binary_ref.key for binary_ref in entry.binaries]
1880 # --- Custom icons ---
1882 @property
1883 def custom_icons(self) -> dict[uuid_module.UUID, CustomIcon]:
1884 """Get dictionary of custom icons (UUID -> CustomIcon)."""
1885 return self._settings.custom_icons
1887 def get_custom_icon(self, uuid: uuid_module.UUID) -> bytes | None:
1888 """Get custom icon data by UUID.
1890 Args:
1891 uuid: UUID of the custom icon
1893 Returns:
1894 PNG image data, or None if not found
1895 """
1896 icon = self._settings.custom_icons.get(uuid)
1897 return icon.data if icon else None
1899 def add_custom_icon(self, data: bytes, name: str | None = None) -> uuid_module.UUID:
1900 """Add a custom icon to the database.
1902 Args:
1903 data: PNG image data
1904 name: Optional display name for the icon
1906 Returns:
1907 UUID of the new custom icon
1908 """
1909 icon_uuid = uuid_module.uuid4()
1910 icon = CustomIcon(
1911 uuid=icon_uuid,
1912 data=data,
1913 name=name,
1914 last_modification_time=datetime.now(UTC),
1915 )
1916 self._settings.custom_icons[icon_uuid] = icon
1917 return icon_uuid
1919 def remove_custom_icon(self, uuid: uuid_module.UUID) -> bool:
1920 """Remove a custom icon from the database.
1922 Note: This does not update entries/groups that reference this icon.
1923 They will continue to reference the now-missing UUID.
1925 Args:
1926 uuid: UUID of the custom icon to remove
1928 Returns:
1929 True if removed, False if not found
1930 """
1931 if uuid in self._settings.custom_icons:
1932 del self._settings.custom_icons[uuid]
1933 return True
1934 return False
1936 def find_custom_icon_by_name(self, name: str) -> uuid_module.UUID | None:
1937 """Find a custom icon by name.
1939 Args:
1940 name: Name of the icon to find (must match exactly one icon)
1942 Returns:
1943 UUID of the matching icon, or None if not found
1945 Raises:
1946 ValueError: If multiple icons have the same name
1947 """
1948 matches = [icon.uuid for icon in self._settings.custom_icons.values() if icon.name == name]
1949 if len(matches) == 0:
1950 return None
1951 if len(matches) > 1:
1952 raise ValueError(f"Multiple custom icons found with name: {name}")
1953 return matches[0]
1955 # --- XML parsing ---
1957 @classmethod
1958 def _parse_xml(
1959 cls, xml_data: bytes, inner_header: InnerHeader | None = None
1960 ) -> tuple[Group, DatabaseSettings, dict[int, bytes]]:
1961 """Parse KDBX XML into models.
1963 Args:
1964 xml_data: XML payload bytes
1965 inner_header: Inner header with stream cipher info (for decrypting protected values)
1967 Returns:
1968 Tuple of (root_group, settings, binaries)
1969 """
1970 root = DefusedET.fromstring(xml_data)
1972 # Decrypt protected values in-place before parsing
1973 if inner_header is not None:
1974 cls._decrypt_protected_values(root, inner_header)
1976 # Parse Meta section for settings
1977 settings = cls._parse_meta(root.find("Meta"))
1979 # Parse Root/Group for entries
1980 root_elem = root.find("Root")
1981 if root_elem is None:
1982 raise InvalidXmlError("Missing Root element")
1984 group_elem = root_elem.find("Group")
1985 if group_elem is None:
1986 raise InvalidXmlError("Missing root Group element")
1988 root_group = cls._parse_group(group_elem)
1989 root_group._is_root = True
1991 # Extract binaries from inner header (KDBX4 style)
1992 # The protection flag indicates memory protection policy, not encryption
1993 binaries: dict[int, bytes] = {}
1994 if inner_header is not None:
1995 for idx, (_protected, data) in inner_header.binaries.items():
1996 binaries[idx] = data
1998 return root_group, settings, binaries
2000 @classmethod
2001 def _decrypt_protected_values(cls, root: Element, inner_header: InnerHeader) -> None:
2002 """Decrypt all protected values in the XML tree in document order.
2004 Protected values are XOR'd with a stream cipher and base64 encoded.
2005 This method decrypts them in-place.
2006 """
2007 cipher = ProtectedStreamCipher(
2008 inner_header.random_stream_id,
2009 inner_header.random_stream_key,
2010 )
2012 # Find all Value elements with Protected="True" in document order
2013 for elem in root.iter("Value"):
2014 if elem.get("Protected") == "True" and elem.text:
2015 try:
2016 ciphertext = base64.b64decode(elem.text)
2017 plaintext = cipher.decrypt(ciphertext)
2018 elem.text = plaintext.decode("utf-8")
2019 except (binascii.Error, ValueError, UnicodeDecodeError):
2020 # If decryption fails, leave as-is
2021 pass
2023 @classmethod
2024 def _parse_meta(cls, meta_elem: Element | None) -> DatabaseSettings:
2025 """Parse Meta element into DatabaseSettings."""
2026 settings = DatabaseSettings()
2028 if meta_elem is None:
2029 return settings
2031 def get_text(tag: str) -> str | None:
2032 elem = meta_elem.find(tag)
2033 return elem.text if elem is not None else None
2035 if name := get_text("DatabaseName"):
2036 settings.database_name = name
2037 if desc := get_text("DatabaseDescription"):
2038 settings.database_description = desc
2039 if username := get_text("DefaultUserName"):
2040 settings.default_username = username
2041 if gen := get_text("Generator"):
2042 settings.generator = gen
2044 # Parse memory protection
2045 mp_elem = meta_elem.find("MemoryProtection")
2046 if mp_elem is not None:
2047 for field in ["Title", "UserName", "Password", "URL", "Notes"]:
2048 elem = mp_elem.find(f"Protect{field}")
2049 if elem is not None:
2050 settings.memory_protection[field] = elem.text == "True"
2052 # Parse recycle bin
2053 if rb := get_text("RecycleBinEnabled"):
2054 settings.recycle_bin_enabled = rb == "True"
2055 if rb_uuid := get_text("RecycleBinUUID"):
2056 import contextlib
2058 with contextlib.suppress(binascii.Error, ValueError):
2059 settings.recycle_bin_uuid = uuid_module.UUID(bytes=base64.b64decode(rb_uuid))
2061 # Parse custom icons
2062 custom_icons_elem = meta_elem.find("CustomIcons")
2063 if custom_icons_elem is not None:
2064 for icon_elem in custom_icons_elem.findall("Icon"):
2065 icon_uuid_elem = icon_elem.find("UUID")
2066 icon_data_elem = icon_elem.find("Data")
2067 if (
2068 icon_uuid_elem is not None
2069 and icon_uuid_elem.text
2070 and icon_data_elem is not None
2071 and icon_data_elem.text
2072 ):
2073 try:
2074 icon_uuid = uuid_module.UUID(bytes=base64.b64decode(icon_uuid_elem.text))
2075 icon_data = base64.b64decode(icon_data_elem.text)
2076 icon_name = None
2077 name_elem = icon_elem.find("Name")
2078 if name_elem is not None:
2079 icon_name = name_elem.text
2080 icon_mtime = None
2081 mtime_elem = icon_elem.find("LastModificationTime")
2082 if mtime_elem is not None and mtime_elem.text:
2083 icon_mtime = cls._decode_time(mtime_elem.text)
2084 settings.custom_icons[icon_uuid] = CustomIcon(
2085 uuid=icon_uuid,
2086 data=icon_data,
2087 name=icon_name,
2088 last_modification_time=icon_mtime,
2089 )
2090 except (binascii.Error, ValueError):
2091 pass # Skip invalid icon
2093 return settings
2095 @classmethod
2096 def _parse_group(cls, elem: Element) -> Group:
2097 """Parse a Group element into a Group model."""
2098 group = Group()
2100 # UUID
2101 uuid_elem = elem.find("UUID")
2102 if uuid_elem is not None and uuid_elem.text:
2103 group.uuid = uuid_module.UUID(bytes=base64.b64decode(uuid_elem.text))
2105 # Name
2106 name_elem = elem.find("Name")
2107 if name_elem is not None:
2108 group.name = name_elem.text
2110 # Notes
2111 notes_elem = elem.find("Notes")
2112 if notes_elem is not None:
2113 group.notes = notes_elem.text
2115 # Icon
2116 icon_elem = elem.find("IconID")
2117 if icon_elem is not None and icon_elem.text:
2118 group.icon_id = icon_elem.text
2120 # Custom icon UUID
2121 custom_icon_elem = elem.find("CustomIconUUID")
2122 if custom_icon_elem is not None and custom_icon_elem.text:
2123 with contextlib.suppress(binascii.Error, ValueError):
2124 group.custom_icon_uuid = uuid_module.UUID(
2125 bytes=base64.b64decode(custom_icon_elem.text)
2126 )
2128 # Times
2129 group.times = cls._parse_times(elem.find("Times"))
2131 # Entries
2132 for entry_elem in elem.findall("Entry"):
2133 entry = cls._parse_entry(entry_elem)
2134 group.add_entry(entry)
2136 # Subgroups (recursive)
2137 for subgroup_elem in elem.findall("Group"):
2138 subgroup = cls._parse_group(subgroup_elem)
2139 group.add_subgroup(subgroup)
2141 return group
2143 @classmethod
2144 def _parse_entry(cls, elem: Element) -> Entry:
2145 """Parse an Entry element into an Entry model."""
2146 entry = Entry()
2148 # UUID
2149 uuid_elem = elem.find("UUID")
2150 if uuid_elem is not None and uuid_elem.text:
2151 entry.uuid = uuid_module.UUID(bytes=base64.b64decode(uuid_elem.text))
2153 # Icon
2154 icon_elem = elem.find("IconID")
2155 if icon_elem is not None and icon_elem.text:
2156 entry.icon_id = icon_elem.text
2158 # Custom icon UUID
2159 custom_icon_elem = elem.find("CustomIconUUID")
2160 if custom_icon_elem is not None and custom_icon_elem.text:
2161 with contextlib.suppress(binascii.Error, ValueError):
2162 entry.custom_icon_uuid = uuid_module.UUID(
2163 bytes=base64.b64decode(custom_icon_elem.text)
2164 )
2166 # Tags
2167 tags_elem = elem.find("Tags")
2168 if tags_elem is not None and tags_elem.text:
2169 tag_text = tags_elem.text.replace(",", ";")
2170 entry.tags = [t.strip() for t in tag_text.split(";") if t.strip()]
2172 # Times
2173 entry.times = cls._parse_times(elem.find("Times"))
2175 # String fields
2176 for string_elem in elem.findall("String"):
2177 key_elem = string_elem.find("Key")
2178 value_elem = string_elem.find("Value")
2179 if key_elem is not None and key_elem.text:
2180 key = key_elem.text
2181 value = value_elem.text if value_elem is not None else None
2182 protected = value_elem is not None and value_elem.get("Protected") == "True"
2183 entry.strings[key] = StringField(key=key, value=value, protected=protected)
2185 # Binary references
2186 for binary_elem in elem.findall("Binary"):
2187 key_elem = binary_elem.find("Key")
2188 value_elem = binary_elem.find("Value")
2189 if key_elem is not None and key_elem.text and value_elem is not None:
2190 ref = value_elem.get("Ref")
2191 if ref is not None:
2192 entry.binaries.append(BinaryRef(key=key_elem.text, ref=int(ref)))
2194 # AutoType
2195 at_elem = elem.find("AutoType")
2196 if at_elem is not None:
2197 enabled_elem = at_elem.find("Enabled")
2198 seq_elem = at_elem.find("DefaultSequence")
2199 obf_elem = at_elem.find("DataTransferObfuscation")
2201 entry.autotype = AutoType(
2202 enabled=enabled_elem is not None and enabled_elem.text == "True",
2203 sequence=seq_elem.text if seq_elem is not None else None,
2204 obfuscation=int(obf_elem.text) if obf_elem is not None and obf_elem.text else 0,
2205 )
2207 # Window from Association
2208 assoc_elem = at_elem.find("Association")
2209 if assoc_elem is not None:
2210 window_elem = assoc_elem.find("Window")
2211 if window_elem is not None:
2212 entry.autotype.window = window_elem.text
2214 # History
2215 history_elem = elem.find("History")
2216 if history_elem is not None:
2217 for hist_entry_elem in history_elem.findall("Entry"):
2218 hist_entry = cls._parse_entry(hist_entry_elem)
2219 history_entry = HistoryEntry.from_entry(hist_entry)
2220 entry.history.append(history_entry)
2222 return entry
2224 @classmethod
2225 def _parse_times(cls, times_elem: Element | None) -> Times:
2226 """Parse Times element into Times model."""
2227 times = Times.create_new()
2229 if times_elem is None:
2230 return times
2232 def parse_time(tag: str) -> datetime | None:
2233 elem = times_elem.find(tag)
2234 if elem is not None and elem.text:
2235 return cls._decode_time(elem.text)
2236 return None
2238 if ct := parse_time("CreationTime"):
2239 times.creation_time = ct
2240 if mt := parse_time("LastModificationTime"):
2241 times.last_modification_time = mt
2242 if at := parse_time("LastAccessTime"):
2243 times.last_access_time = at
2244 if et := parse_time("ExpiryTime"):
2245 times.expiry_time = et
2246 if lc := parse_time("LocationChanged"):
2247 times.location_changed = lc
2249 expires_elem = times_elem.find("Expires")
2250 if expires_elem is not None:
2251 times.expires = expires_elem.text == "True"
2253 usage_elem = times_elem.find("UsageCount")
2254 if usage_elem is not None and usage_elem.text:
2255 times.usage_count = int(usage_elem.text)
2257 return times
2259 @classmethod
2260 def _decode_time(cls, time_str: str) -> datetime:
2261 """Decode KDBX time string to datetime.
2263 KDBX4 uses base64-encoded binary timestamps or ISO format.
2264 """
2265 # Try base64 binary format first (KDBX4)
2266 # Base64 strings don't contain - or : which are present in ISO dates
2267 if "-" not in time_str and ":" not in time_str:
2268 try:
2269 binary = base64.b64decode(time_str)
2270 if len(binary) == 8: # int64 = 8 bytes
2271 # KDBX4 stores seconds since 0001-01-01 as int64
2272 import struct
2274 seconds = struct.unpack("<q", binary)[0]
2275 # Convert to datetime (epoch is 0001-01-01)
2276 base = datetime(1, 1, 1, tzinfo=UTC)
2277 return base + timedelta(seconds=seconds)
2278 except (ValueError, struct.error):
2279 pass # Not valid base64 or wrong size
2281 # Try ISO format
2282 try:
2283 return datetime.strptime(time_str, KDBX4_TIME_FORMAT).replace(tzinfo=UTC)
2284 except ValueError:
2285 pass
2287 # Fallback: try without timezone
2288 try:
2289 return datetime.fromisoformat(time_str.replace("Z", "+00:00"))
2290 except ValueError:
2291 return datetime.now(UTC)
2293 @classmethod
2294 def _encode_time(cls, dt: datetime) -> str:
2295 """Encode datetime to ISO 8601 format for KDBX4.
2297 Uses ISO 8601 format (e.g., 2025-01-15T10:30:45Z) which is
2298 human-readable and compatible with KeePassXC.
2299 """
2300 # Ensure UTC timezone
2301 if dt.tzinfo is None:
2302 dt = dt.replace(tzinfo=UTC)
2303 return dt.strftime(KDBX4_TIME_FORMAT)
2305 # --- XML building ---
2307 def _build_xml(self) -> bytes:
2308 """Build KDBX XML from models."""
2309 root = Element("KeePassFile")
2311 # Meta section
2312 meta = SubElement(root, "Meta")
2313 self._build_meta(meta)
2315 # Root section
2316 root_elem = SubElement(root, "Root")
2317 self._build_group(root_elem, self._root_group)
2319 # Encrypt protected values before serializing
2320 if self._inner_header is not None:
2321 self._encrypt_protected_values(root, self._inner_header)
2323 # Serialize to bytes (tostring returns bytes when encoding is specified)
2324 return cast(bytes, tostring(root, encoding="utf-8", xml_declaration=True))
2326 def _encrypt_protected_values(self, root: Element, inner_header: InnerHeader) -> None:
2327 """Encrypt all protected values in the XML tree in document order.
2329 Protected values are XOR'd with a stream cipher and base64 encoded.
2330 This method encrypts them in-place.
2331 """
2332 cipher = ProtectedStreamCipher(
2333 inner_header.random_stream_id,
2334 inner_header.random_stream_key,
2335 )
2337 # Find all Value elements with Protected="True" in document order
2338 for elem in root.iter("Value"):
2339 if elem.get("Protected") == "True":
2340 plaintext = (elem.text or "").encode("utf-8")
2341 ciphertext = cipher.encrypt(plaintext)
2342 elem.text = base64.b64encode(ciphertext).decode("ascii")
2344 def _build_meta(self, meta: Element) -> None:
2345 """Build Meta element from settings."""
2346 s = self._settings
2348 SubElement(meta, "Generator").text = s.generator
2349 SubElement(meta, "DatabaseName").text = s.database_name
2350 if s.database_description:
2351 SubElement(meta, "DatabaseDescription").text = s.database_description
2352 if s.default_username:
2353 SubElement(meta, "DefaultUserName").text = s.default_username
2355 SubElement(meta, "MaintenanceHistoryDays").text = str(s.maintenance_history_days)
2356 SubElement(meta, "MasterKeyChangeRec").text = str(s.master_key_change_rec)
2357 SubElement(meta, "MasterKeyChangeForce").text = str(s.master_key_change_force)
2359 # Memory protection
2360 mp = SubElement(meta, "MemoryProtection")
2361 for field_name, is_protected in s.memory_protection.items():
2362 SubElement(mp, f"Protect{field_name}").text = str(is_protected)
2364 SubElement(meta, "RecycleBinEnabled").text = str(s.recycle_bin_enabled)
2365 if s.recycle_bin_uuid:
2366 SubElement(meta, "RecycleBinUUID").text = base64.b64encode(
2367 s.recycle_bin_uuid.bytes
2368 ).decode("ascii")
2369 else:
2370 # Empty UUID
2371 SubElement(meta, "RecycleBinUUID").text = base64.b64encode(b"\x00" * 16).decode("ascii")
2373 SubElement(meta, "HistoryMaxItems").text = str(s.history_max_items)
2374 SubElement(meta, "HistoryMaxSize").text = str(s.history_max_size)
2376 # Custom icons
2377 if s.custom_icons:
2378 custom_icons_elem = SubElement(meta, "CustomIcons")
2379 for icon in s.custom_icons.values():
2380 icon_elem = SubElement(custom_icons_elem, "Icon")
2381 SubElement(icon_elem, "UUID").text = base64.b64encode(icon.uuid.bytes).decode(
2382 "ascii"
2383 )
2384 SubElement(icon_elem, "Data").text = base64.b64encode(icon.data).decode("ascii")
2385 if icon.name:
2386 SubElement(icon_elem, "Name").text = icon.name
2387 if icon.last_modification_time:
2388 SubElement(icon_elem, "LastModificationTime").text = self._encode_time(
2389 icon.last_modification_time
2390 )
2392 def _build_group(self, parent: Element, group: Group) -> None:
2393 """Build Group element from Group model."""
2394 elem = SubElement(parent, "Group")
2396 SubElement(elem, "UUID").text = base64.b64encode(group.uuid.bytes).decode("ascii")
2397 SubElement(elem, "Name").text = group.name or ""
2398 if group.notes:
2399 SubElement(elem, "Notes").text = group.notes
2400 SubElement(elem, "IconID").text = group.icon_id
2401 if group.custom_icon_uuid:
2402 SubElement(elem, "CustomIconUUID").text = base64.b64encode(
2403 group.custom_icon_uuid.bytes
2404 ).decode("ascii")
2406 self._build_times(elem, group.times)
2408 SubElement(elem, "IsExpanded").text = str(group.is_expanded)
2410 if group.default_autotype_sequence:
2411 SubElement(elem, "DefaultAutoTypeSequence").text = group.default_autotype_sequence
2412 if group.enable_autotype is not None:
2413 SubElement(elem, "EnableAutoType").text = str(group.enable_autotype)
2414 if group.enable_searching is not None:
2415 SubElement(elem, "EnableSearching").text = str(group.enable_searching)
2417 SubElement(elem, "LastTopVisibleEntry").text = base64.b64encode(
2418 (group.last_top_visible_entry or uuid_module.UUID(int=0)).bytes
2419 ).decode("ascii")
2421 # Entries
2422 for entry in group.entries:
2423 self._build_entry(elem, entry)
2425 # Subgroups (recursive)
2426 for subgroup in group.subgroups:
2427 self._build_group(elem, subgroup)
2429 def _build_entry(self, parent: Element, entry: Entry) -> None:
2430 """Build Entry element from Entry model."""
2431 elem = SubElement(parent, "Entry")
2433 SubElement(elem, "UUID").text = base64.b64encode(entry.uuid.bytes).decode("ascii")
2434 SubElement(elem, "IconID").text = entry.icon_id
2435 if entry.custom_icon_uuid:
2436 SubElement(elem, "CustomIconUUID").text = base64.b64encode(
2437 entry.custom_icon_uuid.bytes
2438 ).decode("ascii")
2440 if entry.foreground_color:
2441 SubElement(elem, "ForegroundColor").text = entry.foreground_color
2442 if entry.background_color:
2443 SubElement(elem, "BackgroundColor").text = entry.background_color
2444 if entry.override_url:
2445 SubElement(elem, "OverrideURL").text = entry.override_url
2447 if entry.tags:
2448 SubElement(elem, "Tags").text = ";".join(entry.tags)
2450 self._build_times(elem, entry.times)
2452 # String fields - apply memory protection policy from database settings
2453 for key, string_field in entry.strings.items():
2454 string_elem = SubElement(elem, "String")
2455 SubElement(string_elem, "Key").text = key
2456 value_elem = SubElement(string_elem, "Value")
2457 value_elem.text = string_field.value or ""
2458 # Use database memory_protection policy for standard fields,
2459 # fall back to string_field.protected for custom fields
2460 if key in self._settings.memory_protection:
2461 should_protect = self._settings.memory_protection[key]
2462 else:
2463 should_protect = string_field.protected
2464 if should_protect:
2465 value_elem.set("Protected", "True")
2467 # Binary references
2468 for binary_ref in entry.binaries:
2469 binary_elem = SubElement(elem, "Binary")
2470 SubElement(binary_elem, "Key").text = binary_ref.key
2471 value_elem = SubElement(binary_elem, "Value")
2472 value_elem.set("Ref", str(binary_ref.ref))
2474 # AutoType
2475 at = entry.autotype
2476 at_elem = SubElement(elem, "AutoType")
2477 SubElement(at_elem, "Enabled").text = str(at.enabled)
2478 SubElement(at_elem, "DataTransferObfuscation").text = str(at.obfuscation)
2479 SubElement(at_elem, "DefaultSequence").text = at.sequence or ""
2481 if at.window:
2482 assoc = SubElement(at_elem, "Association")
2483 SubElement(assoc, "Window").text = at.window
2484 SubElement(assoc, "KeystrokeSequence").text = ""
2486 # History
2487 if entry.history:
2488 history_elem = SubElement(elem, "History")
2489 for hist_entry in entry.history:
2490 self._build_entry(history_elem, hist_entry)
2492 def _build_times(self, parent: Element, times: Times) -> None:
2493 """Build Times element from Times model."""
2494 elem = SubElement(parent, "Times")
2496 SubElement(elem, "CreationTime").text = self._encode_time(times.creation_time)
2497 SubElement(elem, "LastModificationTime").text = self._encode_time(
2498 times.last_modification_time
2499 )
2500 SubElement(elem, "LastAccessTime").text = self._encode_time(times.last_access_time)
2501 if times.expiry_time:
2502 SubElement(elem, "ExpiryTime").text = self._encode_time(times.expiry_time)
2503 else:
2504 SubElement(elem, "ExpiryTime").text = self._encode_time(times.creation_time)
2505 SubElement(elem, "Expires").text = str(times.expires)
2506 SubElement(elem, "UsageCount").text = str(times.usage_count)
2507 if times.location_changed:
2508 SubElement(elem, "LocationChanged").text = self._encode_time(times.location_changed)
2510 def __str__(self) -> str:
2511 entry_count = sum(1 for _ in self.iter_entries())
2512 group_count = sum(1 for _ in self.iter_groups())
2513 name = self._settings.database_name
2514 return f'Database: "{name}" ({entry_count} entries, {group_count} groups)'