blob: f4df8b5ce050d9b0f62bf7a4f25ba3503335245c [file] [log] [blame]
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001from time import time
2
Jean-Paul Calderonea46fdd12013-02-21 10:51:09 -08003from OpenSSL.xcrypto import PKCS7Type, load_pkcs7_data
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08004
5from tls.c import api as _api
6
7FILETYPE_PEM = _api.SSL_FILETYPE_PEM
8FILETYPE_ASN1 = _api.SSL_FILETYPE_ASN1
9
10# TODO This was an API mistake. OpenSSL has no such constant.
11FILETYPE_TEXT = 2 ** 16 - 1
12
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080013TYPE_RSA = _api.EVP_PKEY_RSA
14TYPE_DSA = _api.EVP_PKEY_DSA
15
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080016
17def _bio_to_string(bio):
18 """
19 Copy the contents of an OpenSSL BIO object into a Python byte string.
20 """
21 result_buffer = _api.new('char**')
22 buffer_length = _api.BIO_get_mem_data(bio, result_buffer)
23 return _api.buffer(result_buffer[0], buffer_length)[:]
24
25
26
Jean-Paul Calderone57122982013-02-21 08:47:05 -080027def _set_asn1_time(boundary, when):
28 if not isinstance(when, bytes):
29 raise TypeError("when must be a byte string")
30
31 set_result = _api.ASN1_GENERALIZEDTIME_set_string(
32 _api.cast('ASN1_GENERALIZEDTIME*', boundary), when)
33 if set_result == 0:
34 dummy = _api.ASN1_STRING_new()
35 _api.ASN1_STRING_set(dummy, when, len(when))
36 check_result = _api.ASN1_GENERALIZEDTIME_check(
37 _api.cast('ASN1_GENERALIZEDTIME*', dummy))
38 if not check_result:
39 raise ValueError("Invalid string")
40 else:
41 # TODO No tests for this case
42 raise RuntimeError("Unknown ASN1_GENERALIZEDTIME_set_string failure")
43
44
45
46def _get_asn1_time(timestamp):
47 string_timestamp = _api.cast('ASN1_STRING*', timestamp)
48 if _api.ASN1_STRING_length(string_timestamp) == 0:
49 return None
50 elif _api.ASN1_STRING_type(string_timestamp) == _api.V_ASN1_GENERALIZEDTIME:
51 return _api.string(_api.ASN1_STRING_data(string_timestamp))
52 else:
53 generalized_timestamp = _api.new("ASN1_GENERALIZEDTIME**")
54 _api.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
55 if generalized_timestamp[0] == _api.NULL:
56 1/0
57 else:
58 string_timestamp = _api.cast(
59 "ASN1_STRING*", generalized_timestamp[0])
60 string_data = _api.ASN1_STRING_data(string_timestamp)
61 string_result = _api.string(string_data)
62 _api.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
63 return string_result
64
65
66
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080067def _raise_current_error():
68 errors = []
69 while True:
70 error = _api.ERR_get_error()
71 if error == 0:
72 break
73 errors.append((
74 _api.string(_api.ERR_lib_error_string(error)),
75 _api.string(_api.ERR_func_error_string(error)),
76 _api.string(_api.ERR_reason_error_string(error))))
77
78 raise Error(errors)
79
80_exception_from_error_queue = _raise_current_error
81
82
83class Error(Exception):
84 pass
85
86
87
88class PKey(object):
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080089 _only_public = False
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080090 _initialized = True
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080091
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080092 def __init__(self):
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080093 self._pkey = _api.EVP_PKEY_new()
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080094 self._initialized = False
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080095
96
97 def generate_key(self, type, bits):
98 """
99 Generate a key of a given type, with a given number of a bits
100
101 :param type: The key type (TYPE_RSA or TYPE_DSA)
102 :param bits: The number of bits
103
104 :return: None
105 """
106 if not isinstance(type, int):
107 raise TypeError("type must be an integer")
108
109 if not isinstance(bits, int):
110 raise TypeError("bits must be an integer")
111
112 exponent = _api.new("BIGNUM**")
113 # TODO Check error return
114 # TODO Free the exponent[0]
115 _api.BN_hex2bn(exponent, "10001")
116
117 if type == TYPE_RSA:
118 if bits <= 0:
119 raise ValueError("Invalid number of bits")
120
121 rsa = _api.RSA_new();
122
123 # TODO Release GIL?
124 result = _api.RSA_generate_key_ex(rsa, bits, exponent[0], _api.NULL)
125 if result == -1:
126 1/0
127
128 result = _api.EVP_PKEY_assign_RSA(self._pkey, rsa)
129 if not result:
130 1/0
131
132 elif type == TYPE_DSA:
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -0800133 dsa = _api.DSA_generate_parameters(
134 bits, _api.NULL, 0, _api.NULL, _api.NULL, _api.NULL, _api.NULL)
135 if dsa == _api.NULL:
136 1/0
137 if not _api.DSA_generate_key(dsa):
138 1/0
139 if not _api.EVP_PKEY_assign_DSA(self._pkey, dsa):
140 1/0
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800141 else:
142 raise Error("No such key type")
143
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800144 self._initialized = True
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800145
146
147 def check(self):
148 """
149 Check the consistency of an RSA private key.
150
151 :return: True if key is consistent.
152 :raise Error: if the key is inconsistent.
153 :raise TypeError: if the key is of a type which cannot be checked.
154 Only RSA keys can currently be checked.
155 """
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -0800156 if self._only_public:
157 raise TypeError("public key only")
158
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800159 if _api.EVP_PKEY_type(self._pkey.type) != _api.EVP_PKEY_RSA:
160 raise TypeError("key type unsupported")
161
162 rsa = _api.EVP_PKEY_get1_RSA(self._pkey)
163 result = _api.RSA_check_key(rsa)
164 if result:
165 return True
166 _raise_current_error()
167
168
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -0800169 def type(self):
170 """
171 Returns the type of the key
172
173 :return: The type of the key.
174 """
175 return self._pkey.type
176
177
178 def bits(self):
179 """
180 Returns the number of bits of the key
181
182 :return: The number of bits of the key.
183 """
184 return _api.EVP_PKEY_bits(self._pkey)
185PKeyType = PKey
186
187
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800188
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800189class X509Name(object):
190 def __init__(self, name):
191 """
192 Create a new X509Name, copying the given X509Name instance.
193
194 :param name: An X509Name object to copy
195 """
196 self._name = _api.X509_NAME_dup(name._name)
197
198
199 def __setattr__(self, name, value):
200 if name.startswith('_'):
201 return super(X509Name, self).__setattr__(name, value)
202
203 if type(name) is not str:
204 raise TypeError("attribute name must be string, not '%.200s'" % (
205 type(value).__name__,))
206
207 nid = _api.OBJ_txt2nid(name)
208 if nid == _api.NID_undef:
209 try:
210 _raise_current_error()
211 except Error:
212 pass
213 raise AttributeError("No such attribute")
214
215 # If there's an old entry for this NID, remove it
216 for i in range(_api.X509_NAME_entry_count(self._name)):
217 ent = _api.X509_NAME_get_entry(self._name, i)
218 ent_obj = _api.X509_NAME_ENTRY_get_object(ent)
219 ent_nid = _api.OBJ_obj2nid(ent_obj)
220 if nid == ent_nid:
221 ent = _api.X509_NAME_delete_entry(self._name, i)
222 _api.X509_NAME_ENTRY_free(ent)
223 break
224
225 if isinstance(value, unicode):
226 value = value.encode('utf-8')
227
228 add_result = _api.X509_NAME_add_entry_by_NID(
229 self._name, nid, _api.MBSTRING_UTF8, value, -1, -1, 0)
230 if not add_result:
231 # TODO Untested
232 1/0
233
234
235 def __getattr__(self, name):
236 """
237 Find attribute. An X509Name object has the following attributes:
238 countryName (alias C), stateOrProvince (alias ST), locality (alias L),
239 organization (alias O), organizationalUnit (alias OU), commonName (alias
240 CN) and more...
241 """
242 nid = _api.OBJ_txt2nid(name)
243 if nid == _api.NID_undef:
244 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems
245 # a lower level function, a2d_ASN1_OBJECT, also feels the need to
246 # push something onto the error queue. If we don't clean that up
247 # now, someone else will bump into it later and be quite confused.
248 # See lp#314814.
249 try:
250 _raise_current_error()
251 except Error:
252 pass
253 return super(X509Name, self).__getattr__(name)
254
255 entry_index = _api.X509_NAME_get_index_by_NID(self._name, nid, -1)
256 if entry_index == -1:
257 return None
258
259 entry = _api.X509_NAME_get_entry(self._name, entry_index)
260 data = _api.X509_NAME_ENTRY_get_data(entry)
261
262 result_buffer = _api.new("unsigned char**")
263 data_length = _api.ASN1_STRING_to_UTF8(result_buffer, data)
264 if data_length < 0:
265 1/0
266
267 result = _api.buffer(result_buffer[0], data_length)[:].decode('utf-8')
268 _api.OPENSSL_free(result_buffer[0])
269 return result
270
271
272 def __cmp__(self, other):
273 if not isinstance(other, X509Name):
274 return NotImplemented
275
276 result = _api.X509_NAME_cmp(self._name, other._name)
277 # TODO result == -2 is an error case that maybe should be checked for
278 return result
279
280
281 def __repr__(self):
282 """
283 String representation of an X509Name
284 """
285 result_buffer = _api.new("char[]", 512);
286 format_result = _api.X509_NAME_oneline(
287 self._name, result_buffer, len(result_buffer))
288
289 if format_result == _api.NULL:
290 1/0
291
292 return "<X509Name object '%s'>" % (_api.string(result_buffer),)
293
294
295 def hash(self):
296 """
297 Return the hash value of this name
298
299 :return: None
300 """
301 return _api.X509_NAME_hash(self._name)
302
303
304 def der(self):
305 """
306 Return the DER encoding of this name
307
308 :return: A :py:class:`bytes` instance giving the DER encoded form of
309 this name.
310 """
311 result_buffer = _api.new('unsigned char**')
312 encode_result = _api.i2d_X509_NAME(self._name, result_buffer)
313 if encode_result < 0:
314 1/0
315
316 string_result = _api.buffer(result_buffer[0], encode_result)[:]
317 _api.OPENSSL_free(result_buffer[0])
318 return string_result
319
320
321 def get_components(self):
322 """
323 Returns the split-up components of this name.
324
325 :return: List of tuples (name, value).
326 """
327 result = []
328 for i in range(_api.X509_NAME_entry_count(self._name)):
329 ent = _api.X509_NAME_get_entry(self._name, i)
330
331 fname = _api.X509_NAME_ENTRY_get_object(ent)
332 fval = _api.X509_NAME_ENTRY_get_data(ent)
333
334 nid = _api.OBJ_obj2nid(fname)
335 name = _api.OBJ_nid2sn(nid)
336
337 result.append((
338 _api.string(name),
339 _api.string(
340 _api.ASN1_STRING_data(fval),
341 _api.ASN1_STRING_length(fval))))
342
343 return result
344X509NameType = X509Name
345
346
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800347class X509Extension(object):
348 def __init__(self, type_name, critical, value, subject=None, issuer=None):
349 """
350 :param typename: The name of the extension to create.
351 :type typename: :py:data:`str`
352
353 :param critical: A flag indicating whether this is a critical extension.
354
355 :param value: The value of the extension.
356 :type value: :py:data:`str`
357
358 :param subject: Optional X509 cert to use as subject.
359 :type subject: :py:class:`X509`
360
361 :param issuer: Optional X509 cert to use as issuer.
362 :type issuer: :py:class:`X509`
363
364 :return: The X509Extension object
365 """
366 ctx = _api.new("X509V3_CTX*")
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800367
368 # A context is necessary for any extension which uses the r2i conversion
369 # method. That is, X509V3_EXT_nconf may segfault if passed a NULL ctx.
370 # Start off by initializing most of the fields to NULL.
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800371 _api.X509V3_set_ctx(ctx, _api.NULL, _api.NULL, _api.NULL, _api.NULL, 0)
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800372
373 # We have no configuration database - but perhaps we should (some
374 # extensions may require it).
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800375 _api.X509V3_set_ctx_nodb(ctx)
376
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800377 # Initialize the subject and issuer, if appropriate. ctx is a local,
378 # and as far as I can tell none of the X509V3_* APIs invoked here steal
379 # any references, so no need to mess with reference counts or duplicates.
380 if issuer is not None:
381 if not isinstance(issuer, X509):
382 raise TypeError("issuer must be an X509 instance")
383 ctx.issuer_cert = issuer._x509
384 if subject is not None:
385 if not isinstance(subject, X509):
386 raise TypeError("subject must be an X509 instance")
387 ctx.subject_cert = subject._x509
388
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800389 if critical:
390 # There are other OpenSSL APIs which would let us pass in critical
391 # separately, but they're harder to use, and since value is already
392 # a pile of crappy junk smuggling a ton of utterly important
393 # structured data, what's the point of trying to avoid nasty stuff
394 # with strings? (However, X509V3_EXT_i2d in particular seems like it
395 # would be a better API to invoke. I do not know where to get the
396 # ext_struc it desires for its last parameter, though.)
397 value = "critical," + value
398
399 self._extension = _api.X509V3_EXT_nconf(
400 _api.NULL, ctx, type_name, value)
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800401 if self._extension == _api.NULL:
402 _raise_current_error()
403
404
405 def __str__(self):
406 """
407 :return: a nice text representation of the extension
408 """
409 bio = _api.BIO_new(_api.BIO_s_mem())
410 if bio == _api.NULL:
411 1/0
412
413 print_result = _api.X509V3_EXT_print(bio, self._extension, 0, 0)
414 if not print_result:
415 1/0
416
417 return _bio_to_string(bio)
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800418
419
420 def get_critical(self):
421 """
422 Returns the critical field of the X509Extension
423
424 :return: The critical field.
425 """
426 return _api.X509_EXTENSION_get_critical(self._extension)
427
428
429 def get_short_name(self):
430 """
431 Returns the short version of the type name of the X509Extension
432
433 :return: The short type name.
434 """
435 obj = _api.X509_EXTENSION_get_object(self._extension)
436 nid = _api.OBJ_obj2nid(obj)
437 return _api.string(_api.OBJ_nid2sn(nid))
438
439
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800440 def get_data(self):
441 """
442 Returns the data of the X509Extension
443
444 :return: A :py:data:`str` giving the X509Extension's ASN.1 encoded data.
445 """
446 octet_result = _api.X509_EXTENSION_get_data(self._extension)
447 string_result = _api.cast('ASN1_STRING*', octet_result)
448 char_result = _api.ASN1_STRING_data(string_result)
449 result_length = _api.ASN1_STRING_length(string_result)
450 return _api.buffer(char_result, result_length)[:]
451
452X509ExtensionType = X509Extension
453
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800454
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800455class X509Req(object):
Jean-Paul Calderone4328d472013-02-20 14:28:46 -0800456 def __init__(self):
457 self._req = _api.X509_REQ_new()
458
459
460 def set_pubkey(self, pkey):
461 """
462 Set the public key of the certificate request
463
464 :param pkey: The public key to use
465 :return: None
466 """
467 set_result = _api.X509_REQ_set_pubkey(self._req, pkey._pkey)
468 if not set_result:
469 1/0
470
471
472 def get_pubkey(self):
473 """
474 Get the public key from the certificate request
475
476 :return: The public key
477 """
478 pkey = PKey.__new__(PKey)
479 pkey._pkey = _api.X509_REQ_get_pubkey(self._req)
480 if pkey._pkey == _api.NULL:
481 1/0
482 pkey._only_public = True
483 return pkey
484
485
486 def set_version(self, version):
487 """
488 Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate
489 request.
490
491 :param version: The version number
492 :return: None
493 """
494 set_result = _api.X509_REQ_set_version(self._req, version)
495 if not set_result:
496 _raise_current_error()
497
498
499 def get_version(self):
500 """
501 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
502 request.
503
504 :return: an integer giving the value of the version subfield
505 """
506 return _api.X509_REQ_get_version(self._req)
507
508
509 def get_subject(self):
510 """
511 Create an X509Name object for the subject of the certificate request
512
513 :return: An X509Name object
514 """
515 name = X509Name.__new__(X509Name)
516 name._name = _api.X509_REQ_get_subject_name(self._req)
517 if name._name == _api.NULL:
518 1/0
519 return name
520
521
522 def add_extensions(self, extensions):
523 """
524 Add extensions to the request.
525
526 :param extensions: a sequence of X509Extension objects
527 :return: None
528 """
529 stack = _api.sk_X509_EXTENSION_new_null()
530 if stack == _api.NULL:
531 1/0
532
533 for ext in extensions:
534 if not isinstance(ext, X509Extension):
Jean-Paul Calderonec2154b72013-02-20 14:29:37 -0800535 raise ValueError("One of the elements is not an X509Extension")
Jean-Paul Calderone4328d472013-02-20 14:28:46 -0800536
537 _api.sk_X509_EXTENSION_push(stack, ext._extension)
538
539 add_result = _api.X509_REQ_add_extensions(self._req, stack)
540 if not add_result:
541 1/0
542
543
544 def sign(self, pkey, digest):
545 """
546 Sign the certificate request using the supplied key and digest
547
548 :param pkey: The key to sign with
549 :param digest: The message digest to use
550 :return: None
551 """
552 if pkey._only_public:
553 raise ValueError("Key has only public part")
554
555 if not pkey._initialized:
556 raise ValueError("Key is uninitialized")
557
558 digest_obj = _api.EVP_get_digestbyname(digest)
559 if digest_obj == _api.NULL:
560 raise ValueError("No such digest method")
561
562 sign_result = _api.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
563 if not sign_result:
564 1/0
565
566
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800567X509ReqType = X509Req
568
569
570
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800571class X509(object):
572 def __init__(self):
573 # TODO Allocation failure? And why not __new__ instead of __init__?
574 self._x509 = _api.X509_new()
575
576
577 def set_version(self, version):
578 """
579 Set version number of the certificate
580
581 :param version: The version number
582 :type version: :py:class:`int`
583
584 :return: None
585 """
586 if not isinstance(version, int):
587 raise TypeError("version must be an integer")
588
589 _api.X509_set_version(self._x509, version)
590
591
592 def get_version(self):
593 """
594 Return version number of the certificate
595
596 :return: Version number as a Python integer
597 """
598 return _api.X509_get_version(self._x509)
599
600
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800601 def get_pubkey(self):
602 """
603 Get the public key of the certificate
604
605 :return: The public key
606 """
607 pkey = PKey.__new__(PKey)
608 pkey._pkey = _api.X509_get_pubkey(self._x509)
609 if pkey._pkey == _api.NULL:
610 _raise_current_error()
611 pkey._only_public = True
612 return pkey
613
614
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800615 def set_pubkey(self, pkey):
616 """
617 Set the public key of the certificate
618
619 :param pkey: The public key
620
621 :return: None
622 """
623 if not isinstance(pkey, PKey):
624 raise TypeError("pkey must be a PKey instance")
625
626 set_result = _api.X509_set_pubkey(self._x509, pkey._pkey)
627 if not set_result:
628 _raise_current_error()
629
630
631 def sign(self, pkey, digest):
632 """
633 Sign the certificate using the supplied key and digest
634
635 :param pkey: The key to sign with
636 :param digest: The message digest to use
637 :return: None
638 """
639 if not isinstance(pkey, PKey):
640 raise TypeError("pkey must be a PKey instance")
641
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800642 if pkey._only_public:
643 raise ValueError("Key only has public part")
644
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800645 if not pkey._initialized:
646 raise ValueError("Key is uninitialized")
647
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800648 evp_md = _api.EVP_get_digestbyname(digest)
649 if evp_md == _api.NULL:
650 raise ValueError("No such digest method")
651
652 sign_result = _api.X509_sign(self._x509, pkey._pkey, evp_md)
653 if not sign_result:
654 _raise_current_error()
655
656
Jean-Paul Calderonee4aa3fa2013-02-19 12:12:53 -0800657 def get_signature_algorithm(self):
658 """
659 Retrieve the signature algorithm used in the certificate
660
661 :return: A byte string giving the name of the signature algorithm used in
662 the certificate.
663 :raise ValueError: If the signature algorithm is undefined.
664 """
665 alg = self._x509.cert_info.signature.algorithm
666 nid = _api.OBJ_obj2nid(alg)
667 if nid == _api.NID_undef:
668 raise ValueError("Undefined signature algorithm")
669 return _api.string(_api.OBJ_nid2ln(nid))
670
671
Jean-Paul Calderoneb4078722013-02-19 12:01:55 -0800672 def digest(self, digest_name):
673 """
674 Return the digest of the X509 object.
675
676 :param digest_name: The name of the digest algorithm to use.
677 :type digest_name: :py:class:`bytes`
678
679 :return: The digest of the object
680 """
681 digest = _api.EVP_get_digestbyname(digest_name)
682 if digest == _api.NULL:
683 raise ValueError("No such digest method")
684
685 result_buffer = _api.new("char[]", _api.EVP_MAX_MD_SIZE)
686 result_length = _api.new("unsigned int[]", 1)
687 result_length[0] = len(result_buffer)
688
689 digest_result = _api.X509_digest(
690 self._x509, digest, result_buffer, result_length)
691
692 if not digest_result:
693 1/0
694
695 return ':'.join([
696 ch.encode('hex').upper() for ch
697 in _api.buffer(result_buffer, result_length[0])])
698
699
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800700 def subject_name_hash(self):
701 """
702 Return the hash of the X509 subject.
703
704 :return: The hash of the subject.
705 """
706 return _api.X509_subject_name_hash(self._x509)
707
708
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800709 def set_serial_number(self, serial):
710 """
711 Set serial number of the certificate
712
713 :param serial: The serial number
714 :type serial: :py:class:`int`
715
716 :return: None
717 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800718 if not isinstance(serial, (int, long)):
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800719 raise TypeError("serial must be an integer")
720
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800721 hex_serial = hex(serial)[2:]
722 if not isinstance(hex_serial, bytes):
723 hex_serial = hex_serial.encode('ascii')
724
725 bignum_serial = _api.new("BIGNUM**")
726
727 # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
728 # it. If bignum is still NULL after this call, then the return value is
729 # actually the result. I hope. -exarkun
730 small_serial = _api.BN_hex2bn(bignum_serial, hex_serial)
731
732 if bignum_serial[0] == _api.NULL:
733 set_result = ASN1_INTEGER_set(
734 _api.X509_get_serialNumber(self._x509), small_serial)
735 if set_result:
736 # TODO Not tested
737 _raise_current_error()
738 else:
739 asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
740 _api.BN_free(bignum_serial[0])
741 if asn1_serial == _api.NULL:
742 # TODO Not tested
743 _raise_current_error()
744 set_result = _api.X509_set_serialNumber(self._x509, asn1_serial)
745 if not set_result:
746 # TODO Not tested
747 _raise_current_error()
748
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800749
750 def get_serial_number(self):
751 """
752 Return serial number of the certificate
753
754 :return: Serial number as a Python integer
755 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800756 asn1_serial = _api.X509_get_serialNumber(self._x509)
757 bignum_serial = _api.ASN1_INTEGER_to_BN(asn1_serial, _api.NULL)
758 try:
759 hex_serial = _api.BN_bn2hex(bignum_serial)
760 try:
761 hexstring_serial = _api.string(hex_serial)
762 serial = int(hexstring_serial, 16)
763 return serial
764 finally:
765 _api.OPENSSL_free(hex_serial)
766 finally:
767 _api.BN_free(bignum_serial)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800768
769
770 def gmtime_adj_notAfter(self, amount):
771 """
772 Adjust the time stamp for when the certificate stops being valid
773
774 :param amount: The number of seconds by which to adjust the ending
775 validity time.
776 :type amount: :py:class:`int`
777
778 :return: None
779 """
780 if not isinstance(amount, int):
781 raise TypeError("amount must be an integer")
782
783 notAfter = _api.X509_get_notAfter(self._x509)
784 _api.X509_gmtime_adj(notAfter, amount)
785
786
Jean-Paul Calderone662afe52013-02-20 08:41:11 -0800787 def gmtime_adj_notBefore(self, amount):
788 """
789 Change the timestamp for when the certificate starts being valid to the current
790 time plus an offset.
791
792 :param amount: The number of seconds by which to adjust the starting validity
793 time.
794 :return: None
795 """
796 if not isinstance(amount, int):
797 raise TypeError("amount must be an integer")
798
799 notBefore = _api.X509_get_notBefore(self._x509)
800 _api.X509_gmtime_adj(notBefore, amount)
801
802
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800803 def has_expired(self):
804 """
805 Check whether the certificate has expired.
806
807 :return: True if the certificate has expired, false otherwise
808 """
809 now = int(time())
810 notAfter = _api.X509_get_notAfter(self._x509)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800811 return _api.ASN1_UTCTIME_cmp_time_t(
812 _api.cast('ASN1_UTCTIME*', notAfter), now) < 0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800813
814
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800815 def _get_boundary_time(self, which):
Jean-Paul Calderone57122982013-02-21 08:47:05 -0800816 return _get_asn1_time(which(self._x509))
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800817
818
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800819 def get_notBefore(self):
820 """
821 Retrieve the time stamp for when the certificate starts being valid
822
823 :return: A string giving the timestamp, in the format::
824
825 YYYYMMDDhhmmssZ
826 YYYYMMDDhhmmss+hhmm
827 YYYYMMDDhhmmss-hhmm
828
829 or None if there is no value set.
830 """
831 return self._get_boundary_time(_api.X509_get_notBefore)
832
833
834 def _set_boundary_time(self, which, when):
Jean-Paul Calderone57122982013-02-21 08:47:05 -0800835 return _set_asn1_time(which(self._x509), when)
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800836
837
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800838 def set_notBefore(self, when):
839 """
840 Set the time stamp for when the certificate starts being valid
841
842 :param when: A string giving the timestamp, in the format:
843
844 YYYYMMDDhhmmssZ
845 YYYYMMDDhhmmss+hhmm
846 YYYYMMDDhhmmss-hhmm
847 :type when: :py:class:`bytes`
848
849 :return: None
850 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800851 return self._set_boundary_time(_api.X509_get_notBefore, when)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800852
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800853
854 def get_notAfter(self):
855 """
856 Retrieve the time stamp for when the certificate stops being valid
857
858 :return: A string giving the timestamp, in the format::
859
860 YYYYMMDDhhmmssZ
861 YYYYMMDDhhmmss+hhmm
862 YYYYMMDDhhmmss-hhmm
863
864 or None if there is no value set.
865 """
866 return self._get_boundary_time(_api.X509_get_notAfter)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800867
868
869 def set_notAfter(self, when):
870 """
871 Set the time stamp for when the certificate stops being valid
872
873 :param when: A string giving the timestamp, in the format:
874
875 YYYYMMDDhhmmssZ
876 YYYYMMDDhhmmss+hhmm
877 YYYYMMDDhhmmss-hhmm
878 :type when: :py:class:`bytes`
879
880 :return: None
881 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800882 return self._set_boundary_time(_api.X509_get_notAfter, when)
883
884
885 def _get_name(self, which):
886 name = X509Name.__new__(X509Name)
887 name._name = which(self._x509)
888 if name._name == _api.NULL:
889 1/0
890 return name
891
892
893 def _set_name(self, which, name):
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800894 if not isinstance(name, X509Name):
895 raise TypeError("name must be an X509Name")
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800896 set_result = which(self._x509, name._name)
897 if not set_result:
898 1/0
899
900
901 def get_issuer(self):
902 """
903 Create an X509Name object for the issuer of the certificate
904
905 :return: An X509Name object
906 """
907 return self._get_name(_api.X509_get_issuer_name)
908
909
910 def set_issuer(self, issuer):
911 """
912 Set the issuer of the certificate
913
914 :param issuer: The issuer name
915 :type issuer: :py:class:`X509Name`
916
917 :return: None
918 """
919 return self._set_name(_api.X509_set_issuer_name, issuer)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800920
921
922 def get_subject(self):
923 """
924 Create an X509Name object for the subject of the certificate
925
926 :return: An X509Name object
927 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800928 return self._get_name(_api.X509_get_subject_name)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800929
930
931 def set_subject(self, subject):
932 """
933 Set the subject of the certificate
934
935 :param subject: The subject name
936 :type subject: :py:class:`X509Name`
937 :return: None
938 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800939 return self._set_name(_api.X509_set_subject_name, subject)
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800940
941
942 def get_extension_count(self):
943 """
944 Get the number of extensions on the certificate.
945
946 :return: The number of extensions as an integer.
947 """
948 return _api.X509_get_ext_count(self._x509)
949
950
951 def add_extensions(self, extensions):
952 """
953 Add extensions to the certificate.
954
955 :param extensions: a sequence of X509Extension objects
956 :return: None
957 """
958 for ext in extensions:
959 if not isinstance(ext, X509Extension):
960 raise ValueError("One of the elements is not an X509Extension")
961
962 add_result = _api.X509_add_ext(self._x509, ext._extension, -1)
963 if not add_result:
964 _raise_current_error()
965
966
967 def get_extension(self, index):
968 """
969 Get a specific extension of the certificate by index.
970
971 :param index: The index of the extension to retrieve.
972 :return: The X509Extension object at the specified index.
973 """
974 ext = X509Extension.__new__(X509Extension)
975 ext._extension = _api.X509_get_ext(self._x509, index)
976 if ext._extension == _api.NULL:
977 raise IndexError("extension index out of bounds")
978
979 ext._extension = _api.X509_EXTENSION_dup(ext._extension)
980 return ext
981
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800982X509Type = X509
983
984
985
986def load_certificate(type, buffer):
987 """
988 Load a certificate from a buffer
989
990 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
991
992 :param buffer: The buffer the certificate is stored in
993 :type buffer: :py:class:`bytes`
994
995 :return: The X509 object
996 """
997 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800998 if bio == _api.NULL:
999 1/0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001000
1001 try:
1002 if type == FILETYPE_PEM:
1003 x509 = _api.PEM_read_bio_X509(bio, _api.NULL, _api.NULL, _api.NULL)
1004 elif type == FILETYPE_ASN1:
1005 x509 = _api.d2i_X509_bio(bio, _api.NULL);
1006 else:
1007 raise ValueError(
1008 "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1009 finally:
1010 _api.BIO_free(bio)
1011
1012 if x509 == _api.NULL:
1013 _raise_current_error()
1014
1015 cert = X509.__new__(X509)
1016 cert._x509 = x509
1017 return cert
1018
1019
1020def dump_certificate(type, cert):
1021 """
1022 Dump a certificate to a buffer
1023
1024 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1025 :param cert: The certificate to dump
1026 :return: The buffer with the dumped certificate in
1027 """
1028 bio = _api.BIO_new(_api.BIO_s_mem())
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001029 if bio == _api.NULL:
1030 1/0
1031
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001032 if type == FILETYPE_PEM:
1033 result_code = _api.PEM_write_bio_X509(bio, cert._x509)
1034 elif type == FILETYPE_ASN1:
1035 result_code = _api.i2d_X509_bio(bio, cert._x509)
1036 elif type == FILETYPE_TEXT:
1037 result_code = _api.X509_print_ex(bio, cert._x509, 0, 0)
1038 else:
1039 raise ValueError(
1040 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1041 "FILETYPE_TEXT")
1042
1043 return _bio_to_string(bio)
1044
1045
1046
1047def dump_privatekey(type, pkey, cipher=None, passphrase=None):
1048 """
1049 Dump a private key to a buffer
1050
1051 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1052 :param pkey: The PKey to dump
1053 :param cipher: (optional) if encrypted PEM format, the cipher to
1054 use
1055 :param passphrase: (optional) if encrypted PEM format, this can be either
1056 the passphrase to use, or a callback for providing the
1057 passphrase.
1058 :return: The buffer with the dumped key in
1059 :rtype: :py:data:`str`
1060 """
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001061 bio = _api.BIO_new(_api.BIO_s_mem())
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001062 if bio == _api.NULL:
1063 1/0
1064
1065 if cipher is not None:
1066 cipher_obj = _api.EVP_get_cipherbyname(cipher)
1067 if cipher_obj == _api.NULL:
1068 raise ValueError("Invalid cipher name")
1069 else:
1070 cipher_obj = _api.NULL
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001071
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001072 helper = _PassphraseHelper(type, passphrase)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001073 if type == FILETYPE_PEM:
1074 result_code = _api.PEM_write_bio_PrivateKey(
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001075 bio, pkey._pkey, cipher_obj, _api.NULL, 0,
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001076 helper.callback, helper.callback_args)
1077 helper.raise_if_problem()
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001078 elif type == FILETYPE_ASN1:
1079 result_code = _api.i2d_PrivateKey_bio(bio, pkey._pkey)
1080 elif type == FILETYPE_TEXT:
1081 rsa = _api.EVP_PKEY_get1_RSA(pkey._pkey)
1082 result_code = _api.RSA_print(bio, rsa, 0)
1083 # TODO RSA_free(rsa)?
1084 else:
1085 raise ValueError(
1086 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1087 "FILETYPE_TEXT")
1088
1089 if result_code == 0:
1090 _raise_current_error()
1091
1092 return _bio_to_string(bio)
1093
1094
1095
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001096def _X509_REVOKED_dup(original):
1097 copy = _api.X509_REVOKED_new()
1098 if copy == _api.NULL:
1099 1/0
1100
1101 if original.serialNumber != _api.NULL:
1102 copy.serialNumber = _api.ASN1_INTEGER_dup(original.serialNumber)
1103
1104 if original.revocationDate != _api.NULL:
1105 copy.revocationDate = _api.M_ASN1_TIME_dup(original.revocationDate)
1106
1107 if original.extensions != _api.NULL:
1108 extension_stack = _api.sk_X509_EXTENSION_new_null()
1109 for i in range(_api.sk_X509_EXTENSION_num(original.extensions)):
1110 original_ext = _api.sk_X509_EXTENSION_value(original.extensions, i)
1111 copy_ext = _api.X509_EXTENSION_dup(original_ext)
1112 _api.sk_X509_EXTENSION_push(extension_stack, copy_ext)
1113 copy.extensions = extension_stack
1114
1115 copy.sequence = original.sequence
1116 return copy
1117
1118
1119
1120class Revoked(object):
1121 # http://www.openssl.org/docs/apps/x509v3_config.html#CRL_distribution_points_
1122 # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches
1123 # OCSP_crl_reason_str. We use the latter, just like the command line
1124 # program.
1125 _crl_reasons = [
1126 "unspecified",
1127 "keyCompromise",
1128 "CACompromise",
1129 "affiliationChanged",
1130 "superseded",
1131 "cessationOfOperation",
1132 "certificateHold",
1133 # "removeFromCRL",
1134 ]
1135
1136 def __init__(self):
1137 self._revoked = _api.X509_REVOKED_new()
1138
1139
1140 def set_serial(self, hex_str):
1141 """
1142 Set the serial number of a revoked Revoked structure
1143
1144 :param hex_str: The new serial number.
1145 :type hex_str: :py:data:`str`
1146 :return: None
1147 """
1148 bignum_serial = _api.new("BIGNUM**")
1149 bn_result = _api.BN_hex2bn(bignum_serial, hex_str)
1150 if not bn_result:
1151 raise ValueError("bad hex string")
1152
1153 asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
1154 _api.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
1155
1156
1157 def get_serial(self):
1158 """
1159 Return the serial number of a Revoked structure
1160
1161 :return: The serial number as a string
1162 """
1163 bio = _api.BIO_new(_api.BIO_s_mem())
1164 if bio == _api.NULL:
1165 1/0
1166
1167 result = _api.i2a_ASN1_INTEGER(bio, self._revoked.serialNumber)
1168 if result < 0:
1169 1/0
1170
1171 return _bio_to_string(bio)
1172
1173
1174 def _delete_reason(self):
1175 stack = self._revoked.extensions
1176 for i in range(_api.sk_X509_EXTENSION_num(stack)):
1177 ext = _api.sk_X509_EXTENSION_value(stack, i)
1178 if _api.OBJ_obj2nid(ext.object) == _api.NID_crl_reason:
1179 _api.sk_X509_EXTENSION_delete(stack, i)
1180 break
1181
1182
1183 def set_reason(self, reason):
1184 """
1185 Set the reason of a Revoked object.
1186
1187 If :py:data:`reason` is :py:data:`None`, delete the reason instead.
1188
1189 :param reason: The reason string.
1190 :type reason: :py:class:`str` or :py:class:`NoneType`
1191 :return: None
1192 """
1193 if reason is None:
1194 self._delete_reason()
1195 elif not isinstance(reason, bytes):
1196 raise TypeError("reason must be None or a byte string")
1197 else:
1198 reason = reason.lower().replace(' ', '')
1199 reason_code = [r.lower() for r in self._crl_reasons].index(reason)
1200
1201 new_reason_ext = _api.ASN1_ENUMERATED_new()
1202 if new_reason_ext == _api.NULL:
1203 1/0
1204
1205 set_result = _api.ASN1_ENUMERATED_set(new_reason_ext, reason_code)
1206 if set_result == _api.NULL:
1207 1/0
1208
1209 self._delete_reason()
1210 add_result = _api.X509_REVOKED_add1_ext_i2d(
1211 self._revoked, _api.NID_crl_reason, new_reason_ext, 0, 0)
1212
1213 if not add_result:
1214 1/0
1215
1216
1217 def get_reason(self):
1218 """
1219 Return the reason of a Revoked object.
1220
1221 :return: The reason as a string
1222 """
1223 extensions = self._revoked.extensions
1224 for i in range(_api.sk_X509_EXTENSION_num(extensions)):
1225 ext = _api.sk_X509_EXTENSION_value(extensions, i)
1226 if _api.OBJ_obj2nid(ext.object) == _api.NID_crl_reason:
1227 bio = _api.BIO_new(_api.BIO_s_mem())
1228 if bio == _api.NULL:
1229 1/0
1230
1231 print_result = _api.X509V3_EXT_print(bio, ext, 0, 0)
1232 if not print_result:
1233 print_result = _api.M_ASN1_OCTET_STRING_print(bio, ext.value)
1234 if print_result == 0:
1235 1/0
1236
1237 return _bio_to_string(bio)
1238
1239
1240 def all_reasons(self):
1241 """
1242 Return a list of all the supported reason strings.
1243
1244 :return: A list of reason strings.
1245 """
1246 return self._crl_reasons[:]
1247
1248
1249 def set_rev_date(self, when):
1250 """
1251 Set the revocation timestamp
1252
1253 :param when: A string giving the timestamp, in the format:
1254
1255 YYYYMMDDhhmmssZ
1256 YYYYMMDDhhmmss+hhmm
1257 YYYYMMDDhhmmss-hhmm
1258
1259 :return: None
1260 """
1261 return _set_asn1_time(self._revoked.revocationDate, when)
1262
1263
1264 def get_rev_date(self):
1265 """
1266 Retrieve the revocation date
1267
1268 :return: A string giving the timestamp, in the format:
1269
1270 YYYYMMDDhhmmssZ
1271 YYYYMMDDhhmmss+hhmm
1272 YYYYMMDDhhmmss-hhmm
1273 """
1274 return _get_asn1_time(self._revoked.revocationDate)
1275
1276
1277
1278class CRL(object):
1279 def __init__(self):
1280 """
1281 Create a new empty CRL object.
1282 """
1283 self._crl = _api.X509_CRL_new()
1284
1285
1286 def get_revoked(self):
1287 """
1288 Return revoked portion of the CRL structure (by value not reference).
1289
1290 :return: A tuple of Revoked objects.
1291 """
1292 results = []
1293 revoked_stack = self._crl.crl.revoked
1294 for i in range(_api.sk_X509_REVOKED_num(revoked_stack)):
1295 revoked = _api.sk_X509_REVOKED_value(revoked_stack, i)
1296 revoked_copy = _X509_REVOKED_dup(revoked)
1297 pyrev = Revoked.__new__(Revoked)
1298 pyrev._revoked = revoked_copy
1299 results.append(pyrev)
Jean-Paul Calderone85b74eb2013-02-21 09:15:01 -08001300 if results:
1301 return tuple(results)
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001302
1303
1304 def add_revoked(self, revoked):
1305 """
1306 Add a revoked (by value not reference) to the CRL structure
1307
1308 :param revoked: The new revoked.
1309 :type revoked: :class:`X509`
1310
1311 :return: None
1312 """
1313 copy = _X509_REVOKED_dup(revoked._revoked)
1314 if copy == _api.NULL:
1315 1/0
1316
1317 add_result = _api.X509_CRL_add0_revoked(self._crl, copy)
1318 # TODO what check on add_result?
1319
1320
1321 def export(self, cert, key, type=FILETYPE_PEM, days=100):
1322 """
1323 export a CRL as a string
1324
1325 :param cert: Used to sign CRL.
1326 :type cert: :class:`X509`
1327
1328 :param key: Used to sign CRL.
1329 :type key: :class:`PKey`
1330
1331 :param type: The export format, either :py:data:`FILETYPE_PEM`, :py:data:`FILETYPE_ASN1`, or :py:data:`FILETYPE_TEXT`.
1332
1333 :param days: The number of days until the next update of this CRL.
1334 :type days: :py:data:`int`
1335
1336 :return: :py:data:`str`
1337 """
Jean-Paul Calderone85b74eb2013-02-21 09:15:01 -08001338 if not isinstance(cert, X509):
1339 raise TypeError("cert must be an X509 instance")
1340 if not isinstance(key, PKey):
1341 raise TypeError("key must be a PKey instance")
1342 if not isinstance(type, int):
1343 raise TypeError("type must be an integer")
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001344
Jean-Paul Calderone85b74eb2013-02-21 09:15:01 -08001345 bio = _api.BIO_new(_api.BIO_s_mem())
1346 if bio == _api.NULL:
1347 1/0
1348
1349 # A scratch time object to give different values to different CRL fields
1350 sometime = _api.ASN1_TIME_new()
1351 if sometime == _api.NULL:
1352 1/0
1353
1354 _api.X509_gmtime_adj(sometime, 0)
1355 _api.X509_CRL_set_lastUpdate(self._crl, sometime)
1356
1357 _api.X509_gmtime_adj(sometime, days * 24 * 60 * 60)
1358 _api.X509_CRL_set_nextUpdate(self._crl, sometime)
1359
1360 _api.X509_CRL_set_issuer_name(self._crl, _api.X509_get_subject_name(cert._x509))
1361
1362 sign_result = _api.X509_CRL_sign(self._crl, key._pkey, _api.EVP_md5())
1363 if not sign_result:
1364 _raise_current_error()
1365
1366 if type == FILETYPE_PEM:
1367 ret = _api.PEM_write_bio_X509_CRL(bio, self._crl)
1368 elif type == FILETYPE_ASN1:
1369 ret = _api.i2d_X509_CRL_bio(bio, self._crl)
1370 elif type == FILETYPE_TEXT:
1371 ret = _api.X509_CRL_print(bio, self._crl)
1372 else:
1373 raise ValueError(
1374 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT")
1375
1376 if not ret:
1377 1/0
1378
1379 return _bio_to_string(bio)
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001380CRLType = CRL
1381
1382
Jean-Paul Calderone3b89f472013-02-21 09:32:25 -08001383
Jean-Paul Calderonee5912ce2013-02-21 10:49:35 -08001384class PKCS12(object):
1385 def __init__(self):
1386 self._pkey = None
1387 self._cert = None
1388 self._cacerts = None
1389 self._friendlyname = None
1390
1391
1392 def get_certificate(self):
1393 """
1394 Return certificate portion of the PKCS12 structure
1395
1396 :return: X509 object containing the certificate
1397 """
1398 return self._cert
1399
1400
1401 def set_certificate(self, cert):
1402 """
1403 Replace the certificate portion of the PKCS12 structure
1404
1405 :param cert: The new certificate.
1406 :type cert: :py:class:`X509` or :py:data:`None`
1407 :return: None
1408 """
1409 if not isinstance(cert, X509):
1410 raise TypeError("cert must be an X509 instance")
1411 self._cert = cert
1412
1413
1414 def get_privatekey(self):
1415 """
1416 Return private key portion of the PKCS12 structure
1417
1418 :returns: PKey object containing the private key
1419 """
1420 return self._pkey
1421
1422
1423 def set_privatekey(self, pkey):
1424 """
1425 Replace or set the certificate portion of the PKCS12 structure
1426
1427 :param pkey: The new private key.
1428 :type pkey: :py:class:`PKey`
1429 :return: None
1430 """
1431 if not isinstance(pkey, PKey):
1432 raise TypeError("pkey must be a PKey instance")
1433 self._pkey = pkey
1434
1435
1436 def get_ca_certificates(self):
1437 """
1438 Return CA certificates within of the PKCS12 object
1439
1440 :return: A newly created tuple containing the CA certificates in the chain,
1441 if any are present, or None if no CA certificates are present.
1442 """
1443 if self._cacerts is not None:
1444 return tuple(self._cacerts)
1445
1446
1447 def set_ca_certificates(self, cacerts):
1448 """
1449 Replace or set the CA certificates withing the PKCS12 object.
1450
1451 :param cacerts: The new CA certificates.
1452 :type cacerts: :py:data:`None` or an iterable of :py:class:`X509`
1453 :return: None
1454 """
1455 if cacerts is None:
1456 self._cacerts = None
1457 else:
1458 cacerts = list(cacerts)
1459 for cert in cacerts:
1460 if not isinstance(cert, X509):
1461 raise TypeError("iterable must only contain X509 instances")
1462 self._cacerts = cacerts
1463
1464
1465 def set_friendlyname(self, name):
1466 """
1467 Replace or set the certificate portion of the PKCS12 structure
1468
1469 :param name: The new friendly name.
1470 :type name: :py:class:`bytes`
1471 :return: None
1472 """
1473 if name is None:
1474 self._friendlyname = None
1475 elif not isinstance(name, bytes):
1476 raise TypeError("name must be a byte string or None (not %r)" % (name,))
1477 self._friendlyname = name
1478
1479
1480 def get_friendlyname(self):
1481 """
1482 Return friendly name portion of the PKCS12 structure
1483
1484 :returns: String containing the friendlyname
1485 """
1486 return self._friendlyname
1487
1488
1489 def export(self, passphrase=None, iter=2048, maciter=1):
1490 """
1491 Dump a PKCS12 object as a string. See also "man PKCS12_create".
1492
1493 :param passphrase: used to encrypt the PKCS12
1494 :type passphrase: :py:data:`bytes`
1495
1496 :param iter: How many times to repeat the encryption
1497 :type iter: :py:data:`int`
1498
1499 :param maciter: How many times to repeat the MAC
1500 :type maciter: :py:data:`int`
1501
1502 :return: The string containing the PKCS12
1503 """
1504 if self._cacerts is None:
1505 cacerts = _api.NULL
1506 else:
1507 cacerts = _api.sk_X509_new_null()
1508 for cert in self._cacerts:
1509 _api.sk_X509_push(cacerts, cert._x509)
1510
1511 if passphrase is None:
1512 passphrase = _api.NULL
1513
1514 friendlyname = self._friendlyname
1515 if friendlyname is None:
1516 friendlyname = _api.NULL
1517
1518 if self._pkey is None:
1519 pkey = _api.NULL
1520 else:
1521 pkey = self._pkey._pkey
1522
1523 if self._cert is None:
1524 cert = _api.NULL
1525 else:
1526 cert = self._cert._x509
1527
1528 pkcs12 = _api.PKCS12_create(
1529 passphrase, friendlyname, pkey, cert, cacerts,
1530 _api.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
1531 _api.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
1532 iter, maciter, 0)
1533 if pkcs12 == _api.NULL:
1534 _raise_current_error()
1535
1536 bio = _api.BIO_new(_api.BIO_s_mem())
1537 if bio == _api.NULL:
1538 1/0
1539
1540 _api.i2d_PKCS12_bio(bio, pkcs12)
1541 return _bio_to_string(bio)
1542PKCS12Type = PKCS12
1543
1544
1545
Jean-Paul Calderone3b89f472013-02-21 09:32:25 -08001546class NetscapeSPKI(object):
1547 def __init__(self):
1548 self._spki = _api.NETSCAPE_SPKI_new()
1549
1550
1551 def sign(self, pkey, digest):
1552 """
1553 Sign the certificate request using the supplied key and digest
1554
1555 :param pkey: The key to sign with
1556 :param digest: The message digest to use
1557 :return: None
1558 """
1559 if pkey._only_public:
1560 raise ValueError("Key has only public part")
1561
1562 if not pkey._initialized:
1563 raise ValueError("Key is uninitialized")
1564
1565 digest_obj = _api.EVP_get_digestbyname(digest)
1566 if digest_obj == _api.NULL:
1567 raise ValueError("No such digest method")
1568
1569 sign_result = _api.NETSCAPE_SPKI_sign(self._spki, pkey._pkey, digest_obj)
1570 if not sign_result:
1571 1/0
1572
1573
1574 def verify(self, key):
1575 """
1576 Verifies a certificate request using the supplied public key
1577
1578 :param key: a public key
1579 :return: True if the signature is correct.
1580 :raise OpenSSL.crypto.Error: If the signature is invalid or there is a
1581 problem verifying the signature.
1582 """
1583 answer = _api.NETSCAPE_SPKI_verify(self._spki, key._pkey)
1584 if answer <= 0:
1585 _raise_current_error()
1586 return True
1587
1588
1589 def b64_encode(self):
1590 """
1591 Generate a base64 encoded string from an SPKI
1592
1593 :return: The base64 encoded string
1594 """
1595 return _api.string(_api.NETSCAPE_SPKI_b64_encode(self._spki))
1596
1597
1598 def get_pubkey(self):
1599 """
1600 Get the public key of the certificate
1601
1602 :return: The public key
1603 """
1604 pkey = PKey.__new__(PKey)
1605 pkey._pkey = _api.NETSCAPE_SPKI_get_pubkey(self._spki)
1606 if pkey._pkey == _api.NULL:
1607 1/0
1608 pkey._only_public = True
1609 return pkey
1610
1611
1612 def set_pubkey(self, pkey):
1613 """
1614 Set the public key of the certificate
1615
1616 :param pkey: The public key
1617 :return: None
1618 """
1619 set_result = _api.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey)
1620 if not set_result:
1621 1/0
1622NetscapeSPKIType = NetscapeSPKI
1623
1624
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001625class _PassphraseHelper(object):
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001626 def __init__(self, type, passphrase):
1627 if type != FILETYPE_PEM and passphrase is not None:
1628 raise ValueError("only FILETYPE_PEM key format supports encryption")
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001629 self._passphrase = passphrase
1630 self._problems = []
1631
1632
1633 @property
1634 def callback(self):
1635 if self._passphrase is None:
1636 return _api.NULL
1637 elif isinstance(self._passphrase, bytes):
1638 return _api.NULL
1639 elif callable(self._passphrase):
1640 return _api.callback("pem_password_cb", self._read_passphrase)
1641 else:
1642 raise TypeError("Last argument must be string or callable")
1643
1644
1645 @property
1646 def callback_args(self):
1647 if self._passphrase is None:
1648 return _api.NULL
1649 elif isinstance(self._passphrase, bytes):
1650 return self._passphrase
1651 elif callable(self._passphrase):
1652 return _api.NULL
1653 else:
1654 raise TypeError("Last argument must be string or callable")
1655
1656
1657 def raise_if_problem(self):
1658 if self._problems:
1659 try:
1660 _raise_current_error()
1661 except Error:
1662 pass
1663 raise self._problems[0]
1664
1665
1666 def _read_passphrase(self, buf, size, rwflag, userdata):
1667 try:
1668 result = self._passphrase(rwflag)
1669 if not isinstance(result, bytes):
1670 raise ValueError("String expected")
1671 if len(result) > size:
1672 raise ValueError("passphrase returned by callback is too long")
1673 for i in range(len(result)):
1674 buf[i] = result[i]
1675 return len(result)
1676 except Exception as e:
1677 self._problems.append(e)
1678 return 0
1679
1680
1681
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001682def load_privatekey(type, buffer, passphrase=None):
1683 """
1684 Load a private key from a buffer
1685
1686 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1687 :param buffer: The buffer the key is stored in
1688 :param passphrase: (optional) if encrypted PEM format, this can be
1689 either the passphrase to use, or a callback for
1690 providing the passphrase.
1691
1692 :return: The PKey object
1693 """
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001694 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001695 if bio == _api.NULL:
1696 1/0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001697
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001698 helper = _PassphraseHelper(type, passphrase)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001699 if type == FILETYPE_PEM:
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001700 evp_pkey = _api.PEM_read_bio_PrivateKey(
1701 bio, _api.NULL, helper.callback, helper.callback_args)
1702 helper.raise_if_problem()
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001703 elif type == FILETYPE_ASN1:
1704 evp_pkey = _api.d2i_PrivateKey_bio(bio, _api.NULL)
1705 else:
1706 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1707
Jean-Paul Calderone31393aa2013-02-20 13:22:21 -08001708 if evp_pkey == _api.NULL:
1709 _raise_current_error()
1710
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001711 pkey = PKey.__new__(PKey)
1712 pkey._pkey = evp_pkey
1713 return pkey
1714
1715
1716
1717def dump_certificate_request(type, req):
1718 """
1719 Dump a certificate request to a buffer
1720
1721 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1722 :param req: The certificate request to dump
1723 :return: The buffer with the dumped certificate request in
1724 """
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001725 bio = _api.BIO_new(_api.BIO_s_mem())
1726 if bio == _api.NULL:
1727 1/0
1728
1729 if type == FILETYPE_PEM:
1730 result_code = _api.PEM_write_bio_X509_REQ(bio, req._req)
1731 elif type == FILETYPE_ASN1:
Jean-Paul Calderonec9a395f2013-02-20 16:59:21 -08001732 result_code = _api.i2d_X509_REQ_bio(bio, req._req)
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001733 elif type == FILETYPE_TEXT:
Jean-Paul Calderonec9a395f2013-02-20 16:59:21 -08001734 result_code = _api.X509_REQ_print_ex(bio, req._req, 0, 0)
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001735 else:
Jean-Paul Calderonec9a395f2013-02-20 16:59:21 -08001736 raise ValueError("type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT")
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001737
1738 if result_code == 0:
1739 1/0
1740
1741 return _bio_to_string(bio)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001742
1743
1744
1745def load_certificate_request(type, buffer):
1746 """
1747 Load a certificate request from a buffer
1748
1749 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1750 :param buffer: The buffer the certificate request is stored in
1751 :return: The X509Req object
1752 """
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001753 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
1754 if bio == _api.NULL:
1755 1/0
1756
1757 if type == FILETYPE_PEM:
1758 req = _api.PEM_read_bio_X509_REQ(bio, _api.NULL, _api.NULL, _api.NULL)
1759 elif type == FILETYPE_ASN1:
Jean-Paul Calderonec9a395f2013-02-20 16:59:21 -08001760 req = _api.d2i_X509_REQ_bio(bio, _api.NULL)
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001761 else:
1762 1/0
1763
1764 if req == _api.NULL:
1765 1/0
1766
1767 x509req = X509Req.__new__(X509Req)
1768 x509req._req = req
1769 return x509req
Jean-Paul Calderone8cf4f802013-02-20 16:45:02 -08001770
1771
1772
1773def sign(pkey, data, digest):
1774 """
1775 Sign data with a digest
1776
1777 :param pkey: Pkey to sign with
1778 :param data: data to be signed
1779 :param digest: message digest to use
1780 :return: signature
1781 """
1782 digest_obj = _api.EVP_get_digestbyname(digest)
1783 if digest_obj == _api.NULL:
1784 raise ValueError("No such digest method")
1785
1786 md_ctx = _api.new("EVP_MD_CTX*")
1787
1788 _api.EVP_SignInit(md_ctx, digest_obj)
1789 _api.EVP_SignUpdate(md_ctx, data, len(data))
1790
1791 signature_buffer = _api.new("unsigned char[]", 512)
1792 signature_length = _api.new("unsigned int*")
1793 signature_length[0] = len(signature_buffer)
1794 final_result = _api.EVP_SignFinal(
1795 md_ctx, signature_buffer, signature_length, pkey._pkey)
1796
1797 if final_result != 1:
1798 1/0
1799
1800 return _api.buffer(signature_buffer, signature_length[0])[:]
1801
1802
1803
1804def verify(cert, signature, data, digest):
1805 """
1806 Verify a signature
1807
1808 :param cert: signing certificate (X509 object)
1809 :param signature: signature returned by sign function
1810 :param data: data to be verified
1811 :param digest: message digest to use
1812 :return: None if the signature is correct, raise exception otherwise
1813 """
1814 digest_obj = _api.EVP_get_digestbyname(digest)
1815 if digest_obj == _api.NULL:
1816 raise ValueError("No such digest method")
1817
1818 pkey = _api.X509_get_pubkey(cert._x509)
1819 if pkey == _api.NULL:
1820 1/0
1821
1822 md_ctx = _api.new("EVP_MD_CTX*")
1823
1824 _api.EVP_VerifyInit(md_ctx, digest_obj)
1825 _api.EVP_VerifyUpdate(md_ctx, data, len(data))
1826 verify_result = _api.EVP_VerifyFinal(md_ctx, signature, len(signature), pkey)
1827
1828 if verify_result != 1:
1829 _raise_current_error()
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001830
1831
1832
1833def load_crl(type, buffer):
1834 """
1835 Load a certificate revocation list from a buffer
1836
1837 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1838 :param buffer: The buffer the CRL is stored in
1839
1840 :return: The PKey object
1841 """
1842 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
1843 if bio == _api.NULL:
1844 1/0
1845
1846 if type == FILETYPE_PEM:
1847 crl = _api.PEM_read_bio_X509_CRL(bio, _api.NULL, _api.NULL, _api.NULL)
1848 elif type == FILETYPE_ASN1:
1849 crl = _api.d2i_X509_CRL_bio(bio, _api.NULL)
1850 else:
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001851 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1852
1853 if crl == _api.NULL:
1854 _raise_current_error()
1855
1856 result = CRL.__new__(CRL)
1857 result._crl = crl
1858 return result
Jean-Paul Calderonee5912ce2013-02-21 10:49:35 -08001859
1860
1861
1862def load_pkcs12(buffer, passphrase):
1863 """
1864 Load a PKCS12 object from a buffer
1865
1866 :param buffer: The buffer the certificate is stored in
1867 :param passphrase: (Optional) The password to decrypt the PKCS12 lump
1868 :returns: The PKCS12 object
1869 """
1870 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
1871 if bio == _api.NULL:
1872 1/0
1873
1874 p12 = _api.d2i_PKCS12_bio(bio, _api.NULL)
1875 if p12 == _api.NULL:
1876 _raise_current_error()
1877
1878 pkey = _api.new("EVP_PKEY**")
1879 cert = _api.new("X509**")
1880 cacerts = _api.new("struct stack_st_X509**")
1881
1882 parse_result = _api.PKCS12_parse(p12, passphrase, pkey, cert, cacerts)
1883 if not parse_result:
1884 _raise_current_error()
1885
1886 # openssl 1.0.0 sometimes leaves an X509_check_private_key error in the
1887 # queue for no particular reason. This error isn't interesting to anyone
1888 # outside this function. It's not even interesting to us. Get rid of it.
1889 try:
1890 _raise_current_error()
1891 except Error:
1892 pass
1893
1894 if pkey[0] == _api.NULL:
1895 pykey = None
1896 else:
1897 pykey = PKey.__new__(PKey)
1898 pykey._pkey = pkey[0]
1899
1900 if cert[0] == _api.NULL:
1901 pycert = None
1902 friendlyname = None
1903 else:
1904 pycert = X509.__new__(X509)
1905 pycert._x509 = cert[0]
1906
1907 friendlyname_length = _api.new("int*")
1908 friendlyname_buffer = _api.X509_alias_get0(cert[0], friendlyname_length)
1909 friendlyname = _api.buffer(friendlyname_buffer, friendlyname_length[0])[:]
1910 if friendlyname_buffer == _api.NULL:
1911 friendlyname = None
1912
1913 pycacerts = []
1914 for i in range(_api.sk_X509_num(cacerts[0])):
1915 pycacert = X509.__new__(X509)
1916 pycacert._x509 = _api.sk_X509_value(cacerts[0], i)
1917 pycacerts.append(pycacert)
1918 if not pycacerts:
1919 pycacerts = None
1920
1921 pkcs12 = PKCS12.__new__(PKCS12)
1922 pkcs12._pkey = pykey
1923 pkcs12._cert = pycert
1924 pkcs12._cacerts = pycacerts
1925 pkcs12._friendlyname = friendlyname
1926 return pkcs12