Coverage for src / kdbxtool / merge.py: 84%

293 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-19 21:22 +0000

1"""Database merge functionality for combining KDBX databases. 

2 

3This module provides the Merger class for merging two KeePass databases 

4following a UUID-based matching and timestamp-based conflict resolution 

5algorithm similar to KeePassXC. 

6 

7Example: 

8 >>> from kdbxtool import Database 

9 >>> target = Database.open("main.kdbx", password="secret") 

10 >>> source = Database.open("branch.kdbx", password="secret") 

11 >>> result = target.merge(source) 

12 >>> print(f"Added {result.entries_added} entries") 

13 >>> target.save() 

14""" 

15 

16from __future__ import annotations 

17 

18import copy 

19import hashlib 

20import uuid as uuid_module 

21from dataclasses import dataclass 

22from datetime import datetime 

23from enum import Enum, auto 

24from typing import TYPE_CHECKING 

25 

26from .models.entry import BinaryRef, Entry, HistoryEntry, StringField 

27from .models.group import Group 

28from .models.times import Times 

29 

30if TYPE_CHECKING: 

31 from .database import Database 

32 

33 

34class MergeMode(Enum): 

35 """Merge mode determining how conflicts and deletions are handled. 

36 

37 Attributes: 

38 STANDARD: Add and update entries/groups from source. Does not delete 

39 anything from target. This is the default and safest mode. 

40 SYNCHRONIZE: Full bidirectional sync including deletions. Items deleted 

41 in source (tracked in DeletedObjects) will be deleted from target 

42 if they haven't been modified after the deletion time. 

43 """ 

44 

45 STANDARD = auto() 

46 SYNCHRONIZE = auto() 

47 

48 

49@dataclass 

50class DeletedObject: 

51 """Record of a deleted entry or group. 

52 

53 Used in SYNCHRONIZE mode to track deletions that should propagate 

54 to other databases during merge. 

55 

56 Attributes: 

57 uuid: UUID of the deleted entry or group 

58 deletion_time: When the deletion occurred 

59 """ 

60 

61 uuid: uuid_module.UUID 

62 deletion_time: datetime 

63 

64 

65@dataclass 

66class MergeResult: 

67 """Result of a database merge operation. 

68 

69 Contains detailed statistics about what was changed during the merge. 

70 

71 Attributes: 

72 entries_added: Number of new entries added from source 

73 entries_updated: Number of existing entries updated (source was newer) 

74 entries_relocated: Number of entries moved to different groups 

75 entries_deleted: Number of entries deleted (SYNCHRONIZE mode only) 

76 groups_added: Number of new groups added from source 

77 groups_updated: Number of existing groups updated (source was newer) 

78 groups_relocated: Number of groups moved to different parents 

79 groups_deleted: Number of groups deleted (SYNCHRONIZE mode only) 

80 history_entries_merged: Number of history entries added 

81 binaries_added: Number of new binary attachments added 

82 custom_icons_added: Number of new custom icons added 

83 """ 

84 

85 entries_added: int = 0 

86 entries_updated: int = 0 

87 entries_relocated: int = 0 

88 entries_deleted: int = 0 

89 groups_added: int = 0 

90 groups_updated: int = 0 

91 groups_relocated: int = 0 

92 groups_deleted: int = 0 

93 history_entries_merged: int = 0 

94 binaries_added: int = 0 

95 custom_icons_added: int = 0 

96 

97 @property 

98 def has_changes(self) -> bool: 

99 """Check if any changes were made during the merge.""" 

100 return any( 

101 [ 

102 self.entries_added, 

103 self.entries_updated, 

104 self.entries_relocated, 

105 self.entries_deleted, 

106 self.groups_added, 

107 self.groups_updated, 

108 self.groups_relocated, 

109 self.groups_deleted, 

110 self.binaries_added, 

111 self.custom_icons_added, 

112 ] 

113 ) 

114 

115 @property 

116 def total_changes(self) -> int: 

117 """Total number of changes made.""" 

