Coverage for src / kdbxtool / database.py: 93%
956 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-19 21:22 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-19 21:22 +0000
1"""High-level Database API for KDBX files.
3This module provides the main interface for working with KeePass databases:
4- Opening and decrypting KDBX files
5- Creating new databases
6- Searching for entries and groups
7- Saving databases
8"""
10from __future__ import annotations
12import base64
13import binascii
14import contextlib
15import getpass
16import hashlib
17import os
18import uuid as uuid_module
19from collections.abc import Iterator
20from dataclasses import dataclass, field
21from datetime import UTC, datetime, timedelta
22from pathlib import Path
23from types import TracebackType
24from typing import TYPE_CHECKING, Protocol, cast
25from xml.etree.ElementTree import Element, SubElement, tostring
27if TYPE_CHECKING:
28 from .merge import DeletedObject, MergeMode, MergeResult
30from Cryptodome.Cipher import ChaCha20, Salsa20
31from defusedxml import ElementTree as DefusedET
33from .exceptions import (
34 AuthenticationError,
35 DatabaseError,
36 InvalidXmlError,
37 Kdbx3UpgradeRequired,
38 MissingCredentialsError,
39 UnknownCipherError,
40)
41from .models import Attachment, Entry, Group, HistoryEntry, Times
42from .models.entry import AutoType, BinaryRef, StringField
43from .parsing import CompressionType, KdbxHeader, KdbxVersion
44from .parsing.kdbx3 import read_kdbx3
45from .parsing.kdbx4 import InnerHeader, read_kdbx4, write_kdbx4
46from .security import AesKdfConfig, Argon2Config, Cipher, KdfType
47from .security import yubikey as yubikey_module
48from .security.yubikey import YubiKeyConfig, compute_challenge_response
50# Union type for KDF configurations
51KdfConfig = Argon2Config | AesKdfConfig
54class _StreamCipher(Protocol):
55 """Protocol for stream ciphers used for protected value encryption."""
57 def encrypt(self, plaintext: bytes) -> bytes: ...
58 def decrypt(self, ciphertext: bytes) -> bytes: ...
61# KDBX4 time format (ISO 8601, compatible with KeePassXC)
62KDBX4_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
64# Protected stream cipher IDs
65PROTECTED_STREAM_SALSA20 = 2
66PROTECTED_STREAM_CHACHA20 = 3
69class ProtectedStreamCipher:
70 """Stream cipher for encrypting/decrypting protected values in XML.
72 KDBX uses a stream cipher (ChaCha20 or Salsa20) to protect sensitive
73 values like passwords in the XML payload. Each protected value is
74 XOR'd with the cipher output in document order.
75 """
77 def __init__(self, stream_id: int, stream_key: bytes) -> None:
78 """Initialize the stream cipher.
80 Args:
81 stream_id: Cipher type (2=Salsa20, 3=ChaCha20)
82 stream_key: Key material from inner header (typically 64 bytes)
83 """
84 self._stream_id = stream_id
85 self._stream_key = stream_key
86 self._cipher = self._create_cipher()
88 def _create_cipher(self) -> _StreamCipher:
89 """Create the appropriate cipher based on stream_id."""
90 if self._stream_id == PROTECTED_STREAM_CHACHA20:
91 # ChaCha20: SHA-512 of key, first 32 bytes = key, bytes 32-44 = nonce
92 key_hash = hashlib.sha512(self._stream_key).digest()
93 key = key_hash[:32]
94 nonce = key_hash[32:44]
95 return ChaCha20.new(key=key, nonce=nonce)
96 elif self._stream_id == PROTECTED_STREAM_SALSA20:
97 # Salsa20: SHA-256 of key, fixed nonce
98 key = hashlib.sha256(self._stream_key).digest()
99 nonce = b"\xe8\x30\x09\x4b\x97\x20\x5d\x2a"
100 return Salsa20.new(key=key, nonce=nonce)
101 else:
102 raise UnknownCipherError(self._stream_id.to_bytes(4, "little"))
104 def decrypt(self, ciphertext: bytes) -> bytes:
105 """Decrypt protected value (XOR with stream)."""
106 return self._cipher.decrypt(ciphertext)
108 def encrypt(self, plaintext: bytes) -> bytes:
109 """Encrypt protected value (XOR with stream)."""
110 return self._cipher.encrypt(plaintext)
113@dataclass
114class CustomIcon:
115 """A custom icon in a KDBX database.
117 Custom icons are PNG images that can be assigned to entries and groups
118 for visual customization beyond the standard icon set.
120 Attributes:
121 uuid: Unique identifier for the icon
122 data: PNG image data
123 name: Optional display name for the icon
124 last_modification_time: When the icon was last modified
125 """
127 uuid: uuid_module.UUID
128 data: bytes
129 name: str | None = None
130 last_modification_time: datetime | None = None
133@dataclass
134class DatabaseSettings:
135 """Settings for a KDBX database.
137 Attributes:
138 generator: Generator application name
139 database_name: Name of the database
140 database_description: Description of the database
141 default_username: Default username for new entries
142 maintenance_history_days: Days to keep deleted items
143 color: Database color (hex)
144 master_key_change_rec: Days until master key change recommended
145 master_key_change_force: Days until master key change forced
146 memory_protection: Which fields to protect in memory
147 recycle_bin_enabled: Whether recycle bin is enabled
148 recycle_bin_uuid: UUID of recycle bin group
149 history_max_items: Max history entries per entry
150 history_max_size: Max history size in bytes
151 custom_icons: Dictionary of custom icons (UUID -> CustomIcon)
152 """
154 generator: str = "kdbxtool"
155 database_name: str = "Database"
156 database_description: str = ""
157 default_username: str = ""
158 maintenance_history_days: int = 365
159 color: str | None = None
160 master_key_change_rec: int = -1
161 master_key_change_force: int = -1
162 memory_protection: dict[str, bool] = field(
163 default_factory=lambda: {
164 "Title": False,
165 "UserName": False,
166 "Password": True,
167 "URL": False,
168 "Notes": False,
169 }
170 )
171 recycle_bin_enabled: bool = True
172 recycle_bin_uuid: uuid_module.UUID | None = None
173 history_max_items: int = 10
174 history_max_size: int = 6 * 1024 * 1024 # 6 MiB
175 custom_icons: dict[uuid_module.UUID, CustomIcon] = field(default_factory=dict)
176 deleted_objects: list[DeletedObject] = field(default_factory=list)
179class Database:
180 """High-level interface for KDBX databases.
182 This class provides the main API for working with KeePass databases.
183 It handles encryption/decryption, XML parsing, and model management.
185 Example usage:
186 # Open existing database
187 db = Database.open("passwords.kdbx", password="secret")
189 # Find entries
190 entries = db.find_entries(title="GitHub")
192 # Create entry
193 entry = db.root_group.create_entry(
194 title="New Site",
195 username="user",
196 password="pass123",
197 )
199 # Save changes
200 db.save()
201 """
203 def __init__(
204 self,
205 root_group: Group,
206 settings: DatabaseSettings | None = None,
207 header: KdbxHeader | None = None,
208 inner_header: InnerHeader | None = None,
209 binaries: dict[int, bytes] | None = None,
210 ) -> None:
211 """Initialize database.
213 Usually you should use Database.open() or Database.create() instead.
215 Args:
216 root_group: Root group containing all entries/groups
217 settings: Database settings
218 header: KDBX header (for existing databases)
219 inner_header: Inner header (for existing databases)
220 binaries: Binary attachments
221 """
222 self._root_group = root_group
223 self._settings = settings or DatabaseSettings()
224 self._header = header
225 self._inner_header = inner_header
226 self._binaries = binaries or {}
227 self._password: str | None = None
228 self._keyfile_data: bytes | None = None
229 self._transformed_key: bytes | None = None
230 self._filepath: Path | None = None
231 self._yubikey_slot: int | None = None
232 self._yubikey_serial: int | None = None
233 # Set database reference on all entries and groups
234 self._set_database_references(root_group)
236 def _set_database_references(self, group: Group) -> None:
237 """Recursively set _database reference on a group and all its contents."""
238 group._database = self
239 for entry in group.entries:
240 entry._database = self
241 for subgroup in group.subgroups:
242 self._set_database_references(subgroup)
244 def __enter__(self) -> Database:
245 """Enter context manager."""
246 return self
248 def __exit__(
249 self,
250 exc_type: type[BaseException] | None,
251 exc_val: BaseException | None,
252 exc_tb: TracebackType | None,
253 ) -> None:
254 """Exit context manager, zeroizing credentials."""
255 self.zeroize_credentials()
257 def zeroize_credentials(self) -> None:
258 """Explicitly zeroize stored credentials from memory.
260 Call this when done with the database to minimize the time
261 credentials remain in memory. Note that Python's string
262 interning may retain copies; for maximum security, use
263 SecureBytes for credential input.
264 """
265 # Clear password (Python GC will eventually free memory)
266 self._password = None
267 # Clear keyfile data (convert to bytearray and zeroize if possible)
268 if self._keyfile_data is not None:
269 try:
270 # Attempt to overwrite the memory
271 data = bytearray(self._keyfile_data)
272 for i in range(len(data)):
273 data[i] = 0
274 except TypeError:
275 pass # bytes is immutable, just dereference
276 self._keyfile_data = None
277 # Clear transformed key
278 if self._transformed_key is not None:
279 try:
280 data = bytearray(self._transformed_key)
281 for i in range(len(data)):
282 data[i] = 0
283 except TypeError:
284 pass
285 self._transformed_key = None
287 def dump(self) -> str:
288 """Return a human-readable summary of the database for debugging.
290 Returns:
291 Multi-line string with database metadata and statistics.
292 """
293 lines = [f'Database: "{self._settings.database_name or "(unnamed)"}"']
294 if self._header is not None:
295 lines.append(f" Format: KDBX{self._header.version.value}")
296 lines.append(f" Cipher: {self._header.cipher.name}")
297 lines.append(f" KDF: {self._header.kdf_type.name}")
299 # Count entries and groups
300 entry_count = sum(1 for _ in self._root_group.iter_entries(recursive=True))
301 group_count = sum(1 for _ in self._root_group.iter_groups(recursive=True))
302 lines.append(f" Total entries: {entry_count}")
303 lines.append(f" Total groups: {group_count}")
305 # Custom icons
306 if self.custom_icons:
307 lines.append(f" Custom icons: {len(self.custom_icons)}")
309 # Recycle bin
310 if self._settings.recycle_bin_enabled:
311 lines.append(" Recycle bin: enabled")
313 return "\n".join(lines)
315 def merge(
316 self,
317 source: Database,
318 *,
319 mode: MergeMode | None = None,
320 ) -> MergeResult:
321 """Merge another database into this one.
323 Combines entries, groups, history, attachments, and custom icons
324 from the source database into this database using UUID-based
325 matching and timestamp-based conflict resolution.
327 Args:
328 source: Database to merge from (read-only)
329 mode: Merge mode (STANDARD or SYNCHRONIZE). Defaults to STANDARD.
330 - STANDARD: Add and update only, never deletes
331 - SYNCHRONIZE: Full sync including deletions
333 Returns:
334 MergeResult with counts and statistics about the merge
336 Raises:
337 MergeError: If merge cannot be completed
339 Example:
340 >>> target_db = Database.open("main.kdbx", password="secret")
341 >>> source_db = Database.open("branch.kdbx", password="secret")
342 >>> result = target_db.merge(source_db)
343 >>> print(f"Added {result.entries_added} entries")
344 >>> target_db.save()
345 """
346 from .merge import MergeMode, Merger
348 if mode is None:
349 mode = MergeMode.STANDARD
351 merger = Merger(self, source, mode=mode)
352 return merger.merge()
354 @property
355 def transformed_key(self) -> bytes | None:
356 """Get the transformed key for caching.
358 The transformed key is the result of applying the KDF (Argon2) to
359 the credentials. Caching this allows fast repeated database opens
360 without re-running the expensive KDF.
362 Security note: The transformed key is as sensitive as the password.
363 Anyone with this key can decrypt the database. Store securely and
364 zeroize when done.
366 Returns:
367 The transformed key, or None if not available (e.g., newly created DB)
368 """
369 return self._transformed_key
371 @property
372 def kdf_salt(self) -> bytes | None:
373 """Get the KDF salt used for key derivation.
375 The salt is used together with credentials to derive the transformed key.
376 If the salt changes (e.g., after save with regenerate_seeds=True),
377 any cached transformed_key becomes invalid.
379 Returns:
380 The KDF salt, or None if no header is set
381 """
382 if self._header is None:
383 return None
384 return self._header.kdf_salt
386 @property
387 def root_group(self) -> Group:
388 """Get the root group of the database."""
389 return self._root_group
391 @property
392 def settings(self) -> DatabaseSettings:
393 """Get database settings."""
394 return self._settings
396 @property
397 def filepath(self) -> Path | None:
398 """Get the file path (if opened from file)."""
399 return self._filepath
401 # --- Opening databases ---
403 @classmethod
404 def open(
405 cls,
406 filepath: str | Path,
407 password: str | None = None,
408 keyfile: str | Path | None = None,
409 yubikey_slot: int | None = None,
410 yubikey_serial: int | None = None,
411 ) -> Database:
412 """Open an existing KDBX database.
414 Args:
415 filepath: Path to the .kdbx file
416 password: Database password
417 keyfile: Path to keyfile (optional)
418 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional).
419 If provided, the database's KDF salt is used as challenge and
420 the 20-byte HMAC-SHA1 response is incorporated into key derivation.
421 Requires yubikey-manager package: pip install kdbxtool[yubikey]
422 yubikey_serial: Serial number of specific YubiKey to use when multiple
423 devices are connected. Use list_yubikeys() to discover serials.
425 Returns:
426 Database instance
428 Raises:
429 FileNotFoundError: If file doesn't exist
430 ValueError: If credentials are wrong or file is corrupted
431 YubiKeyError: If YubiKey operation fails
432 """
433 filepath = Path(filepath)
434 if not filepath.exists():
435 raise FileNotFoundError(f"Database file not found: {filepath}")
437 data = filepath.read_bytes()
439 keyfile_data = None
440 if keyfile:
441 keyfile_path = Path(keyfile)
442 if not keyfile_path.exists():
443 raise FileNotFoundError(f"Keyfile not found: {keyfile}")
444 keyfile_data = keyfile_path.read_bytes()
446 return cls.open_bytes(
447 data,
448 password=password,
449 keyfile_data=keyfile_data,
450 filepath=filepath,
451 yubikey_slot=yubikey_slot,
452 yubikey_serial=yubikey_serial,
453 )
455 @classmethod
456 def open_interactive(
457 cls,
458 filepath: str | Path,
459 keyfile: str | Path | None = None,
460 yubikey_slot: int | None = None,
461 yubikey_serial: int | None = None,
462 prompt: str = "Password: ",
463 max_attempts: int = 3,
464 ) -> Database:
465 """Open a KDBX database with interactive password prompt.
467 Prompts the user for a password using secure input (no echo). If the
468 password is incorrect, allows retrying up to max_attempts times.
470 This is a convenience method for CLI applications that need to securely
471 prompt for database credentials.
473 Args:
474 filepath: Path to the .kdbx file
475 keyfile: Path to keyfile (optional)
476 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional)
477 yubikey_serial: Serial number of specific YubiKey to use
478 prompt: Custom prompt string (default: "Password: ")
479 max_attempts: Maximum password attempts before raising (default: 3)
481 Returns:
482 Database instance
484 Raises:
485 FileNotFoundError: If file or keyfile doesn't exist
486 AuthenticationError: If max_attempts exceeded with wrong password
487 YubiKeyError: If YubiKey operation fails
489 Example:
490 >>> db = Database.open_interactive("vault.kdbx")
491 Password:
492 >>> db = Database.open_interactive("vault.kdbx", keyfile="vault.key")
493 Password:
494 """
495 import sys
497 for attempt in range(max_attempts):
498 password = getpass.getpass(prompt)
499 try:
500 return cls.open(
501 filepath,
502 password=password,
503 keyfile=keyfile,
504 yubikey_slot=yubikey_slot,
505 yubikey_serial=yubikey_serial,
506 )
507 except AuthenticationError:
508 if attempt < max_attempts - 1:
509 print("Invalid password, try again.", file=sys.stderr)
510 else:
511 raise AuthenticationError(
512 f"Authentication failed after {max_attempts} attempts"
513 ) from None
514 # This should never be reached due to the raise in the loop
515 raise AuthenticationError(f"Authentication failed after {max_attempts} attempts")
517 @classmethod
518 def open_bytes(
519 cls,
520 data: bytes,
521 password: str | None = None,
522 keyfile_data: bytes | None = None,
523 filepath: Path | None = None,
524 transformed_key: bytes | None = None,
525 yubikey_slot: int | None = None,
526 yubikey_serial: int | None = None,
527 ) -> Database:
528 """Open a KDBX database from bytes.
530 Supports both KDBX3 and KDBX4 formats. KDBX3 databases are opened
531 read-only and will be automatically upgraded to KDBX4 on save.
533 Args:
534 data: KDBX file contents
535 password: Database password
536 keyfile_data: Keyfile contents (optional)
537 filepath: Original file path (for save)
538 transformed_key: Precomputed transformed key (skips KDF, faster opens)
539 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional).
540 If provided, the database's KDF salt is used as challenge and
541 the 20-byte HMAC-SHA1 response is incorporated into key derivation.
542 Requires yubikey-manager package: pip install kdbxtool[yubikey]
543 yubikey_serial: Serial number of specific YubiKey to use when multiple
544 devices are connected. Use list_yubikeys() to discover serials.
546 Returns:
547 Database instance
549 Raises:
550 YubiKeyError: If YubiKey operation fails
551 """
552 # Detect version from header (parse just enough to get version)
553 header, _ = KdbxHeader.parse(data)
555 # Get YubiKey response if slot specified
556 # KeePassXC uses the KDF salt as the challenge, not master_seed
557 yubikey_response: bytes | None = None
558 if yubikey_slot is not None:
559 if not yubikey_module.YUBIKEY_AVAILABLE:
560 from .exceptions import YubiKeyNotAvailableError
562 raise YubiKeyNotAvailableError()
563 config = YubiKeyConfig(slot=yubikey_slot, serial=yubikey_serial)
564 response = compute_challenge_response(header.kdf_salt, config)
565 yubikey_response = response.data
567 # Decrypt the file using appropriate reader
568 is_kdbx3 = header.version == KdbxVersion.KDBX3
569 if is_kdbx3:
570 import warnings
572 warnings.warn(
573 "Opening KDBX3 database. Saving will automatically upgrade to KDBX4 "
574 "with modern security settings (Argon2d, ChaCha20). "
575 "Use save(allow_upgrade=True) to confirm.",
576 UserWarning,
577 stacklevel=3,
578 )
579 # KDBX3 doesn't support YubiKey CR in the same way
580 # (KeeChallenge used a sidecar XML file, not integrated)
581 if yubikey_slot is not None:
582 raise DatabaseError(
583 "YubiKey challenge-response is not supported for KDBX3 databases. "
584 "Upgrade to KDBX4 first."
585 )
586 payload = read_kdbx3(
587 data,
588 password=password,
589 keyfile_data=keyfile_data,
590 transformed_key=transformed_key,
591 )
592 else:
593 payload = read_kdbx4(
594 data,
595 password=password,
596 keyfile_data=keyfile_data,
597 transformed_key=transformed_key,
598 yubikey_response=yubikey_response,
599 )
601 # Parse XML into models (with protected value decryption)
602 root_group, settings, binaries = cls._parse_xml(payload.xml_data, payload.inner_header)
604 db = cls(
605 root_group=root_group,
606 settings=settings,
607 header=payload.header,
608 inner_header=payload.inner_header,
609 binaries=binaries,
610 )
611 db._password = password
612 db._keyfile_data = keyfile_data
613 db._transformed_key = payload.transformed_key
614 db._filepath = filepath
615 db._opened_as_kdbx3 = is_kdbx3
616 db._yubikey_slot = yubikey_slot
617 db._yubikey_serial = yubikey_serial
619 return db
621 # --- Creating databases ---
623 @classmethod
624 def create(
625 cls,
626 filepath: str | Path | None = None,
627 password: str | None = None,
628 keyfile: str | Path | None = None,
629 database_name: str = "Database",
630 cipher: Cipher = Cipher.AES256_CBC,
631 kdf_config: KdfConfig | None = None,
632 ) -> Database:
633 """Create a new KDBX database.
635 Args:
636 filepath: Path to save the database (optional)
637 password: Database password
638 keyfile: Path to keyfile (optional)
639 database_name: Name for the database
640 cipher: Encryption cipher to use
641 kdf_config: KDF configuration (Argon2Config or AesKdfConfig).
642 Defaults to Argon2Config.standard() with Argon2d variant.
644 Returns:
645 New Database instance
646 """
647 if password is None and keyfile is None:
648 raise MissingCredentialsError()
650 keyfile_data = None
651 if keyfile:
652 keyfile_path = Path(keyfile)
653 if not keyfile_path.exists():
654 raise FileNotFoundError(f"Keyfile not found: {keyfile}")
655 keyfile_data = keyfile_path.read_bytes()
657 # Use provided config or standard Argon2d defaults
658 if kdf_config is None:
659 kdf_config = Argon2Config.standard()
661 # Create root group
662 root_group = Group.create_root(database_name)
664 # Create recycle bin group
665 recycle_bin = Group(name="Recycle Bin", icon_id="43")
666 root_group.add_subgroup(recycle_bin)
668 # Create header based on KDF config type
669 if isinstance(kdf_config, Argon2Config):
670 header = KdbxHeader(
671 version=KdbxVersion.KDBX4,
672 cipher=cipher,
673 compression=CompressionType.GZIP,
674 master_seed=os.urandom(32),
675 encryption_iv=os.urandom(cipher.iv_size),
676 kdf_type=kdf_config.variant,
677 kdf_salt=kdf_config.salt,
678 argon2_memory_kib=kdf_config.memory_kib,
679 argon2_iterations=kdf_config.iterations,
680 argon2_parallelism=kdf_config.parallelism,
681 )
682 elif isinstance(kdf_config, AesKdfConfig):
683 header = KdbxHeader(
684 version=KdbxVersion.KDBX4,
685 cipher=cipher,
686 compression=CompressionType.GZIP,
687 master_seed=os.urandom(32),
688 encryption_iv=os.urandom(cipher.iv_size),
689 kdf_type=KdfType.AES_KDF,
690 kdf_salt=kdf_config.salt,
691 aes_kdf_rounds=kdf_config.rounds,
692 )
693 else:
694 raise DatabaseError(f"Unsupported KDF config type: {type(kdf_config)}")
696 # Create inner header
697 inner_header = InnerHeader(
698 random_stream_id=3, # ChaCha20
699 random_stream_key=os.urandom(64),
700 binaries={},
701 )
703 settings = DatabaseSettings(
704 database_name=database_name,
705 recycle_bin_enabled=True,
706 recycle_bin_uuid=recycle_bin.uuid,
707 )
709 db = cls(
710 root_group=root_group,
711 settings=settings,
712 header=header,
713 inner_header=inner_header,
714 )
715 db._password = password
716 db._keyfile_data = keyfile_data
717 if filepath:
718 db._filepath = Path(filepath)
720 return db
722 # --- Saving databases ---
724 def _upgrade_to_kdbx4(
725 self,
726 kdf_config: KdfConfig | None = None,
727 cipher: Cipher | None = None,
728 ) -> None:
729 """Upgrade KDBX3 database to KDBX4 format.
731 This converts:
732 - KDF to specified config (defaults to Argon2d with standard parameters)
733 - Salsa20 protected stream to ChaCha20
734 - Generates new cryptographic material (seeds, IVs)
736 Args:
737 kdf_config: Optional KDF configuration (Argon2Config or AesKdfConfig).
738 If not provided, uses Argon2Config.standard() with Argon2d.
739 cipher: Optional encryption cipher. If not provided, preserves existing.
740 """
741 if self._header is None:
742 return
744 # Use provided config or standard Argon2d defaults
745 if kdf_config is None:
746 kdf_config = Argon2Config.standard()
748 # Use provided cipher or preserve existing
749 target_cipher = cipher if cipher is not None else self._header.cipher
751 # Build header based on KDF config type
752 if isinstance(kdf_config, Argon2Config):
753 self._header = KdbxHeader(
754 version=KdbxVersion.KDBX4,
755 cipher=target_cipher,
756 compression=self._header.compression,
757 master_seed=os.urandom(32),
758 encryption_iv=os.urandom(target_cipher.iv_size),
759 kdf_type=kdf_config.variant,
760 kdf_salt=kdf_config.salt,
761 argon2_memory_kib=kdf_config.memory_kib,
762 argon2_iterations=kdf_config.iterations,
763 argon2_parallelism=kdf_config.parallelism,
764 )
765 elif isinstance(kdf_config, AesKdfConfig):
766 self._header = KdbxHeader(
767 version=KdbxVersion.KDBX4,
768 cipher=target_cipher,
769 compression=self._header.compression,
770 master_seed=os.urandom(32),
771 encryption_iv=os.urandom(target_cipher.iv_size),
772 kdf_type=KdfType.AES_KDF,
773 kdf_salt=kdf_config.salt,
774 aes_kdf_rounds=kdf_config.rounds,
775 )
777 # Upgrade inner header to use ChaCha20 (more secure than Salsa20)
778 if self._inner_header is not None:
779 self._inner_header.random_stream_id = PROTECTED_STREAM_CHACHA20
780 self._inner_header.random_stream_key = os.urandom(64)
782 def save(
783 self,
784 filepath: str | Path | None = None,
785 *,
786 allow_upgrade: bool = False,
787 regenerate_seeds: bool = True,
788 kdf_config: KdfConfig | None = None,
789 cipher: Cipher | None = None,
790 yubikey_slot: int | None = None,
791 yubikey_serial: int | None = None,
792 ) -> None:
793 """Save the database to a file.
795 KDBX3 databases are automatically upgraded to KDBX4 on save. When saving
796 a KDBX3 database to its original file, explicit confirmation is required
797 via the allow_upgrade parameter.
799 Args:
800 filepath: Path to save to (uses original path if not specified)
801 allow_upgrade: Must be True to confirm KDBX3 to KDBX4 upgrade when
802 saving to the original file. Not required when saving to a new file.
803 regenerate_seeds: If True (default), regenerate all cryptographic seeds
804 (master_seed, encryption_iv, kdf_salt, random_stream_key) on save.
805 Set to False only for testing or when using pre-computed transformed keys.
806 kdf_config: Optional KDF configuration for KDBX3 upgrade. Use presets like:
807 - Argon2Config.standard() / high_security() / fast()
808 - AesKdfConfig.standard() / high_security() / fast()
809 Defaults to Argon2Config.standard() with Argon2d variant.
810 cipher: Optional encryption cipher. Use one of:
811 - Cipher.AES256_CBC (default, widely compatible)
812 - Cipher.CHACHA20 (modern, faster in software)
813 - Cipher.TWOFISH256_CBC (requires oxifish package)
814 If not specified, preserves existing cipher.
815 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional).
816 If provided (or if database was opened with yubikey_slot), the new
817 KDF salt is used as challenge and the response is incorporated
818 into key derivation. Requires yubikey-manager package.
819 yubikey_serial: Serial number of specific YubiKey to use when multiple
820 devices are connected. Use list_yubikeys() to discover serials.
822 Raises:
823 DatabaseError: If no filepath specified and database wasn't opened from file
824 Kdbx3UpgradeRequired: If saving KDBX3 to original file without allow_upgrade=True
825 YubiKeyError: If YubiKey operation fails
826 """
827 save_to_new_file = filepath is not None
828 if filepath:
829 self._filepath = Path(filepath)
830 elif self._filepath is None:
831 raise DatabaseError("No filepath specified and database wasn't opened from file")
833 # Require explicit confirmation when saving KDBX3 to original file
834 was_kdbx3 = getattr(self, "_opened_as_kdbx3", False)
835 if was_kdbx3 and not save_to_new_file and not allow_upgrade:
836 raise Kdbx3UpgradeRequired()
838 # Use provided yubikey params, or fall back to stored ones
839 effective_yubikey_slot = yubikey_slot if yubikey_slot is not None else self._yubikey_slot
840 effective_yubikey_serial = (
841 yubikey_serial if yubikey_serial is not None else self._yubikey_serial
842 )
844 data = self.to_bytes(
845 regenerate_seeds=regenerate_seeds,
846 kdf_config=kdf_config,
847 cipher=cipher,
848 yubikey_slot=effective_yubikey_slot,
849 yubikey_serial=effective_yubikey_serial,
850 )
851 self._filepath.write_bytes(data)
853 # Update stored yubikey params if changed
854 if yubikey_slot is not None:
855 self._yubikey_slot = yubikey_slot
856 if yubikey_serial is not None:
857 self._yubikey_serial = yubikey_serial
859 # After KDBX3 upgrade, reload to get proper KDBX4 state (including transformed_key)
860 if was_kdbx3:
861 self.reload()
862 self._opened_as_kdbx3 = False
864 def reload(self) -> None:
865 """Reload the database from disk using stored credentials.
867 Re-reads the database file and replaces all in-memory state with
868 the current file contents. Useful for discarding unsaved changes
869 or syncing with external modifications.
871 Raises:
872 DatabaseError: If database wasn't opened from a file
873 MissingCredentialsError: If no credentials are stored
874 """
875 if self._filepath is None:
876 raise DatabaseError("Cannot reload: database wasn't opened from a file")
878 if self._password is None and self._keyfile_data is None:
879 raise MissingCredentialsError()
881 # Re-read and parse the file
882 data = self._filepath.read_bytes()
883 reloaded = self.open_bytes(
884 data,
885 password=self._password,
886 keyfile_data=self._keyfile_data,
887 filepath=self._filepath,
888 )
890 # Copy all state from reloaded database
891 self._root_group = reloaded._root_group
892 self._settings = reloaded._settings
893 self._header = reloaded._header
894 self._inner_header = reloaded._inner_header
895 self._binaries = reloaded._binaries
896 self._transformed_key = reloaded._transformed_key
897 self._opened_as_kdbx3 = reloaded._opened_as_kdbx3
899 def xml(self, *, pretty_print: bool = False) -> bytes:
900 """Export database XML payload.
902 Returns the decrypted, decompressed XML payload of the database.
903 Protected values (passwords, etc.) are shown in plaintext.
904 Useful for debugging and migration.
906 Args:
907 pretty_print: If True, format XML with indentation for readability
909 Returns:
910 XML payload as bytes (UTF-8 encoded)
911 """
912 root = Element("KeePassFile")
914 # Meta section
915 meta = SubElement(root, "Meta")
916 self._build_meta(meta)
918 # Root section
919 root_elem = SubElement(root, "Root")
920 self._build_group(root_elem, self._root_group)
922 # Note: We do NOT encrypt protected values here - the point is to
923 # export the decrypted XML for debugging/inspection
925 if pretty_print:
926 self._indent_xml(root)
928 return cast(bytes, tostring(root, encoding="utf-8", xml_declaration=True))
930 def dump_xml(self, filepath: str | Path, *, pretty_print: bool = True) -> None:
931 """Write database XML payload to a file.
933 Writes the decrypted, decompressed XML payload to a file.
934 Protected values (passwords, etc.) are shown in plaintext.
935 Useful for debugging and migration.
937 Args:
938 filepath: Path to write the XML file
939 pretty_print: If True (default), format XML with indentation
940 """
941 xml_data = self.xml(pretty_print=pretty_print)
942 Path(filepath).write_bytes(xml_data)
944 @staticmethod
945 def _indent_xml(elem: Element, level: int = 0) -> None:
946 """Add indentation to XML element tree for pretty printing."""
947 indent = "\n" + " " * level
948 if len(elem):
949 if not elem.text or not elem.text.strip():
950 elem.text = indent + " "
951 if not elem.tail or not elem.tail.strip():
952 elem.tail = indent
953 for child in elem:
954 Database._indent_xml(child, level + 1)
955 if not child.tail or not child.tail.strip():
956 child.tail = indent
957 else:
958 if level and (not elem.tail or not elem.tail.strip()):
959 elem.tail = indent
961 def to_bytes(
962 self,
963 *,
964 regenerate_seeds: bool = True,
965 kdf_config: KdfConfig | None = None,
966 cipher: Cipher | None = None,
967 yubikey_slot: int | None = None,
968 yubikey_serial: int | None = None,
969 ) -> bytes:
970 """Serialize the database to KDBX4 format.
972 KDBX3 databases are automatically upgraded to KDBX4 on save.
973 This includes converting to the specified KDF and ChaCha20 protected stream.
975 Args:
976 regenerate_seeds: If True (default), regenerate all cryptographic seeds
977 (master_seed, encryption_iv, kdf_salt, random_stream_key) on save.
978 This prevents precomputation attacks where an attacker can derive
979 the encryption key in advance. Set to False only for testing or
980 when using pre-computed transformed keys.
981 kdf_config: Optional KDF configuration for KDBX3 upgrade. Use presets like:
982 - Argon2Config.standard() / high_security() / fast()
983 - AesKdfConfig.standard() / high_security() / fast()
984 Defaults to Argon2Config.standard() with Argon2d variant.
985 cipher: Optional encryption cipher. Use one of:
986 - Cipher.AES256_CBC (default, widely compatible)
987 - Cipher.CHACHA20 (modern, faster in software)
988 - Cipher.TWOFISH256_CBC (requires oxifish package)
989 If not specified, preserves existing cipher.
990 yubikey_slot: YubiKey slot for challenge-response (1 or 2, optional).
991 If provided, the (new) KDF salt is used as challenge and the
992 20-byte HMAC-SHA1 response is incorporated into key derivation.
993 Requires yubikey-manager package: pip install kdbxtool[yubikey]
994 yubikey_serial: Serial number of specific YubiKey to use when multiple
995 devices are connected. Use list_yubikeys() to discover serials.
997 Returns:
998 KDBX4 file contents as bytes
1000 Raises:
1001 MissingCredentialsError: If no credentials are set
1002 YubiKeyError: If YubiKey operation fails
1003 """
1004 # Need either credentials, a transformed key, or YubiKey
1005 has_credentials = self._password is not None or self._keyfile_data is not None
1006 has_transformed_key = self._transformed_key is not None
1007 has_yubikey = yubikey_slot is not None
1008 if not has_credentials and not has_transformed_key and not has_yubikey:
1009 raise MissingCredentialsError()
1011 if self._header is None:
1012 raise DatabaseError("No header - database not properly initialized")
1014 if self._inner_header is None:
1015 raise DatabaseError("No inner header - database not properly initialized")
1017 # Auto-upgrade KDBX3 to KDBX4
1018 if self._header.version == KdbxVersion.KDBX3:
1019 self._upgrade_to_kdbx4(kdf_config=kdf_config, cipher=cipher)
1020 elif cipher is not None and cipher != self._header.cipher:
1021 # Change cipher for KDBX4 database
1022 self._header.cipher = cipher
1023 self._header.encryption_iv = os.urandom(cipher.iv_size)
1024 # Cipher change invalidates transformed key
1025 self._transformed_key = None
1027 # Regenerate all cryptographic seeds to prevent precomputation attacks.
1028 # This ensures each save produces a file encrypted with fresh randomness.
1029 # See: https://github.com/libkeepass/pykeepass/issues/219
1030 if regenerate_seeds:
1031 self._header.master_seed = os.urandom(32)
1032 self._header.encryption_iv = os.urandom(self._header.cipher.iv_size)
1033 self._header.kdf_salt = os.urandom(32)
1034 self._inner_header.random_stream_key = os.urandom(64)
1035 # Cached transformed_key is now invalid (salt changed)
1036 self._transformed_key = None
1038 # Get YubiKey response if slot specified
1039 # KeePassXC uses the KDF salt as the challenge, not master_seed
1040 yubikey_response: bytes | None = None
1041 if yubikey_slot is not None:
1042 if not yubikey_module.YUBIKEY_AVAILABLE:
1043 from .exceptions import YubiKeyNotAvailableError
1045 raise YubiKeyNotAvailableError()
1046 config = YubiKeyConfig(slot=yubikey_slot, serial=yubikey_serial)
1047 response = compute_challenge_response(self._header.kdf_salt, config)
1048 yubikey_response = response.data
1050 # Sync binaries to inner header (preserve protection flags where possible)
1051 existing_binaries = self._inner_header.binaries
1052 new_binaries: dict[int, tuple[bool, bytes]] = {}
1053 for ref, data in self._binaries.items():
1054 if ref in existing_binaries:
1055 # Preserve existing protection flag
1056 protected, _ = existing_binaries[ref]
1057 new_binaries[ref] = (protected, data)
1058 else:
1059 # New binary, default to protected
1060 new_binaries[ref] = (True, data)
1061 self._inner_header.binaries = new_binaries
1063 # Build XML
1064 xml_data = self._build_xml()
1066 # Encrypt and return
1067 # Use cached transformed_key if available (faster), otherwise use credentials
1068 return write_kdbx4(
1069 header=self._header,
1070 inner_header=self._inner_header,
1071 xml_data=xml_data,
1072 password=self._password,
1073 keyfile_data=self._keyfile_data,
1074 transformed_key=self._transformed_key,
1075 yubikey_response=yubikey_response,
1076 )
1078 def set_credentials(
1079 self,
1080 password: str | None = None,
1081 keyfile_data: bytes | None = None,
1082 ) -> None:
1083 """Set or update database credentials.
1085 Args:
1086 password: New password (None to remove)
1087 keyfile_data: New keyfile data (None to remove)
1089 Raises:
1090 ValueError: If both password and keyfile are None
1091 """
1092 if password is None and keyfile_data is None:
1093 raise MissingCredentialsError()
1094 self._password = password
1095 self._keyfile_data = keyfile_data
1097 # --- Search operations ---
1099 def find_entries(
1100 self,
1101 title: str | None = None,
1102 username: str | None = None,
1103 password: str | None = None,
1104 url: str | None = None,
1105 notes: str | None = None,
1106 otp: str | None = None,
1107 tags: list[str] | None = None,
1108 string: dict[str, str] | None = None,
1109 autotype_enabled: bool | None = None,
1110 autotype_sequence: str | None = None,
1111 autotype_window: str | None = None,
1112 uuid: uuid_module.UUID | None = None,
1113 path: list[str] | str | None = None,
1114 recursive: bool = True,
1115 history: bool = False,
1116 first: bool = False,
1117 ) -> list[Entry] | Entry | None:
1118 """Find entries matching criteria.
1120 Args:
1121 title: Match entries with this title
1122 username: Match entries with this username
1123 password: Match entries with this password
1124 url: Match entries with this URL
1125 notes: Match entries with these notes
1126 otp: Match entries with this OTP
1127 tags: Match entries with all these tags
1128 string: Match entries with custom properties (dict of key:value)
1129 autotype_enabled: Filter by AutoType enabled state
1130 autotype_sequence: Match entries with this AutoType sequence
1131 autotype_window: Match entries with this AutoType window
1132 uuid: Match entry with this UUID
1133 path: Path to entry as list of group names ending with entry title,
1134 or as a '/'-separated string. When specified, other criteria
1135 are ignored.
1136 recursive: Search in subgroups
1137 history: Include history entries in search
1138 first: If True, return first match or None. If False, return list.
1140 Returns:
1141 If first=True: Entry or None
1142 If first=False: List of matching entries
1143 """
1144 # Path-based search
1145 if path is not None:
1146 results = self._find_entry_by_path(path)
1147 if first:
1148 return results[0] if results else None
1149 return results
1151 if uuid is not None:
1152 entry = self._root_group.find_entry_by_uuid(uuid, recursive=recursive)
1153 if first:
1154 return entry
1155 return [entry] if entry else []
1157 results = self._root_group.find_entries(
1158 title=title,
1159 username=username,
1160 password=password,
1161 url=url,
1162 notes=notes,
1163 otp=otp,
1164 tags=tags,
1165 string=string,
1166 autotype_enabled=autotype_enabled,
1167 autotype_sequence=autotype_sequence,
1168 autotype_window=autotype_window,
1169 recursive=recursive,
1170 history=history,
1171 )
1173 if first:
1174 return results[0] if results else None
1175 return results
1177 def _find_entry_by_path(self, path: list[str] | str) -> list[Entry]:
1178 """Find entry by path.
1180 Args:
1181 path: Path as list ['group1', 'group2', 'entry_title'] or
1182 string 'group1/group2/entry_title'
1184 Returns:
1185 List containing matching entry, or empty list if not found
1186 """
1187 if isinstance(path, str):
1188 path = [p for p in path.split("/") if p]
1190 if not path:
1191 return []
1193 # Last element is entry title, rest are group names
1194 entry_title = path[-1]
1195 group_path = path[:-1]
1197 # Navigate to target group
1198 current = self._root_group
1199 for group_name in group_path:
1200 found = None
1201 for subgroup in current.subgroups:
1202 if subgroup.name == group_name:
1203 found = subgroup
1204 break
1205 if found is None:
1206 return []
1207 current = found
1209 # Find entry in target group (non-recursive)
1210 for entry in current.entries:
1211 if entry.title == entry_title:
1212 return [entry]
1214 return []
1216 def find_groups(
1217 self,
1218 name: str | None = None,
1219 uuid: uuid_module.UUID | None = None,
1220 path: list[str] | str | None = None,
1221 recursive: bool = True,
1222 first: bool = False,
1223 ) -> list[Group] | Group | None:
1224 """Find groups matching criteria.
1226 Args:
1227 name: Match groups with this name
1228 uuid: Match group with this UUID
1229 path: Path to group as list of group names or as a '/'-separated
1230 string. When specified, other criteria are ignored.
1231 recursive: Search in nested subgroups
1232 first: If True, return first matching group or None instead of list
1234 Returns:
1235 List of matching groups, or single Group/None if first=True
1236 """
1237 # Path-based search
1238 if path is not None:
1239 results = self._find_group_by_path(path)
1240 if first:
1241 return results[0] if results else None
1242 return results
1244 if uuid is not None:
1245 group = self._root_group.find_group_by_uuid(uuid, recursive=recursive)
1246 if first:
1247 return group
1248 return [group] if group else []
1250 # find_groups with first=False always returns list
1251 group_results = cast(
1252 list[Group],
1253 self._root_group.find_groups(name=name, recursive=recursive),
1254 )
1255 if first:
1256 return group_results[0] if group_results else None
1257 return group_results
1259 def _find_group_by_path(self, path: list[str] | str) -> list[Group]:
1260 """Find group by path.
1262 Args:
1263 path: Path as list ['group1', 'group2'] or string 'group1/group2'
1265 Returns:
1266 List containing matching group, or empty list if not found
1267 """
1268 if isinstance(path, str):
1269 path = [p for p in path.split("/") if p]
1271 if not path:
1272 return [self._root_group]
1274 # Navigate through path
1275 current = self._root_group
1276 for group_name in path:
1277 found = None
1278 for subgroup in current.subgroups:
1279 if subgroup.name == group_name:
1280 found = subgroup
1281 break
1282 if found is None:
1283 return []
1284 current = found
1286 return [current]
1288 def find_entries_contains(
1289 self,
1290 title: str | None = None,
1291 username: str | None = None,
1292 password: str | None = None,
1293 url: str | None = None,
1294 notes: str | None = None,
1295 otp: str | None = None,
1296 recursive: bool = True,
1297 case_sensitive: bool = False,
1298 history: bool = False,
1299 ) -> list[Entry]:
1300 """Find entries where fields contain the given substrings.
1302 All criteria are combined with AND logic. None means "any value".
1304 Args:
1305 title: Match entries whose title contains this substring
1306 username: Match entries whose username contains this substring
1307 password: Match entries whose password contains this substring
1308 url: Match entries whose URL contains this substring
1309 notes: Match entries whose notes contain this substring
1310 otp: Match entries whose OTP contains this substring
1311 recursive: Search in subgroups
1312 case_sensitive: If False (default), matching is case-insensitive
1313 history: Include history entries in search
1315 Returns:
1316 List of matching entries
1317 """
1318 return self._root_group.find_entries_contains(
1319 title=title,
1320 username=username,
1321 password=password,
1322 url=url,
1323 notes=notes,
1324 otp=otp,
1325 recursive=recursive,
1326 case_sensitive=case_sensitive,
1327 history=history,
1328 )
1330 def find_entries_regex(
1331 self,
1332 title: str | None = None,
1333 username: str | None = None,
1334 password: str | None = None,
1335 url: str | None = None,
1336 notes: str | None = None,
1337 otp: str | None = None,
1338 recursive: bool = True,
1339 case_sensitive: bool = False,
1340 history: bool = False,
1341 ) -> list[Entry]:
1342 """Find entries where fields match the given regex patterns.
1344 All criteria are combined with AND logic. None means "any value".
1346 Args:
1347 title: Regex pattern to match against title
1348 username: Regex pattern to match against username
1349 password: Regex pattern to match against password
1350 url: Regex pattern to match against URL
1351 notes: Regex pattern to match against notes
1352 otp: Regex pattern to match against OTP
1353 recursive: Search in subgroups
1354 case_sensitive: If False (default), matching is case-insensitive
1355 history: Include history entries in search
1357 Returns:
1358 List of matching entries
1360 Raises:
1361 re.error: If any pattern is not a valid regex
1362 """
1363 return self._root_group.find_entries_regex(
1364 title=title,
1365 username=username,
1366 password=password,
1367 url=url,
1368 notes=notes,
1369 otp=otp,
1370 recursive=recursive,
1371 case_sensitive=case_sensitive,
1372 history=history,
1373 )
1375 def find_attachments(
1376 self,
1377 id: int | None = None,
1378 filename: str | None = None,
1379 regex: bool = False,
1380 recursive: bool = True,
1381 history: bool = False,
1382 first: bool = False,
1383 ) -> list[Attachment] | Attachment | None:
1384 """Find attachments in the database.
1386 Args:
1387 id: Match attachments with this binary reference ID
1388 filename: Match attachments with this filename (exact or regex)
1389 regex: If True, treat filename as a regex pattern
1390 recursive: Search in subgroups
1391 history: Include history entries in search
1392 first: If True, return first match or None. If False, return list.
1394 Returns:
1395 If first=True: Attachment or None
1396 If first=False: List of matching attachments
1397 """
1398 import re as re_module
1400 results: list[Attachment] = []
1401 pattern: re_module.Pattern[str] | None = None
1403 if regex and filename is not None:
1404 pattern = re_module.compile(filename)
1406 for entry in self._root_group.iter_entries(recursive=recursive, history=history):
1407 for binary_ref in entry.binaries:
1408 # Check ID filter
1409 if id is not None and binary_ref.ref != id:
1410 continue
1412 # Check filename filter
1413 if filename is not None:
1414 if regex and pattern is not None:
1415 if not pattern.search(binary_ref.key):
1416 continue
1417 elif binary_ref.key != filename:
1418 continue
1420 attachment = Attachment(
1421 filename=binary_ref.key,
1422 id=binary_ref.ref,
1423 entry=entry,
1424 )
1425 results.append(attachment)
1427 if first:
1428 return attachment
1430 if first:
1431 return None
1432 return results
1434 @property
1435 def attachments(self) -> list[Attachment]:
1436 """Get all attachments in the database."""
1437 result = self.find_attachments(filename=".*", regex=True)
1438 # find_attachments returns list when first=False (default)
1439 return result if isinstance(result, list) else []
1441 def iter_entries(self, recursive: bool = True) -> Iterator[Entry]:
1442 """Iterate over all entries in the database.
1444 Args:
1445 recursive: Include entries from all subgroups
1447 Yields:
1448 Entry objects
1449 """
1450 yield from self._root_group.iter_entries(recursive=recursive)
1452 def iter_groups(self, recursive: bool = True) -> Iterator[Group]:
1453 """Iterate over all groups in the database.
1455 Args:
1456 recursive: Include nested subgroups
1458 Yields:
1459 Group objects
1460 """
1461 yield from self._root_group.iter_groups(recursive=recursive)
1463 # --- Field References ---
1465 def deref(self, value: str | None) -> str | uuid_module.UUID | None:
1466 """Resolve KeePass field references in a value.
1468 Parses field references in the format {REF:X@Y:Z} and replaces them
1469 with the actual values from the referenced entries:
1470 - X = Field to retrieve (T=Title, U=Username, P=Password, A=URL, N=Notes, I=UUID)
1471 - Y = Field to search by (T, U, P, A, N, I)
1472 - Z = Search value
1474 References are resolved recursively, so a reference that resolves to
1475 another reference will continue resolving until a final value is found.
1477 Args:
1478 value: String potentially containing field references
1480 Returns:
1481 - The resolved string with all references replaced
1482 - A UUID if the final result is a UUID reference
1483 - None if any referenced entry cannot be found
1484 - The original value if it contains no references or is None
1486 Example:
1487 >>> # Entry with password = '{REF:P@I:ABCD1234...}'
1488 >>> db.deref(entry.password) # Returns the referenced password
1489 >>>
1490 >>> # With prefix/suffix: 'prefix{REF:U@I:...}suffix'
1491 >>> db.deref(value) # Returns 'prefix<username>suffix'
1492 """
1493 import re
1495 if not value:
1496 return value
1498 # Pattern matches {REF:X@Y:Z} where X and Y are field codes, Z is search value
1499 pattern = r"(\{REF:([TUPANI])@([TUPANI]):([^}]+)\})"
1500 references = set(re.findall(pattern, value))
1502 if not references:
1503 return value
1505 field_to_attr = {
1506 "T": "title",
1507 "U": "username",
1508 "P": "password",
1509 "A": "url",
1510 "N": "notes",
1511 "I": "uuid",
1512 }
1514 for ref_str, wanted_field, search_field, search_value in references:
1515 wanted_attr = field_to_attr[wanted_field]
1516 search_attr = field_to_attr[search_field]
1518 # Convert UUID search value to proper UUID object
1519 if search_attr == "uuid":
1520 try:
1521 search_value = uuid_module.UUID(search_value)
1522 except ValueError:
1523 return None
1525 # Find the referenced entry
1526 ref_entry = self.find_entries(first=True, **{search_attr: search_value})
1527 if ref_entry is None:
1528 return None
1530 # Get the wanted field value
1531 resolved_value = getattr(ref_entry, wanted_attr)
1532 if resolved_value is None:
1533 resolved_value = ""
1535 # UUID needs special handling - convert to string for replacement
1536 if isinstance(resolved_value, uuid_module.UUID):
1537 resolved_value = str(resolved_value)
1539 value = value.replace(ref_str, resolved_value)
1541 # Recursively resolve any nested references
1542 return self.deref(value)
1544 # --- Move operations ---
1546 def move_entry(self, entry: Entry, destination: Group) -> None:
1547 """Move an entry to a different group.
1549 This is a convenience method that calls entry.move_to(). It validates
1550 that both the entry and destination belong to this database.
1552 Args:
1553 entry: Entry to move
1554 destination: Target group to move the entry to
1556 Raises:
1557 ValueError: If entry or destination is not in this database
1558 ValueError: If entry has no parent
1559 ValueError: If destination is the current parent
1560 """
1561 # Validate entry is in this database
1562 if entry.parent is None:
1563 raise ValueError("Entry has no parent group")
1564 found = self._root_group.find_entry_by_uuid(entry.uuid)
1565 if found is None:
1566 raise ValueError("Entry is not in this database")
1568 # Validate destination is in this database
1569 if destination is not self._root_group:
1570 found_group = self._root_group.find_group_by_uuid(destination.uuid)
1571 if found_group is None:
1572 raise ValueError("Destination group is not in this database")
1574 entry.move_to(destination)
1576 def move_group(self, group: Group, destination: Group) -> None:
1577 """Move a group to a different parent group.
1579 This is a convenience method that calls group.move_to(). It validates
1580 that both the group and destination belong to this database.
1582 Args:
1583 group: Group to move
1584 destination: Target parent group to move the group to
1586 Raises:
1587 ValueError: If group or destination is not in this database
1588 ValueError: If group is the root group
1589 ValueError: If group has no parent
1590 ValueError: If destination is the current parent
1591 ValueError: If destination is the group itself or a descendant
1592 """
1593 # Validate group is in this database (and not root)
1594 if group.is_root_group:
1595 raise ValueError("Cannot move the root group")
1596 if group.parent is None:
1597 raise ValueError("Group has no parent")
1598 found = self._root_group.find_group_by_uuid(group.uuid)
1599 if found is None:
1600 raise ValueError("Group is not in this database")
1602 # Validate destination is in this database
1603 if destination is not self._root_group:
1604 found_dest = self._root_group.find_group_by_uuid(destination.uuid)
1605 if found_dest is None:
1606 raise ValueError("Destination group is not in this database")
1608 group.move_to(destination)
1610 # --- Recycle bin operations ---
1612 @property
1613 def recyclebin_group(self) -> Group | None:
1614 """Get the recycle bin group, or None if disabled.
1616 If recycle_bin_enabled is True but no recycle bin exists yet,
1617 this creates one automatically.
1619 Returns:
1620 Recycle bin Group, or None if recycle bin is disabled
1621 """
1622 if not self._settings.recycle_bin_enabled:
1623 return None
1625 # Try to find existing recycle bin
1626 if self._settings.recycle_bin_uuid is not None:
1627 group = self._root_group.find_group_by_uuid(self._settings.recycle_bin_uuid)
1628 if group is not None:
1629 return group
1631 # Create new recycle bin
1632 recycle_bin = Group(name="Recycle Bin", icon_id="43")
1633 self._root_group.add_subgroup(recycle_bin)
1634 self._settings.recycle_bin_uuid = recycle_bin.uuid
1635 return recycle_bin
1637 def trash_entry(self, entry: Entry) -> None:
1638 """Move an entry to the recycle bin.
1640 If the entry is already in the recycle bin, it is permanently deleted.
1642 Args:
1643 entry: Entry to trash
1645 Raises:
1646 ValueError: If entry is not in this database
1647 ValueError: If recycle bin is disabled
1648 """
1649 # Validate entry is in this database
1650 if entry.parent is None:
1651 raise ValueError("Entry has no parent group")
1652 found = self._root_group.find_entry_by_uuid(entry.uuid)
1653 if found is None:
1654 raise ValueError("Entry is not in this database")
1656 recycle_bin = self.recyclebin_group
1657 if recycle_bin is None:
1658 raise ValueError("Recycle bin is disabled")
1660 # If already in recycle bin, delete permanently
1661 if entry.parent is recycle_bin:
1662 recycle_bin.remove_entry(entry)
1663 return
1665 # Move to recycle bin
1666 entry.move_to(recycle_bin)
1668 def trash_group(self, group: Group) -> None:
1669 """Move a group to the recycle bin.
1671 If the group is already in the recycle bin, it is permanently deleted.
1672 Cannot trash the root group or the recycle bin itself.
1674 Args:
1675 group: Group to trash
1677 Raises:
1678 ValueError: If group is not in this database
1679 ValueError: If group is the root group
1680 ValueError: If group is the recycle bin
1681 ValueError: If recycle bin is disabled
1682 """
1683 # Validate group
1684 if group.is_root_group:
1685 raise ValueError("Cannot trash the root group")
1686 if group.parent is None:
1687 raise ValueError("Group has no parent")
1688 found = self._root_group.find_group_by_uuid(group.uuid)
1689 if found is None:
1690 raise ValueError("Group is not in this database")
1692 recycle_bin = self.recyclebin_group
1693 if recycle_bin is None:
1694 raise ValueError("Recycle bin is disabled")
1696 # Cannot trash the recycle bin itself
1697 if group is recycle_bin:
1698 raise ValueError("Cannot trash the recycle bin")
1700 # If already in recycle bin, delete permanently
1701 if group.parent is recycle_bin:
1702 recycle_bin.remove_subgroup(group)
1703 return
1705 # Move to recycle bin
1706 group.move_to(recycle_bin)
1708 def empty_group(self, group: Group) -> None:
1709 """Delete all entries and subgroups from a group.
1711 This permanently deletes all contents (does not use recycle bin).
1712 The group itself is not deleted.
1714 Args:
1715 group: Group to empty
1717 Raises:
1718 ValueError: If group is not in this database
1719 """
1720 # Validate group is in this database
1721 if group is not self._root_group:
1722 found = self._root_group.find_group_by_uuid(group.uuid)
1723 if found is None:
1724 raise ValueError("Group is not in this database")
1726 # Delete all subgroups (iterate over copy since we're modifying)
1727 for subgroup in list(group.subgroups):
1728 group.remove_subgroup(subgroup)
1730 # Delete all entries
1731 for entry in list(group.entries):
1732 group.remove_entry(entry)
1734 # --- Memory protection ---
1736 def apply_protection_policy(self, entry: Entry) -> None:
1737 """Apply the database's memory protection policy to an entry.
1739 Updates the `protected` flag on the entry's string fields
1740 according to the database's memory_protection settings.
1742 This is automatically applied when saving the database, but
1743 can be called manually if you need protection applied immediately
1744 for in-memory operations.
1746 Args:
1747 entry: Entry to apply policy to
1748 """
1749 for key, string_field in entry.strings.items():
1750 if key in self._settings.memory_protection:
1751 string_field.protected = self._settings.memory_protection[key]
1753 def apply_protection_policy_all(self) -> None:
1754 """Apply memory protection policy to all entries in the database.
1756 Updates all entries' string field protection flags according
1757 to the database's memory_protection settings.
1758 """
1759 for entry in self.iter_entries():
1760 self.apply_protection_policy(entry)
1762 # --- Binary attachments ---
1764 def get_binary(self, ref: int) -> bytes | None:
1765 """Get binary attachment data by reference ID.
1767 Args:
1768 ref: Binary reference ID
1770 Returns:
1771 Binary data or None if not found
1772 """
1773 return self._binaries.get(ref)
1775 def add_binary(self, data: bytes, protected: bool = True) -> int:
1776 """Add a new binary attachment to the database.
1778 Args:
1779 data: Binary data
1780 protected: Whether the binary should be memory-protected
1782 Returns:
1783 Reference ID for the new binary
1784 """
1785 # Find next available index
1786 ref = max(self._binaries.keys(), default=-1) + 1
1787 self._binaries[ref] = data
1788 # Update inner header
1789 if self._inner_header is not None:
1790 self._inner_header.binaries[ref] = (protected, data)
1791 return ref
1793 def remove_binary(self, ref: int) -> bool:
1794 """Remove a binary attachment from the database.
1796 Args:
1797 ref: Binary reference ID
1799 Returns:
1800 True if removed, False if not found
1801 """
1802 if ref in self._binaries:
1803 del self._binaries[ref]
1804 if self._inner_header is not None and ref in self._inner_header.binaries:
1805 del self._inner_header.binaries[ref]
1806 return True
1807 return False
1809 def get_attachment(self, entry: Entry, name: str) -> bytes | None:
1810 """Get an attachment from an entry by filename.
1812 Args:
1813 entry: Entry to get attachment from
1814 name: Filename of the attachment
1816 Returns:
1817 Attachment data or None if not found
1818 """
1819 for binary_ref in entry.binaries:
1820 if binary_ref.key == name:
1821 return self._binaries.get(binary_ref.ref)
1822 return None
1824 def add_attachment(self, entry: Entry, name: str, data: bytes, protected: bool = True) -> None:
1825 """Add an attachment to an entry.
1827 Args:
1828 entry: Entry to add attachment to
1829 name: Filename for the attachment
1830 data: Attachment data
1831 protected: Whether the attachment should be memory-protected
1832 """
1833 ref = self.add_binary(data, protected=protected)
1834 entry.binaries.append(BinaryRef(key=name, ref=ref))
1836 def remove_attachment(self, entry: Entry, name: str) -> bool:
1837 """Remove an attachment from an entry by filename.
1839 Args:
1840 entry: Entry to remove attachment from
1841 name: Filename of the attachment
1843 Returns:
1844 True if removed, False if not found
1845 """
1846 for i, binary_ref in enumerate(entry.binaries):
1847 if binary_ref.key == name:
1848 # Remove from entry's list
1849 entry.binaries.pop(i)
1850 # Note: We don't remove from _binaries as other entries may reference it
1851 return True
1852 return False
1854 def list_attachments(self, entry: Entry) -> list[str]:
1855 """List all attachment filenames for an entry.
1857 Args:
1858 entry: Entry to list attachments for
1860 Returns:
1861 List of attachment filenames
1862 """
1863 return [binary_ref.key for binary_ref in entry.binaries]
1865 # --- Custom icons ---
1867 @property
1868 def custom_icons(self) -> dict[uuid_module.UUID, CustomIcon]:
1869 """Get dictionary of custom icons (UUID -> CustomIcon)."""
1870 return self._settings.custom_icons
1872 def get_custom_icon(self, uuid: uuid_module.UUID) -> bytes | None:
1873 """Get custom icon data by UUID.
1875 Args:
1876 uuid: UUID of the custom icon
1878 Returns:
1879 PNG image data, or None if not found
1880 """
1881 icon = self._settings.custom_icons.get(uuid)
1882 return icon.data if icon else None
1884 def add_custom_icon(self, data: bytes, name: str | None = None) -> uuid_module.UUID:
1885 """Add a custom icon to the database.
1887 Args:
1888 data: PNG image data
1889 name: Optional display name for the icon
1891 Returns:
1892 UUID of the new custom icon
1893 """
1894 icon_uuid = uuid_module.uuid4()
1895 icon = CustomIcon(
1896 uuid=icon_uuid,
1897 data=data,
1898 name=name,
1899 last_modification_time=datetime.now(UTC),
1900 )
1901 self._settings.custom_icons[icon_uuid] = icon
1902 return icon_uuid
1904 def remove_custom_icon(self, uuid: uuid_module.UUID) -> bool:
1905 """Remove a custom icon from the database.
1907 Note: This does not update entries/groups that reference this icon.
1908 They will continue to reference the now-missing UUID.
1910 Args:
1911 uuid: UUID of the custom icon to remove
1913 Returns:
1914 True if removed, False if not found
1915 """
1916 if uuid in self._settings.custom_icons:
1917 del self._settings.custom_icons[uuid]
1918 return True
1919 return False
1921 def find_custom_icon_by_name(self, name: str) -> uuid_module.UUID | None:
1922 """Find a custom icon by name.
1924 Args:
1925 name: Name of the icon to find (must match exactly one icon)
1927 Returns:
1928 UUID of the matching icon, or None if not found
1930 Raises:
1931 ValueError: If multiple icons have the same name
1932 """
1933 matches = [icon.uuid for icon in self._settings.custom_icons.values() if icon.name == name]
1934 if len(matches) == 0:
1935 return None
1936 if len(matches) > 1:
1937 raise ValueError(f"Multiple custom icons found with name: {name}")
1938 return matches[0]
1940 # --- XML parsing ---
1942 @classmethod
1943 def _parse_xml(
1944 cls, xml_data: bytes, inner_header: InnerHeader | None = None
1945 ) -> tuple[Group, DatabaseSettings, dict[int, bytes]]:
1946 """Parse KDBX XML into models.
1948 Args:
1949 xml_data: XML payload bytes
1950 inner_header: Inner header with stream cipher info (for decrypting protected values)
1952 Returns:
1953 Tuple of (root_group, settings, binaries)
1954 """
1955 root = DefusedET.fromstring(xml_data)
1957 # Decrypt protected values in-place before parsing
1958 if inner_header is not None:
1959 cls._decrypt_protected_values(root, inner_header)
1961 # Parse Meta section for settings
1962 settings = cls._parse_meta(root.find("Meta"))
1964 # Parse Root/Group for entries
1965 root_elem = root.find("Root")
1966 if root_elem is None:
1967 raise InvalidXmlError("Missing Root element")
1969 group_elem = root_elem.find("Group")
1970 if group_elem is None:
1971 raise InvalidXmlError("Missing root Group element")
1973 root_group = cls._parse_group(group_elem)
1974 root_group._is_root = True
1976 # Extract binaries from inner header (KDBX4 style)
1977 # The protection flag indicates memory protection policy, not encryption
1978 binaries: dict[int, bytes] = {}
1979 if inner_header is not None:
1980 for idx, (_protected, data) in inner_header.binaries.items():
1981 binaries[idx] = data
1983 return root_group, settings, binaries
1985 @classmethod
1986 def _decrypt_protected_values(cls, root: Element, inner_header: InnerHeader) -> None:
1987 """Decrypt all protected values in the XML tree in document order.
1989 Protected values are XOR'd with a stream cipher and base64 encoded.
1990 This method decrypts them in-place.
1991 """
1992 cipher = ProtectedStreamCipher(
1993 inner_header.random_stream_id,
1994 inner_header.random_stream_key,
1995 )
1997 # Find all Value elements with Protected="True" in document order
1998 for elem in root.iter("Value"):
1999 if elem.get("Protected") == "True" and elem.text:
2000 try:
2001 ciphertext = base64.b64decode(elem.text)
2002 plaintext = cipher.decrypt(ciphertext)
2003 elem.text = plaintext.decode("utf-8")
2004 except (binascii.Error, ValueError, UnicodeDecodeError):
2005 # If decryption fails, leave as-is
2006 pass
2008 @classmethod
2009 def _parse_meta(cls, meta_elem: Element | None) -> DatabaseSettings:
2010 """Parse Meta element into DatabaseSettings."""
2011 settings = DatabaseSettings()
2013 if meta_elem is None:
2014 return settings
2016 def get_text(tag: str) -> str | None:
2017 elem = meta_elem.find(tag)
2018 return elem.text if elem is not None else None
2020 if name := get_text("DatabaseName"):
2021 settings.database_name = name
2022 if desc := get_text("DatabaseDescription"):
2023 settings.database_description = desc
2024 if username := get_text("DefaultUserName"):
2025 settings.default_username = username
2026 if gen := get_text("Generator"):
2027 settings.generator = gen
2029 # Parse memory protection
2030 mp_elem = meta_elem.find("MemoryProtection")
2031 if mp_elem is not None:
2032 for field in ["Title", "UserName", "Password", "URL", "Notes"]:
2033 elem = mp_elem.find(f"Protect{field}")
2034 if elem is not None:
2035 settings.memory_protection[field] = elem.text == "True"
2037 # Parse recycle bin
2038 if rb := get_text("RecycleBinEnabled"):
2039 settings.recycle_bin_enabled = rb == "True"
2040 if rb_uuid := get_text("RecycleBinUUID"):
2041 import contextlib
2043 with contextlib.suppress(binascii.Error, ValueError):
2044 settings.recycle_bin_uuid = uuid_module.UUID(bytes=base64.b64decode(rb_uuid))
2046 # Parse custom icons
2047 custom_icons_elem = meta_elem.find("CustomIcons")
2048 if custom_icons_elem is not None:
2049 for icon_elem in custom_icons_elem.findall("Icon"):
2050 icon_uuid_elem = icon_elem.find("UUID")
2051 icon_data_elem = icon_elem.find("Data")
2052 if (
2053 icon_uuid_elem is not None
2054 and icon_uuid_elem.text
2055 and icon_data_elem is not None
2056 and icon_data_elem.text
2057 ):
2058 try:
2059 icon_uuid = uuid_module.UUID(bytes=base64.b64decode(icon_uuid_elem.text))
2060 icon_data = base64.b64decode(icon_data_elem.text)
2061 icon_name = None
2062 name_elem = icon_elem.find("Name")
2063 if name_elem is not None:
2064 icon_name = name_elem.text
2065 icon_mtime = None
2066 mtime_elem = icon_elem.find("LastModificationTime")
2067 if mtime_elem is not None and mtime_elem.text:
2068 icon_mtime = cls._decode_time(mtime_elem.text)
2069 settings.custom_icons[icon_uuid] = CustomIcon(
2070 uuid=icon_uuid,
2071 data=icon_data,
2072 name=icon_name,
2073 last_modification_time=icon_mtime,
2074 )
2075 except (binascii.Error, ValueError):
2076 pass # Skip invalid icon
2078 return settings
2080 @classmethod
2081 def _parse_group(cls, elem: Element) -> Group:
2082 """Parse a Group element into a Group model."""
2083 group = Group()
2085 # UUID
2086 uuid_elem = elem.find("UUID")
2087 if uuid_elem is not None and uuid_elem.text:
2088 group.uuid = uuid_module.UUID(bytes=base64.b64decode(uuid_elem.text))
2090 # Name
2091 name_elem = elem.find("Name")
2092 if name_elem is not None:
2093 group.name = name_elem.text
2095 # Notes
2096 notes_elem = elem.find("Notes")
2097 if notes_elem is not None:
2098 group.notes = notes_elem.text
2100 # Icon
2101 icon_elem = elem.find("IconID")
2102 if icon_elem is not None and icon_elem.text:
2103 group.icon_id = icon_elem.text
2105 # Custom icon UUID
2106 custom_icon_elem = elem.find("CustomIconUUID")
2107 if custom_icon_elem is not None and custom_icon_elem.text:
2108 with contextlib.suppress(binascii.Error, ValueError):
2109 group.custom_icon_uuid = uuid_module.UUID(
2110 bytes=base64.b64decode(custom_icon_elem.text)
2111 )
2113 # Times
2114 group.times = cls._parse_times(elem.find("Times"))
2116 # Entries
2117 for entry_elem in elem.findall("Entry"):
2118 entry = cls._parse_entry(entry_elem)
2119 group.add_entry(entry)
2121 # Subgroups (recursive)
2122 for subgroup_elem in elem.findall("Group"):
2123 subgroup = cls._parse_group(subgroup_elem)
2124 group.add_subgroup(subgroup)
2126 return group
2128 @classmethod
2129 def _parse_entry(cls, elem: Element) -> Entry:
2130 """Parse an Entry element into an Entry model."""
2131 entry = Entry()
2133 # UUID
2134 uuid_elem = elem.find("UUID")
2135 if uuid_elem is not None and uuid_elem.text:
2136 entry.uuid = uuid_module.UUID(bytes=base64.b64decode(uuid_elem.text))
2138 # Icon
2139 icon_elem = elem.find("IconID")
2140 if icon_elem is not None and icon_elem.text:
2141 entry.icon_id = icon_elem.text
2143 # Custom icon UUID
2144 custom_icon_elem = elem.find("CustomIconUUID")
2145 if custom_icon_elem is not None and custom_icon_elem.text:
2146 with contextlib.suppress(binascii.Error, ValueError):
2147 entry.custom_icon_uuid = uuid_module.UUID(
2148 bytes=base64.b64decode(custom_icon_elem.text)
2149 )
2151 # Tags
2152 tags_elem = elem.find("Tags")
2153 if tags_elem is not None and tags_elem.text:
2154 tag_text = tags_elem.text.replace(",", ";")
2155 entry.tags = [t.strip() for t in tag_text.split(";") if t.strip()]
2157 # Times
2158 entry.times = cls._parse_times(elem.find("Times"))
2160 # String fields
2161 for string_elem in elem.findall("String"):
2162 key_elem = string_elem.find("Key")
2163 value_elem = string_elem.find("Value")
2164 if key_elem is not None and key_elem.text:
2165 key = key_elem.text
2166 value = value_elem.text if value_elem is not None else None
2167 protected = value_elem is not None and value_elem.get("Protected") == "True"
2168 entry.strings[key] = StringField(key=key, value=value, protected=protected)
2170 # Binary references
2171 for binary_elem in elem.findall("Binary"):
2172 key_elem = binary_elem.find("Key")
2173 value_elem = binary_elem.find("Value")
2174 if key_elem is not None and key_elem.text and value_elem is not None:
2175 ref = value_elem.get("Ref")
2176 if ref is not None:
2177 entry.binaries.append(BinaryRef(key=key_elem.text, ref=int(ref)))
2179 # AutoType
2180 at_elem = elem.find("AutoType")
2181 if at_elem is not None:
2182 enabled_elem = at_elem.find("Enabled")
2183 seq_elem = at_elem.find("DefaultSequence")
2184 obf_elem = at_elem.find("DataTransferObfuscation")
2186 entry.autotype = AutoType(
2187 enabled=enabled_elem is not None and enabled_elem.text == "True",
2188 sequence=seq_elem.text if seq_elem is not None else None,
2189 obfuscation=int(obf_elem.text) if obf_elem is not None and obf_elem.text else 0,
2190 )
2192 # Window from Association
2193 assoc_elem = at_elem.find("Association")
2194 if assoc_elem is not None:
2195 window_elem = assoc_elem.find("Window")
2196 if window_elem is not None:
2197 entry.autotype.window = window_elem.text
2199 # History
2200 history_elem = elem.find("History")
2201 if history_elem is not None:
2202 for hist_entry_elem in history_elem.findall("Entry"):
2203 hist_entry = cls._parse_entry(hist_entry_elem)
2204 history_entry = HistoryEntry.from_entry(hist_entry)
2205 entry.history.append(history_entry)
2207 return entry
2209 @classmethod
2210 def _parse_times(cls, times_elem: Element | None) -> Times:
2211 """Parse Times element into Times model."""
2212 times = Times.create_new()
2214 if times_elem is None:
2215 return times
2217 def parse_time(tag: str) -> datetime | None:
2218 elem = times_elem.find(tag)
2219 if elem is not None and elem.text:
2220 return cls._decode_time(elem.text)
2221 return None
2223 if ct := parse_time("CreationTime"):
2224 times.creation_time = ct
2225 if mt := parse_time("LastModificationTime"):
2226 times.last_modification_time = mt
2227 if at := parse_time("LastAccessTime"):
2228 times.last_access_time = at
2229 if et := parse_time("ExpiryTime"):
2230 times.expiry_time = et
2231 if lc := parse_time("LocationChanged"):
2232 times.location_changed = lc
2234 expires_elem = times_elem.find("Expires")
2235 if expires_elem is not None:
2236 times.expires = expires_elem.text == "True"
2238 usage_elem = times_elem.find("UsageCount")
2239 if usage_elem is not None and usage_elem.text:
2240 times.usage_count = int(usage_elem.text)
2242 return times
2244 @classmethod
2245 def _decode_time(cls, time_str: str) -> datetime:
2246 """Decode KDBX time string to datetime.
2248 KDBX4 uses base64-encoded binary timestamps or ISO format.
2249 """
2250 # Try base64 binary format first (KDBX4)
2251 # Base64 strings don't contain - or : which are present in ISO dates
2252 if "-" not in time_str and ":" not in time_str:
2253 try:
2254 binary = base64.b64decode(time_str)
2255 if len(binary) == 8: # int64 = 8 bytes
2256 # KDBX4 stores seconds since 0001-01-01 as int64
2257 import struct
2259 seconds = struct.unpack("<q", binary)[0]
2260 # Convert to datetime (epoch is 0001-01-01)
2261 base = datetime(1, 1, 1, tzinfo=UTC)
2262 return base + timedelta(seconds=seconds)
2263 except (ValueError, struct.error):
2264 pass # Not valid base64 or wrong size
2266 # Try ISO format
2267 try:
2268 return datetime.strptime(time_str, KDBX4_TIME_FORMAT).replace(tzinfo=UTC)
2269 except ValueError:
2270 pass
2272 # Fallback: try without timezone
2273 try:
2274 return datetime.fromisoformat(time_str.replace("Z", "+00:00"))
2275 except ValueError:
2276 return datetime.now(UTC)
2278 @classmethod
2279 def _encode_time(cls, dt: datetime) -> str:
2280 """Encode datetime to ISO 8601 format for KDBX4.
2282 Uses ISO 8601 format (e.g., 2025-01-15T10:30:45Z) which is
2283 human-readable and compatible with KeePassXC.
2284 """
2285 # Ensure UTC timezone
2286 if dt.tzinfo is None:
2287 dt = dt.replace(tzinfo=UTC)
2288 return dt.strftime(KDBX4_TIME_FORMAT)
2290 # --- XML building ---
2292 def _build_xml(self) -> bytes:
2293 """Build KDBX XML from models."""
2294 root = Element("KeePassFile")
2296 # Meta section
2297 meta = SubElement(root, "Meta")
2298 self._build_meta(meta)
2300 # Root section
2301 root_elem = SubElement(root, "Root")
2302 self._build_group(root_elem, self._root_group)
2304 # Encrypt protected values before serializing
2305 if self._inner_header is not None:
2306 self._encrypt_protected_values(root, self._inner_header)
2308 # Serialize to bytes (tostring returns bytes when encoding is specified)
2309 return cast(bytes, tostring(root, encoding="utf-8", xml_declaration=True))
2311 def _encrypt_protected_values(self, root: Element, inner_header: InnerHeader) -> None:
2312 """Encrypt all protected values in the XML tree in document order.
2314 Protected values are XOR'd with a stream cipher and base64 encoded.
2315 This method encrypts them in-place.
2316 """
2317 cipher = ProtectedStreamCipher(
2318 inner_header.random_stream_id,
2319 inner_header.random_stream_key,
2320 )
2322 # Find all Value elements with Protected="True" in document order
2323 for elem in root.iter("Value"):
2324 if elem.get("Protected") == "True":
2325 plaintext = (elem.text or "").encode("utf-8")
2326 ciphertext = cipher.encrypt(plaintext)
2327 elem.text = base64.b64encode(ciphertext).decode("ascii")
2329 def _build_meta(self, meta: Element) -> None:
2330 """Build Meta element from settings."""
2331 s = self._settings
2333 SubElement(meta, "Generator").text = s.generator
2334 SubElement(meta, "DatabaseName").text = s.database_name
2335 if s.database_description:
2336 SubElement(meta, "DatabaseDescription").text = s.database_description
2337 if s.default_username:
2338 SubElement(meta, "DefaultUserName").text = s.default_username
2340 SubElement(meta, "MaintenanceHistoryDays").text = str(s.maintenance_history_days)
2341 SubElement(meta, "MasterKeyChangeRec").text = str(s.master_key_change_rec)
2342 SubElement(meta, "MasterKeyChangeForce").text = str(s.master_key_change_force)
2344 # Memory protection
2345 mp = SubElement(meta, "MemoryProtection")
2346 for field_name, is_protected in s.memory_protection.items():
2347 SubElement(mp, f"Protect{field_name}").text = str(is_protected)
2349 SubElement(meta, "RecycleBinEnabled").text = str(s.recycle_bin_enabled)
2350 if s.recycle_bin_uuid:
2351 SubElement(meta, "RecycleBinUUID").text = base64.b64encode(
2352 s.recycle_bin_uuid.bytes
2353 ).decode("ascii")
2354 else:
2355 # Empty UUID
2356 SubElement(meta, "RecycleBinUUID").text = base64.b64encode(b"\x00" * 16).decode("ascii")
2358 SubElement(meta, "HistoryMaxItems").text = str(s.history_max_items)
2359 SubElement(meta, "HistoryMaxSize").text = str(s.history_max_size)
2361 # Custom icons
2362 if s.custom_icons:
2363 custom_icons_elem = SubElement(meta, "CustomIcons")
2364 for icon in s.custom_icons.values():
2365 icon_elem = SubElement(custom_icons_elem, "Icon")
2366 SubElement(icon_elem, "UUID").text = base64.b64encode(icon.uuid.bytes).decode(
2367 "ascii"
2368 )
2369 SubElement(icon_elem, "Data").text = base64.b64encode(icon.data).decode("ascii")
2370 if icon.name:
2371 SubElement(icon_elem, "Name").text = icon.name
2372 if icon.last_modification_time:
2373 SubElement(icon_elem, "LastModificationTime").text = self._encode_time(
2374 icon.last_modification_time
2375 )
2377 def _build_group(self, parent: Element, group: Group) -> None:
2378 """Build Group element from Group model."""
2379 elem = SubElement(parent, "Group")
2381 SubElement(elem, "UUID").text = base64.b64encode(group.uuid.bytes).decode("ascii")
2382 SubElement(elem, "Name").text = group.name or ""
2383 if group.notes:
2384 SubElement(elem, "Notes").text = group.notes
2385 SubElement(elem, "IconID").text = group.icon_id
2386 if group.custom_icon_uuid:
2387 SubElement(elem, "CustomIconUUID").text = base64.b64encode(
2388 group.custom_icon_uuid.bytes
2389 ).decode("ascii")
2391 self._build_times(elem, group.times)
2393 SubElement(elem, "IsExpanded").text = str(group.is_expanded)
2395 if group.default_autotype_sequence:
2396 SubElement(elem, "DefaultAutoTypeSequence").text = group.default_autotype_sequence
2397 if group.enable_autotype is not None:
2398 SubElement(elem, "EnableAutoType").text = str(group.enable_autotype)
2399 if group.enable_searching is not None:
2400 SubElement(elem, "EnableSearching").text = str(group.enable_searching)
2402 SubElement(elem, "LastTopVisibleEntry").text = base64.b64encode(
2403 (group.last_top_visible_entry or uuid_module.UUID(int=0)).bytes
2404 ).decode("ascii")
2406 # Entries
2407 for entry in group.entries:
2408 self._build_entry(elem, entry)
2410 # Subgroups (recursive)
2411 for subgroup in group.subgroups:
2412 self._build_group(elem, subgroup)
2414 def _build_entry(self, parent: Element, entry: Entry) -> None:
2415 """Build Entry element from Entry model."""
2416 elem = SubElement(parent, "Entry")
2418 SubElement(elem, "UUID").text = base64.b64encode(entry.uuid.bytes).decode("ascii")
2419 SubElement(elem, "IconID").text = entry.icon_id
2420 if entry.custom_icon_uuid:
2421 SubElement(elem, "CustomIconUUID").text = base64.b64encode(
2422 entry.custom_icon_uuid.bytes
2423 ).decode("ascii")
2425 if entry.foreground_color:
2426 SubElement(elem, "ForegroundColor").text = entry.foreground_color
2427 if entry.background_color:
2428 SubElement(elem, "BackgroundColor").text = entry.background_color
2429 if entry.override_url:
2430 SubElement(elem, "OverrideURL").text = entry.override_url
2432 if entry.tags:
2433 SubElement(elem, "Tags").text = ";".join(entry.tags)
2435 self._build_times(elem, entry.times)
2437 # String fields - apply memory protection policy from database settings
2438 for key, string_field in entry.strings.items():
2439 string_elem = SubElement(elem, "String")
2440 SubElement(string_elem, "Key").text = key
2441 value_elem = SubElement(string_elem, "Value")
2442 value_elem.text = string_field.value or ""
2443 # Use database memory_protection policy for standard fields,
2444 # fall back to string_field.protected for custom fields
2445 if key in self._settings.memory_protection:
2446 should_protect = self._settings.memory_protection[key]
2447 else:
2448 should_protect = string_field.protected
2449 if should_protect:
2450 value_elem.set("Protected", "True")
2452 # Binary references
2453 for binary_ref in entry.binaries:
2454 binary_elem = SubElement(elem, "Binary")
2455 SubElement(binary_elem, "Key").text = binary_ref.key
2456 value_elem = SubElement(binary_elem, "Value")
2457 value_elem.set("Ref", str(binary_ref.ref))
2459 # AutoType
2460 at = entry.autotype
2461 at_elem = SubElement(elem, "AutoType")
2462 SubElement(at_elem, "Enabled").text = str(at.enabled)
2463 SubElement(at_elem, "DataTransferObfuscation").text = str(at.obfuscation)
2464 SubElement(at_elem, "DefaultSequence").text = at.sequence or ""
2466 if at.window:
2467 assoc = SubElement(at_elem, "Association")
2468 SubElement(assoc, "Window").text = at.window
2469 SubElement(assoc, "KeystrokeSequence").text = ""
2471 # History
2472 if entry.history:
2473 history_elem = SubElement(elem, "History")
2474 for hist_entry in entry.history:
2475 self._build_entry(history_elem, hist_entry)
2477 def _build_times(self, parent: Element, times: Times) -> None:
2478 """Build Times element from Times model."""
2479 elem = SubElement(parent, "Times")
2481 SubElement(elem, "CreationTime").text = self._encode_time(times.creation_time)
2482 SubElement(elem, "LastModificationTime").text = self._encode_time(
2483 times.last_modification_time
2484 )
2485 SubElement(elem, "LastAccessTime").text = self._encode_time(times.last_access_time)
2486 if times.expiry_time:
2487 SubElement(elem, "ExpiryTime").text = self._encode_time(times.expiry_time)
2488 else:
2489 SubElement(elem, "ExpiryTime").text = self._encode_time(times.creation_time)
2490 SubElement(elem, "Expires").text = str(times.expires)
2491 SubElement(elem, "UsageCount").text = str(times.usage_count)
2492 if times.location_changed:
2493 SubElement(elem, "LocationChanged").text = self._encode_time(times.location_changed)
2495 def __str__(self) -> str:
2496 entry_count = sum(1 for _ in self.iter_entries())
2497 group_count = sum(1 for _ in self.iter_groups())
2498 name = self._settings.database_name
2499 return f'Database: "{name}" ({entry_count} entries, {group_count} groups)'