blob: e03e42d6c1410ba25f07c8547af298920d084eb9 [file] [log] [blame]
Guido van Rossum8b3febe2007-08-30 01:15:14 +00001# Copyright (C) 2002-2007 Python Software Foundation
2# Author: Ben Gertzfield, Barry Warsaw
3# Contact: email-sig@python.org
4
5"""Header encoding and decoding functionality."""
6
7__all__ = [
8 'Header',
9 'decode_header',
10 'make_header',
11 ]
12
13import re
14import binascii
15
16import email.quoprimime
17import email.base64mime
18
19from email.errors import HeaderParseError
20from email.charset import Charset
21
22NL = '\n'
23SPACE = ' '
24BSPACE = b' '
25SPACE8 = ' ' * 8
26EMPTYSTRING = ''
27
Guido van Rossum9604e662007-08-30 03:46:43 +000028MAXLINELEN = 78
Guido van Rossum8b3febe2007-08-30 01:15:14 +000029
30USASCII = Charset('us-ascii')
31UTF8 = Charset('utf-8')
Guido van Rossum9604e662007-08-30 03:46:43 +000032TRANSITIONAL_SPACE = object()
Guido van Rossum8b3febe2007-08-30 01:15:14 +000033
34# Match encoded-word strings in the form =?charset?q?Hello_World?=
35ecre = re.compile(r'''
36 =\? # literal =?
37 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
38 \? # literal ?
39 (?P<encoding>[qb]) # either a "q" or a "b", case insensitive
40 \? # literal ?
41 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
42 \?= # literal ?=
43 (?=[ \t]|$) # whitespace or the end of the string
44 ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)
45
46# Field name regexp, including trailing colon, but not separating whitespace,
47# according to RFC 2822. Character range is from tilde to exclamation mark.
48# For use with .match()
49fcre = re.compile(r'[\041-\176]+:$')
50
51
52
53# Helpers
54_max_append = email.quoprimime._max_append
55
56
57
58def decode_header(header):
59 """Decode a message header value without converting charset.
60
61 Returns a list of (string, charset) pairs containing each of the decoded
62 parts of the header. Charset is None for non-encoded parts of the header,
63 otherwise a lower-case string containing the name of the character set
64 specified in the encoded string.
65
66 An email.Errors.HeaderParseError may be raised when certain decoding error
67 occurs (e.g. a base64 decoding exception).
68 """
69 # If no encoding, just return the header with no charset.
70 if not ecre.search(header):
71 return [(header, None)]
72 # First step is to parse all the encoded parts into triplets of the form
73 # (encoded_string, encoding, charset). For unencoded strings, the last
74 # two parts will be None.
75 words = []
76 for line in header.splitlines():
77 parts = ecre.split(line)
78 while parts:
79 unencoded = parts.pop(0).strip()
80 if unencoded:
81 words.append((unencoded, None, None))
82 if parts:
83 charset = parts.pop(0).lower()
84 encoding = parts.pop(0).lower()
85 encoded = parts.pop(0)
86 words.append((encoded, encoding, charset))
87 # The next step is to decode each encoded word by applying the reverse
88 # base64 or quopri transformation. decoded_words is now a list of the
89 # form (decoded_word, charset).
90 decoded_words = []
91 for encoded_string, encoding, charset in words:
92 if encoding is None:
93 # This is an unencoded word.
94 decoded_words.append((encoded_string, charset))
95 elif encoding == 'q':
96 word = email.quoprimime.header_decode(encoded_string)
97 decoded_words.append((word, charset))
98 elif encoding == 'b':
99 try:
100 word = email.base64mime.decode(encoded_string)
101 except binascii.Error:
102 raise HeaderParseError('Base64 decoding error')
103 else:
104 decoded_words.append((word, charset))
105 else:
106 raise AssertionError('Unexpected encoding: ' + encoding)
107 # Now convert all words to bytes and collapse consecutive runs of
108 # similarly encoded words.
109 collapsed = []
110 last_word = last_charset = None
111 for word, charset in decoded_words:
112 if isinstance(word, str):
Guido van Rossum9604e662007-08-30 03:46:43 +0000113 word = bytes(word, 'raw-unicode-escape')
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000114 if last_word is None:
115 last_word = word
116 last_charset = charset
117 elif charset != last_charset:
118 collapsed.append((last_word, last_charset))
119 last_word = word
120 last_charset = charset
121 elif last_charset is None:
122 last_word += BSPACE + word
123 else:
124 last_word += word
125 collapsed.append((last_word, last_charset))
126 return collapsed
127
128
129
130def make_header(decoded_seq, maxlinelen=None, header_name=None,
131 continuation_ws=' '):
132 """Create a Header from a sequence of pairs as returned by decode_header()
133
134 decode_header() takes a header value string and returns a sequence of
135 pairs of the format (decoded_string, charset) where charset is the string
136 name of the character set.
137
138 This function takes one of those sequence of pairs and returns a Header
139 instance. Optional maxlinelen, header_name, and continuation_ws are as in
140 the Header constructor.
141 """
142 h = Header(maxlinelen=maxlinelen, header_name=header_name,
143 continuation_ws=continuation_ws)
144 for s, charset in decoded_seq:
145 # None means us-ascii but we can simply pass it on to h.append()
146 if charset is not None and not isinstance(charset, Charset):
147 charset = Charset(charset)
148 h.append(s, charset)
149 return h
150
151
152
153class Header:
154 def __init__(self, s=None, charset=None,
155 maxlinelen=None, header_name=None,
156 continuation_ws=' ', errors='strict'):
157 """Create a MIME-compliant header that can contain many character sets.
158
159 Optional s is the initial header value. If None, the initial header
160 value is not set. You can later append to the header with .append()
161 method calls. s may be a byte string or a Unicode string, but see the
162 .append() documentation for semantics.
163
164 Optional charset serves two purposes: it has the same meaning as the
165 charset argument to the .append() method. It also sets the default
166 character set for all subsequent .append() calls that omit the charset
167 argument. If charset is not provided in the constructor, the us-ascii
168 charset is used both as s's initial charset and as the default for
169 subsequent .append() calls.
170
171 The maximum line length can be specified explicit via maxlinelen. For
172 splitting the first line to a shorter value (to account for the field
173 header which isn't included in s, e.g. `Subject') pass in the name of
Guido van Rossum9604e662007-08-30 03:46:43 +0000174 the field in header_name. The default maxlinelen is 78 as recommended
175 by RFC 2822.
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000176
177 continuation_ws must be RFC 2822 compliant folding whitespace (usually
178 either a space or a hard tab) which will be prepended to continuation
179 lines.
180
181 errors is passed through to the .append() call.
182 """
183 if charset is None:
184 charset = USASCII
185 elif not isinstance(charset, Charset):
186 charset = Charset(charset)
187 self._charset = charset
188 self._continuation_ws = continuation_ws
189 self._chunks = []
190 if s is not None:
191 self.append(s, charset, errors)
192 if maxlinelen is None:
193 maxlinelen = MAXLINELEN
194 self._maxlinelen = maxlinelen
195 if header_name is None:
196 self._headerlen = 0
197 else:
198 # Take the separating colon and space into account.
199 self._headerlen = len(header_name) + 2
200
201 def __str__(self):
202 """Return the string value of the header."""
Guido van Rossum9604e662007-08-30 03:46:43 +0000203 self._normalize()
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000204 uchunks = []
205 lastcs = None
Guido van Rossum9604e662007-08-30 03:46:43 +0000206 for string, charset in self._chunks:
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000207 # We must preserve spaces between encoded and non-encoded word
208 # boundaries, which means for us we need to add a space when we go
209 # from a charset to None/us-ascii, or from None/us-ascii to a
210 # charset. Only do this for the second and subsequent chunks.
211 nextcs = charset
212 if uchunks:
213 if lastcs not in (None, 'us-ascii'):
214 if nextcs in (None, 'us-ascii'):
215 uchunks.append(SPACE)
216 nextcs = None
217 elif nextcs not in (None, 'us-ascii'):
218 uchunks.append(SPACE)
219 lastcs = nextcs
Guido van Rossum9604e662007-08-30 03:46:43 +0000220 uchunks.append(string)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000221 return EMPTYSTRING.join(uchunks)
222
223 # Rich comparison operators for equality only. BAW: does it make sense to
224 # have or explicitly disable <, <=, >, >= operators?
225 def __eq__(self, other):
226 # other may be a Header or a string. Both are fine so coerce
Guido van Rossum9604e662007-08-30 03:46:43 +0000227 # ourselves to a unicode (of the unencoded header value), swap the
228 # args and do another comparison.
229 return other == str(self)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000230
231 def __ne__(self, other):
232 return not self == other
233
234 def append(self, s, charset=None, errors='strict'):
235 """Append a string to the MIME header.
236
237 Optional charset, if given, should be a Charset instance or the name
238 of a character set (which will be converted to a Charset instance). A
239 value of None (the default) means that the charset given in the
240 constructor is used.
241
242 s may be a byte string or a Unicode string. If it is a byte string
243 (i.e. isinstance(s, str) is true), then charset is the encoding of
244 that byte string, and a UnicodeError will be raised if the string
245 cannot be decoded with that charset. If s is a Unicode string, then
246 charset is a hint specifying the character set of the characters in
247 the string. In this case, when producing an RFC 2822 compliant header
248 using RFC 2047 rules, the Unicode string will be encoded using the
249 following charsets in order: us-ascii, the charset hint, utf-8. The
250 first character set not to provoke a UnicodeError is used.
251
252 Optional `errors' is passed as the third argument to any unicode() or
253 ustr.encode() call.
254 """
255 if charset is None:
256 charset = self._charset
257 elif not isinstance(charset, Charset):
258 charset = Charset(charset)
259 if isinstance(s, str):
260 # Convert the string from the input character set to the output
261 # character set and store the resulting bytes and the charset for
262 # composition later.
263 input_charset = charset.input_codec or 'us-ascii'
264 input_bytes = s.encode(input_charset, errors)
265 else:
266 # We already have the bytes we will store internally.
267 input_bytes = s
268 # Ensure that the bytes we're storing can be decoded to the output
269 # character set, otherwise an early error is thrown.
270 output_charset = charset.output_codec or 'us-ascii'
271 output_string = input_bytes.decode(output_charset, errors)
272 self._chunks.append((output_string, charset))
273
Guido van Rossum9604e662007-08-30 03:46:43 +0000274 def encode(self, splitchars=';, \t', maxlinelen=None):
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000275 """Encode a message header into an RFC-compliant format.
276
277 There are many issues involved in converting a given string for use in
278 an email header. Only certain character sets are readable in most
279 email clients, and as header strings can only contain a subset of
280 7-bit ASCII, care must be taken to properly convert and encode (with
281 Base64 or quoted-printable) header strings. In addition, there is a
282 75-character length limit on any given encoded header field, so
283 line-wrapping must be performed, even with double-byte character sets.
284
285 This method will do its best to convert the string to the correct
286 character set used in email, and encode and line wrap it safely with
287 the appropriate scheme for that character set.
288
289 If the given charset is not known or an error occurs during
290 conversion, this function will return the header untouched.
291
292 Optional splitchars is a string containing characters to split long
293 ASCII lines on, in rough support of RFC 2822's `highest level
294 syntactic breaks'. This doesn't affect RFC 2047 encoded lines.
295 """
296 self._normalize()
Guido van Rossum9604e662007-08-30 03:46:43 +0000297 if maxlinelen is None:
298 maxlinelen = self._maxlinelen
299 # A maxlinelen of 0 means don't wrap. For all practical purposes,
300 # choosing a huge number here accomplishes that and makes the
301 # _ValueFormatter algorithm much simpler.
302 if maxlinelen == 0:
303 maxlinelen = 1000000
304 formatter = _ValueFormatter(self._headerlen, maxlinelen,
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000305 self._continuation_ws, splitchars)
306 for string, charset in self._chunks:
307 lines = string.splitlines()
308 for line in lines:
309 formatter.feed(line, charset)
310 if len(lines) > 1:
311 formatter.newline()
312 return str(formatter)
313
314 def _normalize(self):
Guido van Rossum9604e662007-08-30 03:46:43 +0000315 # Step 1: Normalize the chunks so that all runs of identical charsets
316 # get collapsed into a single unicode string.
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000317 chunks = []
318 last_charset = None
319 last_chunk = []
320 for string, charset in self._chunks:
321 if charset == last_charset:
322 last_chunk.append(string)
323 else:
324 if last_charset is not None:
325 chunks.append((SPACE.join(last_chunk), last_charset))
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000326 last_chunk = [string]
327 last_charset = charset
328 if last_chunk:
329 chunks.append((SPACE.join(last_chunk), last_charset))
330 self._chunks = chunks
331
332
333
334class _ValueFormatter:
335 def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
336 self._maxlen = maxlen
337 self._continuation_ws = continuation_ws
338 self._continuation_ws_len = len(continuation_ws.replace('\t', SPACE8))
339 self._splitchars = splitchars
340 self._lines = []
341 self._current_line = _Accumulator(headerlen)
342
343 def __str__(self):
Guido van Rossum9604e662007-08-30 03:46:43 +0000344 # Remove the trailing TRANSITIONAL_SPACE
345 last_line = self._current_line.pop()
346 if last_line is not TRANSITIONAL_SPACE:
347 self._current_line.push(last_line)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000348 self.newline()
349 return NL.join(self._lines)
350
351 def newline(self):
352 if len(self._current_line) > 0:
353 self._lines.append(str(self._current_line))
354 self._current_line.reset()
355
356 def feed(self, string, charset):
357 # If the string itself fits on the current line in its encoded format,
358 # then add it now and be done with it.
359 encoded_string = charset.header_encode(string)
360 if len(encoded_string) + len(self._current_line) <= self._maxlen:
361 self._current_line.push(encoded_string)
362 return
Guido van Rossum9604e662007-08-30 03:46:43 +0000363 # If the charset has no header encoding (i.e. it is an ASCII encoding)
364 # then we must split the header at the "highest level syntactic break"
365 # possible. Note that we don't have a lot of smarts about field
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000366 # syntax; we just try to break on semi-colons, then commas, then
Guido van Rossum9604e662007-08-30 03:46:43 +0000367 # whitespace. Eventually, this should be pluggable.
368 if charset.header_encoding is None:
369 for ch in self._splitchars:
370 if ch in string:
371 break
372 else:
373 ch = None
374 # If there's no available split character then regardless of
375 # whether the string fits on the line, we have to put it on a line
376 # by itself.
377 if ch is None:
378 if not self._current_line.is_onlyws():
379 self._lines.append(str(self._current_line))
380 self._current_line.reset(self._continuation_ws)
381 self._current_line.push(encoded_string)
382 else:
383 self._ascii_split(string, ch)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000384 return
Guido van Rossum9604e662007-08-30 03:46:43 +0000385 # Otherwise, we're doing either a Base64 or a quoted-printable
386 # encoding which means we don't need to split the line on syntactic
387 # breaks. We can basically just find enough characters to fit on the
388 # current line, minus the RFC 2047 chrome. What makes this trickier
389 # though is that we have to split at octet boundaries, not character
390 # boundaries but it's only safe to split at character boundaries so at
391 # best we can only get close.
392 encoded_lines = charset.header_encode_lines(string, self._maxlengths())
393 # The first element extends the current line, but if it's None then
394 # nothing more fit on the current line so start a new line.
395 try:
396 first_line = encoded_lines.pop(0)
397 except IndexError:
398 # There are no encoded lines, so we're done.
399 return
400 if first_line is not None:
401 self._current_line.push(first_line)
402 self._lines.append(str(self._current_line))
403 self._current_line.reset(self._continuation_ws)
404 try:
405 last_line = encoded_lines.pop()
406 except IndexError:
407 # There was only one line.
408 return
409 self._current_line.push(last_line)
410 self._current_line.push(TRANSITIONAL_SPACE)
411 # Everything else are full lines in themselves.
412 for line in encoded_lines:
413 self._lines.append(self._continuation_ws + line)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000414
Guido van Rossum9604e662007-08-30 03:46:43 +0000415 def _maxlengths(self):
416 # The first line's length.
417 yield self._maxlen - len(self._current_line)
418 while True:
419 yield self._maxlen - self._continuation_ws_len
420
421 def _ascii_split(self, string, ch):
422 holding = _Accumulator()
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000423 # Split the line on the split character, preserving it. If the split
424 # character is whitespace RFC 2822 $2.2.3 requires us to fold on the
425 # whitespace, so that the line leads with the original whitespace we
426 # split on. However, if a higher syntactic break is used instead
427 # (e.g. comma or semicolon), the folding should happen after the split
428 # character. But then in that case, we need to add our own
429 # continuation whitespace -- although won't that break unfolding?
430 for part, splitpart, nextpart in _spliterator(ch, string):
431 if not splitpart:
432 # No splitpart means this is the last chunk. Put this part
433 # either on the current line or the next line depending on
434 # whether it fits.
435 holding.push(part)
436 if len(holding) + len(self._current_line) <= self._maxlen:
437 # It fits, but we're done.
438 self._current_line.push(str(holding))
439 else:
440 # It doesn't fit, but we're done. Before pushing a new
441 # line, watch out for the current line containing only
442 # whitespace.
443 holding.pop()
Guido van Rossum9604e662007-08-30 03:46:43 +0000444 if self._current_line.is_onlyws() and holding.is_onlyws():
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000445 # Don't start a new line.
446 holding.push(part)
447 part = None
448 self._current_line.push(str(holding))
449 self._lines.append(str(self._current_line))
450 if part is None:
451 self._current_line.reset()
452 else:
453 holding.reset(part)
454 self._current_line.reset(str(holding))
455 return
456 elif not nextpart:
457 # There must be some trailing split characters because we
458 # found a split character but no next part. In this case we
459 # must treat the thing to fit as the part + splitpart because
460 # if splitpart is whitespace it's not allowed to be the only
461 # thing on the line, and if it's not whitespace we must split
462 # after the syntactic break. In either case, we're done.
463 holding_prelen = len(holding)
464 holding.push(part + splitpart)
465 if len(holding) + len(self._current_line) <= self._maxlen:
466 self._current_line.push(str(holding))
467 elif holding_prelen == 0:
468 # This is the only chunk left so it has to go on the
469 # current line.
470 self._current_line.push(str(holding))
471 else:
472 save_part = holding.pop()
473 self._current_line.push(str(holding))
474 self._lines.append(str(self._current_line))
475 holding.reset(save_part)
476 self._current_line.reset(str(holding))
477 return
478 elif not part:
479 # We're leading with a split character. See if the splitpart
480 # and nextpart fits on the current line.
481 holding.push(splitpart + nextpart)
482 holding_len = len(holding)
483 # We know we're not leaving the nextpart on the stack.
484 holding.pop()
485 if holding_len + len(self._current_line) <= self._maxlen:
486 holding.push(splitpart)
487 else:
488 # It doesn't fit. Since there's no current part really
489 # the best we can do is start a new line and push the
490 # split part onto it.
491 self._current_line.push(str(holding))
492 holding.reset()
493 if len(self._current_line) > 0 and self._lines:
494 self._lines.append(str(self._current_line))
495 self._current_line.reset()
496 holding.push(splitpart)
497 else:
498 # All three parts are present. First let's see if all three
499 # parts will fit on the current line. If so, we don't need to
500 # split it.
501 holding.push(part + splitpart + nextpart)
502 holding_len = len(holding)
503 # Pop the part because we'll push nextpart on the next
504 # iteration through the loop.
505 holding.pop()
506 if holding_len + len(self._current_line) <= self._maxlen:
507 holding.push(part + splitpart)
508 else:
509 # The entire thing doesn't fit. See if we need to split
510 # before or after the split characters.
511 if splitpart.isspace():
512 # Split before whitespace. Remember that the
513 # whitespace becomes the continuation whitespace of
514 # the next line so it goes to current_line not holding.
515 holding.push(part)
516 self._current_line.push(str(holding))
517 holding.reset()
518 self._lines.append(str(self._current_line))
519 self._current_line.reset(splitpart)
520 else:
521 # Split after non-whitespace. The continuation
522 # whitespace comes from the instance variable.
523 holding.push(part + splitpart)
524 self._current_line.push(str(holding))
525 holding.reset()
526 self._lines.append(str(self._current_line))
527 if nextpart[0].isspace():
528 self._current_line.reset()
529 else:
530 self._current_line.reset(self._continuation_ws)
531 # Get the last of the holding part
532 self._current_line.push(str(holding))
533
534
535
536def _spliterator(character, string):
537 parts = list(reversed(re.split('(%s)' % character, string)))
538 while parts:
539 part = parts.pop()
540 splitparts = (parts.pop() if parts else None)
541 nextpart = (parts.pop() if parts else None)
542 yield (part, splitparts, nextpart)
543 if nextpart is not None:
544 parts.append(nextpart)
545
546
547class _Accumulator:
Guido van Rossum9604e662007-08-30 03:46:43 +0000548 def __init__(self, initial_size=0):
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000549 self._initial_size = initial_size
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000550 self._current = []
551
552 def push(self, string):
553 self._current.append(string)
554
555 def pop(self):
556 return self._current.pop()
557
558 def __len__(self):
Guido van Rossum9604e662007-08-30 03:46:43 +0000559 return sum((len(string)
560 for string in self._current
561 if string is not TRANSITIONAL_SPACE),
562 self._initial_size)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000563
564 def __str__(self):
Guido van Rossum9604e662007-08-30 03:46:43 +0000565 return EMPTYSTRING.join(
566 (' ' if string is TRANSITIONAL_SPACE else string)
567 for string in self._current)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000568
569 def reset(self, string=None):
570 self._current = []
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000571 self._initial_size = 0
572 if string is not None:
573 self.push(string)
Guido van Rossum9604e662007-08-30 03:46:43 +0000574
575 def is_onlyws(self):
576 return len(self) == 0 or str(self).isspace()