Coverage for src / kdbxtool / merge.py: 84%
293 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-19 21:22 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-19 21:22 +0000
1"""Database merge functionality for combining KDBX databases.
3This module provides the Merger class for merging two KeePass databases
4following a UUID-based matching and timestamp-based conflict resolution
5algorithm similar to KeePassXC.
7Example:
8 >>> from kdbxtool import Database
9 >>> target = Database.open("main.kdbx", password="secret")
10 >>> source = Database.open("branch.kdbx", password="secret")
11 >>> result = target.merge(source)
12 >>> print(f"Added {result.entries_added} entries")
13 >>> target.save()
14"""
16from __future__ import annotations
18import copy
19import hashlib
20import uuid as uuid_module
21from dataclasses import dataclass
22from datetime import datetime
23from enum import Enum, auto
24from typing import TYPE_CHECKING
26from .models.entry import BinaryRef, Entry, HistoryEntry, StringField
27from .models.group import Group
28from .models.times import Times
30if TYPE_CHECKING:
31 from .database import Database
34class MergeMode(Enum):
35 """Merge mode determining how conflicts and deletions are handled.
37 Attributes:
38 STANDARD: Add and update entries/groups from source. Does not delete
39 anything from target. This is the default and safest mode.
40 SYNCHRONIZE: Full bidirectional sync including deletions. Items deleted
41 in source (tracked in DeletedObjects) will be deleted from target
42 if they haven't been modified after the deletion time.
43 """
45 STANDARD = auto()
46 SYNCHRONIZE = auto()
49@dataclass
50class DeletedObject:
51 """Record of a deleted entry or group.
53 Used in SYNCHRONIZE mode to track deletions that should propagate
54 to other databases during merge.
56 Attributes:
57 uuid: UUID of the deleted entry or group
58 deletion_time: When the deletion occurred
59 """
61 uuid: uuid_module.UUID
62 deletion_time: datetime
65@dataclass
66class MergeResult:
67 """Result of a database merge operation.
69 Contains detailed statistics about what was changed during the merge.
71 Attributes:
72 entries_added: Number of new entries added from source
73 entries_updated: Number of existing entries updated (source was newer)
74 entries_relocated: Number of entries moved to different groups
75 entries_deleted: Number of entries deleted (SYNCHRONIZE mode only)
76 groups_added: Number of new groups added from source
77 groups_updated: Number of existing groups updated (source was newer)
78 groups_relocated: Number of groups moved to different parents
79 groups_deleted: Number of groups deleted (SYNCHRONIZE mode only)
80 history_entries_merged: Number of history entries added
81 binaries_added: Number of new binary attachments added
82 custom_icons_added: Number of new custom icons added
83 """
85 entries_added: int = 0
86 entries_updated: int = 0
87 entries_relocated: int = 0
88 entries_deleted: int = 0
89 groups_added: int = 0
90 groups_updated: int = 0
91 groups_relocated: int = 0
92 groups_deleted: int = 0
93 history_entries_merged: int = 0
94 binaries_added: int = 0
95 custom_icons_added: int = 0
97 @property
98 def has_changes(self) -> bool:
99 """Check if any changes were made during the merge."""
100 return any(
101 [
102 self.entries_added,
103 self.entries_updated,
104 self.entries_relocated,
105 self.entries_deleted,
106 self.groups_added,
107 self.groups_updated,
108 self.groups_relocated,
109 self.groups_deleted,
110 self.binaries_added,
111 self.custom_icons_added,
112 ]
113 )
115 @property
116 def total_changes(self) -> int:
117 """Total number of changes made."""
118 return (
119 self.entries_added
120 + self.entries_updated
121 + self.entries_relocated
122 + self.entries_deleted
123 + self.groups_added
124 + self.groups_updated
125 + self.groups_relocated
126 + self.groups_deleted
127 )
129 def summary(self) -> str:
130 """Get a human-readable summary of the merge result."""
131 lines = []
132 if self.entries_added:
133 lines.append(f"Added {self.entries_added} entries")
134 if self.entries_updated:
135 lines.append(f"Updated {self.entries_updated} entries")
136 if self.entries_relocated:
137 lines.append(f"Relocated {self.entries_relocated} entries")
138 if self.entries_deleted:
139 lines.append(f"Deleted {self.entries_deleted} entries")
140 if self.groups_added:
141 lines.append(f"Added {self.groups_added} groups")
142 if self.groups_updated:
143 lines.append(f"Updated {self.groups_updated} groups")
144 if self.groups_relocated:
145 lines.append(f"Relocated {self.groups_relocated} groups")
146 if self.groups_deleted:
147 lines.append(f"Deleted {self.groups_deleted} groups")
148 if self.history_entries_merged:
149 lines.append(f"Merged {self.history_entries_merged} history entries")
150 if self.binaries_added:
151 lines.append(f"Added {self.binaries_added} attachments")
152 if self.custom_icons_added:
153 lines.append(f"Added {self.custom_icons_added} custom icons")
154 return "\n".join(lines) if lines else "No changes"
157class Merger:
158 """Merges two KDBX databases with configurable conflict resolution.
160 The merge algorithm follows these principles:
161 1. UUID-based matching: Entries and groups are matched by UUID
162 2. Timestamp-based resolution: Newer last_modification_time wins
163 3. History preservation: Losing versions are preserved in history
164 4. Location tracking: Entry moves are tracked via location_changed
166 Example:
167 >>> merger = Merger(target_db, source_db)
168 >>> result = merger.merge()
169 >>> print(result.summary())
170 """
172 def __init__(
173 self,
174 target: Database,
175 source: Database,
176 *,
177 mode: MergeMode = MergeMode.STANDARD,
178 ) -> None:
179 """Initialize the merger.
181 Args:
182 target: Database to merge into (will be modified)
183 source: Database to merge from (read-only)
184 mode: Merge mode (STANDARD or SYNCHRONIZE)
185 """
186 self._target = target
187 self._source = source
188 self._mode = mode
189 self._result = MergeResult()
190 self._binary_remap: dict[int, int] = {}
192 def merge(self) -> MergeResult:
193 """Execute the merge operation.
195 Returns:
196 MergeResult with statistics about the merge
198 Raises:
199 MergeError: If merge fails
200 """
201 # Phase 1: Merge custom icons
202 self._merge_custom_icons()
204 # Phase 2: Merge binaries (must be done before entries to get remapping)
205 self._merge_binaries()
207 # Phase 3: Merge groups (structure first)
208 self._merge_groups_recursive(
209 self._target.root_group,
210 self._source.root_group,
211 )
213 # Phase 4: Merge entries
214 self._merge_entries()
216 # Phase 5: Handle location changes
217 self._merge_locations()
219 # Phase 6: Apply deletions (SYNCHRONIZE mode only)
220 if self._mode == MergeMode.SYNCHRONIZE:
221 self._apply_deletions()
223 return self._result
225 # --- Custom Icons ---
227 def _merge_custom_icons(self) -> None:
228 """Merge custom icons from source to target."""
229 for source_uuid, source_icon in self._source.custom_icons.items():
230 if source_uuid in self._target.custom_icons:
231 # Icon exists - check if source is newer
232 target_icon = self._target.custom_icons[source_uuid]
233 source_mtime = source_icon.last_modification_time
234 target_mtime = target_icon.last_modification_time
235 if source_mtime and target_mtime and source_mtime > target_mtime:
236 # Source is newer, update target
237 self._target._settings.custom_icons[source_uuid] = copy.deepcopy(source_icon)
238 else:
239 # New icon, add to target
240 self._target._settings.custom_icons[source_uuid] = copy.deepcopy(source_icon)
241 self._result.custom_icons_added += 1
243 # --- Binaries ---
245 def _merge_binaries(self) -> None:
246 """Merge binary attachments, deduplicating by content hash."""
247 # Build hash -> ref map for target binaries
248 target_hashes: dict[bytes, int] = {}
249 for ref, data in self._target._binaries.items():
250 content_hash = hashlib.sha256(data).digest()
251 target_hashes[content_hash] = ref
253 # Process source binaries
254 for source_ref, source_data in self._source._binaries.items():
255 content_hash = hashlib.sha256(source_data).digest()
256 if content_hash in target_hashes:
257 # Duplicate content, reuse existing ref
258 self._binary_remap[source_ref] = target_hashes[content_hash]
259 else:
260 # New binary, add to target
261 new_ref = self._target.add_binary(source_data)
262 self._binary_remap[source_ref] = new_ref
263 target_hashes[content_hash] = new_ref
264 self._result.binaries_added += 1
266 def _remap_binary_refs(self, entry: Entry) -> None:
267 """Remap binary references in an entry after merge."""
268 for binary_ref in entry.binaries:
269 if binary_ref.ref in self._binary_remap:
270 binary_ref.ref = self._binary_remap[binary_ref.ref]
272 # --- Groups ---
274 def _merge_groups_recursive(
275 self,
276 target_group: Group,
277 source_group: Group,
278 ) -> None:
279 """Recursively merge source group tree into target."""
280 for source_subgroup in source_group.subgroups:
281 # Skip recycle bin
282 if self._is_recycle_bin(source_subgroup, self._source):
283 continue
285 # Find matching group in target
286 target_subgroup = self._find_group_in_children(target_group, source_subgroup.uuid)
288 if target_subgroup is None:
289 # New group - clone and add
290 new_group = self._clone_group(source_subgroup, recursive=True)
291 target_group.add_subgroup(new_group)
292 # Count all groups and entries in new group tree
293 self._result.groups_added += 1
294 for _ in new_group.iter_groups(recursive=True):
295 self._result.groups_added += 1
296 for _ in new_group.iter_entries(recursive=True):
297 self._result.entries_added += 1
298 else:
299 # Existing group - update if source is newer
300 if self._is_source_newer(target_subgroup.times, source_subgroup.times):
301 self._update_group_metadata(target_subgroup, source_subgroup)
302 self._result.groups_updated += 1
304 # Recurse into subgroups
305 self._merge_groups_recursive(target_subgroup, source_subgroup)
307 def _find_group_in_children(self, parent: Group, uuid: uuid_module.UUID) -> Group | None:
308 """Find a group by UUID in parent's children (non-recursive)."""
309 for subgroup in parent.subgroups:
310 if subgroup.uuid == uuid:
311 return subgroup
312 # Also check globally in case of restructuring
313 return self._target.root_group.find_group_by_uuid(uuid, recursive=True)
315 def _clone_group(self, group: Group, recursive: bool = False) -> Group:
316 """Create a deep copy of a group."""
317 new_group = Group(
318 uuid=group.uuid,
319 name=group.name,
320 notes=group.notes,
321 times=copy.deepcopy(group.times),
322 icon_id=group.icon_id,
323 custom_icon_uuid=group.custom_icon_uuid,
324 is_expanded=group.is_expanded,
325 default_autotype_sequence=group.default_autotype_sequence,
326 enable_autotype=group.enable_autotype,
327 enable_searching=group.enable_searching,
328 last_top_visible_entry=group.last_top_visible_entry,
329 )
331 if recursive:
332 for entry in group.entries:
333 cloned_entry = self._clone_entry(entry)
334 new_group.add_entry(cloned_entry)
335 for subgroup in group.subgroups:
336 if not self._is_recycle_bin(subgroup, self._source):
337 cloned_subgroup = self._clone_group(subgroup, recursive=True)
338 new_group.add_subgroup(cloned_subgroup)
340 return new_group
342 def _update_group_metadata(self, target: Group, source: Group) -> None:
343 """Update target group metadata from source."""
344 target.name = source.name
345 target.notes = source.notes
346 target.icon_id = source.icon_id
347 target.custom_icon_uuid = source.custom_icon_uuid
348 target.is_expanded = source.is_expanded
349 target.default_autotype_sequence = source.default_autotype_sequence
350 target.enable_autotype = source.enable_autotype
351 target.enable_searching = source.enable_searching
352 target.times = copy.deepcopy(source.times)
354 # --- Entries ---
356 def _merge_entries(self) -> None:
357 """Merge all entries from source to target."""
358 for source_entry in self._source.root_group.iter_entries(recursive=True):
359 # Skip entries in recycle bin
360 if self._is_in_recycle_bin(source_entry, self._source):
361 continue
363 target_entry = self._target.root_group.find_entry_by_uuid(source_entry.uuid)
365 if target_entry is None:
366 # New entry - add to target
367 self._add_new_entry(source_entry)
368 else:
369 # Existing entry - merge
370 self._merge_entry(target_entry, source_entry)
372 def _add_new_entry(self, source_entry: Entry) -> None:
373 """Add a new entry from source to target."""
374 # Find or create parent group
375 source_parent = source_entry.parent
376 target_parent: Group
377 if source_parent is None:
378 target_parent = self._target.root_group
379 else:
380 found_parent = self._target.root_group.find_group_by_uuid(source_parent.uuid)
381 if found_parent is None:
382 # Parent group doesn't exist, create group path
383 target_parent = self._ensure_group_path(source_parent)
384 else:
385 target_parent = found_parent
387 # Clone entry and add
388 new_entry = self._clone_entry(source_entry)
389 target_parent.add_entry(new_entry)
390 self._result.entries_added += 1
392 def _merge_entry(self, target: Entry, source: Entry) -> None:
393 """Merge source entry into target entry."""
394 source_mtime = source.times.last_modification_time
395 target_mtime = target.times.last_modification_time
397 if source_mtime > target_mtime:
398 # Source is newer - update target, preserve old in history
399 target.save_history()
400 self._copy_entry_fields(source, target)
401 self._merge_entry_history(target, source)
402 self._result.entries_updated += 1
403 elif source_mtime < target_mtime:
404 # Target is newer - add source to target's history
405 history_entry = HistoryEntry.from_entry(source)
406 target.history.append(history_entry)
407 self._result.history_entries_merged += 1
408 else:
409 # Same timestamp - merge history only
410 self._merge_entry_history(target, source)
412 def _clone_entry(self, entry: Entry) -> Entry:
413 """Create a deep copy of an entry with UUID preserved."""
414 new_entry = Entry(
415 uuid=entry.uuid,
416 times=copy.deepcopy(entry.times),
417 icon_id=entry.icon_id,
418 custom_icon_uuid=entry.custom_icon_uuid,
419 tags=list(entry.tags),
420 strings={k: StringField(k, v.value, v.protected) for k, v in entry.strings.items()},
421 binaries=[
422 BinaryRef(b.key, self._binary_remap.get(b.ref, b.ref)) for b in entry.binaries
423 ],
424 autotype=copy.deepcopy(entry.autotype),
425 history=[HistoryEntry.from_entry(h) for h in entry.history],
426 foreground_color=entry.foreground_color,
427 background_color=entry.background_color,
428 override_url=entry.override_url,
429 quality_check=entry.quality_check,
430 )
431 return new_entry
433 def _copy_entry_fields(self, source: Entry, target: Entry) -> None:
434 """Copy all fields from source to target, preserving target's UUID."""
435 target.times = copy.deepcopy(source.times)
436 target.icon_id = source.icon_id
437 target.custom_icon_uuid = source.custom_icon_uuid
438 target.tags = list(source.tags)
439 target.strings = {
440 k: StringField(k, v.value, v.protected) for k, v in source.strings.items()
441 }
442 target.binaries = [
443 BinaryRef(b.key, self._binary_remap.get(b.ref, b.ref)) for b in source.binaries
444 ]
445 target.autotype = copy.deepcopy(source.autotype)
446 target.foreground_color = source.foreground_color
447 target.background_color = source.background_color
448 target.override_url = source.override_url
449 target.quality_check = source.quality_check
451 # --- History ---
453 def _merge_entry_history(self, target: Entry, source: Entry) -> None:
454 """Merge history entries, deduplicating by modification time."""
455 # Build map of existing history timestamps
456 existing_times: set[datetime] = {h.times.last_modification_time for h in target.history}
458 # Add source history entries not already present
459 for hist in source.history:
460 if hist.times.last_modification_time not in existing_times:
461 cloned_hist = HistoryEntry.from_entry(hist)
462 # Remap binary refs in history
463 for binary_ref in cloned_hist.binaries:
464 if binary_ref.ref in self._binary_remap:
465 binary_ref.ref = self._binary_remap[binary_ref.ref]
466 target.history.append(cloned_hist)
467 existing_times.add(hist.times.last_modification_time)
468 self._result.history_entries_merged += 1
470 # Sort history by modification time
471 target.history.sort(key=lambda h: h.times.last_modification_time)
473 # Respect history_max_items if set
474 max_items = self._target._settings.history_max_items
475 if max_items >= 0 and len(target.history) > max_items:
476 target.history = target.history[-max_items:]
478 # --- Location Changes ---
480 def _merge_locations(self) -> None:
481 """Handle entry location changes based on location_changed timestamps."""
482 for source_entry in self._source.root_group.iter_entries(recursive=True):
483 if self._is_in_recycle_bin(source_entry, self._source):
484 continue
486 target_entry = self._target.root_group.find_entry_by_uuid(source_entry.uuid)
487 if target_entry is None:
488 continue
490 source_loc_time = source_entry.times.location_changed
491 target_loc_time = target_entry.times.location_changed
493 if source_loc_time and target_loc_time and source_loc_time > target_loc_time:
494 # Source location is newer - move entry
495 source_parent = source_entry.parent
496 if source_parent is not None:
497 target_parent = self._target.root_group.find_group_by_uuid(source_parent.uuid)
498 if target_parent is not None and target_entry.parent is not target_parent:
499 try:
500 target_entry.move_to(target_parent)
501 # Restore source location_changed time
502 target_entry.times.location_changed = source_loc_time
503 self._result.entries_relocated += 1
504 except ValueError:
505 # Move failed (e.g., already in destination)
506 pass
508 # Also handle group location changes
509 for source_group in self._source.root_group.iter_groups(recursive=True):
510 if self._is_recycle_bin(source_group, self._source):
511 continue
513 target_group = self._target.root_group.find_group_by_uuid(source_group.uuid)
514 if target_group is None or target_group.is_root_group:
515 continue
517 source_loc_time = source_group.times.location_changed
518 target_loc_time = target_group.times.location_changed
520 if source_loc_time and target_loc_time and source_loc_time > target_loc_time:
521 source_parent = source_group.parent
522 if source_parent is not None:
523 target_parent = self._target.root_group.find_group_by_uuid(source_parent.uuid)
524 if target_parent is not None and target_group.parent is not target_parent:
525 try:
526 target_group.move_to(target_parent)
527 target_group.times.location_changed = source_loc_time
528 self._result.groups_relocated += 1
529 except ValueError:
530 pass
532 # --- Deletions (SYNCHRONIZE mode) ---
534 def _apply_deletions(self) -> None:
535 """Apply deletions from source in SYNCHRONIZE mode."""
536 for deleted in self._source._settings.deleted_objects:
537 # Try as entry first
538 target_entry = self._target.root_group.find_entry_by_uuid(deleted.uuid)
539 if (
540 target_entry is not None
541 and target_entry.times.last_modification_time <= deleted.deletion_time
542 and target_entry.parent is not None
543 ):
544 target_entry.parent.remove_entry(target_entry)
545 self._result.entries_deleted += 1
546 continue
548 # Try as group
549 target_group = self._target.root_group.find_group_by_uuid(deleted.uuid)
550 if (
551 target_group is not None
552 and not target_group.is_root_group
553 and target_group.times.last_modification_time <= deleted.deletion_time
554 and target_group.parent is not None
555 ):
556 target_group.parent.remove_subgroup(target_group)
557 self._result.groups_deleted += 1
559 # Merge deleted objects lists
560 target_deleted_uuids = {d.uuid for d in self._target._settings.deleted_objects}
561 for deleted in self._source._settings.deleted_objects:
562 if deleted.uuid not in target_deleted_uuids:
563 self._target._settings.deleted_objects.append(
564 DeletedObject(uuid=deleted.uuid, deletion_time=deleted.deletion_time)
565 )
567 # --- Helpers ---
569 def _is_recycle_bin(self, group: Group, db: Database) -> bool:
570 """Check if group is the recycle bin."""
571 recycle_bin = db.recyclebin_group
572 return recycle_bin is not None and group.uuid == recycle_bin.uuid
574 def _is_in_recycle_bin(self, entry: Entry, db: Database) -> bool:
575 """Check if entry is in the recycle bin."""
576 recycle_bin = db.recyclebin_group
577 if recycle_bin is None:
578 return False
579 current = entry.parent
580 while current is not None:
581 if current.uuid == recycle_bin.uuid:
582 return True
583 current = current.parent
584 return False
586 def _is_source_newer(self, target_times: Times, source_times: Times) -> bool:
587 """Check if source has a newer modification time."""
588 return source_times.last_modification_time > target_times.last_modification_time
590 def _ensure_group_path(self, source_group: Group) -> Group:
591 """Ensure the group path exists in target, creating groups as needed."""
592 path = source_group.path
593 current = self._target.root_group
595 for name in path:
596 found = None
597 for subgroup in current.subgroups:
598 if subgroup.name == name:
599 found = subgroup
600 break
601 if found is None:
602 found = current.create_subgroup(name)
603 self._result.groups_added += 1
604 current = found
606 return current