118 return ( 

119 self.entries_added 

120 + self.entries_updated 

121 + self.entries_relocated 

122 + self.entries_deleted 

123 + self.groups_added 

124 + self.groups_updated 

125 + self.groups_relocated 

126 + self.groups_deleted 

127 ) 

128 

129 def summary(self) -> str: 

130 """Get a human-readable summary of the merge result.""" 

131 lines = [] 

132 if self.entries_added: 

133 lines.append(f"Added {self.entries_added} entries") 

134 if self.entries_updated: 

135 lines.append(f"Updated {self.entries_updated} entries") 

136 if self.entries_relocated: 

137 lines.append(f"Relocated {self.entries_relocated} entries") 

138 if self.entries_deleted: 

139 lines.append(f"Deleted {self.entries_deleted} entries") 

140 if self.groups_added: 

141 lines.append(f"Added {self.groups_added} groups") 

142 if self.groups_updated: 

143 lines.append(f"Updated {self.groups_updated} groups") 

144 if self.groups_relocated: 

145 lines.append(f"Relocated {self.groups_relocated} groups") 

146 if self.groups_deleted: 

147 lines.append(f"Deleted {self.groups_deleted} groups") 

148 if self.history_entries_merged: 

149 lines.append(f"Merged {self.history_entries_merged} history entries") 

150 if self.binaries_added: 

151 lines.append(f"Added {self.binaries_added} attachments") 

152 if self.custom_icons_added: 

153 lines.append(f"Added {self.custom_icons_added} custom icons") 

154 return "\n".join(lines) if lines else "No changes" 

155 

156 

157class Merger: 

158 """Merges two KDBX databases with configurable conflict resolution. 

159 

160 The merge algorithm follows these principles: 

161 1. UUID-based matching: Entries and groups are matched by UUID 

162 2. Timestamp-based resolution: Newer last_modification_time wins 

163 3. History preservation: Losing versions are preserved in history 

164 4. Location tracking: Entry moves are tracked via location_changed 

165 

166 Example: 

167 >>> merger = Merger(target_db, source_db) 

168 >>> result = merger.merge() 

169 >>> print(result.summary()) 

170 """ 

171 

172 def __init__( 

173 self, 

174 target: Database, 

175 source: Database, 

176 *, 

177 mode: MergeMode = MergeMode.STANDARD, 

178 ) -> None: 

179 """Initialize the merger. 

180 

181 Args: 

182 target: Database to merge into (will be modified) 

183 source: Database to merge from (read-only) 

184 mode: Merge mode (STANDARD or SYNCHRONIZE) 

185 """ 

186 self._target = target 

187 self._source = source 

188 self._mode = mode 

189 self._result = MergeResult() 

190 self._binary_remap: dict[int, int] = {} 

191 

192 def merge(self) -> MergeResult: 

193 """Execute the merge operation. 

194 

195 Returns: 

196 MergeResult with statistics about the merge 

197 

198 Raises: 

199 MergeError: If merge fails 

200 """ 

201 # Phase 1: Merge custom icons 

202 self._merge_custom_icons() 

203 

204 # Phase 2: Merge binaries (must be done before entries to get remapping) 

205 self._merge_binaries() 

206 

207 # Phase 3: Merge groups (structure first) 

208 self._merge_groups_recursive( 

209 self._target.root_group, 

210 self._source.root_group, 

211 ) 

212 

213 # Phase 4: Merge entries 

214 self._merge_entries() 

215 

216 # Phase 5: Handle location changes 

217 self._merge_locations() 

218 

219 # Phase 6: Apply deletions (SYNCHRONIZE mode only) 

220 if self._mode == MergeMode.SYNCHRONIZE: 

221 self._apply_deletions() 

222 

223 return self._result 

224 

225 # --- Custom Icons --- 

226 

227 def _merge_custom_icons(self) -> None: 

228 """Merge custom icons from source to target.""" 

229 for source_uuid, source_icon in self._source.custom_icons.items(): 

