blob: 7a0bb9c5f011a76acc800645ee618f4201fab96a [file] [log] [blame]
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08001from time import time
2
Jean-Paul Calderone084b7522013-02-09 09:53:45 -08003from OpenSSL.xcrypto import *
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -08004
5from tls.c import api as _api
6
7FILETYPE_PEM = _api.SSL_FILETYPE_PEM
8FILETYPE_ASN1 = _api.SSL_FILETYPE_ASN1
9
10# TODO This was an API mistake. OpenSSL has no such constant.
11FILETYPE_TEXT = 2 ** 16 - 1
12
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080013TYPE_RSA = _api.EVP_PKEY_RSA
14TYPE_DSA = _api.EVP_PKEY_DSA
15
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080016
17def _bio_to_string(bio):
18 """
19 Copy the contents of an OpenSSL BIO object into a Python byte string.
20 """
21 result_buffer = _api.new('char**')
22 buffer_length = _api.BIO_get_mem_data(bio, result_buffer)
23 return _api.buffer(result_buffer[0], buffer_length)[:]
24
25
26
27def _raise_current_error():
28 errors = []
29 while True:
30 error = _api.ERR_get_error()
31 if error == 0:
32 break
33 errors.append((
34 _api.string(_api.ERR_lib_error_string(error)),
35 _api.string(_api.ERR_func_error_string(error)),
36 _api.string(_api.ERR_reason_error_string(error))))
37
38 raise Error(errors)
39
40_exception_from_error_queue = _raise_current_error
41
42
43class Error(Exception):
44 pass
45
46
47
48class PKey(object):
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080049 _only_public = False
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080050 _initialized = True
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -080051
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080052 def __init__(self):
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080053 self._pkey = _api.EVP_PKEY_new()
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080054 self._initialized = False
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -080055
56
57 def generate_key(self, type, bits):
58 """
59 Generate a key of a given type, with a given number of a bits
60
61 :param type: The key type (TYPE_RSA or TYPE_DSA)
62 :param bits: The number of bits
63
64 :return: None
65 """
66 if not isinstance(type, int):
67 raise TypeError("type must be an integer")
68
69 if not isinstance(bits, int):
70 raise TypeError("bits must be an integer")
71
72 exponent = _api.new("BIGNUM**")
73 # TODO Check error return
74 # TODO Free the exponent[0]
75 _api.BN_hex2bn(exponent, "10001")
76
77 if type == TYPE_RSA:
78 if bits <= 0:
79 raise ValueError("Invalid number of bits")
80
81 rsa = _api.RSA_new();
82
83 # TODO Release GIL?
84 result = _api.RSA_generate_key_ex(rsa, bits, exponent[0], _api.NULL)
85 if result == -1:
86 1/0
87
88 result = _api.EVP_PKEY_assign_RSA(self._pkey, rsa)
89 if not result:
90 1/0
91
92 elif type == TYPE_DSA:
93 pass
94 else:
95 raise Error("No such key type")
96
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -080097 self._initialized = True
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -080098
99
100 def check(self):
101 """
102 Check the consistency of an RSA private key.
103
104 :return: True if key is consistent.
105 :raise Error: if the key is inconsistent.
106 :raise TypeError: if the key is of a type which cannot be checked.
107 Only RSA keys can currently be checked.
108 """
109 if _api.EVP_PKEY_type(self._pkey.type) != _api.EVP_PKEY_RSA:
110 raise TypeError("key type unsupported")
111
112 rsa = _api.EVP_PKEY_get1_RSA(self._pkey)
113 result = _api.RSA_check_key(rsa)
114 if result:
115 return True
116 _raise_current_error()
117
118
119
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800120class X509Name(object):
121 def __init__(self, name):
122 """
123 Create a new X509Name, copying the given X509Name instance.
124
125 :param name: An X509Name object to copy
126 """
127 self._name = _api.X509_NAME_dup(name._name)
128
129
130 def __setattr__(self, name, value):
131 if name.startswith('_'):
132 return super(X509Name, self).__setattr__(name, value)
133
134 if type(name) is not str:
135 raise TypeError("attribute name must be string, not '%.200s'" % (
136 type(value).__name__,))
137
138 nid = _api.OBJ_txt2nid(name)
139 if nid == _api.NID_undef:
140 try:
141 _raise_current_error()
142 except Error:
143 pass
144 raise AttributeError("No such attribute")
145
146 # If there's an old entry for this NID, remove it
147 for i in range(_api.X509_NAME_entry_count(self._name)):
148 ent = _api.X509_NAME_get_entry(self._name, i)
149 ent_obj = _api.X509_NAME_ENTRY_get_object(ent)
150 ent_nid = _api.OBJ_obj2nid(ent_obj)
151 if nid == ent_nid:
152 ent = _api.X509_NAME_delete_entry(self._name, i)
153 _api.X509_NAME_ENTRY_free(ent)
154 break
155
156 if isinstance(value, unicode):
157 value = value.encode('utf-8')
158
159 add_result = _api.X509_NAME_add_entry_by_NID(
160 self._name, nid, _api.MBSTRING_UTF8, value, -1, -1, 0)
161 if not add_result:
162 # TODO Untested
163 1/0
164
165
166 def __getattr__(self, name):
167 """
168 Find attribute. An X509Name object has the following attributes:
169 countryName (alias C), stateOrProvince (alias ST), locality (alias L),
170 organization (alias O), organizationalUnit (alias OU), commonName (alias
171 CN) and more...
172 """
173 nid = _api.OBJ_txt2nid(name)
174 if nid == _api.NID_undef:
175 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems
176 # a lower level function, a2d_ASN1_OBJECT, also feels the need to
177 # push something onto the error queue. If we don't clean that up
178 # now, someone else will bump into it later and be quite confused.
179 # See lp#314814.
180 try:
181 _raise_current_error()
182 except Error:
183 pass
184 return super(X509Name, self).__getattr__(name)
185
186 entry_index = _api.X509_NAME_get_index_by_NID(self._name, nid, -1)
187 if entry_index == -1:
188 return None
189
190 entry = _api.X509_NAME_get_entry(self._name, entry_index)
191 data = _api.X509_NAME_ENTRY_get_data(entry)
192
193 result_buffer = _api.new("unsigned char**")
194 data_length = _api.ASN1_STRING_to_UTF8(result_buffer, data)
195 if data_length < 0:
196 1/0
197
198 result = _api.buffer(result_buffer[0], data_length)[:].decode('utf-8')
199 _api.OPENSSL_free(result_buffer[0])
200 return result
201
202
203 def __cmp__(self, other):
204 if not isinstance(other, X509Name):
205 return NotImplemented
206
207 result = _api.X509_NAME_cmp(self._name, other._name)
208 # TODO result == -2 is an error case that maybe should be checked for
209 return result
210
211
212 def __repr__(self):
213 """
214 String representation of an X509Name
215 """
216 result_buffer = _api.new("char[]", 512);
217 format_result = _api.X509_NAME_oneline(
218 self._name, result_buffer, len(result_buffer))
219
220 if format_result == _api.NULL:
221 1/0
222
223 return "<X509Name object '%s'>" % (_api.string(result_buffer),)
224
225
226 def hash(self):
227 """
228 Return the hash value of this name
229
230 :return: None
231 """
232 return _api.X509_NAME_hash(self._name)
233
234
235 def der(self):
236 """
237 Return the DER encoding of this name
238
239 :return: A :py:class:`bytes` instance giving the DER encoded form of
240 this name.
241 """
242 result_buffer = _api.new('unsigned char**')
243 encode_result = _api.i2d_X509_NAME(self._name, result_buffer)
244 if encode_result < 0:
245 1/0
246
247 string_result = _api.buffer(result_buffer[0], encode_result)[:]
248 _api.OPENSSL_free(result_buffer[0])
249 return string_result
250
251
252 def get_components(self):
253 """
254 Returns the split-up components of this name.
255
256 :return: List of tuples (name, value).
257 """
258 result = []
259 for i in range(_api.X509_NAME_entry_count(self._name)):
260 ent = _api.X509_NAME_get_entry(self._name, i)
261
262 fname = _api.X509_NAME_ENTRY_get_object(ent)
263 fval = _api.X509_NAME_ENTRY_get_data(ent)
264
265 nid = _api.OBJ_obj2nid(fname)
266 name = _api.OBJ_nid2sn(nid)
267
268 result.append((
269 _api.string(name),
270 _api.string(
271 _api.ASN1_STRING_data(fval),
272 _api.ASN1_STRING_length(fval))))
273
274 return result
275X509NameType = X509Name
276
277
278
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800279class X509(object):
280 def __init__(self):
281 # TODO Allocation failure? And why not __new__ instead of __init__?
282 self._x509 = _api.X509_new()
283
284
285 def set_version(self, version):
286 """
287 Set version number of the certificate
288
289 :param version: The version number
290 :type version: :py:class:`int`
291
292 :return: None
293 """
294 if not isinstance(version, int):
295 raise TypeError("version must be an integer")
296
297 _api.X509_set_version(self._x509, version)
298
299
300 def get_version(self):
301 """
302 Return version number of the certificate
303
304 :return: Version number as a Python integer
305 """
306 return _api.X509_get_version(self._x509)
307
308
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800309 def get_pubkey(self):
310 """
311 Get the public key of the certificate
312
313 :return: The public key
314 """
315 pkey = PKey.__new__(PKey)
316 pkey._pkey = _api.X509_get_pubkey(self._x509)
317 if pkey._pkey == _api.NULL:
318 _raise_current_error()
319 pkey._only_public = True
320 return pkey
321
322
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800323 def set_pubkey(self, pkey):
324 """
325 Set the public key of the certificate
326
327 :param pkey: The public key
328
329 :return: None
330 """
331 if not isinstance(pkey, PKey):
332 raise TypeError("pkey must be a PKey instance")
333
334 set_result = _api.X509_set_pubkey(self._x509, pkey._pkey)
335 if not set_result:
336 _raise_current_error()
337
338
339 def sign(self, pkey, digest):
340 """
341 Sign the certificate using the supplied key and digest
342
343 :param pkey: The key to sign with
344 :param digest: The message digest to use
345 :return: None
346 """
347 if not isinstance(pkey, PKey):
348 raise TypeError("pkey must be a PKey instance")
349
Jean-Paul Calderoneedafced2013-02-19 11:48:38 -0800350 if pkey._only_public:
351 raise ValueError("Key only has public part")
352
Jean-Paul Calderone09e3bdc2013-02-19 12:15:28 -0800353 if not pkey._initialized:
354 raise ValueError("Key is uninitialized")
355
Jean-Paul Calderone3e29ccf2013-02-19 11:32:46 -0800356 evp_md = _api.EVP_get_digestbyname(digest)
357 if evp_md == _api.NULL:
358 raise ValueError("No such digest method")
359
360 sign_result = _api.X509_sign(self._x509, pkey._pkey, evp_md)
361 if not sign_result:
362 _raise_current_error()
363
364
Jean-Paul Calderonee4aa3fa2013-02-19 12:12:53 -0800365 def get_signature_algorithm(self):
366 """
367 Retrieve the signature algorithm used in the certificate
368
369 :return: A byte string giving the name of the signature algorithm used in
370 the certificate.
371 :raise ValueError: If the signature algorithm is undefined.
372 """
373 alg = self._x509.cert_info.signature.algorithm
374 nid = _api.OBJ_obj2nid(alg)
375 if nid == _api.NID_undef:
376 raise ValueError("Undefined signature algorithm")
377 return _api.string(_api.OBJ_nid2ln(nid))
378
379
Jean-Paul Calderoneb4078722013-02-19 12:01:55 -0800380 def digest(self, digest_name):
381 """
382 Return the digest of the X509 object.
383
384 :param digest_name: The name of the digest algorithm to use.
385 :type digest_name: :py:class:`bytes`
386
387 :return: The digest of the object
388 """
389 digest = _api.EVP_get_digestbyname(digest_name)
390 if digest == _api.NULL:
391 raise ValueError("No such digest method")
392
393 result_buffer = _api.new("char[]", _api.EVP_MAX_MD_SIZE)
394 result_length = _api.new("unsigned int[]", 1)
395 result_length[0] = len(result_buffer)
396
397 digest_result = _api.X509_digest(
398 self._x509, digest, result_buffer, result_length)
399
400 if not digest_result:
401 1/0
402
403 return ':'.join([
404 ch.encode('hex').upper() for ch
405 in _api.buffer(result_buffer, result_length[0])])
406
407
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800408 def subject_name_hash(self):
409 """
410 Return the hash of the X509 subject.
411
412 :return: The hash of the subject.
413 """
414 return _api.X509_subject_name_hash(self._x509)
415
416
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800417 def set_serial_number(self, serial):
418 """
419 Set serial number of the certificate
420
421 :param serial: The serial number
422 :type serial: :py:class:`int`
423
424 :return: None
425 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800426 if not isinstance(serial, (int, long)):
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800427 raise TypeError("serial must be an integer")
428
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800429 hex_serial = hex(serial)[2:]
430 if not isinstance(hex_serial, bytes):
431 hex_serial = hex_serial.encode('ascii')
432
433 bignum_serial = _api.new("BIGNUM**")
434
435 # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
436 # it. If bignum is still NULL after this call, then the return value is
437 # actually the result. I hope. -exarkun
438 small_serial = _api.BN_hex2bn(bignum_serial, hex_serial)
439
440 if bignum_serial[0] == _api.NULL:
441 set_result = ASN1_INTEGER_set(
442 _api.X509_get_serialNumber(self._x509), small_serial)
443 if set_result:
444 # TODO Not tested
445 _raise_current_error()
446 else:
447 asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
448 _api.BN_free(bignum_serial[0])
449 if asn1_serial == _api.NULL:
450 # TODO Not tested
451 _raise_current_error()
452 set_result = _api.X509_set_serialNumber(self._x509, asn1_serial)
453 if not set_result:
454 # TODO Not tested
455 _raise_current_error()
456
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800457
458 def get_serial_number(self):
459 """
460 Return serial number of the certificate
461
462 :return: Serial number as a Python integer
463 """
Jean-Paul Calderone78133852013-02-19 10:41:46 -0800464 asn1_serial = _api.X509_get_serialNumber(self._x509)
465 bignum_serial = _api.ASN1_INTEGER_to_BN(asn1_serial, _api.NULL)
466 try:
467 hex_serial = _api.BN_bn2hex(bignum_serial)
468 try:
469 hexstring_serial = _api.string(hex_serial)
470 serial = int(hexstring_serial, 16)
471 return serial
472 finally:
473 _api.OPENSSL_free(hex_serial)
474 finally:
475 _api.BN_free(bignum_serial)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800476
477
478 def gmtime_adj_notAfter(self, amount):
479 """
480 Adjust the time stamp for when the certificate stops being valid
481
482 :param amount: The number of seconds by which to adjust the ending
483 validity time.
484 :type amount: :py:class:`int`
485
486 :return: None
487 """
488 if not isinstance(amount, int):
489 raise TypeError("amount must be an integer")
490
491 notAfter = _api.X509_get_notAfter(self._x509)
492 _api.X509_gmtime_adj(notAfter, amount)
493
494
495 def has_expired(self):
496 """
497 Check whether the certificate has expired.
498
499 :return: True if the certificate has expired, false otherwise
500 """
501 now = int(time())
502 notAfter = _api.X509_get_notAfter(self._x509)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800503 return _api.ASN1_UTCTIME_cmp_time_t(
504 _api.cast('ASN1_UTCTIME*', notAfter), now) < 0
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800505
506
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800507 def _get_boundary_time(self, which):
508 timestamp = which(self._x509)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800509 string_timestamp = _api.cast('ASN1_STRING*', timestamp)
510 if _api.ASN1_STRING_length(string_timestamp) == 0:
511 return None
512 elif _api.ASN1_STRING_type(string_timestamp) == _api.V_ASN1_GENERALIZEDTIME:
513 return _api.string(_api.ASN1_STRING_data(string_timestamp))
514 else:
515 generalized_timestamp = _api.new("ASN1_GENERALIZEDTIME**")
516 _api.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
517 if generalized_timestamp[0] == _api.NULL:
518 1/0
519 else:
520 string_timestamp = _api.string(
521 _api.cast("char*", generalized_timestamp[0].data))
522 _api.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
523 return string_timestamp
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800524
525
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800526 def get_notBefore(self):
527 """
528 Retrieve the time stamp for when the certificate starts being valid
529
530 :return: A string giving the timestamp, in the format::
531
532 YYYYMMDDhhmmssZ
533 YYYYMMDDhhmmss+hhmm
534 YYYYMMDDhhmmss-hhmm
535
536 or None if there is no value set.
537 """
538 return self._get_boundary_time(_api.X509_get_notBefore)
539
540
541 def _set_boundary_time(self, which, when):
542 if not isinstance(when, bytes):
543 raise TypeError("when must be a byte string")
544
545 boundary = which(self._x509)
546 set_result = _api.ASN1_GENERALIZEDTIME_set_string(
547 _api.cast('ASN1_GENERALIZEDTIME*', boundary), when)
548 if set_result == 0:
549 dummy = _api.ASN1_STRING_new()
550 _api.ASN1_STRING_set(dummy, when, len(when))
551 check_result = _api.ASN1_GENERALIZEDTIME_check(
552 _api.cast('ASN1_GENERALIZEDTIME*', dummy))
553 if not check_result:
554 raise ValueError("Invalid string")
555 else:
556 # TODO No tests for this case
557 raise RuntimeError("Unknown ASN1_GENERALIZEDTIME_set_string failure")
558
559
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800560 def set_notBefore(self, when):
561 """
562 Set the time stamp for when the certificate starts being valid
563
564 :param when: A string giving the timestamp, in the format:
565
566 YYYYMMDDhhmmssZ
567 YYYYMMDDhhmmss+hhmm
568 YYYYMMDDhhmmss-hhmm
569 :type when: :py:class:`bytes`
570
571 :return: None
572 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800573 return self._set_boundary_time(_api.X509_get_notBefore, when)
Jean-Paul Calderoned7d81272013-02-19 13:16:03 -0800574
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800575
576 def get_notAfter(self):
577 """
578 Retrieve the time stamp for when the certificate stops being valid
579
580 :return: A string giving the timestamp, in the format::
581
582 YYYYMMDDhhmmssZ
583 YYYYMMDDhhmmss+hhmm
584 YYYYMMDDhhmmss-hhmm
585
586 or None if there is no value set.
587 """
588 return self._get_boundary_time(_api.X509_get_notAfter)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800589
590
591 def set_notAfter(self, when):
592 """
593 Set the time stamp for when the certificate stops being valid
594
595 :param when: A string giving the timestamp, in the format:
596
597 YYYYMMDDhhmmssZ
598 YYYYMMDDhhmmss+hhmm
599 YYYYMMDDhhmmss-hhmm
600 :type when: :py:class:`bytes`
601
602 :return: None
603 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800604 return self._set_boundary_time(_api.X509_get_notAfter, when)
605
606
607 def _get_name(self, which):
608 name = X509Name.__new__(X509Name)
609 name._name = which(self._x509)
610 if name._name == _api.NULL:
611 1/0
612 return name
613
614
615 def _set_name(self, which, name):
616 set_result = which(self._x509, name._name)
617 if not set_result:
618 1/0
619
620
621 def get_issuer(self):
622 """
623 Create an X509Name object for the issuer of the certificate
624
625 :return: An X509Name object
626 """
627 return self._get_name(_api.X509_get_issuer_name)
628
629
630 def set_issuer(self, issuer):
631 """
632 Set the issuer of the certificate
633
634 :param issuer: The issuer name
635 :type issuer: :py:class:`X509Name`
636
637 :return: None
638 """
639 return self._set_name(_api.X509_set_issuer_name, issuer)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800640
641
642 def get_subject(self):
643 """
644 Create an X509Name object for the subject of the certificate
645
646 :return: An X509Name object
647 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800648 return self._get_name(_api.X509_get_subject_name)
Jean-Paul Calderonea9de1952013-02-19 16:58:42 -0800649
650
651 def set_subject(self, subject):
652 """
653 Set the subject of the certificate
654
655 :param subject: The subject name
656 :type subject: :py:class:`X509Name`
657 :return: None
658 """
Jean-Paul Calderonec2bd4e92013-02-20 08:12:36 -0800659 return self._set_name(_api.X509_set_subject_name, subject)
Jean-Paul Calderoneabfbab62013-02-09 21:25:02 -0800660X509Type = X509
661
662
663
664def load_certificate(type, buffer):
665 """
666 Load a certificate from a buffer
667
668 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
669
670 :param buffer: The buffer the certificate is stored in
671 :type buffer: :py:class:`bytes`
672
673 :return: The X509 object
674 """
675 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
676
677 try:
678 if type == FILETYPE_PEM:
679 x509 = _api.PEM_read_bio_X509(bio, _api.NULL, _api.NULL, _api.NULL)
680 elif type == FILETYPE_ASN1:
681 x509 = _api.d2i_X509_bio(bio, _api.NULL);
682 else:
683 raise ValueError(
684 "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
685 finally:
686 _api.BIO_free(bio)
687
688 if x509 == _api.NULL:
689 _raise_current_error()
690
691 cert = X509.__new__(X509)
692 cert._x509 = x509
693 return cert
694
695
696def dump_certificate(type, cert):
697 """
698 Dump a certificate to a buffer
699
700 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
701 :param cert: The certificate to dump
702 :return: The buffer with the dumped certificate in
703 """
704 bio = _api.BIO_new(_api.BIO_s_mem())
705 if type == FILETYPE_PEM:
706 result_code = _api.PEM_write_bio_X509(bio, cert._x509)
707 elif type == FILETYPE_ASN1:
708 result_code = _api.i2d_X509_bio(bio, cert._x509)
709 elif type == FILETYPE_TEXT:
710 result_code = _api.X509_print_ex(bio, cert._x509, 0, 0)
711 else:
712 raise ValueError(
713 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
714 "FILETYPE_TEXT")
715
716 return _bio_to_string(bio)
717
718
719
720def dump_privatekey(type, pkey, cipher=None, passphrase=None):
721 """
722 Dump a private key to a buffer
723
724 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
725 :param pkey: The PKey to dump
726 :param cipher: (optional) if encrypted PEM format, the cipher to
727 use
728 :param passphrase: (optional) if encrypted PEM format, this can be either
729 the passphrase to use, or a callback for providing the
730 passphrase.
731 :return: The buffer with the dumped key in
732 :rtype: :py:data:`str`
733 """
734 # TODO incomplete
735 bio = _api.BIO_new(_api.BIO_s_mem())
736
737 if type == FILETYPE_PEM:
738 result_code = _api.PEM_write_bio_PrivateKey(
739 bio, pkey._pkey, _api.NULL, _api.NULL, 0, _api.NULL, _api.NULL)
740 elif type == FILETYPE_ASN1:
741 result_code = _api.i2d_PrivateKey_bio(bio, pkey._pkey)
742 elif type == FILETYPE_TEXT:
743 rsa = _api.EVP_PKEY_get1_RSA(pkey._pkey)
744 result_code = _api.RSA_print(bio, rsa, 0)
745 # TODO RSA_free(rsa)?
746 else:
747 raise ValueError(
748 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
749 "FILETYPE_TEXT")
750
751 if result_code == 0:
752 _raise_current_error()
753
754 return _bio_to_string(bio)
755
756
757
758def load_privatekey(type, buffer, passphrase=None):
759 """
760 Load a private key from a buffer
761
762 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
763 :param buffer: The buffer the key is stored in
764 :param passphrase: (optional) if encrypted PEM format, this can be
765 either the passphrase to use, or a callback for
766 providing the passphrase.
767
768 :return: The PKey object
769 """
770 # TODO incomplete
771 bio = _api.BIO_new_mem_buf(buffer, len(buffer))
772
773 if type == FILETYPE_PEM:
774 evp_pkey = _api.PEM_read_bio_PrivateKey(bio, _api.NULL, _api.NULL, _api.NULL)
775 elif type == FILETYPE_ASN1:
776 evp_pkey = _api.d2i_PrivateKey_bio(bio, _api.NULL)
777 else:
778 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
779
780 pkey = PKey.__new__(PKey)
781 pkey._pkey = evp_pkey
782 return pkey
783
784
785
786def dump_certificate_request(type, req):
787 """
788 Dump a certificate request to a buffer
789
790 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
791 :param req: The certificate request to dump
792 :return: The buffer with the dumped certificate request in
793 """
794
795
796
797def load_certificate_request(type, buffer):
798 """
799 Load a certificate request from a buffer
800
801 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
802 :param buffer: The buffer the certificate request is stored in
803 :return: The X509Req object
804 """