blob: d69de8b16da27b1f4b88ade76f1e4b3b64fa81d0 [file] [log] [blame]
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001from time import time
2
Jean-Paul Calderone084b7522013-02-09 09:53:45 -08003from OpenSSL.xcrypto import *
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08004
5from tls.c import api as _api
6
7FILETYPE_PEM = _api.SSL_FILETYPE_PEM
8FILETYPE_ASN1 = _api.SSL_FILETYPE_ASN1
9
10# TODO This was an API mistake. OpenSSL has no such constant.
11FILETYPE_TEXT = 2 ** 16 - 1
12
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080013TYPE_RSA = _api.EVP_PKEY_RSA
14TYPE_DSA = _api.EVP_PKEY_DSA
15
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080016
17def _bio_to_string(bio):
18 """
19 Copy the contents of an OpenSSL BIO object into a Python byte string.
20 """
21 result_buffer = _api.new('char**')
22 buffer_length = _api.BIO_get_mem_data(bio, result_buffer)
23 return _api.buffer(result_buffer[0], buffer_length)[:]
24
25
26
27def _raise_current_error():
28 errors = []
29 while True:
30 error = _api.ERR_get_error()
31 if error == 0:
32 break
33 errors.append((
34 _api.string(_api.ERR_lib_error_string(error)),
35 _api.string(_api.ERR_func_error_string(error)),
36 _api.string(_api.ERR_reason_error_string(error))))
37
38 raise Error(errors)
39
40_exception_from_error_queue = _raise_current_error
41
42
43class Error(Exception):
44 pass
45
46
47
48class PKey(object):
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080049 _only_public = False
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080050 _initialized = True
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080051
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080052 def __init__(self):
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080053 self._pkey = _api.EVP_PKEY_new()
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080054 self._initialized = False
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080055
56
57 def generate_key(self, type, bits):
58 """
59 Generate a key of a given type, with a given number of a bits
60
61 :param type: The key type (TYPE_RSA or TYPE_DSA)
62 :param bits: The number of bits
63
64 :return: None
65 """
66 if not isinstance(type, int):
67 raise TypeError("type must be an integer")
68
69 if not isinstance(bits, int):
70 raise TypeError("bits must be an integer")
71
72 exponent = _api.new("BIGNUM**")
73 # TODO Check error return
74 # TODO Free the exponent[0]
75 _api.BN_hex2bn(exponent, "10001")
76
77 if type == TYPE_RSA:
78 if bits <= 0:
79 raise ValueError("Invalid number of bits")
80
81 rsa = _api.RSA_new();
82
83 # TODO Release GIL?
84 result = _api.RSA_generate_key_ex(rsa, bits, exponent[0], _api.NULL)
85 if result == -1:
86 1/0
87
88 result = _api.EVP_PKEY_assign_RSA(self._pkey, rsa)
89 if not result:
90 1/0
91
92 elif type == TYPE_DSA:
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -080093 dsa = _api.DSA_generate_parameters(
94 bits, _api.NULL, 0, _api.NULL, _api.NULL, _api.NULL, _api.NULL)
95 if dsa == _api.NULL:
96 1/0
97 if not _api.DSA_generate_key(dsa):
98 1/0
99 if not _api.EVP_PKEY_assign_DSA(self._pkey, dsa):
100 1/0
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800101 else:
102 raise Error("No such key type")
103
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800104 self._initialized = True
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800105
106
107 def check(self):
108 """
109 Check the consistency of an RSA private key.
110
111 :return: True if key is consistent.
112 :raise Error: if the key is inconsistent.
113 :raise TypeError: if the key is of a type which cannot be checked.
114 Only RSA keys can currently be checked.
115 """
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -0800116 if self._only_public:
117 raise TypeError("public key only")
118
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800119 if _api.EVP_PKEY_type(self._pkey.type) != _api.EVP_PKEY_RSA:
120 raise TypeError("key type unsupported")
121
122 rsa = _api.EVP_PKEY_get1_RSA(self._pkey)
123 result = _api.RSA_check_key(rsa)
124 if result:
125 return True
126 _raise_current_error()
127
128
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -0800129 def type(self):
130 """
131 Returns the type of the key
132
133 :return: The type of the key.
134 """
135 return self._pkey.type
136
137
138 def bits(self):
139 """
140 Returns the number of bits of the key
141
142 :return: The number of bits of the key.
143 """
144 return _api.EVP_PKEY_bits(self._pkey)
145PKeyType = PKey
146
147
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800148
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800149class X509Name(object):
150 def __init__(self, name):
151 """
152 Create a new X509Name, copying the given X509Name instance.
153
154 :param name: An X509Name object to copy
155 """
156 self._name = _api.X509_NAME_dup(name._name)
157
158
159 def __setattr__(self, name, value):
160 if name.startswith('_'):
161 return super(X509Name, self).__setattr__(name, value)
162
163 if type(name) is not str:
164 raise TypeError("attribute name must be string, not '%.200s'" % (
165 type(value).__name__,))
166
167 nid = _api.OBJ_txt2nid(name)
168 if nid == _api.NID_undef:
169 try:
170 _raise_current_error()
171 except Error:
172 pass
173 raise AttributeError("No such attribute")
174
175 # If there's an old entry for this NID, remove it
176 for i in range(_api.X509_NAME_entry_count(self._name)):
177 ent = _api.X509_NAME_get_entry(self._name, i)
178 ent_obj = _api.X509_NAME_ENTRY_get_object(ent)
179 ent_nid = _api.OBJ_obj2nid(ent_obj)
180 if nid == ent_nid:
181 ent = _api.X509_NAME_delete_entry(self._name, i)
182 _api.X509_NAME_ENTRY_free(ent)
183 break
184
185 if isinstance(value, unicode):
186 value = value.encode('utf-8')
187
188 add_result = _api.X509_NAME_add_entry_by_NID(
189 self._name, nid, _api.MBSTRING_UTF8, value, -1, -1, 0)
190 if not add_result:
191 # TODO Untested
192 1/0
193
194
195 def __getattr__(self, name):
196 """
197 Find attribute. An X509Name object has the following attributes:
198 countryName (alias C), stateOrProvince (alias ST), locality (alias L),
199 organization (alias O), organizationalUnit (alias OU), commonName (alias
200 CN) and more...
201 """
202 nid = _api.OBJ_txt2nid(name)
203 if nid == _api.NID_undef:
204 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems
205 # a lower level function, a2d_ASN1_OBJECT, also feels the need to
206 # push something onto the error queue. If we don't clean that up
207 # now, someone else will bump into it later and be quite confused.
208 # See lp#314814.
209 try:
210 _raise_current_error()
211 except Error:
212 pass
213 return super(X509Name, self).__getattr__(name)
214
215 entry_index = _api.X509_NAME_get_index_by_NID(self._name, nid, -1)
216 if entry_index == -1:
217 return None
218
219 entry = _api.X509_NAME_get_entry(self._name, entry_index)
220 data = _api.X509_NAME_ENTRY_get_data(entry)
221
222 result_buffer = _api.new("unsigned char**")
223 data_length = _api.ASN1_STRING_to_UTF8(result_buffer, data)
224 if data_length < 0:
225 1/0
226
227 result = _api.buffer(result_buffer[0], data_length)[:].decode('utf-8')
228 _api.OPENSSL_free(result_buffer[0])
229 return result
230
231
232 def __cmp__(self, other):
233 if not isinstance(other, X509Name):
234 return NotImplemented
235
236 result = _api.X509_NAME_cmp(self._name, other._name)
237 # TODO result == -2 is an error case that maybe should be checked for
238 return result
239
240
241 def __repr__(self):
242 """
243 String representation of an X509Name
244 """
245 result_buffer = _api.new("char[]", 512);
246 format_result = _api.X509_NAME_oneline(
247 self._name, result_buffer, len(result_buffer))
248
249 if format_result == _api.NULL:
250 1/0
251
252 return "<X509Name object '%s'>" % (_api.string(result_buffer),)
253
254
255 def hash(self):
256 """
257 Return the hash value of this name
258
259 :return: None
260 """
261 return _api.X509_NAME_hash(self._name)
262
263
264 def der(self):
265 """
266 Return the DER encoding of this name
267
268 :return: A :py:class:`bytes` instance giving the DER encoded form of
269 this name.
270 """
271 result_buffer = _api.new('unsigned char**')
272 encode_result = _api.i2d_X509_NAME(self._name, result_buffer)
273 if encode_result < 0:
274 1/0
275
276 string_result = _api.buffer(result_buffer[0], encode_result)[:]
277 _api.OPENSSL_free(result_buffer[0])
278 return string_result
279
280
281 def get_components(self):
282 """
283 Returns the split-up components of this name.
284
285 :return: List of tuples (name, value).
286 """
287 result = []
288 for i in range(_api.X509_NAME_entry_count(self._name)):
289 ent = _api.X509_NAME_get_entry(self._name, i)
290
291 fname = _api.X509_NAME_ENTRY_get_object(ent)
292 fval = _api.X509_NAME_ENTRY_get_data(ent)
293
294 nid = _api.OBJ_obj2nid(fname)
295 name = _api.OBJ_nid2sn(nid)
296
297 result.append((
298 _api.string(name),
299 _api.string(
300 _api.ASN1_STRING_data(fval),
301 _api.ASN1_STRING_length(fval))))
302
303 return result
304X509NameType = X509Name
305
306
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800307class X509Extension(object):
308 def __init__(self, type_name, critical, value, subject=None, issuer=None):
309 """
310 :param typename: The name of the extension to create.
311 :type typename: :py:data:`str`
312
313 :param critical: A flag indicating whether this is a critical extension.
314
315 :param value: The value of the extension.
316 :type value: :py:data:`str`
317
318 :param subject: Optional X509 cert to use as subject.
319 :type subject: :py:class:`X509`
320
321 :param issuer: Optional X509 cert to use as issuer.
322 :type issuer: :py:class:`X509`
323
324 :return: The X509Extension object
325 """
326 ctx = _api.new("X509V3_CTX*")
327 _api.X509V3_set_ctx(ctx, _api.NULL, _api.NULL, _api.NULL, _api.NULL, 0)
328 _api.X509V3_set_ctx_nodb(ctx)
329
330 if critical:
331 # There are other OpenSSL APIs which would let us pass in critical
332 # separately, but they're harder to use, and since value is already
333 # a pile of crappy junk smuggling a ton of utterly important
334 # structured data, what's the point of trying to avoid nasty stuff
335 # with strings? (However, X509V3_EXT_i2d in particular seems like it
336 # would be a better API to invoke. I do not know where to get the
337 # ext_struc it desires for its last parameter, though.)
338 value = "critical," + value
339
340 self._extension = _api.X509V3_EXT_nconf(
341 _api.NULL, ctx, type_name, value)
342
343
344 def get_critical(self):
345 """
346 Returns the critical field of the X509Extension
347
348 :return: The critical field.
349 """
350 return _api.X509_EXTENSION_get_critical(self._extension)
351
352
353 def get_short_name(self):
354 """
355 Returns the short version of the type name of the X509Extension
356
357 :return: The short type name.
358 """
359 obj = _api.X509_EXTENSION_get_object(self._extension)
360 nid = _api.OBJ_obj2nid(obj)
361 return _api.string(_api.OBJ_nid2sn(nid))
362
363
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800364
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800365class X509Req(object):
Jean-Paul Calderone4328d472013-02-20 14:28:46 -0800366 def __init__(self):
367 self._req = _api.X509_REQ_new()
368
369
370 def set_pubkey(self, pkey):
371 """
372 Set the public key of the certificate request
373
374 :param pkey: The public key to use
375 :return: None
376 """
377 set_result = _api.X509_REQ_set_pubkey(self._req, pkey._pkey)
378 if not set_result:
379 1/0
380
381
382 def get_pubkey(self):
383 """
384 Get the public key from the certificate request
385
386 :return: The public key
387 """
388 pkey = PKey.__new__(PKey)
389 pkey._pkey = _api.X509_REQ_get_pubkey(self._req)
390 if pkey._pkey == _api.NULL:
391 1/0
392 pkey._only_public = True
393 return pkey
394
395
396 def set_version(self, version):
397 """
398 Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate
399 request.
400
401 :param version: The version number
402 :return: None
403 """
404 set_result = _api.X509_REQ_set_version(self._req, version)
405 if not set_result:
406 _raise_current_error()
407
408
409 def get_version(self):
410 """
411 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
412 request.
413
414 :return: an integer giving the value of the version subfield
415 """
416 return _api.X509_REQ_get_version(self._req)
417
418
419 def get_subject(self):
420 """
421 Create an X509Name object for the subject of the certificate request
422
423 :return: An X509Name object
424 """
425 name = X509Name.__new__(X509Name)
426 name._name = _api.X509_REQ_get_subject_name(self._req)
427 if name._name == _api.NULL:
428 1/0
429 return name
430
431
432 def add_extensions(self, extensions):
433 """
434 Add extensions to the request.
435
436 :param extensions: a sequence of X509Extension objects
437 :return: None
438 """
439 stack = _api.sk_X509_EXTENSION_new_null()
440 if stack == _api.NULL:
441 1/0
442
443 for ext in extensions:
444 if not isinstance(ext, X509Extension):
Jean-Paul Calderonec2154b72013-02-20 14:29:37 -0800445 raise ValueError("One of the elements is not an X509Extension")
Jean-Paul Calderone4328d472013-02-20 14:28:46 -0800446
447 _api.sk_X509_EXTENSION_push(stack, ext._extension)
448
449 add_result = _api.X509_REQ_add_extensions(self._req, stack)
450 if not add_result:
451 1/0
452
453
454 def sign(self, pkey, digest):
455 """
456 Sign the certificate request using the supplied key and digest
457
458 :param pkey: The key to sign with
459 :param digest: The message digest to use
460 :return: None
461 """
462 if pkey._only_public:
463 raise ValueError("Key has only public part")
464
465 if not pkey._initialized:
466 raise ValueError("Key is uninitialized")
467
468 digest_obj = _api.EVP_get_digestbyname(digest)
469 if digest_obj == _api.NULL:
470 raise ValueError("No such digest method")
471
472 sign_result = _api.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
473 if not sign_result:
474 1/0
475
476
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800477X509ReqType = X509Req
478
479
480
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800481class X509(object):
482 def __init__(self):
483 # TODO Allocation failure? And why not __new__ instead of __init__?
484 self._x509 = _api.X509_new()
485
486
487 def set_version(self, version):
488 """
489 Set version number of the certificate
490
491 :param version: The version number
492 :type version: :py:class:`int`
493
494 :return: None
495 """
496 if not isinstance(version, int):
497 raise TypeError("version must be an integer")
498
499 _api.X509_set_version(self._x509, version)
500
501
502 def get_version(self):
503 """
504 Return version number of the certificate
505
506 :return: Version number as a Python integer
507 """
508 return _api.X509_get_version(self._x509)
509
510
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800511 def get_pubkey(self):
512 """
513 Get the public key of the certificate
514
515 :return: The public key
516 """
517 pkey = PKey.__new__(PKey)
518 pkey._pkey = _api.X509_get_pubkey(self._x509)
519 if pkey._pkey == _api.NULL:
520 _raise_current_error()
521 pkey._only_public = True
522 return pkey
523
524
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800525 def set_pubkey(self, pkey):
526 """
527 Set the public key of the certificate
528
529 :param pkey: The public key
530
531 :return: None
532 """
533 if not isinstance(pkey, PKey):
534 raise TypeError("pkey must be a PKey instance")
535
536 set_result = _api.X509_set_pubkey(self._x509, pkey._pkey)
537 if not set_result:
538 _raise_current_error()
539
540
541 def sign(self, pkey, digest):
542 """
543 Sign the certificate using the supplied key and digest
544
545 :param pkey: The key to sign with
546 :param digest: The message digest to use
547 :return: None
548 """
549 if not isinstance(pkey, PKey):
550 raise TypeError("pkey must be a PKey instance")
551
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800552 if pkey._only_public:
553 raise ValueError("Key only has public part")
554
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800555 if not pkey._initialized:
556 raise ValueError("Key is uninitialized")
557
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800558 evp_md = _api.EVP_get_digestbyname(digest)
559 if evp_md == _api.NULL:
560 raise ValueError("No such digest method")
561
562 sign_result = _api.X509_sign(self._x509, pkey._pkey, evp_md)
563 if not sign_result:
564 _raise_current_error()
565
566
Jean-Paul Calderonee4aa3fa2013-02-19 12:12:53 -0800567 def get_signature_algorithm(self):
568 """
569 Retrieve the signature algorithm used in the certificate
570
571 :return: A byte string giving the name of the signature algorithm used in
572 the certificate.
573 :raise ValueError: If the signature algorithm is undefined.
574 """
575 alg = self._x509.cert_info.signature.algorithm
576 nid = _api.OBJ_obj2nid(alg)
577 if nid == _api.NID_undef:
578 raise ValueError("Undefined signature algorithm")
579 return _api.string(_api.OBJ_nid2ln(nid))
580
581
Jean-Paul Calderoneb4078722013-02-19 12:01:55 -0800582 def digest(self, digest_name):
583 """
584 Return the digest of the X509 object.
585
586 :param digest_name: The name of the digest algorithm to use.
587 :type digest_name: :py:class:`bytes`
588
589 :return: The digest of the object
590 """
591 digest = _api.EVP_get_digestbyname(digest_name)
592 if digest == _api.NULL:
593 raise ValueError("No such digest method")
594
595 result_buffer = _api.new("char[]", _api.EVP_MAX_MD_SIZE)
596 result_length = _api.new("unsigned int[]", 1)
597 result_length[0] = len(result_buffer)
598
599 digest_result = _api.X509_digest(
600 self._x509, digest, result_buffer, result_length)
601
602 if not digest_result:
603 1/0
604
605 return ':'.join([
606 ch.encode('hex').upper() for ch
607 in _api.buffer(result_buffer, result_length[0])])
608
609
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800610 def subject_name_hash(self):
611 """
612 Return the hash of the X509 subject.
613
614 :return: The hash of the subject.
615 """
616 return _api.X509_subject_name_hash(self._x509)
617
618
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800619 def set_serial_number(self, serial):
620 """
621 Set serial number of the certificate
622
623 :param serial: The serial number
624 :type serial: :py:class:`int`
625
626 :return: None
627 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800628 if not isinstance(serial, (int, long)):
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800629 raise TypeError("serial must be an integer")
630
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800631 hex_serial = hex(serial)[2:]
632 if not isinstance(hex_serial, bytes):
633 hex_serial = hex_serial.encode('ascii')
634
635 bignum_serial = _api.new("BIGNUM**")
636
637 # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
638 # it. If bignum is still NULL after this call, then the return value is
639 # actually the result. I hope. -exarkun
640 small_serial = _api.BN_hex2bn(bignum_serial, hex_serial)
641
642 if bignum_serial[0] == _api.NULL:
643 set_result = ASN1_INTEGER_set(
644 _api.X509_get_serialNumber(self._x509), small_serial)
645 if set_result:
646 # TODO Not tested
647 _raise_current_error()
648 else:
649 asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
650 _api.BN_free(bignum_serial[0])
651 if asn1_serial == _api.NULL:
652 # TODO Not tested
653 _raise_current_error()
654 set_result = _api.X509_set_serialNumber(self._x509, asn1_serial)
655 if not set_result:
656 # TODO Not tested
657 _raise_current_error()
658
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800659
660 def get_serial_number(self):
661 """
662 Return serial number of the certificate
663
664 :return: Serial number as a Python integer
665 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800666 asn1_serial = _api.X509_get_serialNumber(self._x509)
667 bignum_serial = _api.ASN1_INTEGER_to_BN(asn1_serial, _api.NULL)
668 try:
669 hex_serial = _api.BN_bn2hex(bignum_serial)
670 try:
671 hexstring_serial = _api.string(hex_serial)
672 serial = int(hexstring_serial, 16)
673 return serial
674 finally:
675 _api.OPENSSL_free(hex_serial)
676 finally:
677 _api.BN_free(bignum_serial)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800678
679
680 def gmtime_adj_notAfter(self, amount):
681 """
682 Adjust the time stamp for when the certificate stops being valid
683
684 :param amount: The number of seconds by which to adjust the ending
685 validity time.
686 :type amount: :py:class:`int`
687
688 :return: None
689 """
690 if not isinstance(amount, int):
691 raise TypeError("amount must be an integer")
692
693 notAfter = _api.X509_get_notAfter(self._x509)
694 _api.X509_gmtime_adj(notAfter, amount)
695
696
Jean-Paul Calderone662afe52013-02-20 08:41:11 -0800697 def gmtime_adj_notBefore(self, amount):
698 """
699 Change the timestamp for when the certificate starts being valid to the current
700 time plus an offset.
701
702 :param amount: The number of seconds by which to adjust the starting validity
703 time.
704 :return: None
705 """
706 if not isinstance(amount, int):
707 raise TypeError("amount must be an integer")
708
709 notBefore = _api.X509_get_notBefore(self._x509)
710 _api.X509_gmtime_adj(notBefore, amount)
711
712
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800713 def has_expired(self):
714 """
715 Check whether the certificate has expired.
716
717 :return: True if the certificate has expired, false otherwise
718 """
719 now = int(time())
720 notAfter = _api.X509_get_notAfter(self._x509)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800721 return _api.ASN1_UTCTIME_cmp_time_t(
722 _api.cast('ASN1_UTCTIME*', notAfter), now) < 0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800723
724
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800725 def _get_boundary_time(self, which):
726 timestamp = which(self._x509)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800727 string_timestamp = _api.cast('ASN1_STRING*', timestamp)
728 if _api.ASN1_STRING_length(string_timestamp) == 0:
729 return None
730 elif _api.ASN1_STRING_type(string_timestamp) == _api.V_ASN1_GENERALIZEDTIME:
731 return _api.string(_api.ASN1_STRING_data(string_timestamp))
732 else:
733 generalized_timestamp = _api.new("ASN1_GENERALIZEDTIME**")
734 _api.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
735 if generalized_timestamp[0] == _api.NULL:
736 1/0
737 else:
Jean-Paul Calderone19247972013-02-20 08:22:15 -0800738 string_timestamp = _api.cast(
739 "ASN1_STRING*", generalized_timestamp[0])
740 string_data = _api.ASN1_STRING_data(string_timestamp)
741 string_result = _api.string(string_data)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800742 _api.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
Jean-Paul Calderone19247972013-02-20 08:22:15 -0800743 return string_result
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800744
745
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800746 def get_notBefore(self):
747 """
748 Retrieve the time stamp for when the certificate starts being valid
749
750 :return: A string giving the timestamp, in the format::
751
752 YYYYMMDDhhmmssZ
753 YYYYMMDDhhmmss+hhmm
754 YYYYMMDDhhmmss-hhmm
755
756 or None if there is no value set.
757 """
758 return self._get_boundary_time(_api.X509_get_notBefore)
759
760
761 def _set_boundary_time(self, which, when):
762 if not isinstance(when, bytes):
763 raise TypeError("when must be a byte string")
764
765 boundary = which(self._x509)
766 set_result = _api.ASN1_GENERALIZEDTIME_set_string(
767 _api.cast('ASN1_GENERALIZEDTIME*', boundary), when)
768 if set_result == 0:
769 dummy = _api.ASN1_STRING_new()
770 _api.ASN1_STRING_set(dummy, when, len(when))
771 check_result = _api.ASN1_GENERALIZEDTIME_check(
772 _api.cast('ASN1_GENERALIZEDTIME*', dummy))
773 if not check_result:
774 raise ValueError("Invalid string")
775 else:
776 # TODO No tests for this case
777 raise RuntimeError("Unknown ASN1_GENERALIZEDTIME_set_string failure")
778
779
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800780 def set_notBefore(self, when):
781 """
782 Set the time stamp for when the certificate starts being valid
783
784 :param when: A string giving the timestamp, in the format:
785
786 YYYYMMDDhhmmssZ
787 YYYYMMDDhhmmss+hhmm
788 YYYYMMDDhhmmss-hhmm
789 :type when: :py:class:`bytes`
790
791 :return: None
792 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800793 return self._set_boundary_time(_api.X509_get_notBefore, when)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800794
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800795
796 def get_notAfter(self):
797 """
798 Retrieve the time stamp for when the certificate stops being valid
799
800 :return: A string giving the timestamp, in the format::
801
802 YYYYMMDDhhmmssZ
803 YYYYMMDDhhmmss+hhmm
804 YYYYMMDDhhmmss-hhmm
805
806 or None if there is no value set.
807 """
808 return self._get_boundary_time(_api.X509_get_notAfter)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800809
810
811 def set_notAfter(self, when):
812 """
813 Set the time stamp for when the certificate stops being valid
814
815 :param when: A string giving the timestamp, in the format:
816
817 YYYYMMDDhhmmssZ
818 YYYYMMDDhhmmss+hhmm
819 YYYYMMDDhhmmss-hhmm
820 :type when: :py:class:`bytes`
821
822 :return: None
823 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800824 return self._set_boundary_time(_api.X509_get_notAfter, when)
825
826
827 def _get_name(self, which):
828 name = X509Name.__new__(X509Name)
829 name._name = which(self._x509)
830 if name._name == _api.NULL:
831 1/0
832 return name
833
834
835 def _set_name(self, which, name):
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800836 if not isinstance(name, X509Name):
837 raise TypeError("name must be an X509Name")
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800838 set_result = which(self._x509, name._name)
839 if not set_result:
840 1/0
841
842
843 def get_issuer(self):
844 """
845 Create an X509Name object for the issuer of the certificate
846
847 :return: An X509Name object
848 """
849 return self._get_name(_api.X509_get_issuer_name)
850
851
852 def set_issuer(self, issuer):
853 """
854 Set the issuer of the certificate
855
856 :param issuer: The issuer name
857 :type issuer: :py:class:`X509Name`
858
859 :return: None
860 """
861 return self._set_name(_api.X509_set_issuer_name, issuer)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800862
863
864 def get_subject(self):
865 """
866 Create an X509Name object for the subject of the certificate
867
868 :return: An X509Name object
869 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800870 return self._get_name(_api.X509_get_subject_name)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800871
872
873 def set_subject(self, subject):
874 """
875 Set the subject of the certificate
876
877 :param subject: The subject name
878 :type subject: :py:class:`X509Name`
879 :return: None
880 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800881 return self._set_name(_api.X509_set_subject_name, subject)
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800882
883
884 def get_extension_count(self):
885 """
886 Get the number of extensions on the certificate.
887
888 :return: The number of extensions as an integer.
889 """
890 return _api.X509_get_ext_count(self._x509)
891
892
893 def add_extensions(self, extensions):
894 """
895 Add extensions to the certificate.
896
897 :param extensions: a sequence of X509Extension objects
898 :return: None
899 """
900 for ext in extensions:
901 if not isinstance(ext, X509Extension):
902 raise ValueError("One of the elements is not an X509Extension")
903
904 add_result = _api.X509_add_ext(self._x509, ext._extension, -1)
905 if not add_result:
906 _raise_current_error()
907
908
909 def get_extension(self, index):
910 """
911 Get a specific extension of the certificate by index.
912
913 :param index: The index of the extension to retrieve.
914 :return: The X509Extension object at the specified index.
915 """
916 ext = X509Extension.__new__(X509Extension)
917 ext._extension = _api.X509_get_ext(self._x509, index)
918 if ext._extension == _api.NULL:
919 raise IndexError("extension index out of bounds")
920
921 ext._extension = _api.X509_EXTENSION_dup(ext._extension)
922 return ext
923
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800924X509Type = X509
925
926
927
928def load_certificate(type, buffer):
929 """
930 Load a certificate from a buffer
931
932 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
933
934 :param buffer: The buffer the certificate is stored in
935 :type buffer: :py:class:`bytes`
936
937 :return: The X509 object
938 """
939 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800940 if bio == _api.NULL:
941 1/0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800942
943 try:
944 if type == FILETYPE_PEM:
945 x509 = _api.PEM_read_bio_X509(bio, _api.NULL, _api.NULL, _api.NULL)
946 elif type == FILETYPE_ASN1:
947 x509 = _api.d2i_X509_bio(bio, _api.NULL);
948 else:
949 raise ValueError(
950 "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
951 finally:
952 _api.BIO_free(bio)
953
954 if x509 == _api.NULL:
955 _raise_current_error()
956
957 cert = X509.__new__(X509)
958 cert._x509 = x509
959 return cert
960
961
962def dump_certificate(type, cert):
963 """
964 Dump a certificate to a buffer
965
966 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
967 :param cert: The certificate to dump
968 :return: The buffer with the dumped certificate in
969 """
970 bio = _api.BIO_new(_api.BIO_s_mem())
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800971 if bio == _api.NULL:
972 1/0
973
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800974 if type == FILETYPE_PEM:
975 result_code = _api.PEM_write_bio_X509(bio, cert._x509)
976 elif type == FILETYPE_ASN1:
977 result_code = _api.i2d_X509_bio(bio, cert._x509)
978 elif type == FILETYPE_TEXT:
979 result_code = _api.X509_print_ex(bio, cert._x509, 0, 0)
980 else:
981 raise ValueError(
982 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
983 "FILETYPE_TEXT")
984
985 return _bio_to_string(bio)
986
987
988
989def dump_privatekey(type, pkey, cipher=None, passphrase=None):
990 """
991 Dump a private key to a buffer
992
993 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
994 :param pkey: The PKey to dump
995 :param cipher: (optional) if encrypted PEM format, the cipher to
996 use
997 :param passphrase: (optional) if encrypted PEM format, this can be either
998 the passphrase to use, or a callback for providing the
999 passphrase.
1000 :return: The buffer with the dumped key in
1001 :rtype: :py:data:`str`
1002 """
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001003 bio = _api.BIO_new(_api.BIO_s_mem())
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001004 if bio == _api.NULL:
1005 1/0
1006
1007 if cipher is not None:
1008 cipher_obj = _api.EVP_get_cipherbyname(cipher)
1009 if cipher_obj == _api.NULL:
1010 raise ValueError("Invalid cipher name")
1011 else:
1012 cipher_obj = _api.NULL
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001013
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001014 helper = _PassphraseHelper(type, passphrase)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001015 if type == FILETYPE_PEM:
1016 result_code = _api.PEM_write_bio_PrivateKey(
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001017 bio, pkey._pkey, cipher_obj, _api.NULL, 0,
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001018 helper.callback, helper.callback_args)
1019 helper.raise_if_problem()
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001020 elif type == FILETYPE_ASN1:
1021 result_code = _api.i2d_PrivateKey_bio(bio, pkey._pkey)
1022 elif type == FILETYPE_TEXT:
1023 rsa = _api.EVP_PKEY_get1_RSA(pkey._pkey)
1024 result_code = _api.RSA_print(bio, rsa, 0)
1025 # TODO RSA_free(rsa)?
1026 else:
1027 raise ValueError(
1028 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1029 "FILETYPE_TEXT")
1030
1031 if result_code == 0:
1032 _raise_current_error()
1033
1034 return _bio_to_string(bio)
1035
1036
1037
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001038class _PassphraseHelper(object):
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001039 def __init__(self, type, passphrase):
1040 if type != FILETYPE_PEM and passphrase is not None:
1041 raise ValueError("only FILETYPE_PEM key format supports encryption")
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001042 self._passphrase = passphrase
1043 self._problems = []
1044
1045
1046 @property
1047 def callback(self):
1048 if self._passphrase is None:
1049 return _api.NULL
1050 elif isinstance(self._passphrase, bytes):
1051 return _api.NULL
1052 elif callable(self._passphrase):
1053 return _api.callback("pem_password_cb", self._read_passphrase)
1054 else:
1055 raise TypeError("Last argument must be string or callable")
1056
1057
1058 @property
1059 def callback_args(self):
1060 if self._passphrase is None:
1061 return _api.NULL
1062 elif isinstance(self._passphrase, bytes):
1063 return self._passphrase
1064 elif callable(self._passphrase):
1065 return _api.NULL
1066 else:
1067 raise TypeError("Last argument must be string or callable")
1068
1069
1070 def raise_if_problem(self):
1071 if self._problems:
1072 try:
1073 _raise_current_error()
1074 except Error:
1075 pass
1076 raise self._problems[0]
1077
1078
1079 def _read_passphrase(self, buf, size, rwflag, userdata):
1080 try:
1081 result = self._passphrase(rwflag)
1082 if not isinstance(result, bytes):
1083 raise ValueError("String expected")
1084 if len(result) > size:
1085 raise ValueError("passphrase returned by callback is too long")
1086 for i in range(len(result)):
1087 buf[i] = result[i]
1088 return len(result)
1089 except Exception as e:
1090 self._problems.append(e)
1091 return 0
1092
1093
1094
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001095def load_privatekey(type, buffer, passphrase=None):
1096 """
1097 Load a private key from a buffer
1098
1099 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1100 :param buffer: The buffer the key is stored in
1101 :param passphrase: (optional) if encrypted PEM format, this can be
1102 either the passphrase to use, or a callback for
1103 providing the passphrase.
1104
1105 :return: The PKey object
1106 """
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001107 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001108 if bio == _api.NULL:
1109 1/0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001110
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001111 helper = _PassphraseHelper(type, passphrase)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001112 if type == FILETYPE_PEM:
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001113 evp_pkey = _api.PEM_read_bio_PrivateKey(
1114 bio, _api.NULL, helper.callback, helper.callback_args)
1115 helper.raise_if_problem()
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001116 elif type == FILETYPE_ASN1:
1117 evp_pkey = _api.d2i_PrivateKey_bio(bio, _api.NULL)
1118 else:
1119 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1120
Jean-Paul Calderone31393aa2013-02-20 13:22:21 -08001121 if evp_pkey == _api.NULL:
1122 _raise_current_error()
1123
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001124 pkey = PKey.__new__(PKey)
1125 pkey._pkey = evp_pkey
1126 return pkey
1127
1128
1129
1130def dump_certificate_request(type, req):
1131 """
1132 Dump a certificate request to a buffer
1133
1134 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1135 :param req: The certificate request to dump
1136 :return: The buffer with the dumped certificate request in
1137 """
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001138 bio = _api.BIO_new(_api.BIO_s_mem())
1139 if bio == _api.NULL:
1140 1/0
1141
1142 if type == FILETYPE_PEM:
1143 result_code = _api.PEM_write_bio_X509_REQ(bio, req._req)
1144 elif type == FILETYPE_ASN1:
1145 pass
1146 elif type == FILETYPE_TEXT:
1147 pass
1148 else:
1149 1/0
1150
1151 if result_code == 0:
1152 1/0
1153
1154 return _bio_to_string(bio)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001155
1156
1157
1158def load_certificate_request(type, buffer):
1159 """
1160 Load a certificate request from a buffer
1161
1162 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1163 :param buffer: The buffer the certificate request is stored in
1164 :return: The X509Req object
1165 """
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001166 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
1167 if bio == _api.NULL:
1168 1/0
1169
1170 if type == FILETYPE_PEM:
1171 req = _api.PEM_read_bio_X509_REQ(bio, _api.NULL, _api.NULL, _api.NULL)
1172 elif type == FILETYPE_ASN1:
1173 pass
1174 else:
1175 1/0
1176
1177 if req == _api.NULL:
1178 1/0
1179
1180 x509req = X509Req.__new__(X509Req)
1181 x509req._req = req
1182 return x509req