230 if source_uuid in self._target.custom_icons: 

231 # Icon exists - check if source is newer 

232 target_icon = self._target.custom_icons[source_uuid] 

233 source_mtime = source_icon.last_modification_time 

234 target_mtime = target_icon.last_modification_time 

235 if source_mtime and target_mtime and source_mtime > target_mtime: 

236 # Source is newer, update target 

237 self._target._settings.custom_icons[source_uuid] = copy.deepcopy(source_icon) 

238 else: 

239 # New icon, add to target 

240 self._target._settings.custom_icons[source_uuid] = copy.deepcopy(source_icon) 

241 self._result.custom_icons_added += 1 

242 

243 # --- Binaries --- 

244 

245 def _merge_binaries(self) -> None: 

246 """Merge binary attachments, deduplicating by content hash.""" 

247 # Build hash -> ref map for target binaries 

248 target_hashes: dict[bytes, int] = {} 

249 for ref, data in self._target._binaries.items(): 

250 content_hash = hashlib.sha256(data).digest() 

251 target_hashes[content_hash] = ref 

252 

253 # Process source binaries 

254 for source_ref, source_data in self._source._binaries.items(): 

255 content_hash = hashlib.sha256(source_data).digest() 

256 if content_hash in target_hashes: 

257 # Duplicate content, reuse existing ref 

258 self._binary_remap[source_ref] = target_hashes[content_hash] 

259 else: 

260 # New binary, add to target 

261 new_ref = self._target.add_binary(source_data) 

262 self._binary_remap[source_ref] = new_ref 

263 target_hashes[content_hash] = new_ref 

264 self._result.binaries_added += 1 

265 

266 def _remap_binary_refs(self, entry: Entry) -> None: 

267 """Remap binary references in an entry after merge.""" 

268 for binary_ref in entry.binaries: 

269 if binary_ref.ref in self._binary_remap: 

270 binary_ref.ref = self._binary_remap[binary_ref.ref] 

271 

272 # --- Groups --- 

273 

274 def _merge_groups_recursive( 

275 self, 

276 target_group: Group, 

277 source_group: Group, 

278 ) -> None: 

279 """Recursively merge source group tree into target.""" 

280 for source_subgroup in source_group.subgroups: 

281 # Skip recycle bin 

282 if self._is_recycle_bin(source_subgroup, self._source): 

283 continue 

284 

285 # Find matching group in target 

286 target_subgroup = self._find_group_in_children(target_group, source_subgroup.uuid) 

287 

288 if target_subgroup is None: 

289 # New group - clone and add 

290 new_group = self._clone_group(source_subgroup, recursive=True) 

291 target_group.add_subgroup(new_group) 

292 # Count all groups and entries in new group tree 

293 self._result.groups_added += 1 

294 for _ in new_group.iter_groups(recursive=True): 

295 self._result.groups_added += 1 

296 for _ in new_group.iter_entries(recursive=True): 

297 self._result.entries_added += 1 

298 else: 

299 # Existing group - update if source is newer 

300 if self._is_source_newer(target_subgroup.times, source_subgroup.times): 

301 self._update_group_metadata(target_subgroup, source_subgroup) 

302 self._result.groups_updated += 1 

303 

304 # Recurse into subgroups 

305 self._merge_groups_recursive(target_subgroup, source_subgroup) 

306 

307 def _find_group_in_children(self, parent: Group, uuid: uuid_module.UUID) -> Group | None: 

308 """Find a group by UUID in parent's children (non-recursive).""" 

309 for subgroup in parent.subgroups: 

310 if subgroup.uuid == uuid: 

311 return subgroup 

312 # Also check globally in case of restructuring 

313 return self._target.root_group.find_group_by_uuid(uuid, recursive=True) 

314 

315 def _clone_group(self, group: Group, recursive: bool = False) -> Group: 

316 """Create a deep copy of a group.""" 

