Coverage for src / kdbxtool / parsing / kdbx3.py: 87%

89 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-19 21:22 +0000

1"""KDBX3 payload encryption and decryption. 

2 

3This module handles the cryptographic operations for KDBX3 files: 

4- Master key derivation from credentials (AES-KDF) 

5- Payload decryption and encryption 

6- Content hashed block verification 

7- Synthetic inner header creation from outer header 

8 

9KDBX3 structure: 

101. Outer header (plaintext, with 2-byte length fields) 

112. Encrypted payload (content hashed blocks format) 

12 - Stream start bytes (32 bytes, for verification) 

13 - Compressed/uncompressed XML database content 

14 

15Key differences from KDBX4: 

16- No header hash or HMAC verification 

17- Protected stream key is in outer header (not inner) 

18- Uses content hashed blocks instead of HMAC block stream 

19- No inner header inside the encrypted payload 

20""" 

21 

22from __future__ import annotations 

23 

24import gzip 

25import hashlib 

26 

27from kdbxtool.exceptions import ( 

28 AuthenticationError, 

29 CorruptedDataError, 

30 KdfError, 

31 UnsupportedVersionError, 

32) 

33from kdbxtool.security import ( 

34 CipherContext, 

35 SecureBytes, 

36 constant_time_compare, 

37 derive_composite_key, 

38 derive_key_aes_kdf, 

39) 

40from kdbxtool.security.kdf import AesKdfConfig 

41 

42from .context import ParseContext 

43from .header import CompressionType, KdbxHeader, KdbxVersion 

44from .kdbx4 import DecryptedPayload, InnerHeader 

45 

46 

47class Kdbx3Reader: 

48 """Reader for KDBX3 format databases. 

49 

50 KDBX3 uses AES-KDF for key derivation and content hashed blocks 

51 for payload integrity verification. 

52 """ 

53 

54 def __init__(self, data: bytes) -> None: 

55 """Initialize reader with KDBX3 file data. 

56 

57 Args: 

58 data: Complete KDBX3 file contents 

59 """ 

60 self._data = data 

61 self._ctx = ParseContext(data) 

62 

63 def decrypt( 

64 self, 

65 password: str | None = None, 

66 keyfile_data: bytes | None = None, 

67 transformed_key: bytes | None = None, 

68 ) -> DecryptedPayload: 

69 """Decrypt the KDBX3 file. 

70 

71 Args: 

72 password: Optional password 

73 keyfile_data: Optional keyfile contents 

74 transformed_key: Optional pre-computed transformed key (skips KDF) 

75 

76 Returns: 

77 DecryptedPayload with header, synthetic inner header, and XML 

78 

79 Raises: 

80 AuthenticationError: If credentials are wrong 

81 CorruptedDataError: If file is corrupted 

82 """ 

83 # Parse outer header 

84 header, header_end = KdbxHeader.parse(self._ctx.data) 

85 

86 if header.version != KdbxVersion.KDBX3: 

87 raise UnsupportedVersionError(header.version.value, 0) 

88 

89 self._ctx.offset = header_end 

90 

91 # Use pre-computed transformed key if provided, otherwise derive it 

92 if transformed_key is not None: 

93 master_key = SecureBytes(transformed_key) 

94 else: 

95 # Derive composite key from credentials 

96 composite_key = derive_composite_key( 

97 password=password, 

98 keyfile_data=keyfile_data, 

99 ) 

100 

101 # Derive master key using AES-KDF 

102 master_key = self._derive_master_key(header, composite_key) 

103 

104 # Derive cipher key 

105 cipher_key = self._derive_cipher_key(master_key.data, header.master_seed) 

106 

107 # Read encrypted payload (everything after header) 

108 encrypted_payload = self._ctx.data[self._ctx.offset :] 

109 

110 # Decrypt payload 

111 ctx = CipherContext(header.cipher, cipher_key, header.encryption_iv) 

112 decrypted = ctx.decrypt(encrypted_payload) 

113 

114 # Remove PKCS7 padding for block ciphers 

115 if header.cipher.iv_size == 16: # AES-CBC or Twofish-CBC 

116 decrypted = self._remove_pkcs7_padding(decrypted) 

117 

118 # Verify stream start bytes (first 32 bytes) 

119 if len(decrypted) < 32: 

120 raise CorruptedDataError("Decrypted payload too short") 

121 

122 stream_start = decrypted[:32] 

123 if header.stream_start_bytes is None: 

124 raise CorruptedDataError("Missing stream start bytes in header") 

125 

126 if not constant_time_compare(stream_start, header.stream_start_bytes): 

127 raise AuthenticationError() 

128 

129 # Read content hashed blocks (after stream start bytes) 

130 payload_data = self._read_hashed_blocks(decrypted[32:]) 

131 

132 # Decompress if needed 

133 if header.compression == CompressionType.GZIP: 

134 payload_data = gzip.decompress(payload_data) 

135 

136 # Create synthetic inner header from outer header fields 

137 inner_header = self._create_synthetic_inner_header(header) 

138 

139 return DecryptedPayload( 

140 header=header, 

141 inner_header=inner_header, 

142 xml_data=payload_data, 

143 transformed_key=master_key.data, 

144 ) 

145 

