Coverage for src / kdbxtool / parsing / kdbx3.py: 87%

95 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-20 19:19 +0000

1"""KDBX3 payload encryption and decryption. 

2 

3This module handles the cryptographic operations for KDBX3 files: 

4- Master key derivation from credentials (AES-KDF) 

5- Payload decryption and encryption 

6- Content hashed block verification 

7- Synthetic inner header creation from outer header 

8 

9KDBX3 structure: 

101. Outer header (plaintext, with 2-byte length fields) 

112. Encrypted payload (content hashed blocks format) 

12 - Stream start bytes (32 bytes, for verification) 

13 - Compressed/uncompressed XML database content 

14 

15Key differences from KDBX4: 

16- No header hash or HMAC verification 

17- Protected stream key is in outer header (not inner) 

18- Uses content hashed blocks instead of HMAC block stream 

19- No inner header inside the encrypted payload 

20""" 

21 

22from __future__ import annotations 

23 

24import gzip 

25import hashlib 

26import logging 

27 

28from kdbxtool.exceptions import ( 

29 AuthenticationError, 

30 CorruptedDataError, 

31 KdfError, 

32 UnsupportedVersionError, 

33) 

34from kdbxtool.security import ( 

35 CipherContext, 

36 SecureBytes, 

37 constant_time_compare, 

38 derive_composite_key, 

39 derive_key_aes_kdf, 

40) 

41from kdbxtool.security.kdf import AesKdfConfig 

42 

43from .context import ParseContext 

44from .header import CompressionType, KdbxHeader, KdbxVersion 

45from .kdbx4 import DecryptedPayload, InnerHeader 

46 

47logger = logging.getLogger(__name__) 

48 

49 

50class Kdbx3Reader: 

51 """Reader for KDBX3 format databases. 

52 

53 KDBX3 uses AES-KDF for key derivation and content hashed blocks 

54 for payload integrity verification. 

55 """ 

56 

57 def __init__(self, data: bytes) -> None: 

58 """Initialize reader with KDBX3 file data. 

59 

60 Args: 

61 data: Complete KDBX3 file contents 

62 """ 

63 self._data = data 

64 self._ctx = ParseContext(data) 

65 

66 def decrypt( 

67 self, 

68 password: str | None = None, 

69 keyfile_data: bytes | None = None, 

70 transformed_key: bytes | None = None, 

71 ) -> DecryptedPayload: 

72 """Decrypt the KDBX3 file. 

73 

74 Args: 

75 password: Optional password 

76 keyfile_data: Optional keyfile contents 

77 transformed_key: Optional pre-computed transformed key (skips KDF) 

78 

79 Returns: 

80 DecryptedPayload with header, synthetic inner header, and XML 

81 

82 Raises: 

83 AuthenticationError: If credentials are wrong 

84 CorruptedDataError: If file is corrupted 

85 """ 

86 logger.debug("Starting KDBX3 decryption") 

87 

88 # Parse outer header 

89 header, header_end = KdbxHeader.parse(self._ctx.data) 

90 

91 if header.version != KdbxVersion.KDBX3: 

92 raise UnsupportedVersionError(header.version.value, 0) 

93 

94 self._ctx.offset = header_end 

95 

96 # Use pre-computed transformed key if provided, otherwise derive it 

97 if transformed_key is not None: 

98 master_key = SecureBytes(transformed_key) 

99 else: 

100 # Derive composite key from credentials 

101 composite_key = derive_composite_key( 

102 password=password, 

103 keyfile_data=keyfile_data, 

104 ) 

105 

106 # Derive master key using AES-KDF 

107 master_key = self._derive_master_key(header, composite_key) 

108 

109 # Derive cipher key 

110 cipher_key = self._derive_cipher_key(master_key.data, header.master_seed) 

111 

112 # Read encrypted payload (everything after header) 

113 encrypted_payload = self._ctx.data[self._ctx.offset :] 

114 

115 # Decrypt payload 

116 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv) 

117 decrypted = ctx.decrypt(encrypted_payload) 

118 

119 # Remove PKCS7 padding for block ciphers 

120 if header.cipher.iv_size == 16: # AES-CBC or Twofish-CBC 

121 decrypted = self._remove_pkcs7_padding(decrypted) 

122 

123 # Verify stream start bytes (first 32 bytes) 

124 if len(decrypted) < 32: 

125 raise CorruptedDataError("Decrypted payload too short") 

126 

127 stream_start = decrypted[:32] 

128 if header.stream_start_bytes is None: 

129 raise CorruptedDataError("Missing stream start bytes in header") 

130 

131 if not constant_time_compare(stream_start, header.stream_start_bytes): 

132 raise AuthenticationError() 

133 logger.debug("Stream start bytes verified") 

134 

135 # Read content hashed blocks (after stream start bytes) 

136 payload_data = self._read_hashed_blocks(decrypted[32:]) 

137 

138 # Decompress if needed 

139 if header.compression == CompressionType.GZIP: 

140 payload_data = gzip.decompress(payload_data) 

141 logger.debug("Payload decompressed, %d bytes", len(payload_data)) 

142 

143 # Create synthetic inner header from outer header fields 

144 inner_header = self._create_synthetic_inner_header(header) 

145 

