blob: cc6b2789a07cddbea9f66c685d93ff05aadf6756 [file] [log] [blame]
Guido van Rossum8b3febe2007-08-30 01:15:14 +00001# Copyright (C) 2002-2007 Python Software Foundation
2# Author: Ben Gertzfield, Barry Warsaw
3# Contact: email-sig@python.org
4
5"""Header encoding and decoding functionality."""
6
7__all__ = [
8 'Header',
9 'decode_header',
10 'make_header',
11 ]
12
13import re
14import binascii
15
16import email.quoprimime
17import email.base64mime
18
19from email.errors import HeaderParseError
20from email.charset import Charset
21
22NL = '\n'
23SPACE = ' '
24BSPACE = b' '
25SPACE8 = ' ' * 8
26EMPTYSTRING = ''
27
Guido van Rossum9604e662007-08-30 03:46:43 +000028MAXLINELEN = 78
Guido van Rossum8b3febe2007-08-30 01:15:14 +000029
30USASCII = Charset('us-ascii')
31UTF8 = Charset('utf-8')
32
33# Match encoded-word strings in the form =?charset?q?Hello_World?=
34ecre = re.compile(r'''
35 =\? # literal =?
36 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
37 \? # literal ?
38 (?P<encoding>[qb]) # either a "q" or a "b", case insensitive
39 \? # literal ?
40 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
41 \?= # literal ?=
42 (?=[ \t]|$) # whitespace or the end of the string
43 ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)
44
45# Field name regexp, including trailing colon, but not separating whitespace,
46# according to RFC 2822. Character range is from tilde to exclamation mark.
47# For use with .match()
48fcre = re.compile(r'[\041-\176]+:$')
49
50
51
52# Helpers
53_max_append = email.quoprimime._max_append
54
55
56
57def decode_header(header):
58 """Decode a message header value without converting charset.
59
60 Returns a list of (string, charset) pairs containing each of the decoded
61 parts of the header. Charset is None for non-encoded parts of the header,
62 otherwise a lower-case string containing the name of the character set
63 specified in the encoded string.
64
65 An email.Errors.HeaderParseError may be raised when certain decoding error
66 occurs (e.g. a base64 decoding exception).
67 """
68 # If no encoding, just return the header with no charset.
69 if not ecre.search(header):
70 return [(header, None)]
71 # First step is to parse all the encoded parts into triplets of the form
72 # (encoded_string, encoding, charset). For unencoded strings, the last
73 # two parts will be None.
74 words = []
75 for line in header.splitlines():
76 parts = ecre.split(line)
77 while parts:
78 unencoded = parts.pop(0).strip()
79 if unencoded:
80 words.append((unencoded, None, None))
81 if parts:
82 charset = parts.pop(0).lower()
83 encoding = parts.pop(0).lower()
84 encoded = parts.pop(0)
85 words.append((encoded, encoding, charset))
86 # The next step is to decode each encoded word by applying the reverse
87 # base64 or quopri transformation. decoded_words is now a list of the
88 # form (decoded_word, charset).
89 decoded_words = []
90 for encoded_string, encoding, charset in words:
91 if encoding is None:
92 # This is an unencoded word.
93 decoded_words.append((encoded_string, charset))
94 elif encoding == 'q':
95 word = email.quoprimime.header_decode(encoded_string)
96 decoded_words.append((word, charset))
97 elif encoding == 'b':
98 try:
99 word = email.base64mime.decode(encoded_string)
100 except binascii.Error:
101 raise HeaderParseError('Base64 decoding error')
102 else:
103 decoded_words.append((word, charset))
104 else:
105 raise AssertionError('Unexpected encoding: ' + encoding)
106 # Now convert all words to bytes and collapse consecutive runs of
107 # similarly encoded words.
108 collapsed = []
109 last_word = last_charset = None
110 for word, charset in decoded_words:
111 if isinstance(word, str):
Guido van Rossum9604e662007-08-30 03:46:43 +0000112 word = bytes(word, 'raw-unicode-escape')
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000113 if last_word is None:
114 last_word = word
115 last_charset = charset
116 elif charset != last_charset:
117 collapsed.append((last_word, last_charset))
118 last_word = word
119 last_charset = charset
120 elif last_charset is None:
121 last_word += BSPACE + word
122 else:
123 last_word += word
124 collapsed.append((last_word, last_charset))
125 return collapsed
126
127
128
129def make_header(decoded_seq, maxlinelen=None, header_name=None,
130 continuation_ws=' '):
131 """Create a Header from a sequence of pairs as returned by decode_header()
132
133 decode_header() takes a header value string and returns a sequence of
134 pairs of the format (decoded_string, charset) where charset is the string
135 name of the character set.
136
137 This function takes one of those sequence of pairs and returns a Header
138 instance. Optional maxlinelen, header_name, and continuation_ws are as in
139 the Header constructor.
140 """
141 h = Header(maxlinelen=maxlinelen, header_name=header_name,
142 continuation_ws=continuation_ws)
143 for s, charset in decoded_seq:
144 # None means us-ascii but we can simply pass it on to h.append()
145 if charset is not None and not isinstance(charset, Charset):
146 charset = Charset(charset)
147 h.append(s, charset)
148 return h
149
150
151
152class Header:
153 def __init__(self, s=None, charset=None,
154 maxlinelen=None, header_name=None,
155 continuation_ws=' ', errors='strict'):
156 """Create a MIME-compliant header that can contain many character sets.
157
158 Optional s is the initial header value. If None, the initial header
159 value is not set. You can later append to the header with .append()
160 method calls. s may be a byte string or a Unicode string, but see the
161 .append() documentation for semantics.
162
163 Optional charset serves two purposes: it has the same meaning as the
164 charset argument to the .append() method. It also sets the default
165 character set for all subsequent .append() calls that omit the charset
166 argument. If charset is not provided in the constructor, the us-ascii
167 charset is used both as s's initial charset and as the default for
168 subsequent .append() calls.
169
170 The maximum line length can be specified explicit via maxlinelen. For
171 splitting the first line to a shorter value (to account for the field
172 header which isn't included in s, e.g. `Subject') pass in the name of
Guido van Rossum9604e662007-08-30 03:46:43 +0000173 the field in header_name. The default maxlinelen is 78 as recommended
174 by RFC 2822.
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000175
176 continuation_ws must be RFC 2822 compliant folding whitespace (usually
177 either a space or a hard tab) which will be prepended to continuation
178 lines.
179
180 errors is passed through to the .append() call.
181 """
182 if charset is None:
183 charset = USASCII
184 elif not isinstance(charset, Charset):
185 charset = Charset(charset)
186 self._charset = charset
187 self._continuation_ws = continuation_ws
188 self._chunks = []
189 if s is not None:
190 self.append(s, charset, errors)
191 if maxlinelen is None:
192 maxlinelen = MAXLINELEN
193 self._maxlinelen = maxlinelen
194 if header_name is None:
195 self._headerlen = 0
196 else:
197 # Take the separating colon and space into account.
198 self._headerlen = len(header_name) + 2
199
200 def __str__(self):
201 """Return the string value of the header."""
Guido van Rossum9604e662007-08-30 03:46:43 +0000202 self._normalize()
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000203 uchunks = []
204 lastcs = None
Guido van Rossum9604e662007-08-30 03:46:43 +0000205 for string, charset in self._chunks:
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000206 # We must preserve spaces between encoded and non-encoded word
207 # boundaries, which means for us we need to add a space when we go
208 # from a charset to None/us-ascii, or from None/us-ascii to a
209 # charset. Only do this for the second and subsequent chunks.
210 nextcs = charset
211 if uchunks:
212 if lastcs not in (None, 'us-ascii'):
213 if nextcs in (None, 'us-ascii'):
214 uchunks.append(SPACE)
215 nextcs = None
216 elif nextcs not in (None, 'us-ascii'):
217 uchunks.append(SPACE)
218 lastcs = nextcs
Guido van Rossum9604e662007-08-30 03:46:43 +0000219 uchunks.append(string)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000220 return EMPTYSTRING.join(uchunks)
221
222 # Rich comparison operators for equality only. BAW: does it make sense to
223 # have or explicitly disable <, <=, >, >= operators?
224 def __eq__(self, other):
225 # other may be a Header or a string. Both are fine so coerce
Guido van Rossum9604e662007-08-30 03:46:43 +0000226 # ourselves to a unicode (of the unencoded header value), swap the
227 # args and do another comparison.
228 return other == str(self)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000229
230 def __ne__(self, other):
231 return not self == other
232
233 def append(self, s, charset=None, errors='strict'):
234 """Append a string to the MIME header.
235
236 Optional charset, if given, should be a Charset instance or the name
237 of a character set (which will be converted to a Charset instance). A
238 value of None (the default) means that the charset given in the
239 constructor is used.
240
241 s may be a byte string or a Unicode string. If it is a byte string
242 (i.e. isinstance(s, str) is true), then charset is the encoding of
243 that byte string, and a UnicodeError will be raised if the string
244 cannot be decoded with that charset. If s is a Unicode string, then
245 charset is a hint specifying the character set of the characters in
246 the string. In this case, when producing an RFC 2822 compliant header
247 using RFC 2047 rules, the Unicode string will be encoded using the
248 following charsets in order: us-ascii, the charset hint, utf-8. The
249 first character set not to provoke a UnicodeError is used.
250
251 Optional `errors' is passed as the third argument to any unicode() or
252 ustr.encode() call.
253 """
254 if charset is None:
255 charset = self._charset
256 elif not isinstance(charset, Charset):
257 charset = Charset(charset)
258 if isinstance(s, str):
259 # Convert the string from the input character set to the output
260 # character set and store the resulting bytes and the charset for
261 # composition later.
262 input_charset = charset.input_codec or 'us-ascii'
263 input_bytes = s.encode(input_charset, errors)
264 else:
265 # We already have the bytes we will store internally.
266 input_bytes = s
267 # Ensure that the bytes we're storing can be decoded to the output
268 # character set, otherwise an early error is thrown.
269 output_charset = charset.output_codec or 'us-ascii'
270 output_string = input_bytes.decode(output_charset, errors)
271 self._chunks.append((output_string, charset))
272
Guido van Rossum9604e662007-08-30 03:46:43 +0000273 def encode(self, splitchars=';, \t', maxlinelen=None):
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000274 """Encode a message header into an RFC-compliant format.
275
276 There are many issues involved in converting a given string for use in
277 an email header. Only certain character sets are readable in most
278 email clients, and as header strings can only contain a subset of
279 7-bit ASCII, care must be taken to properly convert and encode (with
280 Base64 or quoted-printable) header strings. In addition, there is a
281 75-character length limit on any given encoded header field, so
282 line-wrapping must be performed, even with double-byte character sets.
283
284 This method will do its best to convert the string to the correct
285 character set used in email, and encode and line wrap it safely with
286 the appropriate scheme for that character set.
287
288 If the given charset is not known or an error occurs during
289 conversion, this function will return the header untouched.
290
291 Optional splitchars is a string containing characters to split long
292 ASCII lines on, in rough support of RFC 2822's `highest level
293 syntactic breaks'. This doesn't affect RFC 2047 encoded lines.
294 """
295 self._normalize()
Guido van Rossum9604e662007-08-30 03:46:43 +0000296 if maxlinelen is None:
297 maxlinelen = self._maxlinelen
298 # A maxlinelen of 0 means don't wrap. For all practical purposes,
299 # choosing a huge number here accomplishes that and makes the
300 # _ValueFormatter algorithm much simpler.
301 if maxlinelen == 0:
302 maxlinelen = 1000000
303 formatter = _ValueFormatter(self._headerlen, maxlinelen,
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000304 self._continuation_ws, splitchars)
305 for string, charset in self._chunks:
306 lines = string.splitlines()
307 for line in lines:
308 formatter.feed(line, charset)
309 if len(lines) > 1:
310 formatter.newline()
Barry Warsaw00b34222007-08-31 02:35:00 +0000311 formatter.add_transition()
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000312 return str(formatter)
313
314 def _normalize(self):
Guido van Rossum9604e662007-08-30 03:46:43 +0000315 # Step 1: Normalize the chunks so that all runs of identical charsets
316 # get collapsed into a single unicode string.
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000317 chunks = []
318 last_charset = None
319 last_chunk = []
320 for string, charset in self._chunks:
321 if charset == last_charset:
322 last_chunk.append(string)
323 else:
324 if last_charset is not None:
325 chunks.append((SPACE.join(last_chunk), last_charset))
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000326 last_chunk = [string]
327 last_charset = charset
328 if last_chunk:
329 chunks.append((SPACE.join(last_chunk), last_charset))
330 self._chunks = chunks
331
332
333
334class _ValueFormatter:
335 def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
336 self._maxlen = maxlen
337 self._continuation_ws = continuation_ws
338 self._continuation_ws_len = len(continuation_ws.replace('\t', SPACE8))
339 self._splitchars = splitchars
340 self._lines = []
341 self._current_line = _Accumulator(headerlen)
342
343 def __str__(self):
344 self.newline()
345 return NL.join(self._lines)
346
347 def newline(self):
Barry Warsaw00b34222007-08-31 02:35:00 +0000348 end_of_line = self._current_line.pop()
349 if end_of_line is not None:
350 self._current_line.push(end_of_line)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000351 if len(self._current_line) > 0:
352 self._lines.append(str(self._current_line))
353 self._current_line.reset()
354
Barry Warsaw00b34222007-08-31 02:35:00 +0000355 def add_transition(self):
356 self._current_line.push(None)
357
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000358 def feed(self, string, charset):
359 # If the string itself fits on the current line in its encoded format,
360 # then add it now and be done with it.
361 encoded_string = charset.header_encode(string)
362 if len(encoded_string) + len(self._current_line) <= self._maxlen:
363 self._current_line.push(encoded_string)
364 return
Guido van Rossum9604e662007-08-30 03:46:43 +0000365 # If the charset has no header encoding (i.e. it is an ASCII encoding)
366 # then we must split the header at the "highest level syntactic break"
367 # possible. Note that we don't have a lot of smarts about field
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000368 # syntax; we just try to break on semi-colons, then commas, then
Guido van Rossum9604e662007-08-30 03:46:43 +0000369 # whitespace. Eventually, this should be pluggable.
370 if charset.header_encoding is None:
371 for ch in self._splitchars:
372 if ch in string:
373 break
374 else:
375 ch = None
376 # If there's no available split character then regardless of
377 # whether the string fits on the line, we have to put it on a line
378 # by itself.
379 if ch is None:
380 if not self._current_line.is_onlyws():
381 self._lines.append(str(self._current_line))
382 self._current_line.reset(self._continuation_ws)
383 self._current_line.push(encoded_string)
384 else:
385 self._ascii_split(string, ch)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000386 return
Guido van Rossum9604e662007-08-30 03:46:43 +0000387 # Otherwise, we're doing either a Base64 or a quoted-printable
388 # encoding which means we don't need to split the line on syntactic
389 # breaks. We can basically just find enough characters to fit on the
390 # current line, minus the RFC 2047 chrome. What makes this trickier
391 # though is that we have to split at octet boundaries, not character
392 # boundaries but it's only safe to split at character boundaries so at
393 # best we can only get close.
394 encoded_lines = charset.header_encode_lines(string, self._maxlengths())
395 # The first element extends the current line, but if it's None then
396 # nothing more fit on the current line so start a new line.
397 try:
398 first_line = encoded_lines.pop(0)
399 except IndexError:
400 # There are no encoded lines, so we're done.
401 return
402 if first_line is not None:
403 self._current_line.push(first_line)
404 self._lines.append(str(self._current_line))
405 self._current_line.reset(self._continuation_ws)
406 try:
407 last_line = encoded_lines.pop()
408 except IndexError:
409 # There was only one line.
410 return
411 self._current_line.push(last_line)
Guido van Rossum9604e662007-08-30 03:46:43 +0000412 # Everything else are full lines in themselves.
413 for line in encoded_lines:
414 self._lines.append(self._continuation_ws + line)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000415
Guido van Rossum9604e662007-08-30 03:46:43 +0000416 def _maxlengths(self):
417 # The first line's length.
418 yield self._maxlen - len(self._current_line)
419 while True:
420 yield self._maxlen - self._continuation_ws_len
421
422 def _ascii_split(self, string, ch):
423 holding = _Accumulator()
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000424 # Split the line on the split character, preserving it. If the split
425 # character is whitespace RFC 2822 $2.2.3 requires us to fold on the
426 # whitespace, so that the line leads with the original whitespace we
427 # split on. However, if a higher syntactic break is used instead
428 # (e.g. comma or semicolon), the folding should happen after the split
429 # character. But then in that case, we need to add our own
430 # continuation whitespace -- although won't that break unfolding?
431 for part, splitpart, nextpart in _spliterator(ch, string):
432 if not splitpart:
433 # No splitpart means this is the last chunk. Put this part
434 # either on the current line or the next line depending on
435 # whether it fits.
436 holding.push(part)
437 if len(holding) + len(self._current_line) <= self._maxlen:
438 # It fits, but we're done.
439 self._current_line.push(str(holding))
440 else:
441 # It doesn't fit, but we're done. Before pushing a new
442 # line, watch out for the current line containing only
443 # whitespace.
444 holding.pop()
Guido van Rossum9604e662007-08-30 03:46:43 +0000445 if self._current_line.is_onlyws() and holding.is_onlyws():
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000446 # Don't start a new line.
447 holding.push(part)
448 part = None
449 self._current_line.push(str(holding))
450 self._lines.append(str(self._current_line))
451 if part is None:
452 self._current_line.reset()
453 else:
454 holding.reset(part)
455 self._current_line.reset(str(holding))
456 return
457 elif not nextpart:
458 # There must be some trailing split characters because we
459 # found a split character but no next part. In this case we
460 # must treat the thing to fit as the part + splitpart because
461 # if splitpart is whitespace it's not allowed to be the only
462 # thing on the line, and if it's not whitespace we must split
463 # after the syntactic break. In either case, we're done.
464 holding_prelen = len(holding)
465 holding.push(part + splitpart)
466 if len(holding) + len(self._current_line) <= self._maxlen:
467 self._current_line.push(str(holding))
468 elif holding_prelen == 0:
469 # This is the only chunk left so it has to go on the
470 # current line.
471 self._current_line.push(str(holding))
472 else:
473 save_part = holding.pop()
474 self._current_line.push(str(holding))
475 self._lines.append(str(self._current_line))
476 holding.reset(save_part)
477 self._current_line.reset(str(holding))
478 return
479 elif not part:
480 # We're leading with a split character. See if the splitpart
481 # and nextpart fits on the current line.
482 holding.push(splitpart + nextpart)
483 holding_len = len(holding)
484 # We know we're not leaving the nextpart on the stack.
485 holding.pop()
486 if holding_len + len(self._current_line) <= self._maxlen:
487 holding.push(splitpart)
488 else:
489 # It doesn't fit. Since there's no current part really
490 # the best we can do is start a new line and push the
491 # split part onto it.
492 self._current_line.push(str(holding))
493 holding.reset()
494 if len(self._current_line) > 0 and self._lines:
495 self._lines.append(str(self._current_line))
496 self._current_line.reset()
497 holding.push(splitpart)
498 else:
499 # All three parts are present. First let's see if all three
500 # parts will fit on the current line. If so, we don't need to
501 # split it.
502 holding.push(part + splitpart + nextpart)
503 holding_len = len(holding)
504 # Pop the part because we'll push nextpart on the next
505 # iteration through the loop.
506 holding.pop()
507 if holding_len + len(self._current_line) <= self._maxlen:
508 holding.push(part + splitpart)
509 else:
510 # The entire thing doesn't fit. See if we need to split
511 # before or after the split characters.
512 if splitpart.isspace():
513 # Split before whitespace. Remember that the
514 # whitespace becomes the continuation whitespace of
515 # the next line so it goes to current_line not holding.
516 holding.push(part)
517 self._current_line.push(str(holding))
518 holding.reset()
519 self._lines.append(str(self._current_line))
520 self._current_line.reset(splitpart)
521 else:
522 # Split after non-whitespace. The continuation
523 # whitespace comes from the instance variable.
524 holding.push(part + splitpart)
525 self._current_line.push(str(holding))
526 holding.reset()
527 self._lines.append(str(self._current_line))
528 if nextpart[0].isspace():
529 self._current_line.reset()
530 else:
531 self._current_line.reset(self._continuation_ws)
532 # Get the last of the holding part
533 self._current_line.push(str(holding))
534
535
536
537def _spliterator(character, string):
538 parts = list(reversed(re.split('(%s)' % character, string)))
539 while parts:
540 part = parts.pop()
541 splitparts = (parts.pop() if parts else None)
542 nextpart = (parts.pop() if parts else None)
543 yield (part, splitparts, nextpart)
544 if nextpart is not None:
545 parts.append(nextpart)
546
547
548class _Accumulator:
Guido van Rossum9604e662007-08-30 03:46:43 +0000549 def __init__(self, initial_size=0):
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000550 self._initial_size = initial_size
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000551 self._current = []
552
553 def push(self, string):
554 self._current.append(string)
555
556 def pop(self):
Barry Warsaw00b34222007-08-31 02:35:00 +0000557 if not self._current:
558 return None
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000559 return self._current.pop()
560
561 def __len__(self):
Barry Warsaw00b34222007-08-31 02:35:00 +0000562 return sum(((1 if string is None else len(string))
563 for string in self._current),
Guido van Rossum9604e662007-08-30 03:46:43 +0000564 self._initial_size)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000565
566 def __str__(self):
Barry Warsaw00b34222007-08-31 02:35:00 +0000567 if self._current and self._current[-1] is None:
568 self._current.pop()
569 return EMPTYSTRING.join((' ' if string is None else string)
570 for string in self._current)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000571
572 def reset(self, string=None):
573 self._current = []
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000574 self._initial_size = 0
575 if string is not None:
576 self.push(string)
Guido van Rossum9604e662007-08-30 03:46:43 +0000577
578 def is_onlyws(self):
579 return len(self) == 0 or str(self).isspace()