blob: b0e5897aabd79225c87018a1e659fa7e226d31a7 [file] [log] [blame]
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001from time import time
2
Jean-Paul Calderone084b7522013-02-09 09:53:45 -08003from OpenSSL.xcrypto import *
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08004
5from tls.c import api as _api
6
7FILETYPE_PEM = _api.SSL_FILETYPE_PEM
8FILETYPE_ASN1 = _api.SSL_FILETYPE_ASN1
9
10# TODO This was an API mistake. OpenSSL has no such constant.
11FILETYPE_TEXT = 2 ** 16 - 1
12
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080013TYPE_RSA = _api.EVP_PKEY_RSA
14TYPE_DSA = _api.EVP_PKEY_DSA
15
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080016
17def _bio_to_string(bio):
18 """
19 Copy the contents of an OpenSSL BIO object into a Python byte string.
20 """
21 result_buffer = _api.new('char**')
22 buffer_length = _api.BIO_get_mem_data(bio, result_buffer)
23 return _api.buffer(result_buffer[0], buffer_length)[:]
24
25
26
27def _raise_current_error():
28 errors = []
29 while True:
30 error = _api.ERR_get_error()
31 if error == 0:
32 break
33 errors.append((
34 _api.string(_api.ERR_lib_error_string(error)),
35 _api.string(_api.ERR_func_error_string(error)),
36 _api.string(_api.ERR_reason_error_string(error))))
37
38 raise Error(errors)
39
40_exception_from_error_queue = _raise_current_error
41
42
43class Error(Exception):
44 pass
45
46
47
48class PKey(object):
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080049 _only_public = False
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080050 _initialized = True
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080051
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080052 def __init__(self):
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080053 self._pkey = _api.EVP_PKEY_new()
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080054 self._initialized = False
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080055
56
57 def generate_key(self, type, bits):
58 """
59 Generate a key of a given type, with a given number of a bits
60
61 :param type: The key type (TYPE_RSA or TYPE_DSA)
62 :param bits: The number of bits
63
64 :return: None
65 """
66 if not isinstance(type, int):
67 raise TypeError("type must be an integer")
68
69 if not isinstance(bits, int):
70 raise TypeError("bits must be an integer")
71
72 exponent = _api.new("BIGNUM**")
73 # TODO Check error return
74 # TODO Free the exponent[0]
75 _api.BN_hex2bn(exponent, "10001")
76
77 if type == TYPE_RSA:
78 if bits <= 0:
79 raise ValueError("Invalid number of bits")
80
81 rsa = _api.RSA_new();
82
83 # TODO Release GIL?
84 result = _api.RSA_generate_key_ex(rsa, bits, exponent[0], _api.NULL)
85 if result == -1:
86 1/0
87
88 result = _api.EVP_PKEY_assign_RSA(self._pkey, rsa)
89 if not result:
90 1/0
91
92 elif type == TYPE_DSA:
93 pass
94 else:
95 raise Error("No such key type")
96
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080097 self._initialized = True
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080098
99
100 def check(self):
101 """
102 Check the consistency of an RSA private key.
103
104 :return: True if key is consistent.
105 :raise Error: if the key is inconsistent.
106 :raise TypeError: if the key is of a type which cannot be checked.
107 Only RSA keys can currently be checked.
108 """
109 if _api.EVP_PKEY_type(self._pkey.type) != _api.EVP_PKEY_RSA:
110 raise TypeError("key type unsupported")
111
112 rsa = _api.EVP_PKEY_get1_RSA(self._pkey)
113 result = _api.RSA_check_key(rsa)
114 if result:
115 return True
116 _raise_current_error()
117
118
119
120class X509(object):
121 def __init__(self):
122 # TODO Allocation failure? And why not __new__ instead of __init__?
123 self._x509 = _api.X509_new()
124
125
126 def set_version(self, version):
127 """
128 Set version number of the certificate
129
130 :param version: The version number
131 :type version: :py:class:`int`
132
133 :return: None
134 """
135 if not isinstance(version, int):
136 raise TypeError("version must be an integer")
137
138 _api.X509_set_version(self._x509, version)
139
140
141 def get_version(self):
142 """
143 Return version number of the certificate
144
145 :return: Version number as a Python integer
146 """
147 return _api.X509_get_version(self._x509)
148
149
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800150 def get_pubkey(self):
151 """
152 Get the public key of the certificate
153
154 :return: The public key
155 """
156 pkey = PKey.__new__(PKey)
157 pkey._pkey = _api.X509_get_pubkey(self._x509)
158 if pkey._pkey == _api.NULL:
159 _raise_current_error()
160 pkey._only_public = True
161 return pkey
162
163
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800164 def set_pubkey(self, pkey):
165 """
166 Set the public key of the certificate
167
168 :param pkey: The public key
169
170 :return: None
171 """
172 if not isinstance(pkey, PKey):
173 raise TypeError("pkey must be a PKey instance")
174
175 set_result = _api.X509_set_pubkey(self._x509, pkey._pkey)
176 if not set_result:
177 _raise_current_error()
178
179
180 def sign(self, pkey, digest):
181 """
182 Sign the certificate using the supplied key and digest
183
184 :param pkey: The key to sign with
185 :param digest: The message digest to use
186 :return: None
187 """
188 if not isinstance(pkey, PKey):
189 raise TypeError("pkey must be a PKey instance")
190
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800191 if pkey._only_public:
192 raise ValueError("Key only has public part")
193
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800194 if not pkey._initialized:
195 raise ValueError("Key is uninitialized")
196
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800197 evp_md = _api.EVP_get_digestbyname(digest)
198 if evp_md == _api.NULL:
199 raise ValueError("No such digest method")
200
201 sign_result = _api.X509_sign(self._x509, pkey._pkey, evp_md)
202 if not sign_result:
203 _raise_current_error()
204
205
Jean-Paul Calderonee4aa3fa2013-02-19 12:12:53 -0800206 def get_signature_algorithm(self):
207 """
208 Retrieve the signature algorithm used in the certificate
209
210 :return: A byte string giving the name of the signature algorithm used in
211 the certificate.
212 :raise ValueError: If the signature algorithm is undefined.
213 """
214 alg = self._x509.cert_info.signature.algorithm
215 nid = _api.OBJ_obj2nid(alg)
216 if nid == _api.NID_undef:
217 raise ValueError("Undefined signature algorithm")
218 return _api.string(_api.OBJ_nid2ln(nid))
219
220
Jean-Paul Calderoneb4078722013-02-19 12:01:55 -0800221 def digest(self, digest_name):
222 """
223 Return the digest of the X509 object.
224
225 :param digest_name: The name of the digest algorithm to use.
226 :type digest_name: :py:class:`bytes`
227
228 :return: The digest of the object
229 """
230 digest = _api.EVP_get_digestbyname(digest_name)
231 if digest == _api.NULL:
232 raise ValueError("No such digest method")
233
234 result_buffer = _api.new("char[]", _api.EVP_MAX_MD_SIZE)
235 result_length = _api.new("unsigned int[]", 1)
236 result_length[0] = len(result_buffer)
237
238 digest_result = _api.X509_digest(
239 self._x509, digest, result_buffer, result_length)
240
241 if not digest_result:
242 1/0
243
244 return ':'.join([
245 ch.encode('hex').upper() for ch
246 in _api.buffer(result_buffer, result_length[0])])
247
248
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800249 def subject_name_hash(self):
250 """
251 Return the hash of the X509 subject.
252
253 :return: The hash of the subject.
254 """
255 return _api.X509_subject_name_hash(self._x509)
256
257
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800258 def set_serial_number(self, serial):
259 """
260 Set serial number of the certificate
261
262 :param serial: The serial number
263 :type serial: :py:class:`int`
264
265 :return: None
266 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800267 if not isinstance(serial, (int, long)):
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800268 raise TypeError("serial must be an integer")
269
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800270 hex_serial = hex(serial)[2:]
271 if not isinstance(hex_serial, bytes):
272 hex_serial = hex_serial.encode('ascii')
273
274 bignum_serial = _api.new("BIGNUM**")
275
276 # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
277 # it. If bignum is still NULL after this call, then the return value is
278 # actually the result. I hope. -exarkun
279 small_serial = _api.BN_hex2bn(bignum_serial, hex_serial)
280
281 if bignum_serial[0] == _api.NULL:
282 set_result = ASN1_INTEGER_set(
283 _api.X509_get_serialNumber(self._x509), small_serial)
284 if set_result:
285 # TODO Not tested
286 _raise_current_error()
287 else:
288 asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
289 _api.BN_free(bignum_serial[0])
290 if asn1_serial == _api.NULL:
291 # TODO Not tested
292 _raise_current_error()
293 set_result = _api.X509_set_serialNumber(self._x509, asn1_serial)
294 if not set_result:
295 # TODO Not tested
296 _raise_current_error()
297
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800298
299 def get_serial_number(self):
300 """
301 Return serial number of the certificate
302
303 :return: Serial number as a Python integer
304 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800305 asn1_serial = _api.X509_get_serialNumber(self._x509)
306 bignum_serial = _api.ASN1_INTEGER_to_BN(asn1_serial, _api.NULL)
307 try:
308 hex_serial = _api.BN_bn2hex(bignum_serial)
309 try:
310 hexstring_serial = _api.string(hex_serial)
311 serial = int(hexstring_serial, 16)
312 return serial
313 finally:
314 _api.OPENSSL_free(hex_serial)
315 finally:
316 _api.BN_free(bignum_serial)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800317
318
319 def gmtime_adj_notAfter(self, amount):
320 """
321 Adjust the time stamp for when the certificate stops being valid
322
323 :param amount: The number of seconds by which to adjust the ending
324 validity time.
325 :type amount: :py:class:`int`
326
327 :return: None
328 """
329 if not isinstance(amount, int):
330 raise TypeError("amount must be an integer")
331
332 notAfter = _api.X509_get_notAfter(self._x509)
333 _api.X509_gmtime_adj(notAfter, amount)
334
335
336 def has_expired(self):
337 """
338 Check whether the certificate has expired.
339
340 :return: True if the certificate has expired, false otherwise
341 """
342 now = int(time())
343 notAfter = _api.X509_get_notAfter(self._x509)
344 return _api.ASN1_UTCTIME_cmp_time_t(notAfter, now) < 0
345
346
347 def get_notBefore(self):
348 """
349 Retrieve the time stamp for when the certificate starts being valid
350
351 :return: A string giving the timestamp, in the format::
352
353 YYYYMMDDhhmmssZ\n\
354 YYYYMMDDhhmmss+hhmm\n\
355 YYYYMMDDhhmmss-hhmm\n\
356
357 or None if there is no value set.
358 """
359
360
361 def set_notBefore(self, when):
362 """
363 Set the time stamp for when the certificate starts being valid
364
365 :param when: A string giving the timestamp, in the format:
366
367 YYYYMMDDhhmmssZ
368 YYYYMMDDhhmmss+hhmm
369 YYYYMMDDhhmmss-hhmm
370 :type when: :py:class:`bytes`
371
372 :return: None
373 """
374 notBefore = _api.X509_get_notBefore(self._x509)
375 _api.ASN1_GENERALIZEDTIME_set_string(notBefore, when)
376
377
378 def set_notAfter(self, when):
379 """
380 Set the time stamp for when the certificate stops being valid
381
382 :param when: A string giving the timestamp, in the format:
383
384 YYYYMMDDhhmmssZ
385 YYYYMMDDhhmmss+hhmm
386 YYYYMMDDhhmmss-hhmm
387 :type when: :py:class:`bytes`
388
389 :return: None
390 """
391 notAfter = _api.X509_get_notAfter(self._x509)
392 _api.ASN1_GENERALIZEDTIME_set_string(notAfter, when)
393X509Type = X509
394
395
396
397def load_certificate(type, buffer):
398 """
399 Load a certificate from a buffer
400
401 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
402
403 :param buffer: The buffer the certificate is stored in
404 :type buffer: :py:class:`bytes`
405
406 :return: The X509 object
407 """
408 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
409
410 try:
411 if type == FILETYPE_PEM:
412 x509 = _api.PEM_read_bio_X509(bio, _api.NULL, _api.NULL, _api.NULL)
413 elif type == FILETYPE_ASN1:
414 x509 = _api.d2i_X509_bio(bio, _api.NULL);
415 else:
416 raise ValueError(
417 "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
418 finally:
419 _api.BIO_free(bio)
420
421 if x509 == _api.NULL:
422 _raise_current_error()
423
424 cert = X509.__new__(X509)
425 cert._x509 = x509
426 return cert
427
428
429def dump_certificate(type, cert):
430 """
431 Dump a certificate to a buffer
432
433 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
434 :param cert: The certificate to dump
435 :return: The buffer with the dumped certificate in
436 """
437 bio = _api.BIO_new(_api.BIO_s_mem())
438 if type == FILETYPE_PEM:
439 result_code = _api.PEM_write_bio_X509(bio, cert._x509)
440 elif type == FILETYPE_ASN1:
441 result_code = _api.i2d_X509_bio(bio, cert._x509)
442 elif type == FILETYPE_TEXT:
443 result_code = _api.X509_print_ex(bio, cert._x509, 0, 0)
444 else:
445 raise ValueError(
446 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
447 "FILETYPE_TEXT")
448
449 return _bio_to_string(bio)
450
451
452
453def dump_privatekey(type, pkey, cipher=None, passphrase=None):
454 """
455 Dump a private key to a buffer
456
457 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
458 :param pkey: The PKey to dump
459 :param cipher: (optional) if encrypted PEM format, the cipher to
460 use
461 :param passphrase: (optional) if encrypted PEM format, this can be either
462 the passphrase to use, or a callback for providing the
463 passphrase.
464 :return: The buffer with the dumped key in
465 :rtype: :py:data:`str`
466 """
467 # TODO incomplete
468 bio = _api.BIO_new(_api.BIO_s_mem())
469
470 if type == FILETYPE_PEM:
471 result_code = _api.PEM_write_bio_PrivateKey(
472 bio, pkey._pkey, _api.NULL, _api.NULL, 0, _api.NULL, _api.NULL)
473 elif type == FILETYPE_ASN1:
474 result_code = _api.i2d_PrivateKey_bio(bio, pkey._pkey)
475 elif type == FILETYPE_TEXT:
476 rsa = _api.EVP_PKEY_get1_RSA(pkey._pkey)
477 result_code = _api.RSA_print(bio, rsa, 0)
478 # TODO RSA_free(rsa)?
479 else:
480 raise ValueError(
481 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
482 "FILETYPE_TEXT")
483
484 if result_code == 0:
485 _raise_current_error()
486
487 return _bio_to_string(bio)
488
489
490
491def load_privatekey(type, buffer, passphrase=None):
492 """
493 Load a private key from a buffer
494
495 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
496 :param buffer: The buffer the key is stored in
497 :param passphrase: (optional) if encrypted PEM format, this can be
498 either the passphrase to use, or a callback for
499 providing the passphrase.
500
501 :return: The PKey object
502 """
503 # TODO incomplete
504 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
505
506 if type == FILETYPE_PEM:
507 evp_pkey = _api.PEM_read_bio_PrivateKey(bio, _api.NULL, _api.NULL, _api.NULL)
508 elif type == FILETYPE_ASN1:
509 evp_pkey = _api.d2i_PrivateKey_bio(bio, _api.NULL)
510 else:
511 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
512
513 pkey = PKey.__new__(PKey)
514 pkey._pkey = evp_pkey
515 return pkey
516
517
518
519def dump_certificate_request(type, req):
520 """
521 Dump a certificate request to a buffer
522
523 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
524 :param req: The certificate request to dump
525 :return: The buffer with the dumped certificate request in
526 """
527
528
529
530def load_certificate_request(type, buffer):
531 """
532 Load a certificate request from a buffer
533
534 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
535 :param buffer: The buffer the certificate request is stored in
536 :return: The X509Req object
537 """