146 return DecryptedPayload( 

147 header=header, 

148 inner_header=inner_header, 

149 xml_data=payload_data, 

150 transformed_key=master_key.data, 

151 ) 

152 

153 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes: 

154 """Derive master key using AES-KDF.""" 

155 if header.aes_kdf_rounds is None: 

156 raise KdfError("Missing AES-KDF rounds in header") 

157 

158 aes_config = AesKdfConfig( 

159 salt=header.kdf_salt, 

160 rounds=header.aes_kdf_rounds, 

161 ) 

162 

163 return derive_key_aes_kdf(composite_key.data, aes_config) 

164 

165 def _derive_cipher_key(self, master_key: bytes, master_seed: bytes) -> bytes: 

166 """Derive the cipher key from master key and seed. 

167 

168 KDBX3 key derivation: 

169 - cipher_key = SHA256(master_seed || transformed_key) 

170 """ 

171 return hashlib.sha256(master_seed + master_key).digest() 

172 

173 def _read_hashed_blocks(self, data: bytes) -> bytes: 

174 """Read and verify content hashed blocks. 

175 

176 KDBX3 uses content hashed blocks for integrity: 

177 - 4 bytes: block index (sequential, starting at 0) 

178 - 32 bytes: SHA-256 hash of block data 

179 - 4 bytes: block data length 

180 - N bytes: block data 

181 

182 Blocks continue until block data length is 0. 

183 """ 

184 result = bytearray() 

185 ctx = ParseContext(data) 

186 expected_index = 0 

187 

188 with ctx.scope("hashed_blocks"): 

189 while not ctx.exhausted: 

190 with ctx.scope(f"block[{expected_index}]"): 

191 # Read block index 

192 block_index = ctx.read_u32("index") 

193 

194 if block_index != expected_index: 

195 raise CorruptedDataError( 

196 f"Block index mismatch: expected {expected_index}, got {block_index}" 

197 ) 

198 

199 # Read block hash 

200 block_hash = ctx.read(32, "hash") 

201 

202 # Read block data length 

203 block_len = ctx.read_u32("length") 

204 

205 # Check for end block 

206 if block_len == 0: 

207 # Verify empty block has zero hash 

208 if block_hash != b"\x00" * 32: 

209 raise CorruptedDataError("Invalid end block hash") 

210 break 

211 

212 # Read block data 

213 block_data = ctx.read(block_len, "data") 

214 

215 # Verify block hash 

216 computed_hash = hashlib.sha256(block_data).digest() 

217 if not constant_time_compare(computed_hash, block_hash): 

218 raise CorruptedDataError(f"Block {block_index} hash mismatch") 

219 

220 result.extend(block_data) 

221 expected_index += 1 

222 

223 logger.debug("Verified %d hashed blocks", expected_index) 

224 return bytes(result) 

225 

226 def _create_synthetic_inner_header(self, header: KdbxHeader) -> InnerHeader: 

227 """Create a synthetic inner header from KDBX3 outer header fields. 

228 

229 KDBX3 stores the protected stream configuration in the outer header, 

230 while KDBX4 has an inner header. We create a synthetic inner header 

231 so the rest of the code can work uniformly. 

232 """ 

233 # Default to Salsa20 (stream_id=2) if not specified 

234 stream_id = header.inner_random_stream_id or 2 

235 

236 # Protected stream key from outer header 

237 if header.protected_stream_key is None: 

238 raise CorruptedDataError("Missing protected stream key in KDBX3 header") 

239 

240 return InnerHeader( 

241 random_stream_id=stream_id, 

242 random_stream_key=header.protected_stream_key, 

243 binaries={}, # KDBX3 stores binaries in XML, not inner header 

244 ) 

245 

246 @staticmethod 

247 def _remove_pkcs7_padding(data: bytes) -> bytes: 

248 """Remove PKCS7 padding from decrypted data.""" 

249 if not data: 

250 raise CorruptedDataError("Empty decrypted data") 

251 

252 pad_len = data[-1] 

253 if pad_len == 0 or pad_len > 16: 

254 raise CorruptedDataError(f"Invalid PKCS7 padding length: {pad_len}") 

255 

256 # Verify all padding bytes are correct 

257 for i in range(1, pad_len + 1): 

258 if data[-i] != pad_len: 

259 raise CorruptedDataError("Invalid PKCS7 padding bytes") 

260 

261 return data[:-pad_len] 

262 

263 

264def read_kdbx3( 

265 data: bytes, 

266 password: str | None = None, 

267 keyfile_data: bytes | None = None, 

268 transformed_key: bytes | None = None, 

269) -> DecryptedPayload: 

270 """Read and decrypt a KDBX3 database. 

271 

272 Args: 

273 data: Complete KDBX3 file contents 

274 password: Optional password 

275 keyfile_data: Optional keyfile contents 

276 transformed_key: Optional pre-computed transformed key (skips KDF) 

277 

278 Returns: 

279 DecryptedPayload containing header, inner header, and XML data 

280 

281 Raises: 

282 AuthenticationError: If credentials are wrong 

283 CorruptedDataError: If file is corrupted 

284 UnsupportedVersionError: If file is not KDBX3 

285 """ 

286 reader = Kdbx3Reader(data) 

287 return reader.decrypt( 

288 password=password, 

289 keyfile_data=keyfile_data, 

290 transformed_key=transformed_key, 

291 )