317 new_group = Group( 

318 uuid=group.uuid, 

319 name=group.name, 

320 notes=group.notes, 

321 times=copy.deepcopy(group.times), 

322 icon_id=group.icon_id, 

323 custom_icon_uuid=group.custom_icon_uuid, 

324 is_expanded=group.is_expanded, 

325 default_autotype_sequence=group.default_autotype_sequence, 

326 enable_autotype=group.enable_autotype, 

327 enable_searching=group.enable_searching, 

328 last_top_visible_entry=group.last_top_visible_entry, 

329 ) 

330 

331 if recursive: 

332 for entry in group.entries: 

333 cloned_entry = self._clone_entry(entry) 

334 new_group.add_entry(cloned_entry) 

335 for subgroup in group.subgroups: 

336 if not self._is_recycle_bin(subgroup, self._source): 

337 cloned_subgroup = self._clone_group(subgroup, recursive=True) 

338 new_group.add_subgroup(cloned_subgroup) 

339 

340 return new_group 

341 

342 def _update_group_metadata(self, target: Group, source: Group) -> None: 

343 """Update target group metadata from source.""" 

344 target.name = source.name 

345 target.notes = source.notes 

346 target.icon_id = source.icon_id 

347 target.custom_icon_uuid = source.custom_icon_uuid 

348 target.is_expanded = source.is_expanded 

349 target.default_autotype_sequence = source.default_autotype_sequence 

350 target.enable_autotype = source.enable_autotype 

351 target.enable_searching = source.enable_searching 

352 target.times = copy.deepcopy(source.times) 

353 

354 # --- Entries --- 

355 

356 def _merge_entries(self) -> None: 

357 """Merge all entries from source to target.""" 

358 for source_entry in self._source.root_group.iter_entries(recursive=True): 

359 # Skip entries in recycle bin 

360 if self._is_in_recycle_bin(source_entry, self._source): 

361 continue 

362 

363 target_entry = self._target.root_group.find_entry_by_uuid(source_entry.uuid) 

364 

365 if target_entry is None: 

366 # New entry - add to target 

367 self._add_new_entry(source_entry) 

368 else: 

369 # Existing entry - merge 

370 self._merge_entry(target_entry, source_entry) 

371 

372 def _add_new_entry(self, source_entry: Entry) -> None: 

373 """Add a new entry from source to target.""" 

374 # Find or create parent group 

375 source_parent = source_entry.parent 

376 target_parent: Group 

377 if source_parent is None: 

378 target_parent = self._target.root_group 

379 else: 

380 found_parent = self._target.root_group.find_group_by_uuid(source_parent.uuid) 

381 if found_parent is None: 

382 # Parent group doesn't exist, create group path 

383 target_parent = self._ensure_group_path(source_parent) 

384 else: 

385 target_parent = found_parent 

386 

387 # Clone entry and add 

388 new_entry = self._clone_entry(source_entry) 

389 target_parent.add_entry(new_entry) 

390 self._result.entries_added += 1 

391 

392 def _merge_entry(self, target: Entry, source: Entry) -> None: 

393 """Merge source entry into target entry.""" 

394 source_mtime = source.times.last_modification_time 

395 target_mtime = target.times.last_modification_time 

396 

397 if source_mtime > target_mtime: 

398 # Source is newer - update target, preserve old in history 

399 target.save_history() 

400 self._copy_entry_fields(source, target) 

401 self._merge_entry_history(target, source) 

402 self._result.entries_updated += 1 

403 elif source_mtime < target_mtime: 

404 # Target is newer - add source to target's history 

405 history_entry = HistoryEntry.from_entry(source) 

406 target.history.append(history_entry) 

407 self._result.history_entries_merged += 1 

408 else: 

409 # Same timestamp - merge history only 

410 self._merge_entry_history(target, source) 

411 

412 def _clone_entry(self, entry: Entry) -> Entry: 

413 """Create a deep copy of an entry with UUID preserved.""" 

