blob: aac622c5f3b9a97796b6e957c4a89eedba491e3e [file] [log] [blame]
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001from time import time
2
Jean-Paul Calderone084b7522013-02-09 09:53:45 -08003from OpenSSL.xcrypto import *
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08004
5from tls.c import api as _api
6
7FILETYPE_PEM = _api.SSL_FILETYPE_PEM
8FILETYPE_ASN1 = _api.SSL_FILETYPE_ASN1
9
10# TODO This was an API mistake. OpenSSL has no such constant.
11FILETYPE_TEXT = 2 ** 16 - 1
12
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080013TYPE_RSA = _api.EVP_PKEY_RSA
14TYPE_DSA = _api.EVP_PKEY_DSA
15
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080016
17def _bio_to_string(bio):
18 """
19 Copy the contents of an OpenSSL BIO object into a Python byte string.
20 """
21 result_buffer = _api.new('char**')
22 buffer_length = _api.BIO_get_mem_data(bio, result_buffer)
23 return _api.buffer(result_buffer[0], buffer_length)[:]
24
25
26
27def _raise_current_error():
28 errors = []
29 while True:
30 error = _api.ERR_get_error()
31 if error == 0:
32 break
33 errors.append((
34 _api.string(_api.ERR_lib_error_string(error)),
35 _api.string(_api.ERR_func_error_string(error)),
36 _api.string(_api.ERR_reason_error_string(error))))
37
38 raise Error(errors)
39
40_exception_from_error_queue = _raise_current_error
41
42
43class Error(Exception):
44 pass
45
46
47
48class PKey(object):
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080049 _only_public = False
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080050 _initialized = True
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080051
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080052 def __init__(self):
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080053 self._pkey = _api.EVP_PKEY_new()
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080054 self._initialized = False
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080055
56
57 def generate_key(self, type, bits):
58 """
59 Generate a key of a given type, with a given number of a bits
60
61 :param type: The key type (TYPE_RSA or TYPE_DSA)
62 :param bits: The number of bits
63
64 :return: None
65 """
66 if not isinstance(type, int):
67 raise TypeError("type must be an integer")
68
69 if not isinstance(bits, int):
70 raise TypeError("bits must be an integer")
71
72 exponent = _api.new("BIGNUM**")
73 # TODO Check error return
74 # TODO Free the exponent[0]
75 _api.BN_hex2bn(exponent, "10001")
76
77 if type == TYPE_RSA:
78 if bits <= 0:
79 raise ValueError("Invalid number of bits")
80
81 rsa = _api.RSA_new();
82
83 # TODO Release GIL?
84 result = _api.RSA_generate_key_ex(rsa, bits, exponent[0], _api.NULL)
85 if result == -1:
86 1/0
87
88 result = _api.EVP_PKEY_assign_RSA(self._pkey, rsa)
89 if not result:
90 1/0
91
92 elif type == TYPE_DSA:
93 pass
94 else:
95 raise Error("No such key type")
96
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080097 self._initialized = True
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080098
99
100 def check(self):
101 """
102 Check the consistency of an RSA private key.
103
104 :return: True if key is consistent.
105 :raise Error: if the key is inconsistent.
106 :raise TypeError: if the key is of a type which cannot be checked.
107 Only RSA keys can currently be checked.
108 """
109 if _api.EVP_PKEY_type(self._pkey.type) != _api.EVP_PKEY_RSA:
110 raise TypeError("key type unsupported")
111
112 rsa = _api.EVP_PKEY_get1_RSA(self._pkey)
113 result = _api.RSA_check_key(rsa)
114 if result:
115 return True
116 _raise_current_error()
117
118
119
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800120class X509Name(object):
121 def __init__(self, name):
122 """
123 Create a new X509Name, copying the given X509Name instance.
124
125 :param name: An X509Name object to copy
126 """
127 self._name = _api.X509_NAME_dup(name._name)
128
129
130 def __setattr__(self, name, value):
131 if name.startswith('_'):
132 return super(X509Name, self).__setattr__(name, value)
133
134 if type(name) is not str:
135 raise TypeError("attribute name must be string, not '%.200s'" % (
136 type(value).__name__,))
137
138 nid = _api.OBJ_txt2nid(name)
139 if nid == _api.NID_undef:
140 try:
141 _raise_current_error()
142 except Error:
143 pass
144 raise AttributeError("No such attribute")
145
146 # If there's an old entry for this NID, remove it
147 for i in range(_api.X509_NAME_entry_count(self._name)):
148 ent = _api.X509_NAME_get_entry(self._name, i)
149 ent_obj = _api.X509_NAME_ENTRY_get_object(ent)
150 ent_nid = _api.OBJ_obj2nid(ent_obj)
151 if nid == ent_nid:
152 ent = _api.X509_NAME_delete_entry(self._name, i)
153 _api.X509_NAME_ENTRY_free(ent)
154 break
155
156 if isinstance(value, unicode):
157 value = value.encode('utf-8')
158
159 add_result = _api.X509_NAME_add_entry_by_NID(
160 self._name, nid, _api.MBSTRING_UTF8, value, -1, -1, 0)
161 if not add_result:
162 # TODO Untested
163 1/0
164
165
166 def __getattr__(self, name):
167 """
168 Find attribute. An X509Name object has the following attributes:
169 countryName (alias C), stateOrProvince (alias ST), locality (alias L),
170 organization (alias O), organizationalUnit (alias OU), commonName (alias
171 CN) and more...
172 """
173 nid = _api.OBJ_txt2nid(name)
174 if nid == _api.NID_undef:
175 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems
176 # a lower level function, a2d_ASN1_OBJECT, also feels the need to
177 # push something onto the error queue. If we don't clean that up
178 # now, someone else will bump into it later and be quite confused.
179 # See lp#314814.
180 try:
181 _raise_current_error()
182 except Error:
183 pass
184 return super(X509Name, self).__getattr__(name)
185
186 entry_index = _api.X509_NAME_get_index_by_NID(self._name, nid, -1)
187 if entry_index == -1:
188 return None
189
190 entry = _api.X509_NAME_get_entry(self._name, entry_index)
191 data = _api.X509_NAME_ENTRY_get_data(entry)
192
193 result_buffer = _api.new("unsigned char**")
194 data_length = _api.ASN1_STRING_to_UTF8(result_buffer, data)
195 if data_length < 0:
196 1/0
197
198 result = _api.buffer(result_buffer[0], data_length)[:].decode('utf-8')
199 _api.OPENSSL_free(result_buffer[0])
200 return result
201
202
203 def __cmp__(self, other):
204 if not isinstance(other, X509Name):
205 return NotImplemented
206
207 result = _api.X509_NAME_cmp(self._name, other._name)
208 # TODO result == -2 is an error case that maybe should be checked for
209 return result
210
211
212 def __repr__(self):
213 """
214 String representation of an X509Name
215 """
216 result_buffer = _api.new("char[]", 512);
217 format_result = _api.X509_NAME_oneline(
218 self._name, result_buffer, len(result_buffer))
219
220 if format_result == _api.NULL:
221 1/0
222
223 return "<X509Name object '%s'>" % (_api.string(result_buffer),)
224
225
226 def hash(self):
227 """
228 Return the hash value of this name
229
230 :return: None
231 """
232 return _api.X509_NAME_hash(self._name)
233
234
235 def der(self):
236 """
237 Return the DER encoding of this name
238
239 :return: A :py:class:`bytes` instance giving the DER encoded form of
240 this name.
241 """
242 result_buffer = _api.new('unsigned char**')
243 encode_result = _api.i2d_X509_NAME(self._name, result_buffer)
244 if encode_result < 0:
245 1/0
246
247 string_result = _api.buffer(result_buffer[0], encode_result)[:]
248 _api.OPENSSL_free(result_buffer[0])
249 return string_result
250
251
252 def get_components(self):
253 """
254 Returns the split-up components of this name.
255
256 :return: List of tuples (name, value).
257 """
258 result = []
259 for i in range(_api.X509_NAME_entry_count(self._name)):
260 ent = _api.X509_NAME_get_entry(self._name, i)
261
262 fname = _api.X509_NAME_ENTRY_get_object(ent)
263 fval = _api.X509_NAME_ENTRY_get_data(ent)
264
265 nid = _api.OBJ_obj2nid(fname)
266 name = _api.OBJ_nid2sn(nid)
267
268 result.append((
269 _api.string(name),
270 _api.string(
271 _api.ASN1_STRING_data(fval),
272 _api.ASN1_STRING_length(fval))))
273
274 return result
275X509NameType = X509Name
276
277
278
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800279class X509(object):
280 def __init__(self):
281 # TODO Allocation failure? And why not __new__ instead of __init__?
282 self._x509 = _api.X509_new()
283
284
285 def set_version(self, version):
286 """
287 Set version number of the certificate
288
289 :param version: The version number
290 :type version: :py:class:`int`
291
292 :return: None
293 """
294 if not isinstance(version, int):
295 raise TypeError("version must be an integer")
296
297 _api.X509_set_version(self._x509, version)
298
299
300 def get_version(self):
301 """
302 Return version number of the certificate
303
304 :return: Version number as a Python integer
305 """
306 return _api.X509_get_version(self._x509)
307
308
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800309 def get_pubkey(self):
310 """
311 Get the public key of the certificate
312
313 :return: The public key
314 """
315 pkey = PKey.__new__(PKey)
316 pkey._pkey = _api.X509_get_pubkey(self._x509)
317 if pkey._pkey == _api.NULL:
318 _raise_current_error()
319 pkey._only_public = True
320 return pkey
321
322
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800323 def set_pubkey(self, pkey):
324 """
325 Set the public key of the certificate
326
327 :param pkey: The public key
328
329 :return: None
330 """
331 if not isinstance(pkey, PKey):
332 raise TypeError("pkey must be a PKey instance")
333
334 set_result = _api.X509_set_pubkey(self._x509, pkey._pkey)
335 if not set_result:
336 _raise_current_error()
337
338
339 def sign(self, pkey, digest):
340 """
341 Sign the certificate using the supplied key and digest
342
343 :param pkey: The key to sign with
344 :param digest: The message digest to use
345 :return: None
346 """
347 if not isinstance(pkey, PKey):
348 raise TypeError("pkey must be a PKey instance")
349
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800350 if pkey._only_public:
351 raise ValueError("Key only has public part")
352
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800353 if not pkey._initialized:
354 raise ValueError("Key is uninitialized")
355
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800356 evp_md = _api.EVP_get_digestbyname(digest)
357 if evp_md == _api.NULL:
358 raise ValueError("No such digest method")
359
360 sign_result = _api.X509_sign(self._x509, pkey._pkey, evp_md)
361 if not sign_result:
362 _raise_current_error()
363
364
Jean-Paul Calderonee4aa3fa2013-02-19 12:12:53 -0800365 def get_signature_algorithm(self):
366 """
367 Retrieve the signature algorithm used in the certificate
368
369 :return: A byte string giving the name of the signature algorithm used in
370 the certificate.
371 :raise ValueError: If the signature algorithm is undefined.
372 """
373 alg = self._x509.cert_info.signature.algorithm
374 nid = _api.OBJ_obj2nid(alg)
375 if nid == _api.NID_undef:
376 raise ValueError("Undefined signature algorithm")
377 return _api.string(_api.OBJ_nid2ln(nid))
378
379
Jean-Paul Calderoneb4078722013-02-19 12:01:55 -0800380 def digest(self, digest_name):
381 """
382 Return the digest of the X509 object.
383
384 :param digest_name: The name of the digest algorithm to use.
385 :type digest_name: :py:class:`bytes`
386
387 :return: The digest of the object
388 """
389 digest = _api.EVP_get_digestbyname(digest_name)
390 if digest == _api.NULL:
391 raise ValueError("No such digest method")
392
393 result_buffer = _api.new("char[]", _api.EVP_MAX_MD_SIZE)
394 result_length = _api.new("unsigned int[]", 1)
395 result_length[0] = len(result_buffer)
396
397 digest_result = _api.X509_digest(
398 self._x509, digest, result_buffer, result_length)
399
400 if not digest_result:
401 1/0
402
403 return ':'.join([
404 ch.encode('hex').upper() for ch
405 in _api.buffer(result_buffer, result_length[0])])
406
407
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800408 def subject_name_hash(self):
409 """
410 Return the hash of the X509 subject.
411
412 :return: The hash of the subject.
413 """
414 return _api.X509_subject_name_hash(self._x509)
415
416
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800417 def set_serial_number(self, serial):
418 """
419 Set serial number of the certificate
420
421 :param serial: The serial number
422 :type serial: :py:class:`int`
423
424 :return: None
425 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800426 if not isinstance(serial, (int, long)):
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800427 raise TypeError("serial must be an integer")
428
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800429 hex_serial = hex(serial)[2:]
430 if not isinstance(hex_serial, bytes):
431 hex_serial = hex_serial.encode('ascii')
432
433 bignum_serial = _api.new("BIGNUM**")
434
435 # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
436 # it. If bignum is still NULL after this call, then the return value is
437 # actually the result. I hope. -exarkun
438 small_serial = _api.BN_hex2bn(bignum_serial, hex_serial)
439
440 if bignum_serial[0] == _api.NULL:
441 set_result = ASN1_INTEGER_set(
442 _api.X509_get_serialNumber(self._x509), small_serial)
443 if set_result:
444 # TODO Not tested
445 _raise_current_error()
446 else:
447 asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
448 _api.BN_free(bignum_serial[0])
449 if asn1_serial == _api.NULL:
450 # TODO Not tested
451 _raise_current_error()
452 set_result = _api.X509_set_serialNumber(self._x509, asn1_serial)
453 if not set_result:
454 # TODO Not tested
455 _raise_current_error()
456
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800457
458 def get_serial_number(self):
459 """
460 Return serial number of the certificate
461
462 :return: Serial number as a Python integer
463 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800464 asn1_serial = _api.X509_get_serialNumber(self._x509)
465 bignum_serial = _api.ASN1_INTEGER_to_BN(asn1_serial, _api.NULL)
466 try:
467 hex_serial = _api.BN_bn2hex(bignum_serial)
468 try:
469 hexstring_serial = _api.string(hex_serial)
470 serial = int(hexstring_serial, 16)
471 return serial
472 finally:
473 _api.OPENSSL_free(hex_serial)
474 finally:
475 _api.BN_free(bignum_serial)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800476
477
478 def gmtime_adj_notAfter(self, amount):
479 """
480 Adjust the time stamp for when the certificate stops being valid
481
482 :param amount: The number of seconds by which to adjust the ending
483 validity time.
484 :type amount: :py:class:`int`
485
486 :return: None
487 """
488 if not isinstance(amount, int):
489 raise TypeError("amount must be an integer")
490
491 notAfter = _api.X509_get_notAfter(self._x509)
492 _api.X509_gmtime_adj(notAfter, amount)
493
494
495 def has_expired(self):
496 """
497 Check whether the certificate has expired.
498
499 :return: True if the certificate has expired, false otherwise
500 """
501 now = int(time())
502 notAfter = _api.X509_get_notAfter(self._x509)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800503 return _api.ASN1_UTCTIME_cmp_time_t(
504 _api.cast('ASN1_UTCTIME*', notAfter), now) < 0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800505
506
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800507 def _get_boundary_time(self, which):
508 timestamp = which(self._x509)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800509 string_timestamp = _api.cast('ASN1_STRING*', timestamp)
510 if _api.ASN1_STRING_length(string_timestamp) == 0:
511 return None
512 elif _api.ASN1_STRING_type(string_timestamp) == _api.V_ASN1_GENERALIZEDTIME:
513 return _api.string(_api.ASN1_STRING_data(string_timestamp))
514 else:
515 generalized_timestamp = _api.new("ASN1_GENERALIZEDTIME**")
516 _api.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
517 if generalized_timestamp[0] == _api.NULL:
518 1/0
519 else:
Jean-Paul Calderone19247972013-02-20 08:22:15 -0800520 string_timestamp = _api.cast(
521 "ASN1_STRING*", generalized_timestamp[0])
522 string_data = _api.ASN1_STRING_data(string_timestamp)
523 string_result = _api.string(string_data)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800524 _api.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
Jean-Paul Calderone19247972013-02-20 08:22:15 -0800525 return string_result
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800526
527
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800528 def get_notBefore(self):
529 """
530 Retrieve the time stamp for when the certificate starts being valid
531
532 :return: A string giving the timestamp, in the format::
533
534 YYYYMMDDhhmmssZ
535 YYYYMMDDhhmmss+hhmm
536 YYYYMMDDhhmmss-hhmm
537
538 or None if there is no value set.
539 """
540 return self._get_boundary_time(_api.X509_get_notBefore)
541
542
543 def _set_boundary_time(self, which, when):
544 if not isinstance(when, bytes):
545 raise TypeError("when must be a byte string")
546
547 boundary = which(self._x509)
548 set_result = _api.ASN1_GENERALIZEDTIME_set_string(
549 _api.cast('ASN1_GENERALIZEDTIME*', boundary), when)
550 if set_result == 0:
551 dummy = _api.ASN1_STRING_new()
552 _api.ASN1_STRING_set(dummy, when, len(when))
553 check_result = _api.ASN1_GENERALIZEDTIME_check(
554 _api.cast('ASN1_GENERALIZEDTIME*', dummy))
555 if not check_result:
556 raise ValueError("Invalid string")
557 else:
558 # TODO No tests for this case
559 raise RuntimeError("Unknown ASN1_GENERALIZEDTIME_set_string failure")
560
561
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800562 def set_notBefore(self, when):
563 """
564 Set the time stamp for when the certificate starts being valid
565
566 :param when: A string giving the timestamp, in the format:
567
568 YYYYMMDDhhmmssZ
569 YYYYMMDDhhmmss+hhmm
570 YYYYMMDDhhmmss-hhmm
571 :type when: :py:class:`bytes`
572
573 :return: None
574 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800575 return self._set_boundary_time(_api.X509_get_notBefore, when)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800576
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800577
578 def get_notAfter(self):
579 """
580 Retrieve the time stamp for when the certificate stops being valid
581
582 :return: A string giving the timestamp, in the format::
583
584 YYYYMMDDhhmmssZ
585 YYYYMMDDhhmmss+hhmm
586 YYYYMMDDhhmmss-hhmm
587
588 or None if there is no value set.
589 """
590 return self._get_boundary_time(_api.X509_get_notAfter)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800591
592
593 def set_notAfter(self, when):
594 """
595 Set the time stamp for when the certificate stops being valid
596
597 :param when: A string giving the timestamp, in the format:
598
599 YYYYMMDDhhmmssZ
600 YYYYMMDDhhmmss+hhmm
601 YYYYMMDDhhmmss-hhmm
602 :type when: :py:class:`bytes`
603
604 :return: None
605 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800606 return self._set_boundary_time(_api.X509_get_notAfter, when)
607
608
609 def _get_name(self, which):
610 name = X509Name.__new__(X509Name)
611 name._name = which(self._x509)
612 if name._name == _api.NULL:
613 1/0
614 return name
615
616
617 def _set_name(self, which, name):
618 set_result = which(self._x509, name._name)
619 if not set_result:
620 1/0
621
622
623 def get_issuer(self):
624 """
625 Create an X509Name object for the issuer of the certificate
626
627 :return: An X509Name object
628 """
629 return self._get_name(_api.X509_get_issuer_name)
630
631
632 def set_issuer(self, issuer):
633 """
634 Set the issuer of the certificate
635
636 :param issuer: The issuer name
637 :type issuer: :py:class:`X509Name`
638
639 :return: None
640 """
641 return self._set_name(_api.X509_set_issuer_name, issuer)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800642
643
644 def get_subject(self):
645 """
646 Create an X509Name object for the subject of the certificate
647
648 :return: An X509Name object
649 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800650 return self._get_name(_api.X509_get_subject_name)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800651
652
653 def set_subject(self, subject):
654 """
655 Set the subject of the certificate
656
657 :param subject: The subject name
658 :type subject: :py:class:`X509Name`
659 :return: None
660 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800661 return self._set_name(_api.X509_set_subject_name, subject)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800662X509Type = X509
663
664
665
666def load_certificate(type, buffer):
667 """
668 Load a certificate from a buffer
669
670 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
671
672 :param buffer: The buffer the certificate is stored in
673 :type buffer: :py:class:`bytes`
674
675 :return: The X509 object
676 """
677 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
678
679 try:
680 if type == FILETYPE_PEM:
681 x509 = _api.PEM_read_bio_X509(bio, _api.NULL, _api.NULL, _api.NULL)
682 elif type == FILETYPE_ASN1:
683 x509 = _api.d2i_X509_bio(bio, _api.NULL);
684 else:
685 raise ValueError(
686 "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
687 finally:
688 _api.BIO_free(bio)
689
690 if x509 == _api.NULL:
691 _raise_current_error()
692
693 cert = X509.__new__(X509)
694 cert._x509 = x509
695 return cert
696
697
698def dump_certificate(type, cert):
699 """
700 Dump a certificate to a buffer
701
702 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
703 :param cert: The certificate to dump
704 :return: The buffer with the dumped certificate in
705 """
706 bio = _api.BIO_new(_api.BIO_s_mem())
707 if type == FILETYPE_PEM:
708 result_code = _api.PEM_write_bio_X509(bio, cert._x509)
709 elif type == FILETYPE_ASN1:
710 result_code = _api.i2d_X509_bio(bio, cert._x509)
711 elif type == FILETYPE_TEXT:
712 result_code = _api.X509_print_ex(bio, cert._x509, 0, 0)
713 else:
714 raise ValueError(
715 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
716 "FILETYPE_TEXT")
717
718 return _bio_to_string(bio)
719
720
721
722def dump_privatekey(type, pkey, cipher=None, passphrase=None):
723 """
724 Dump a private key to a buffer
725
726 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
727 :param pkey: The PKey to dump
728 :param cipher: (optional) if encrypted PEM format, the cipher to
729 use
730 :param passphrase: (optional) if encrypted PEM format, this can be either
731 the passphrase to use, or a callback for providing the
732 passphrase.
733 :return: The buffer with the dumped key in
734 :rtype: :py:data:`str`
735 """
736 # TODO incomplete
737 bio = _api.BIO_new(_api.BIO_s_mem())
738
739 if type == FILETYPE_PEM:
740 result_code = _api.PEM_write_bio_PrivateKey(
741 bio, pkey._pkey, _api.NULL, _api.NULL, 0, _api.NULL, _api.NULL)
742 elif type == FILETYPE_ASN1:
743 result_code = _api.i2d_PrivateKey_bio(bio, pkey._pkey)
744 elif type == FILETYPE_TEXT:
745 rsa = _api.EVP_PKEY_get1_RSA(pkey._pkey)
746 result_code = _api.RSA_print(bio, rsa, 0)
747 # TODO RSA_free(rsa)?
748 else:
749 raise ValueError(
750 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
751 "FILETYPE_TEXT")
752
753 if result_code == 0:
754 _raise_current_error()
755
756 return _bio_to_string(bio)
757
758
759
760def load_privatekey(type, buffer, passphrase=None):
761 """
762 Load a private key from a buffer
763
764 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
765 :param buffer: The buffer the key is stored in
766 :param passphrase: (optional) if encrypted PEM format, this can be
767 either the passphrase to use, or a callback for
768 providing the passphrase.
769
770 :return: The PKey object
771 """
772 # TODO incomplete
773 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
774
775 if type == FILETYPE_PEM:
776 evp_pkey = _api.PEM_read_bio_PrivateKey(bio, _api.NULL, _api.NULL, _api.NULL)
777 elif type == FILETYPE_ASN1:
778 evp_pkey = _api.d2i_PrivateKey_bio(bio, _api.NULL)
779 else:
780 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
781
782 pkey = PKey.__new__(PKey)
783 pkey._pkey = evp_pkey
784 return pkey
785
786
787
788def dump_certificate_request(type, req):
789 """
790 Dump a certificate request to a buffer
791
792 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
793 :param req: The certificate request to dump
794 :return: The buffer with the dumped certificate request in
795 """
796
797
798
799def load_certificate_request(type, buffer):
800 """
801 Load a certificate request from a buffer
802
803 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
804 :param buffer: The buffer the certificate request is stored in
805 :return: The X509Req object
806 """