blob: 8d6ce8da3c9a1fad5d95dcc7df8394457b419b41 [file] [log] [blame]
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001from time import time
2
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08003from tls.c import api as _api
4
5FILETYPE_PEM = _api.SSL_FILETYPE_PEM
6FILETYPE_ASN1 = _api.SSL_FILETYPE_ASN1
7
8# TODO This was an API mistake. OpenSSL has no such constant.
9FILETYPE_TEXT = 2 ** 16 - 1
10
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080011TYPE_RSA = _api.EVP_PKEY_RSA
12TYPE_DSA = _api.EVP_PKEY_DSA
13
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080014
15def _bio_to_string(bio):
16 """
17 Copy the contents of an OpenSSL BIO object into a Python byte string.
18 """
19 result_buffer = _api.new('char**')
20 buffer_length = _api.BIO_get_mem_data(bio, result_buffer)
21 return _api.buffer(result_buffer[0], buffer_length)[:]
22
23
24
Jean-Paul Calderone57122982013-02-21 08:47:05 -080025def _set_asn1_time(boundary, when):
26 if not isinstance(when, bytes):
27 raise TypeError("when must be a byte string")
28
29 set_result = _api.ASN1_GENERALIZEDTIME_set_string(
30 _api.cast('ASN1_GENERALIZEDTIME*', boundary), when)
31 if set_result == 0:
32 dummy = _api.ASN1_STRING_new()
33 _api.ASN1_STRING_set(dummy, when, len(when))
34 check_result = _api.ASN1_GENERALIZEDTIME_check(
35 _api.cast('ASN1_GENERALIZEDTIME*', dummy))
36 if not check_result:
37 raise ValueError("Invalid string")
38 else:
39 # TODO No tests for this case
40 raise RuntimeError("Unknown ASN1_GENERALIZEDTIME_set_string failure")
41
42
43
44def _get_asn1_time(timestamp):
45 string_timestamp = _api.cast('ASN1_STRING*', timestamp)
46 if _api.ASN1_STRING_length(string_timestamp) == 0:
47 return None
48 elif _api.ASN1_STRING_type(string_timestamp) == _api.V_ASN1_GENERALIZEDTIME:
49 return _api.string(_api.ASN1_STRING_data(string_timestamp))
50 else:
51 generalized_timestamp = _api.new("ASN1_GENERALIZEDTIME**")
52 _api.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
53 if generalized_timestamp[0] == _api.NULL:
54 1/0
55 else:
56 string_timestamp = _api.cast(
57 "ASN1_STRING*", generalized_timestamp[0])
58 string_data = _api.ASN1_STRING_data(string_timestamp)
59 string_result = _api.string(string_data)
60 _api.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
61 return string_result
62
63
64
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080065def _raise_current_error():
66 errors = []
67 while True:
68 error = _api.ERR_get_error()
69 if error == 0:
70 break
71 errors.append((
72 _api.string(_api.ERR_lib_error_string(error)),
73 _api.string(_api.ERR_func_error_string(error)),
74 _api.string(_api.ERR_reason_error_string(error))))
75
76 raise Error(errors)
77
78_exception_from_error_queue = _raise_current_error
79
80
81class Error(Exception):
82 pass
83
84
85
86class PKey(object):
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080087 _only_public = False
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080088 _initialized = True
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080089
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080090 def __init__(self):
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080091 self._pkey = _api.EVP_PKEY_new()
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080092 self._initialized = False
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080093
94
95 def generate_key(self, type, bits):
96 """
97 Generate a key of a given type, with a given number of a bits
98
99 :param type: The key type (TYPE_RSA or TYPE_DSA)
100 :param bits: The number of bits
101
102 :return: None
103 """
104 if not isinstance(type, int):
105 raise TypeError("type must be an integer")
106
107 if not isinstance(bits, int):
108 raise TypeError("bits must be an integer")
109
110 exponent = _api.new("BIGNUM**")
111 # TODO Check error return
112 # TODO Free the exponent[0]
113 _api.BN_hex2bn(exponent, "10001")
114
115 if type == TYPE_RSA:
116 if bits <= 0:
117 raise ValueError("Invalid number of bits")
118
119 rsa = _api.RSA_new();
120
121 # TODO Release GIL?
122 result = _api.RSA_generate_key_ex(rsa, bits, exponent[0], _api.NULL)
123 if result == -1:
124 1/0
125
126 result = _api.EVP_PKEY_assign_RSA(self._pkey, rsa)
127 if not result:
128 1/0
129
130 elif type == TYPE_DSA:
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -0800131 dsa = _api.DSA_generate_parameters(
132 bits, _api.NULL, 0, _api.NULL, _api.NULL, _api.NULL, _api.NULL)
133 if dsa == _api.NULL:
134 1/0
135 if not _api.DSA_generate_key(dsa):
136 1/0
137 if not _api.EVP_PKEY_assign_DSA(self._pkey, dsa):
138 1/0
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800139 else:
140 raise Error("No such key type")
141
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800142 self._initialized = True
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800143
144
145 def check(self):
146 """
147 Check the consistency of an RSA private key.
148
149 :return: True if key is consistent.
150 :raise Error: if the key is inconsistent.
151 :raise TypeError: if the key is of a type which cannot be checked.
152 Only RSA keys can currently be checked.
153 """
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -0800154 if self._only_public:
155 raise TypeError("public key only")
156
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800157 if _api.EVP_PKEY_type(self._pkey.type) != _api.EVP_PKEY_RSA:
158 raise TypeError("key type unsupported")
159
160 rsa = _api.EVP_PKEY_get1_RSA(self._pkey)
161 result = _api.RSA_check_key(rsa)
162 if result:
163 return True
164 _raise_current_error()
165
166
Jean-Paul Calderonec86fcaf2013-02-20 12:38:33 -0800167 def type(self):
168 """
169 Returns the type of the key
170
171 :return: The type of the key.
172 """
173 return self._pkey.type
174
175
176 def bits(self):
177 """
178 Returns the number of bits of the key
179
180 :return: The number of bits of the key.
181 """
182 return _api.EVP_PKEY_bits(self._pkey)
183PKeyType = PKey
184
185
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800186
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800187class X509Name(object):
188 def __init__(self, name):
189 """
190 Create a new X509Name, copying the given X509Name instance.
191
192 :param name: An X509Name object to copy
193 """
194 self._name = _api.X509_NAME_dup(name._name)
195
196
197 def __setattr__(self, name, value):
198 if name.startswith('_'):
199 return super(X509Name, self).__setattr__(name, value)
200
201 if type(name) is not str:
202 raise TypeError("attribute name must be string, not '%.200s'" % (
203 type(value).__name__,))
204
205 nid = _api.OBJ_txt2nid(name)
206 if nid == _api.NID_undef:
207 try:
208 _raise_current_error()
209 except Error:
210 pass
211 raise AttributeError("No such attribute")
212
213 # If there's an old entry for this NID, remove it
214 for i in range(_api.X509_NAME_entry_count(self._name)):
215 ent = _api.X509_NAME_get_entry(self._name, i)
216 ent_obj = _api.X509_NAME_ENTRY_get_object(ent)
217 ent_nid = _api.OBJ_obj2nid(ent_obj)
218 if nid == ent_nid:
219 ent = _api.X509_NAME_delete_entry(self._name, i)
220 _api.X509_NAME_ENTRY_free(ent)
221 break
222
223 if isinstance(value, unicode):
224 value = value.encode('utf-8')
225
226 add_result = _api.X509_NAME_add_entry_by_NID(
227 self._name, nid, _api.MBSTRING_UTF8, value, -1, -1, 0)
228 if not add_result:
229 # TODO Untested
230 1/0
231
232
233 def __getattr__(self, name):
234 """
235 Find attribute. An X509Name object has the following attributes:
236 countryName (alias C), stateOrProvince (alias ST), locality (alias L),
237 organization (alias O), organizationalUnit (alias OU), commonName (alias
238 CN) and more...
239 """
240 nid = _api.OBJ_txt2nid(name)
241 if nid == _api.NID_undef:
242 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems
243 # a lower level function, a2d_ASN1_OBJECT, also feels the need to
244 # push something onto the error queue. If we don't clean that up
245 # now, someone else will bump into it later and be quite confused.
246 # See lp#314814.
247 try:
248 _raise_current_error()
249 except Error:
250 pass
251 return super(X509Name, self).__getattr__(name)
252
253 entry_index = _api.X509_NAME_get_index_by_NID(self._name, nid, -1)
254 if entry_index == -1:
255 return None
256
257 entry = _api.X509_NAME_get_entry(self._name, entry_index)
258 data = _api.X509_NAME_ENTRY_get_data(entry)
259
260 result_buffer = _api.new("unsigned char**")
261 data_length = _api.ASN1_STRING_to_UTF8(result_buffer, data)
262 if data_length < 0:
263 1/0
264
265 result = _api.buffer(result_buffer[0], data_length)[:].decode('utf-8')
266 _api.OPENSSL_free(result_buffer[0])
267 return result
268
269
270 def __cmp__(self, other):
271 if not isinstance(other, X509Name):
272 return NotImplemented
273
274 result = _api.X509_NAME_cmp(self._name, other._name)
275 # TODO result == -2 is an error case that maybe should be checked for
276 return result
277
278
279 def __repr__(self):
280 """
281 String representation of an X509Name
282 """
283 result_buffer = _api.new("char[]", 512);
284 format_result = _api.X509_NAME_oneline(
285 self._name, result_buffer, len(result_buffer))
286
287 if format_result == _api.NULL:
288 1/0
289
290 return "<X509Name object '%s'>" % (_api.string(result_buffer),)
291
292
293 def hash(self):
294 """
295 Return the hash value of this name
296
297 :return: None
298 """
299 return _api.X509_NAME_hash(self._name)
300
301
302 def der(self):
303 """
304 Return the DER encoding of this name
305
306 :return: A :py:class:`bytes` instance giving the DER encoded form of
307 this name.
308 """
309 result_buffer = _api.new('unsigned char**')
310 encode_result = _api.i2d_X509_NAME(self._name, result_buffer)
311 if encode_result < 0:
312 1/0
313
314 string_result = _api.buffer(result_buffer[0], encode_result)[:]
315 _api.OPENSSL_free(result_buffer[0])
316 return string_result
317
318
319 def get_components(self):
320 """
321 Returns the split-up components of this name.
322
323 :return: List of tuples (name, value).
324 """
325 result = []
326 for i in range(_api.X509_NAME_entry_count(self._name)):
327 ent = _api.X509_NAME_get_entry(self._name, i)
328
329 fname = _api.X509_NAME_ENTRY_get_object(ent)
330 fval = _api.X509_NAME_ENTRY_get_data(ent)
331
332 nid = _api.OBJ_obj2nid(fname)
333 name = _api.OBJ_nid2sn(nid)
334
335 result.append((
336 _api.string(name),
337 _api.string(
338 _api.ASN1_STRING_data(fval),
339 _api.ASN1_STRING_length(fval))))
340
341 return result
342X509NameType = X509Name
343
344
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800345class X509Extension(object):
346 def __init__(self, type_name, critical, value, subject=None, issuer=None):
347 """
348 :param typename: The name of the extension to create.
349 :type typename: :py:data:`str`
350
351 :param critical: A flag indicating whether this is a critical extension.
352
353 :param value: The value of the extension.
354 :type value: :py:data:`str`
355
356 :param subject: Optional X509 cert to use as subject.
357 :type subject: :py:class:`X509`
358
359 :param issuer: Optional X509 cert to use as issuer.
360 :type issuer: :py:class:`X509`
361
362 :return: The X509Extension object
363 """
364 ctx = _api.new("X509V3_CTX*")
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800365
366 # A context is necessary for any extension which uses the r2i conversion
367 # method. That is, X509V3_EXT_nconf may segfault if passed a NULL ctx.
368 # Start off by initializing most of the fields to NULL.
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800369 _api.X509V3_set_ctx(ctx, _api.NULL, _api.NULL, _api.NULL, _api.NULL, 0)
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800370
371 # We have no configuration database - but perhaps we should (some
372 # extensions may require it).
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800373 _api.X509V3_set_ctx_nodb(ctx)
374
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800375 # Initialize the subject and issuer, if appropriate. ctx is a local,
376 # and as far as I can tell none of the X509V3_* APIs invoked here steal
377 # any references, so no need to mess with reference counts or duplicates.
378 if issuer is not None:
379 if not isinstance(issuer, X509):
380 raise TypeError("issuer must be an X509 instance")
381 ctx.issuer_cert = issuer._x509
382 if subject is not None:
383 if not isinstance(subject, X509):
384 raise TypeError("subject must be an X509 instance")
385 ctx.subject_cert = subject._x509
386
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800387 if critical:
388 # There are other OpenSSL APIs which would let us pass in critical
389 # separately, but they're harder to use, and since value is already
390 # a pile of crappy junk smuggling a ton of utterly important
391 # structured data, what's the point of trying to avoid nasty stuff
392 # with strings? (However, X509V3_EXT_i2d in particular seems like it
393 # would be a better API to invoke. I do not know where to get the
394 # ext_struc it desires for its last parameter, though.)
395 value = "critical," + value
396
397 self._extension = _api.X509V3_EXT_nconf(
398 _api.NULL, ctx, type_name, value)
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800399 if self._extension == _api.NULL:
400 _raise_current_error()
401
402
403 def __str__(self):
404 """
405 :return: a nice text representation of the extension
406 """
407 bio = _api.BIO_new(_api.BIO_s_mem())
408 if bio == _api.NULL:
409 1/0
410
411 print_result = _api.X509V3_EXT_print(bio, self._extension, 0, 0)
412 if not print_result:
413 1/0
414
415 return _bio_to_string(bio)
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800416
417
418 def get_critical(self):
419 """
420 Returns the critical field of the X509Extension
421
422 :return: The critical field.
423 """
424 return _api.X509_EXTENSION_get_critical(self._extension)
425
426
427 def get_short_name(self):
428 """
429 Returns the short version of the type name of the X509Extension
430
431 :return: The short type name.
432 """
433 obj = _api.X509_EXTENSION_get_object(self._extension)
434 nid = _api.OBJ_obj2nid(obj)
435 return _api.string(_api.OBJ_nid2sn(nid))
436
437
Jean-Paul Calderoned418a9c2013-02-20 16:24:55 -0800438 def get_data(self):
439 """
440 Returns the data of the X509Extension
441
442 :return: A :py:data:`str` giving the X509Extension's ASN.1 encoded data.
443 """
444 octet_result = _api.X509_EXTENSION_get_data(self._extension)
445 string_result = _api.cast('ASN1_STRING*', octet_result)
446 char_result = _api.ASN1_STRING_data(string_result)
447 result_length = _api.ASN1_STRING_length(string_result)
448 return _api.buffer(char_result, result_length)[:]
449
450X509ExtensionType = X509Extension
451
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800452
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800453class X509Req(object):
Jean-Paul Calderone4328d472013-02-20 14:28:46 -0800454 def __init__(self):
455 self._req = _api.X509_REQ_new()
456
457
458 def set_pubkey(self, pkey):
459 """
460 Set the public key of the certificate request
461
462 :param pkey: The public key to use
463 :return: None
464 """
465 set_result = _api.X509_REQ_set_pubkey(self._req, pkey._pkey)
466 if not set_result:
467 1/0
468
469
470 def get_pubkey(self):
471 """
472 Get the public key from the certificate request
473
474 :return: The public key
475 """
476 pkey = PKey.__new__(PKey)
477 pkey._pkey = _api.X509_REQ_get_pubkey(self._req)
478 if pkey._pkey == _api.NULL:
479 1/0
480 pkey._only_public = True
481 return pkey
482
483
484 def set_version(self, version):
485 """
486 Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate
487 request.
488
489 :param version: The version number
490 :return: None
491 """
492 set_result = _api.X509_REQ_set_version(self._req, version)
493 if not set_result:
494 _raise_current_error()
495
496
497 def get_version(self):
498 """
499 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
500 request.
501
502 :return: an integer giving the value of the version subfield
503 """
504 return _api.X509_REQ_get_version(self._req)
505
506
507 def get_subject(self):
508 """
509 Create an X509Name object for the subject of the certificate request
510
511 :return: An X509Name object
512 """
513 name = X509Name.__new__(X509Name)
514 name._name = _api.X509_REQ_get_subject_name(self._req)
515 if name._name == _api.NULL:
516 1/0
517 return name
518
519
520 def add_extensions(self, extensions):
521 """
522 Add extensions to the request.
523
524 :param extensions: a sequence of X509Extension objects
525 :return: None
526 """
527 stack = _api.sk_X509_EXTENSION_new_null()
528 if stack == _api.NULL:
529 1/0
530
531 for ext in extensions:
532 if not isinstance(ext, X509Extension):
Jean-Paul Calderonec2154b72013-02-20 14:29:37 -0800533 raise ValueError("One of the elements is not an X509Extension")
Jean-Paul Calderone4328d472013-02-20 14:28:46 -0800534
535 _api.sk_X509_EXTENSION_push(stack, ext._extension)
536
537 add_result = _api.X509_REQ_add_extensions(self._req, stack)
538 if not add_result:
539 1/0
540
541
542 def sign(self, pkey, digest):
543 """
544 Sign the certificate request using the supplied key and digest
545
546 :param pkey: The key to sign with
547 :param digest: The message digest to use
548 :return: None
549 """
550 if pkey._only_public:
551 raise ValueError("Key has only public part")
552
553 if not pkey._initialized:
554 raise ValueError("Key is uninitialized")
555
556 digest_obj = _api.EVP_get_digestbyname(digest)
557 if digest_obj == _api.NULL:
558 raise ValueError("No such digest method")
559
560 sign_result = _api.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
561 if not sign_result:
562 1/0
563
564
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800565X509ReqType = X509Req
566
567
568
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800569class X509(object):
570 def __init__(self):
571 # TODO Allocation failure? And why not __new__ instead of __init__?
572 self._x509 = _api.X509_new()
573
574
575 def set_version(self, version):
576 """
577 Set version number of the certificate
578
579 :param version: The version number
580 :type version: :py:class:`int`
581
582 :return: None
583 """
584 if not isinstance(version, int):
585 raise TypeError("version must be an integer")
586
587 _api.X509_set_version(self._x509, version)
588
589
590 def get_version(self):
591 """
592 Return version number of the certificate
593
594 :return: Version number as a Python integer
595 """
596 return _api.X509_get_version(self._x509)
597
598
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800599 def get_pubkey(self):
600 """
601 Get the public key of the certificate
602
603 :return: The public key
604 """
605 pkey = PKey.__new__(PKey)
606 pkey._pkey = _api.X509_get_pubkey(self._x509)
607 if pkey._pkey == _api.NULL:
608 _raise_current_error()
609 pkey._only_public = True
610 return pkey
611
612
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800613 def set_pubkey(self, pkey):
614 """
615 Set the public key of the certificate
616
617 :param pkey: The public key
618
619 :return: None
620 """
621 if not isinstance(pkey, PKey):
622 raise TypeError("pkey must be a PKey instance")
623
624 set_result = _api.X509_set_pubkey(self._x509, pkey._pkey)
625 if not set_result:
626 _raise_current_error()
627
628
629 def sign(self, pkey, digest):
630 """
631 Sign the certificate using the supplied key and digest
632
633 :param pkey: The key to sign with
634 :param digest: The message digest to use
635 :return: None
636 """
637 if not isinstance(pkey, PKey):
638 raise TypeError("pkey must be a PKey instance")
639
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800640 if pkey._only_public:
641 raise ValueError("Key only has public part")
642
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800643 if not pkey._initialized:
644 raise ValueError("Key is uninitialized")
645
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800646 evp_md = _api.EVP_get_digestbyname(digest)
647 if evp_md == _api.NULL:
648 raise ValueError("No such digest method")
649
650 sign_result = _api.X509_sign(self._x509, pkey._pkey, evp_md)
651 if not sign_result:
652 _raise_current_error()
653
654
Jean-Paul Calderonee4aa3fa2013-02-19 12:12:53 -0800655 def get_signature_algorithm(self):
656 """
657 Retrieve the signature algorithm used in the certificate
658
659 :return: A byte string giving the name of the signature algorithm used in
660 the certificate.
661 :raise ValueError: If the signature algorithm is undefined.
662 """
663 alg = self._x509.cert_info.signature.algorithm
664 nid = _api.OBJ_obj2nid(alg)
665 if nid == _api.NID_undef:
666 raise ValueError("Undefined signature algorithm")
667 return _api.string(_api.OBJ_nid2ln(nid))
668
669
Jean-Paul Calderoneb4078722013-02-19 12:01:55 -0800670 def digest(self, digest_name):
671 """
672 Return the digest of the X509 object.
673
674 :param digest_name: The name of the digest algorithm to use.
675 :type digest_name: :py:class:`bytes`
676
677 :return: The digest of the object
678 """
679 digest = _api.EVP_get_digestbyname(digest_name)
680 if digest == _api.NULL:
681 raise ValueError("No such digest method")
682
683 result_buffer = _api.new("char[]", _api.EVP_MAX_MD_SIZE)
684 result_length = _api.new("unsigned int[]", 1)
685 result_length[0] = len(result_buffer)
686
687 digest_result = _api.X509_digest(
688 self._x509, digest, result_buffer, result_length)
689
690 if not digest_result:
691 1/0
692
693 return ':'.join([
694 ch.encode('hex').upper() for ch
695 in _api.buffer(result_buffer, result_length[0])])
696
697
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800698 def subject_name_hash(self):
699 """
700 Return the hash of the X509 subject.
701
702 :return: The hash of the subject.
703 """
704 return _api.X509_subject_name_hash(self._x509)
705
706
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800707 def set_serial_number(self, serial):
708 """
709 Set serial number of the certificate
710
711 :param serial: The serial number
712 :type serial: :py:class:`int`
713
714 :return: None
715 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800716 if not isinstance(serial, (int, long)):
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800717 raise TypeError("serial must be an integer")
718
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800719 hex_serial = hex(serial)[2:]
720 if not isinstance(hex_serial, bytes):
721 hex_serial = hex_serial.encode('ascii')
722
723 bignum_serial = _api.new("BIGNUM**")
724
725 # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
726 # it. If bignum is still NULL after this call, then the return value is
727 # actually the result. I hope. -exarkun
728 small_serial = _api.BN_hex2bn(bignum_serial, hex_serial)
729
730 if bignum_serial[0] == _api.NULL:
731 set_result = ASN1_INTEGER_set(
732 _api.X509_get_serialNumber(self._x509), small_serial)
733 if set_result:
734 # TODO Not tested
735 _raise_current_error()
736 else:
737 asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
738 _api.BN_free(bignum_serial[0])
739 if asn1_serial == _api.NULL:
740 # TODO Not tested
741 _raise_current_error()
742 set_result = _api.X509_set_serialNumber(self._x509, asn1_serial)
743 if not set_result:
744 # TODO Not tested
745 _raise_current_error()
746
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800747
748 def get_serial_number(self):
749 """
750 Return serial number of the certificate
751
752 :return: Serial number as a Python integer
753 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800754 asn1_serial = _api.X509_get_serialNumber(self._x509)
755 bignum_serial = _api.ASN1_INTEGER_to_BN(asn1_serial, _api.NULL)
756 try:
757 hex_serial = _api.BN_bn2hex(bignum_serial)
758 try:
759 hexstring_serial = _api.string(hex_serial)
760 serial = int(hexstring_serial, 16)
761 return serial
762 finally:
763 _api.OPENSSL_free(hex_serial)
764 finally:
765 _api.BN_free(bignum_serial)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800766
767
768 def gmtime_adj_notAfter(self, amount):
769 """
770 Adjust the time stamp for when the certificate stops being valid
771
772 :param amount: The number of seconds by which to adjust the ending
773 validity time.
774 :type amount: :py:class:`int`
775
776 :return: None
777 """
778 if not isinstance(amount, int):
779 raise TypeError("amount must be an integer")
780
781 notAfter = _api.X509_get_notAfter(self._x509)
782 _api.X509_gmtime_adj(notAfter, amount)
783
784
Jean-Paul Calderone662afe52013-02-20 08:41:11 -0800785 def gmtime_adj_notBefore(self, amount):
786 """
787 Change the timestamp for when the certificate starts being valid to the current
788 time plus an offset.
789
790 :param amount: The number of seconds by which to adjust the starting validity
791 time.
792 :return: None
793 """
794 if not isinstance(amount, int):
795 raise TypeError("amount must be an integer")
796
797 notBefore = _api.X509_get_notBefore(self._x509)
798 _api.X509_gmtime_adj(notBefore, amount)
799
800
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800801 def has_expired(self):
802 """
803 Check whether the certificate has expired.
804
805 :return: True if the certificate has expired, false otherwise
806 """
807 now = int(time())
808 notAfter = _api.X509_get_notAfter(self._x509)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800809 return _api.ASN1_UTCTIME_cmp_time_t(
810 _api.cast('ASN1_UTCTIME*', notAfter), now) < 0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800811
812
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800813 def _get_boundary_time(self, which):
Jean-Paul Calderone57122982013-02-21 08:47:05 -0800814 return _get_asn1_time(which(self._x509))
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800815
816
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800817 def get_notBefore(self):
818 """
819 Retrieve the time stamp for when the certificate starts being valid
820
821 :return: A string giving the timestamp, in the format::
822
823 YYYYMMDDhhmmssZ
824 YYYYMMDDhhmmss+hhmm
825 YYYYMMDDhhmmss-hhmm
826
827 or None if there is no value set.
828 """
829 return self._get_boundary_time(_api.X509_get_notBefore)
830
831
832 def _set_boundary_time(self, which, when):
Jean-Paul Calderone57122982013-02-21 08:47:05 -0800833 return _set_asn1_time(which(self._x509), when)
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800834
835
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800836 def set_notBefore(self, when):
837 """
838 Set the time stamp for when the certificate starts being valid
839
840 :param when: A string giving the timestamp, in the format:
841
842 YYYYMMDDhhmmssZ
843 YYYYMMDDhhmmss+hhmm
844 YYYYMMDDhhmmss-hhmm
845 :type when: :py:class:`bytes`
846
847 :return: None
848 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800849 return self._set_boundary_time(_api.X509_get_notBefore, when)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800850
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800851
852 def get_notAfter(self):
853 """
854 Retrieve the time stamp for when the certificate stops being valid
855
856 :return: A string giving the timestamp, in the format::
857
858 YYYYMMDDhhmmssZ
859 YYYYMMDDhhmmss+hhmm
860 YYYYMMDDhhmmss-hhmm
861
862 or None if there is no value set.
863 """
864 return self._get_boundary_time(_api.X509_get_notAfter)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800865
866
867 def set_notAfter(self, when):
868 """
869 Set the time stamp for when the certificate stops being valid
870
871 :param when: A string giving the timestamp, in the format:
872
873 YYYYMMDDhhmmssZ
874 YYYYMMDDhhmmss+hhmm
875 YYYYMMDDhhmmss-hhmm
876 :type when: :py:class:`bytes`
877
878 :return: None
879 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800880 return self._set_boundary_time(_api.X509_get_notAfter, when)
881
882
883 def _get_name(self, which):
884 name = X509Name.__new__(X509Name)
885 name._name = which(self._x509)
886 if name._name == _api.NULL:
887 1/0
888 return name
889
890
891 def _set_name(self, which, name):
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800892 if not isinstance(name, X509Name):
893 raise TypeError("name must be an X509Name")
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800894 set_result = which(self._x509, name._name)
895 if not set_result:
896 1/0
897
898
899 def get_issuer(self):
900 """
901 Create an X509Name object for the issuer of the certificate
902
903 :return: An X509Name object
904 """
905 return self._get_name(_api.X509_get_issuer_name)
906
907
908 def set_issuer(self, issuer):
909 """
910 Set the issuer of the certificate
911
912 :param issuer: The issuer name
913 :type issuer: :py:class:`X509Name`
914
915 :return: None
916 """
917 return self._set_name(_api.X509_set_issuer_name, issuer)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800918
919
920 def get_subject(self):
921 """
922 Create an X509Name object for the subject of the certificate
923
924 :return: An X509Name object
925 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800926 return self._get_name(_api.X509_get_subject_name)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800927
928
929 def set_subject(self, subject):
930 """
931 Set the subject of the certificate
932
933 :param subject: The subject name
934 :type subject: :py:class:`X509Name`
935 :return: None
936 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800937 return self._set_name(_api.X509_set_subject_name, subject)
Jean-Paul Calderone83d22eb2013-02-20 12:19:43 -0800938
939
940 def get_extension_count(self):
941 """
942 Get the number of extensions on the certificate.
943
944 :return: The number of extensions as an integer.
945 """
946 return _api.X509_get_ext_count(self._x509)
947
948
949 def add_extensions(self, extensions):
950 """
951 Add extensions to the certificate.
952
953 :param extensions: a sequence of X509Extension objects
954 :return: None
955 """
956 for ext in extensions:
957 if not isinstance(ext, X509Extension):
958 raise ValueError("One of the elements is not an X509Extension")
959
960 add_result = _api.X509_add_ext(self._x509, ext._extension, -1)
961 if not add_result:
962 _raise_current_error()
963
964
965 def get_extension(self, index):
966 """
967 Get a specific extension of the certificate by index.
968
969 :param index: The index of the extension to retrieve.
970 :return: The X509Extension object at the specified index.
971 """
972 ext = X509Extension.__new__(X509Extension)
973 ext._extension = _api.X509_get_ext(self._x509, index)
974 if ext._extension == _api.NULL:
975 raise IndexError("extension index out of bounds")
976
977 ext._extension = _api.X509_EXTENSION_dup(ext._extension)
978 return ext
979
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800980X509Type = X509
981
982
983
984def load_certificate(type, buffer):
985 """
986 Load a certificate from a buffer
987
988 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
989
990 :param buffer: The buffer the certificate is stored in
991 :type buffer: :py:class:`bytes`
992
993 :return: The X509 object
994 """
995 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
Jean-Paul Calderone066f0572013-02-20 13:43:44 -0800996 if bio == _api.NULL:
997 1/0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800998
999 try:
1000 if type == FILETYPE_PEM:
1001 x509 = _api.PEM_read_bio_X509(bio, _api.NULL, _api.NULL, _api.NULL)
1002 elif type == FILETYPE_ASN1:
1003 x509 = _api.d2i_X509_bio(bio, _api.NULL);
1004 else:
1005 raise ValueError(
1006 "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1007 finally:
1008 _api.BIO_free(bio)
1009
1010 if x509 == _api.NULL:
1011 _raise_current_error()
1012
1013 cert = X509.__new__(X509)
1014 cert._x509 = x509
1015 return cert
1016
1017
1018def dump_certificate(type, cert):
1019 """
1020 Dump a certificate to a buffer
1021
1022 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1023 :param cert: The certificate to dump
1024 :return: The buffer with the dumped certificate in
1025 """
1026 bio = _api.BIO_new(_api.BIO_s_mem())
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001027 if bio == _api.NULL:
1028 1/0
1029
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001030 if type == FILETYPE_PEM:
1031 result_code = _api.PEM_write_bio_X509(bio, cert._x509)
1032 elif type == FILETYPE_ASN1:
1033 result_code = _api.i2d_X509_bio(bio, cert._x509)
1034 elif type == FILETYPE_TEXT:
1035 result_code = _api.X509_print_ex(bio, cert._x509, 0, 0)
1036 else:
1037 raise ValueError(
1038 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1039 "FILETYPE_TEXT")
1040
1041 return _bio_to_string(bio)
1042
1043
1044
1045def dump_privatekey(type, pkey, cipher=None, passphrase=None):
1046 """
1047 Dump a private key to a buffer
1048
1049 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1050 :param pkey: The PKey to dump
1051 :param cipher: (optional) if encrypted PEM format, the cipher to
1052 use
1053 :param passphrase: (optional) if encrypted PEM format, this can be either
1054 the passphrase to use, or a callback for providing the
1055 passphrase.
1056 :return: The buffer with the dumped key in
1057 :rtype: :py:data:`str`
1058 """
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001059 bio = _api.BIO_new(_api.BIO_s_mem())
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001060 if bio == _api.NULL:
1061 1/0
1062
1063 if cipher is not None:
1064 cipher_obj = _api.EVP_get_cipherbyname(cipher)
1065 if cipher_obj == _api.NULL:
1066 raise ValueError("Invalid cipher name")
1067 else:
1068 cipher_obj = _api.NULL
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001069
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001070 helper = _PassphraseHelper(type, passphrase)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001071 if type == FILETYPE_PEM:
1072 result_code = _api.PEM_write_bio_PrivateKey(
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001073 bio, pkey._pkey, cipher_obj, _api.NULL, 0,
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001074 helper.callback, helper.callback_args)
1075 helper.raise_if_problem()
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001076 elif type == FILETYPE_ASN1:
1077 result_code = _api.i2d_PrivateKey_bio(bio, pkey._pkey)
1078 elif type == FILETYPE_TEXT:
1079 rsa = _api.EVP_PKEY_get1_RSA(pkey._pkey)
1080 result_code = _api.RSA_print(bio, rsa, 0)
1081 # TODO RSA_free(rsa)?
1082 else:
1083 raise ValueError(
1084 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1085 "FILETYPE_TEXT")
1086
1087 if result_code == 0:
1088 _raise_current_error()
1089
1090 return _bio_to_string(bio)
1091
1092
1093
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001094def _X509_REVOKED_dup(original):
1095 copy = _api.X509_REVOKED_new()
1096 if copy == _api.NULL:
1097 1/0
1098
1099 if original.serialNumber != _api.NULL:
1100 copy.serialNumber = _api.ASN1_INTEGER_dup(original.serialNumber)
1101
1102 if original.revocationDate != _api.NULL:
1103 copy.revocationDate = _api.M_ASN1_TIME_dup(original.revocationDate)
1104
1105 if original.extensions != _api.NULL:
1106 extension_stack = _api.sk_X509_EXTENSION_new_null()
1107 for i in range(_api.sk_X509_EXTENSION_num(original.extensions)):
1108 original_ext = _api.sk_X509_EXTENSION_value(original.extensions, i)
1109 copy_ext = _api.X509_EXTENSION_dup(original_ext)
1110 _api.sk_X509_EXTENSION_push(extension_stack, copy_ext)
1111 copy.extensions = extension_stack
1112
1113 copy.sequence = original.sequence
1114 return copy
1115
1116
1117
1118class Revoked(object):
1119 # http://www.openssl.org/docs/apps/x509v3_config.html#CRL_distribution_points_
1120 # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches
1121 # OCSP_crl_reason_str. We use the latter, just like the command line
1122 # program.
1123 _crl_reasons = [
1124 "unspecified",
1125 "keyCompromise",
1126 "CACompromise",
1127 "affiliationChanged",
1128 "superseded",
1129 "cessationOfOperation",
1130 "certificateHold",
1131 # "removeFromCRL",
1132 ]
1133
1134 def __init__(self):
1135 self._revoked = _api.X509_REVOKED_new()
1136
1137
1138 def set_serial(self, hex_str):
1139 """
1140 Set the serial number of a revoked Revoked structure
1141
1142 :param hex_str: The new serial number.
1143 :type hex_str: :py:data:`str`
1144 :return: None
1145 """
1146 bignum_serial = _api.new("BIGNUM**")
1147 bn_result = _api.BN_hex2bn(bignum_serial, hex_str)
1148 if not bn_result:
1149 raise ValueError("bad hex string")
1150
1151 asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
1152 _api.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
1153
1154
1155 def get_serial(self):
1156 """
1157 Return the serial number of a Revoked structure
1158
1159 :return: The serial number as a string
1160 """
1161 bio = _api.BIO_new(_api.BIO_s_mem())
1162 if bio == _api.NULL:
1163 1/0
1164
1165 result = _api.i2a_ASN1_INTEGER(bio, self._revoked.serialNumber)
1166 if result < 0:
1167 1/0
1168
1169 return _bio_to_string(bio)
1170
1171
1172 def _delete_reason(self):
1173 stack = self._revoked.extensions
1174 for i in range(_api.sk_X509_EXTENSION_num(stack)):
1175 ext = _api.sk_X509_EXTENSION_value(stack, i)
1176 if _api.OBJ_obj2nid(ext.object) == _api.NID_crl_reason:
1177 _api.sk_X509_EXTENSION_delete(stack, i)
1178 break
1179
1180
1181 def set_reason(self, reason):
1182 """
1183 Set the reason of a Revoked object.
1184
1185 If :py:data:`reason` is :py:data:`None`, delete the reason instead.
1186
1187 :param reason: The reason string.
1188 :type reason: :py:class:`str` or :py:class:`NoneType`
1189 :return: None
1190 """
1191 if reason is None:
1192 self._delete_reason()
1193 elif not isinstance(reason, bytes):
1194 raise TypeError("reason must be None or a byte string")
1195 else:
1196 reason = reason.lower().replace(' ', '')
1197 reason_code = [r.lower() for r in self._crl_reasons].index(reason)
1198
1199 new_reason_ext = _api.ASN1_ENUMERATED_new()
1200 if new_reason_ext == _api.NULL:
1201 1/0
1202
1203 set_result = _api.ASN1_ENUMERATED_set(new_reason_ext, reason_code)
1204 if set_result == _api.NULL:
1205 1/0
1206
1207 self._delete_reason()
1208 add_result = _api.X509_REVOKED_add1_ext_i2d(
1209 self._revoked, _api.NID_crl_reason, new_reason_ext, 0, 0)
1210
1211 if not add_result:
1212 1/0
1213
1214
1215 def get_reason(self):
1216 """
1217 Return the reason of a Revoked object.
1218
1219 :return: The reason as a string
1220 """
1221 extensions = self._revoked.extensions
1222 for i in range(_api.sk_X509_EXTENSION_num(extensions)):
1223 ext = _api.sk_X509_EXTENSION_value(extensions, i)
1224 if _api.OBJ_obj2nid(ext.object) == _api.NID_crl_reason:
1225 bio = _api.BIO_new(_api.BIO_s_mem())
1226 if bio == _api.NULL:
1227 1/0
1228
1229 print_result = _api.X509V3_EXT_print(bio, ext, 0, 0)
1230 if not print_result:
1231 print_result = _api.M_ASN1_OCTET_STRING_print(bio, ext.value)
1232 if print_result == 0:
1233 1/0
1234
1235 return _bio_to_string(bio)
1236
1237
1238 def all_reasons(self):
1239 """
1240 Return a list of all the supported reason strings.
1241
1242 :return: A list of reason strings.
1243 """
1244 return self._crl_reasons[:]
1245
1246
1247 def set_rev_date(self, when):
1248 """
1249 Set the revocation timestamp
1250
1251 :param when: A string giving the timestamp, in the format:
1252
1253 YYYYMMDDhhmmssZ
1254 YYYYMMDDhhmmss+hhmm
1255 YYYYMMDDhhmmss-hhmm
1256
1257 :return: None
1258 """
1259 return _set_asn1_time(self._revoked.revocationDate, when)
1260
1261
1262 def get_rev_date(self):
1263 """
1264 Retrieve the revocation date
1265
1266 :return: A string giving the timestamp, in the format:
1267
1268 YYYYMMDDhhmmssZ
1269 YYYYMMDDhhmmss+hhmm
1270 YYYYMMDDhhmmss-hhmm
1271 """
1272 return _get_asn1_time(self._revoked.revocationDate)
1273
1274
1275
1276class CRL(object):
1277 def __init__(self):
1278 """
1279 Create a new empty CRL object.
1280 """
1281 self._crl = _api.X509_CRL_new()
1282
1283
1284 def get_revoked(self):
1285 """
1286 Return revoked portion of the CRL structure (by value not reference).
1287
1288 :return: A tuple of Revoked objects.
1289 """
1290 results = []
1291 revoked_stack = self._crl.crl.revoked
1292 for i in range(_api.sk_X509_REVOKED_num(revoked_stack)):
1293 revoked = _api.sk_X509_REVOKED_value(revoked_stack, i)
1294 revoked_copy = _X509_REVOKED_dup(revoked)
1295 pyrev = Revoked.__new__(Revoked)
1296 pyrev._revoked = revoked_copy
1297 results.append(pyrev)
Jean-Paul Calderone85b74eb2013-02-21 09:15:01 -08001298 if results:
1299 return tuple(results)
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001300
1301
1302 def add_revoked(self, revoked):
1303 """
1304 Add a revoked (by value not reference) to the CRL structure
1305
1306 :param revoked: The new revoked.
1307 :type revoked: :class:`X509`
1308
1309 :return: None
1310 """
1311 copy = _X509_REVOKED_dup(revoked._revoked)
1312 if copy == _api.NULL:
1313 1/0
1314
1315 add_result = _api.X509_CRL_add0_revoked(self._crl, copy)
1316 # TODO what check on add_result?
1317
1318
1319 def export(self, cert, key, type=FILETYPE_PEM, days=100):
1320 """
1321 export a CRL as a string
1322
1323 :param cert: Used to sign CRL.
1324 :type cert: :class:`X509`
1325
1326 :param key: Used to sign CRL.
1327 :type key: :class:`PKey`
1328
1329 :param type: The export format, either :py:data:`FILETYPE_PEM`, :py:data:`FILETYPE_ASN1`, or :py:data:`FILETYPE_TEXT`.
1330
1331 :param days: The number of days until the next update of this CRL.
1332 :type days: :py:data:`int`
1333
1334 :return: :py:data:`str`
1335 """
Jean-Paul Calderone85b74eb2013-02-21 09:15:01 -08001336 if not isinstance(cert, X509):
1337 raise TypeError("cert must be an X509 instance")
1338 if not isinstance(key, PKey):
1339 raise TypeError("key must be a PKey instance")
1340 if not isinstance(type, int):
1341 raise TypeError("type must be an integer")
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001342
Jean-Paul Calderone85b74eb2013-02-21 09:15:01 -08001343 bio = _api.BIO_new(_api.BIO_s_mem())
1344 if bio == _api.NULL:
1345 1/0
1346
1347 # A scratch time object to give different values to different CRL fields
1348 sometime = _api.ASN1_TIME_new()
1349 if sometime == _api.NULL:
1350 1/0
1351
1352 _api.X509_gmtime_adj(sometime, 0)
1353 _api.X509_CRL_set_lastUpdate(self._crl, sometime)
1354
1355 _api.X509_gmtime_adj(sometime, days * 24 * 60 * 60)
1356 _api.X509_CRL_set_nextUpdate(self._crl, sometime)
1357
1358 _api.X509_CRL_set_issuer_name(self._crl, _api.X509_get_subject_name(cert._x509))
1359
1360 sign_result = _api.X509_CRL_sign(self._crl, key._pkey, _api.EVP_md5())
1361 if not sign_result:
1362 _raise_current_error()
1363
1364 if type == FILETYPE_PEM:
1365 ret = _api.PEM_write_bio_X509_CRL(bio, self._crl)
1366 elif type == FILETYPE_ASN1:
1367 ret = _api.i2d_X509_CRL_bio(bio, self._crl)
1368 elif type == FILETYPE_TEXT:
1369 ret = _api.X509_CRL_print(bio, self._crl)
1370 else:
1371 raise ValueError(
1372 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT")
1373
1374 if not ret:
1375 1/0
1376
1377 return _bio_to_string(bio)
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001378CRLType = CRL
1379
1380
Jean-Paul Calderone3b89f472013-02-21 09:32:25 -08001381
Jean-Paul Calderone4e8be1c2013-02-21 18:31:12 -08001382class PKCS7(object):
1383 def type_is_signed(self):
1384 """
1385 Check if this NID_pkcs7_signed object
1386
1387 :return: True if the PKCS7 is of type signed
1388 """
1389 if _api.PKCS7_type_is_signed(self._pkcs7):
1390 return True
1391 return False
1392
1393
1394 def type_is_enveloped(self):
1395 """
1396 Check if this NID_pkcs7_enveloped object
1397
1398 :returns: True if the PKCS7 is of type enveloped
1399 """
1400 if _api.PKCS7_type_is_enveloped(self._pkcs7):
1401 return True
1402 return False
1403
1404
1405 def type_is_signedAndEnveloped(self):
1406 """
1407 Check if this NID_pkcs7_signedAndEnveloped object
1408
1409 :returns: True if the PKCS7 is of type signedAndEnveloped
1410 """
1411 if _api.PKCS7_type_is_signedAndEnveloped(self._pkcs7):
1412 return True
1413 return False
1414
1415
1416 def type_is_data(self):
1417 """
1418 Check if this NID_pkcs7_data object
1419
1420 :return: True if the PKCS7 is of type data
1421 """
1422 if _api.PKCS7_type_is_data(self._pkcs7):
1423 return True
1424 return False
1425
1426
1427 def get_type_name(self):
1428 """
1429 Returns the type name of the PKCS7 structure
1430
1431 :return: A string with the typename
1432 """
1433 nid = _api.OBJ_obj2nid(self._pkcs7.type)
1434 string_type = _api.OBJ_nid2sn(nid)
1435 return _api.string(string_type)
1436
1437PKCS7Type = PKCS7
1438
1439
1440
Jean-Paul Calderonee5912ce2013-02-21 10:49:35 -08001441class PKCS12(object):
1442 def __init__(self):
1443 self._pkey = None
1444 self._cert = None
1445 self._cacerts = None
1446 self._friendlyname = None
1447
1448
1449 def get_certificate(self):
1450 """
1451 Return certificate portion of the PKCS12 structure
1452
1453 :return: X509 object containing the certificate
1454 """
1455 return self._cert
1456
1457
1458 def set_certificate(self, cert):
1459 """
1460 Replace the certificate portion of the PKCS12 structure
1461
1462 :param cert: The new certificate.
1463 :type cert: :py:class:`X509` or :py:data:`None`
1464 :return: None
1465 """
1466 if not isinstance(cert, X509):
1467 raise TypeError("cert must be an X509 instance")
1468 self._cert = cert
1469
1470
1471 def get_privatekey(self):
1472 """
1473 Return private key portion of the PKCS12 structure
1474
1475 :returns: PKey object containing the private key
1476 """
1477 return self._pkey
1478
1479
1480 def set_privatekey(self, pkey):
1481 """
1482 Replace or set the certificate portion of the PKCS12 structure
1483
1484 :param pkey: The new private key.
1485 :type pkey: :py:class:`PKey`
1486 :return: None
1487 """
1488 if not isinstance(pkey, PKey):
1489 raise TypeError("pkey must be a PKey instance")
1490 self._pkey = pkey
1491
1492
1493 def get_ca_certificates(self):
1494 """
1495 Return CA certificates within of the PKCS12 object
1496
1497 :return: A newly created tuple containing the CA certificates in the chain,
1498 if any are present, or None if no CA certificates are present.
1499 """
1500 if self._cacerts is not None:
1501 return tuple(self._cacerts)
1502
1503
1504 def set_ca_certificates(self, cacerts):
1505 """
1506 Replace or set the CA certificates withing the PKCS12 object.
1507
1508 :param cacerts: The new CA certificates.
1509 :type cacerts: :py:data:`None` or an iterable of :py:class:`X509`
1510 :return: None
1511 """
1512 if cacerts is None:
1513 self._cacerts = None
1514 else:
1515 cacerts = list(cacerts)
1516 for cert in cacerts:
1517 if not isinstance(cert, X509):
1518 raise TypeError("iterable must only contain X509 instances")
1519 self._cacerts = cacerts
1520
1521
1522 def set_friendlyname(self, name):
1523 """
1524 Replace or set the certificate portion of the PKCS12 structure
1525
1526 :param name: The new friendly name.
1527 :type name: :py:class:`bytes`
1528 :return: None
1529 """
1530 if name is None:
1531 self._friendlyname = None
1532 elif not isinstance(name, bytes):
1533 raise TypeError("name must be a byte string or None (not %r)" % (name,))
1534 self._friendlyname = name
1535
1536
1537 def get_friendlyname(self):
1538 """
1539 Return friendly name portion of the PKCS12 structure
1540
1541 :returns: String containing the friendlyname
1542 """
1543 return self._friendlyname
1544
1545
1546 def export(self, passphrase=None, iter=2048, maciter=1):
1547 """
1548 Dump a PKCS12 object as a string. See also "man PKCS12_create".
1549
1550 :param passphrase: used to encrypt the PKCS12
1551 :type passphrase: :py:data:`bytes`
1552
1553 :param iter: How many times to repeat the encryption
1554 :type iter: :py:data:`int`
1555
1556 :param maciter: How many times to repeat the MAC
1557 :type maciter: :py:data:`int`
1558
1559 :return: The string containing the PKCS12
1560 """
1561 if self._cacerts is None:
1562 cacerts = _api.NULL
1563 else:
1564 cacerts = _api.sk_X509_new_null()
1565 for cert in self._cacerts:
1566 _api.sk_X509_push(cacerts, cert._x509)
1567
1568 if passphrase is None:
1569 passphrase = _api.NULL
1570
1571 friendlyname = self._friendlyname
1572 if friendlyname is None:
1573 friendlyname = _api.NULL
1574
1575 if self._pkey is None:
1576 pkey = _api.NULL
1577 else:
1578 pkey = self._pkey._pkey
1579
1580 if self._cert is None:
1581 cert = _api.NULL
1582 else:
1583 cert = self._cert._x509
1584
1585 pkcs12 = _api.PKCS12_create(
1586 passphrase, friendlyname, pkey, cert, cacerts,
1587 _api.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
1588 _api.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
1589 iter, maciter, 0)
1590 if pkcs12 == _api.NULL:
1591 _raise_current_error()
1592
1593 bio = _api.BIO_new(_api.BIO_s_mem())
1594 if bio == _api.NULL:
1595 1/0
1596
1597 _api.i2d_PKCS12_bio(bio, pkcs12)
1598 return _bio_to_string(bio)
1599PKCS12Type = PKCS12
1600
1601
1602
Jean-Paul Calderone3b89f472013-02-21 09:32:25 -08001603class NetscapeSPKI(object):
1604 def __init__(self):
1605 self._spki = _api.NETSCAPE_SPKI_new()
1606
1607
1608 def sign(self, pkey, digest):
1609 """
1610 Sign the certificate request using the supplied key and digest
1611
1612 :param pkey: The key to sign with
1613 :param digest: The message digest to use
1614 :return: None
1615 """
1616 if pkey._only_public:
1617 raise ValueError("Key has only public part")
1618
1619 if not pkey._initialized:
1620 raise ValueError("Key is uninitialized")
1621
1622 digest_obj = _api.EVP_get_digestbyname(digest)
1623 if digest_obj == _api.NULL:
1624 raise ValueError("No such digest method")
1625
1626 sign_result = _api.NETSCAPE_SPKI_sign(self._spki, pkey._pkey, digest_obj)
1627 if not sign_result:
1628 1/0
1629
1630
1631 def verify(self, key):
1632 """
1633 Verifies a certificate request using the supplied public key
1634
1635 :param key: a public key
1636 :return: True if the signature is correct.
1637 :raise OpenSSL.crypto.Error: If the signature is invalid or there is a
1638 problem verifying the signature.
1639 """
1640 answer = _api.NETSCAPE_SPKI_verify(self._spki, key._pkey)
1641 if answer <= 0:
1642 _raise_current_error()
1643 return True
1644
1645
1646 def b64_encode(self):
1647 """
1648 Generate a base64 encoded string from an SPKI
1649
1650 :return: The base64 encoded string
1651 """
1652 return _api.string(_api.NETSCAPE_SPKI_b64_encode(self._spki))
1653
1654
1655 def get_pubkey(self):
1656 """
1657 Get the public key of the certificate
1658
1659 :return: The public key
1660 """
1661 pkey = PKey.__new__(PKey)
1662 pkey._pkey = _api.NETSCAPE_SPKI_get_pubkey(self._spki)
1663 if pkey._pkey == _api.NULL:
1664 1/0
1665 pkey._only_public = True
1666 return pkey
1667
1668
1669 def set_pubkey(self, pkey):
1670 """
1671 Set the public key of the certificate
1672
1673 :param pkey: The public key
1674 :return: None
1675 """
1676 set_result = _api.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey)
1677 if not set_result:
1678 1/0
1679NetscapeSPKIType = NetscapeSPKI
1680
1681
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001682class _PassphraseHelper(object):
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001683 def __init__(self, type, passphrase):
1684 if type != FILETYPE_PEM and passphrase is not None:
1685 raise ValueError("only FILETYPE_PEM key format supports encryption")
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001686 self._passphrase = passphrase
1687 self._problems = []
1688
1689
1690 @property
1691 def callback(self):
1692 if self._passphrase is None:
1693 return _api.NULL
1694 elif isinstance(self._passphrase, bytes):
1695 return _api.NULL
1696 elif callable(self._passphrase):
1697 return _api.callback("pem_password_cb", self._read_passphrase)
1698 else:
1699 raise TypeError("Last argument must be string or callable")
1700
1701
1702 @property
1703 def callback_args(self):
1704 if self._passphrase is None:
1705 return _api.NULL
1706 elif isinstance(self._passphrase, bytes):
1707 return self._passphrase
1708 elif callable(self._passphrase):
1709 return _api.NULL
1710 else:
1711 raise TypeError("Last argument must be string or callable")
1712
1713
1714 def raise_if_problem(self):
1715 if self._problems:
1716 try:
1717 _raise_current_error()
1718 except Error:
1719 pass
1720 raise self._problems[0]
1721
1722
1723 def _read_passphrase(self, buf, size, rwflag, userdata):
1724 try:
1725 result = self._passphrase(rwflag)
1726 if not isinstance(result, bytes):
1727 raise ValueError("String expected")
1728 if len(result) > size:
1729 raise ValueError("passphrase returned by callback is too long")
1730 for i in range(len(result)):
1731 buf[i] = result[i]
1732 return len(result)
1733 except Exception as e:
1734 self._problems.append(e)
1735 return 0
1736
1737
1738
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001739def load_privatekey(type, buffer, passphrase=None):
1740 """
1741 Load a private key from a buffer
1742
1743 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1744 :param buffer: The buffer the key is stored in
1745 :param passphrase: (optional) if encrypted PEM format, this can be
1746 either the passphrase to use, or a callback for
1747 providing the passphrase.
1748
1749 :return: The PKey object
1750 """
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001751 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001752 if bio == _api.NULL:
1753 1/0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001754
Jean-Paul Calderone23478b32013-02-20 13:31:38 -08001755 helper = _PassphraseHelper(type, passphrase)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001756 if type == FILETYPE_PEM:
Jean-Paul Calderonee41f05c2013-02-20 13:28:16 -08001757 evp_pkey = _api.PEM_read_bio_PrivateKey(
1758 bio, _api.NULL, helper.callback, helper.callback_args)
1759 helper.raise_if_problem()
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001760 elif type == FILETYPE_ASN1:
1761 evp_pkey = _api.d2i_PrivateKey_bio(bio, _api.NULL)
1762 else:
1763 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1764
Jean-Paul Calderone31393aa2013-02-20 13:22:21 -08001765 if evp_pkey == _api.NULL:
1766 _raise_current_error()
1767
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001768 pkey = PKey.__new__(PKey)
1769 pkey._pkey = evp_pkey
1770 return pkey
1771
1772
1773
1774def dump_certificate_request(type, req):
1775 """
1776 Dump a certificate request to a buffer
1777
1778 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1779 :param req: The certificate request to dump
1780 :return: The buffer with the dumped certificate request in
1781 """
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001782 bio = _api.BIO_new(_api.BIO_s_mem())
1783 if bio == _api.NULL:
1784 1/0
1785
1786 if type == FILETYPE_PEM:
1787 result_code = _api.PEM_write_bio_X509_REQ(bio, req._req)
1788 elif type == FILETYPE_ASN1:
Jean-Paul Calderonec9a395f2013-02-20 16:59:21 -08001789 result_code = _api.i2d_X509_REQ_bio(bio, req._req)
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001790 elif type == FILETYPE_TEXT:
Jean-Paul Calderonec9a395f2013-02-20 16:59:21 -08001791 result_code = _api.X509_REQ_print_ex(bio, req._req, 0, 0)
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001792 else:
Jean-Paul Calderonec9a395f2013-02-20 16:59:21 -08001793 raise ValueError("type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT")
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001794
1795 if result_code == 0:
1796 1/0
1797
1798 return _bio_to_string(bio)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001799
1800
1801
1802def load_certificate_request(type, buffer):
1803 """
1804 Load a certificate request from a buffer
1805
1806 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1807 :param buffer: The buffer the certificate request is stored in
1808 :return: The X509Req object
1809 """
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001810 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
1811 if bio == _api.NULL:
1812 1/0
1813
1814 if type == FILETYPE_PEM:
1815 req = _api.PEM_read_bio_X509_REQ(bio, _api.NULL, _api.NULL, _api.NULL)
1816 elif type == FILETYPE_ASN1:
Jean-Paul Calderonec9a395f2013-02-20 16:59:21 -08001817 req = _api.d2i_X509_REQ_bio(bio, _api.NULL)
Jean-Paul Calderone066f0572013-02-20 13:43:44 -08001818 else:
1819 1/0
1820
1821 if req == _api.NULL:
1822 1/0
1823
1824 x509req = X509Req.__new__(X509Req)
1825 x509req._req = req
1826 return x509req
Jean-Paul Calderone8cf4f802013-02-20 16:45:02 -08001827
1828
1829
1830def sign(pkey, data, digest):
1831 """
1832 Sign data with a digest
1833
1834 :param pkey: Pkey to sign with
1835 :param data: data to be signed
1836 :param digest: message digest to use
1837 :return: signature
1838 """
1839 digest_obj = _api.EVP_get_digestbyname(digest)
1840 if digest_obj == _api.NULL:
1841 raise ValueError("No such digest method")
1842
1843 md_ctx = _api.new("EVP_MD_CTX*")
1844
1845 _api.EVP_SignInit(md_ctx, digest_obj)
1846 _api.EVP_SignUpdate(md_ctx, data, len(data))
1847
1848 signature_buffer = _api.new("unsigned char[]", 512)
1849 signature_length = _api.new("unsigned int*")
1850 signature_length[0] = len(signature_buffer)
1851 final_result = _api.EVP_SignFinal(
1852 md_ctx, signature_buffer, signature_length, pkey._pkey)
1853
1854 if final_result != 1:
1855 1/0
1856
1857 return _api.buffer(signature_buffer, signature_length[0])[:]
1858
1859
1860
1861def verify(cert, signature, data, digest):
1862 """
1863 Verify a signature
1864
1865 :param cert: signing certificate (X509 object)
1866 :param signature: signature returned by sign function
1867 :param data: data to be verified
1868 :param digest: message digest to use
1869 :return: None if the signature is correct, raise exception otherwise
1870 """
1871 digest_obj = _api.EVP_get_digestbyname(digest)
1872 if digest_obj == _api.NULL:
1873 raise ValueError("No such digest method")
1874
1875 pkey = _api.X509_get_pubkey(cert._x509)
1876 if pkey == _api.NULL:
1877 1/0
1878
1879 md_ctx = _api.new("EVP_MD_CTX*")
1880
1881 _api.EVP_VerifyInit(md_ctx, digest_obj)
1882 _api.EVP_VerifyUpdate(md_ctx, data, len(data))
1883 verify_result = _api.EVP_VerifyFinal(md_ctx, signature, len(signature), pkey)
1884
1885 if verify_result != 1:
1886 _raise_current_error()
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001887
1888
1889
1890def load_crl(type, buffer):
1891 """
1892 Load a certificate revocation list from a buffer
1893
1894 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1895 :param buffer: The buffer the CRL is stored in
1896
1897 :return: The PKey object
1898 """
1899 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
1900 if bio == _api.NULL:
1901 1/0
1902
1903 if type == FILETYPE_PEM:
1904 crl = _api.PEM_read_bio_X509_CRL(bio, _api.NULL, _api.NULL, _api.NULL)
1905 elif type == FILETYPE_ASN1:
1906 crl = _api.d2i_X509_CRL_bio(bio, _api.NULL)
1907 else:
Jean-Paul Calderone57122982013-02-21 08:47:05 -08001908 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1909
1910 if crl == _api.NULL:
1911 _raise_current_error()
1912
1913 result = CRL.__new__(CRL)
1914 result._crl = crl
1915 return result
Jean-Paul Calderonee5912ce2013-02-21 10:49:35 -08001916
1917
1918
Jean-Paul Calderone4e8be1c2013-02-21 18:31:12 -08001919def load_pkcs7_data(type, buffer):
1920 """
1921 Load pkcs7 data from a buffer
1922
1923 :param type: The file type (one of FILETYPE_PEM or FILETYPE_ASN1)
1924 :param buffer: The buffer with the pkcs7 data.
1925 :return: The PKCS7 object
1926 """
1927 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
1928 if bio == _api.NULL:
1929 1/0
1930
1931 if type == FILETYPE_PEM:
1932 pkcs7 = _api.PEM_read_bio_PKCS7(bio, _api.NULL, _api.NULL, _api.NULL)
1933 elif type == FILETYPE_ASN1:
1934 pass
1935 else:
1936 1/0
1937 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1938
1939 if pkcs7 == _api.NULL:
1940 1/0
1941
1942 pypkcs7 = PKCS7.__new__(PKCS7)
1943 pypkcs7._pkcs7 = pkcs7
1944 return pypkcs7
1945
1946
1947
Jean-Paul Calderonee5912ce2013-02-21 10:49:35 -08001948def load_pkcs12(buffer, passphrase):
1949 """
1950 Load a PKCS12 object from a buffer
1951
1952 :param buffer: The buffer the certificate is stored in
1953 :param passphrase: (Optional) The password to decrypt the PKCS12 lump
1954 :returns: The PKCS12 object
1955 """
1956 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
1957 if bio == _api.NULL:
1958 1/0
1959
1960 p12 = _api.d2i_PKCS12_bio(bio, _api.NULL)
1961 if p12 == _api.NULL:
1962 _raise_current_error()
1963
1964 pkey = _api.new("EVP_PKEY**")
1965 cert = _api.new("X509**")
1966 cacerts = _api.new("struct stack_st_X509**")
1967
1968 parse_result = _api.PKCS12_parse(p12, passphrase, pkey, cert, cacerts)
1969 if not parse_result:
1970 _raise_current_error()
1971
1972 # openssl 1.0.0 sometimes leaves an X509_check_private_key error in the
1973 # queue for no particular reason. This error isn't interesting to anyone
1974 # outside this function. It's not even interesting to us. Get rid of it.
1975 try:
1976 _raise_current_error()
1977 except Error:
1978 pass
1979
1980 if pkey[0] == _api.NULL:
1981 pykey = None
1982 else:
1983 pykey = PKey.__new__(PKey)
1984 pykey._pkey = pkey[0]
1985
1986 if cert[0] == _api.NULL:
1987 pycert = None
1988 friendlyname = None
1989 else:
1990 pycert = X509.__new__(X509)
1991 pycert._x509 = cert[0]
1992
1993 friendlyname_length = _api.new("int*")
1994 friendlyname_buffer = _api.X509_alias_get0(cert[0], friendlyname_length)
1995 friendlyname = _api.buffer(friendlyname_buffer, friendlyname_length[0])[:]
1996 if friendlyname_buffer == _api.NULL:
1997 friendlyname = None
1998
1999 pycacerts = []
2000 for i in range(_api.sk_X509_num(cacerts[0])):
2001 pycacert = X509.__new__(X509)
2002 pycacert._x509 = _api.sk_X509_value(cacerts[0], i)
2003 pycacerts.append(pycacert)
2004 if not pycacerts:
2005 pycacerts = None
2006
2007 pkcs12 = PKCS12.__new__(PKCS12)
2008 pkcs12._pkey = pykey
2009 pkcs12._cert = pycert
2010 pkcs12._cacerts = pycacerts
2011 pkcs12._friendlyname = friendlyname
2012 return pkcs12