414 new_entry = Entry( 

415 uuid=entry.uuid, 

416 times=copy.deepcopy(entry.times), 

417 icon_id=entry.icon_id, 

418 custom_icon_uuid=entry.custom_icon_uuid, 

419 tags=list(entry.tags), 

420 strings={k: StringField(k, v.value, v.protected) for k, v in entry.strings.items()}, 

421 binaries=[ 

422 BinaryRef(b.key, self._binary_remap.get(b.ref, b.ref)) for b in entry.binaries 

423 ], 

424 autotype=copy.deepcopy(entry.autotype), 

425 history=[HistoryEntry.from_entry(h) for h in entry.history], 

426 foreground_color=entry.foreground_color, 

427 background_color=entry.background_color, 

428 override_url=entry.override_url, 

429 quality_check=entry.quality_check, 

430 ) 

431 return new_entry 

432 

433 def _copy_entry_fields(self, source: Entry, target: Entry) -> None: 

434 """Copy all fields from source to target, preserving target's UUID.""" 

435 target.times = copy.deepcopy(source.times) 

436 target.icon_id = source.icon_id 

437 target.custom_icon_uuid = source.custom_icon_uuid 

438 target.tags = list(source.tags) 

439 target.strings = { 

440 k: StringField(k, v.value, v.protected) for k, v in source.strings.items() 

441 } 

442 target.binaries = [ 

443 BinaryRef(b.key, self._binary_remap.get(b.ref, b.ref)) for b in source.binaries 

444 ] 

445 target.autotype = copy.deepcopy(source.autotype) 

446 target.foreground_color = source.foreground_color 

447 target.background_color = source.background_color 

448 target.override_url = source.override_url 

449 target.quality_check = source.quality_check 

450 

451 # --- History --- 

452 

453 def _merge_entry_history(self, target: Entry, source: Entry) -> None: 

454 """Merge history entries, deduplicating by modification time.""" 

455 # Build map of existing history timestamps 

456 existing_times: set[datetime] = {h.times.last_modification_time for h in target.history} 

457 

458 # Add source history entries not already present 

459 for hist in source.history: 

460 if hist.times.last_modification_time not in existing_times: 

461 cloned_hist = HistoryEntry.from_entry(hist) 

462 # Remap binary refs in history 

463 for binary_ref in cloned_hist.binaries: 

464 if binary_ref.ref in self._binary_remap: 

465 binary_ref.ref = self._binary_remap[binary_ref.ref] 

466 target.history.append(cloned_hist) 

467 existing_times.add(hist.times.last_modification_time) 

468 self._result.history_entries_merged += 1 

469 

470 # Sort history by modification time 

471 target.history.sort(key=lambda h: h.times.last_modification_time) 

472 

473 # Respect history_max_items if set 

474 max_items = self._target._settings.history_max_items 

475 if max_items >= 0 and len(target.history) > max_items: 

476 target.history = target.history[-max_items:] 

477 

478 # --- Location Changes --- 

479 

480 def _merge_locations(self) -> None: 

481 """Handle entry location changes based on location_changed timestamps.""" 

482 for source_entry in self._source.root_group.iter_entries(recursive=True): 

483 if self._is_in_recycle_bin(source_entry, self._source): 

484 continue 

485 

486 target_entry = self._target.root_group.find_entry_by_uuid(source_entry.uuid) 

487 if target_entry is None: 

488 continue 

489 

490 source_loc_time = source_entry.times.location_changed 

491 target_loc_time = target_entry.times.location_changed 

492 

493 if source_loc_time and target_loc_time and source_loc_time > target_loc_time: 

494 # Source location is newer - move entry 

495 source_parent = source_entry.parent 

496 if source_parent is not None: 

497 target_parent = self._target.root_group.find_group_by_uuid(source_parent.uuid) 

498 if target_parent is not None and target_entry.parent is not target_parent: 

499 try: 

500 target_entry.move_to(target_parent) 

501 # Restore source location_changed time 

502 target_entry.times.location_changed = source_loc_time 

503 self._result.entries_relocated += 1 

504 except ValueError: 