146 def _derive_master_key(self, header: KdbxHeader, composite_key: SecureBytes) -> SecureBytes: 

147 """Derive master key using AES-KDF.""" 

148 if header.aes_kdf_rounds is None: 

149 raise KdfError("Missing AES-KDF rounds in header") 

150 

151 aes_config = AesKdfConfig( 

152 salt=header.kdf_salt, 

153 rounds=header.aes_kdf_rounds, 

154 ) 

155 

156 return derive_key_aes_kdf(composite_key.data, aes_config) 

157 

158 def _derive_cipher_key(self, master_key: bytes, master_seed: bytes) -> bytes: 

159 """Derive the cipher key from master key and seed. 

160 

161 KDBX3 key derivation: 

162 - cipher_key = SHA256(master_seed || transformed_key) 

163 """ 

164 return hashlib.sha256(master_seed + master_key).digest() 

165 

166 def _read_hashed_blocks(self, data: bytes) -> bytes: 

167 """Read and verify content hashed blocks. 

168 

169 KDBX3 uses content hashed blocks for integrity: 

170 - 4 bytes: block index (sequential, starting at 0) 

171 - 32 bytes: SHA-256 hash of block data 

172 - 4 bytes: block data length 

173 - N bytes: block data 

174 

175 Blocks continue until block data length is 0. 

176 """ 

177 result = bytearray() 

178 ctx = ParseContext(data) 

179 expected_index = 0 

180 

181 with ctx.scope("hashed_blocks"): 

182 while not ctx.exhausted: 

183 with ctx.scope(f"block[{expected_index}]"): 

184 # Read block index 

185 block_index = ctx.read_u32("index") 

186 

187 if block_index != expected_index: 

188 raise CorruptedDataError( 

189 f"Block index mismatch: expected {expected_index}, got {block_index}" 

190 ) 

191 

192 # Read block hash 

193 block_hash = ctx.read(32, "hash") 

194 

195 # Read block data length 

196 block_len = ctx.read_u32("length") 

197 

198 # Check for end block 

199 if block_len == 0: 

200 # Verify empty block has zero hash 

201 if block_hash != b"\x00" * 32: 

202 raise CorruptedDataError("Invalid end block hash") 

203 break 

204 

205 # Read block data 

206 block_data = ctx.read(block_len, "data") 

207 

208 # Verify block hash 

209 computed_hash = hashlib.sha256(block_data).digest() 

210 if not constant_time_compare(computed_hash, block_hash): 

211 raise CorruptedDataError(f"Block {block_index} hash mismatch") 

212 

213 result.extend(block_data) 

214 expected_index += 1 

215 

216 return bytes(result) 

217 

218 def _create_synthetic_inner_header(self, header: KdbxHeader) -> InnerHeader: 

219 """Create a synthetic inner header from KDBX3 outer header fields. 

220 

221 KDBX3 stores the protected stream configuration in the outer header, 

222 while KDBX4 has an inner header. We create a synthetic inner header 

223 so the rest of the code can work uniformly. 

224 """ 

225 # Default to Salsa20 (stream_id=2) if not specified 

226 stream_id = header.inner_random_stream_id or 2 

227 

228 # Protected stream key from outer header 

229 if header.protected_stream_key is None: 

230 raise CorruptedDataError("Missing protected stream key in KDBX3 header") 

231 

232 return InnerHeader( 

233 random_stream_id=stream_id, 

234 random_stream_key=header.protected_stream_key, 

235 binaries={}, # KDBX3 stores binaries in XML, not inner header 

236 ) 

237 

238 @staticmethod 

239 def _remove_pkcs7_padding(data: bytes) -> bytes: 

240 """Remove PKCS7 padding from decrypted data.""" 

241 if not data: 

242 raise CorruptedDataError("Empty decrypted data") 

243 

244 pad_len = data[-1] 

245 if pad_len == 0 or pad_len > 16: 

246 raise CorruptedDataError(f"Invalid PKCS7 padding length: {pad_len}") 

247 

248 # Verify all padding bytes are correct 

249 for i in range(1, pad_len + 1): 

250 if data[-i] != pad_len: 

251 raise CorruptedDataError("Invalid PKCS7 padding bytes") 

252 

253 return data[:-pad_len] 

254 

255 

256def read_kdbx3( 

257 data: bytes, 

258 password: str | None = None, 

259 keyfile_data: bytes | None = None, 

260 transformed_key: bytes | None = None, 

261) -> DecryptedPayload: 

262 """Read and decrypt a KDBX3 database. 

263 

264 Args: 

265 data: Complete KDBX3 file contents 

266 password: Optional password 

267 keyfile_data: Optional keyfile contents 

268 transformed_key: Optional pre-computed transformed key (skips KDF) 

269 

270 Returns: 

271 DecryptedPayload containing header, inner header, and XML data 

272 

273 Raises: 

274 AuthenticationError: If credentials are wrong 

275 CorruptedDataError: If file is corrupted 

276 UnsupportedVersionError: If file is not KDBX3 

277 """ 

278 reader = Kdbx3Reader(data) 

279 return reader.decrypt( 

280 password=password, 

281 keyfile_data=keyfile_data, 

282 transformed_key=transformed_key, 

283 )