505 # Move failed (e.g., already in destination) 

506 pass 

507 

508 # Also handle group location changes 

509 for source_group in self._source.root_group.iter_groups(recursive=True): 

510 if self._is_recycle_bin(source_group, self._source): 

511 continue 

512 

513 target_group = self._target.root_group.find_group_by_uuid(source_group.uuid) 

514 if target_group is None or target_group.is_root_group: 

515 continue 

516 

517 source_loc_time = source_group.times.location_changed 

518 target_loc_time = target_group.times.location_changed 

519 

520 if source_loc_time and target_loc_time and source_loc_time > target_loc_time: 

521 source_parent = source_group.parent 

522 if source_parent is not None: 

523 target_parent = self._target.root_group.find_group_by_uuid(source_parent.uuid) 

524 if target_parent is not None and target_group.parent is not target_parent: 

525 try: 

526 target_group.move_to(target_parent) 

527 target_group.times.location_changed = source_loc_time 

528 self._result.groups_relocated += 1 

529 except ValueError: 

530 pass 

531 

532 # --- Deletions (SYNCHRONIZE mode) --- 

533 

534 def _apply_deletions(self) -> None: 

535 """Apply deletions from source in SYNCHRONIZE mode.""" 

536 for deleted in self._source._settings.deleted_objects: 

537 # Try as entry first 

538 target_entry = self._target.root_group.find_entry_by_uuid(deleted.uuid) 

539 if ( 

540 target_entry is not None 

541 and target_entry.times.last_modification_time <= deleted.deletion_time 

542 and target_entry.parent is not None 

543 ): 

544 target_entry.parent.remove_entry(target_entry) 

545 self._result.entries_deleted += 1 

546 continue 

547 

548 # Try as group 

549 target_group = self._target.root_group.find_group_by_uuid(deleted.uuid) 

550 if ( 

551 target_group is not None 

552 and not target_group.is_root_group 

553 and target_group.times.last_modification_time <= deleted.deletion_time 

554 and target_group.parent is not None 

555 ): 

556 target_group.parent.remove_subgroup(target_group) 

557 self._result.groups_deleted += 1 

558 

559 # Merge deleted objects lists 

560 target_deleted_uuids = {d.uuid for d in self._target._settings.deleted_objects} 

561 for deleted in self._source._settings.deleted_objects: 

562 if deleted.uuid not in target_deleted_uuids: 

563 self._target._settings.deleted_objects.append( 

564 DeletedObject(uuid=deleted.uuid, deletion_time=deleted.deletion_time) 

565 ) 

566 

567 # --- Helpers --- 

568 

569 def _is_recycle_bin(self, group: Group, db: Database) -> bool: 

570 """Check if group is the recycle bin.""" 

571 recycle_bin = db.recyclebin_group 

572 return recycle_bin is not None and group.uuid == recycle_bin.uuid 

573 

574 def _is_in_recycle_bin(self, entry: Entry, db: Database) -> bool: 

575 """Check if entry is in the recycle bin.""" 

576 recycle_bin = db.recyclebin_group 

577 if recycle_bin is None: 

578 return False 

579 current = entry.parent 

580 while current is not None: 

581 if current.uuid == recycle_bin.uuid: 

582 return True 

583 current = current.parent 

584 return False 

585 

586 def _is_source_newer(self, target_times: Times, source_times: Times) -> bool: 

587 """Check if source has a newer modification time.""" 

588 return source_times.last_modification_time > target_times.last_modification_time 

589 

590 def _ensure_group_path(self, source_group: Group) -> Group: 

591 """Ensure the group path exists in target, creating groups as needed.""" 

592 path = source_group.path 

593 current = self._target.root_group 

594 

595 for name in path: 

596 found = None 

597 for subgroup in current.subgroups: 

598 if subgroup.name == name: 

599 found = subgroup 

600 break 

601 if found is None: 

602 found = current.create_subgroup(name) 

603 self._result.groups_added += 1 

604 current = found 

605 

606 return current