blob: 3215a80f9b6db1730ae3f6bee60ae4be4cca6f14 [file] [log] [blame]
Guido van Rossum8b3febe2007-08-30 01:15:14 +00001# Copyright (C) 2002-2007 Python Software Foundation
2# Author: Ben Gertzfield, Barry Warsaw
3# Contact: email-sig@python.org
4
5"""Header encoding and decoding functionality."""
6
7__all__ = [
8 'Header',
9 'decode_header',
10 'make_header',
11 ]
12
13import re
14import binascii
15
16import email.quoprimime
17import email.base64mime
18
19from email.errors import HeaderParseError
20from email.charset import Charset
21
22NL = '\n'
23SPACE = ' '
24BSPACE = b' '
25SPACE8 = ' ' * 8
26EMPTYSTRING = ''
Guido van Rossum9604e662007-08-30 03:46:43 +000027MAXLINELEN = 78
Guido van Rossum8b3febe2007-08-30 01:15:14 +000028
29USASCII = Charset('us-ascii')
30UTF8 = Charset('utf-8')
31
32# Match encoded-word strings in the form =?charset?q?Hello_World?=
33ecre = re.compile(r'''
34 =\? # literal =?
35 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
36 \? # literal ?
37 (?P<encoding>[qb]) # either a "q" or a "b", case insensitive
38 \? # literal ?
39 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
40 \?= # literal ?=
41 (?=[ \t]|$) # whitespace or the end of the string
42 ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)
43
44# Field name regexp, including trailing colon, but not separating whitespace,
45# according to RFC 2822. Character range is from tilde to exclamation mark.
46# For use with .match()
47fcre = re.compile(r'[\041-\176]+:$')
48
49
50
51# Helpers
52_max_append = email.quoprimime._max_append
53
54
55
56def decode_header(header):
57 """Decode a message header value without converting charset.
58
59 Returns a list of (string, charset) pairs containing each of the decoded
60 parts of the header. Charset is None for non-encoded parts of the header,
61 otherwise a lower-case string containing the name of the character set
62 specified in the encoded string.
63
64 An email.Errors.HeaderParseError may be raised when certain decoding error
65 occurs (e.g. a base64 decoding exception).
66 """
67 # If no encoding, just return the header with no charset.
68 if not ecre.search(header):
69 return [(header, None)]
70 # First step is to parse all the encoded parts into triplets of the form
71 # (encoded_string, encoding, charset). For unencoded strings, the last
72 # two parts will be None.
73 words = []
74 for line in header.splitlines():
75 parts = ecre.split(line)
76 while parts:
77 unencoded = parts.pop(0).strip()
78 if unencoded:
79 words.append((unencoded, None, None))
80 if parts:
81 charset = parts.pop(0).lower()
82 encoding = parts.pop(0).lower()
83 encoded = parts.pop(0)
84 words.append((encoded, encoding, charset))
85 # The next step is to decode each encoded word by applying the reverse
86 # base64 or quopri transformation. decoded_words is now a list of the
87 # form (decoded_word, charset).
88 decoded_words = []
89 for encoded_string, encoding, charset in words:
90 if encoding is None:
91 # This is an unencoded word.
92 decoded_words.append((encoded_string, charset))
93 elif encoding == 'q':
94 word = email.quoprimime.header_decode(encoded_string)
95 decoded_words.append((word, charset))
96 elif encoding == 'b':
97 try:
98 word = email.base64mime.decode(encoded_string)
99 except binascii.Error:
100 raise HeaderParseError('Base64 decoding error')
101 else:
102 decoded_words.append((word, charset))
103 else:
104 raise AssertionError('Unexpected encoding: ' + encoding)
105 # Now convert all words to bytes and collapse consecutive runs of
106 # similarly encoded words.
107 collapsed = []
108 last_word = last_charset = None
109 for word, charset in decoded_words:
110 if isinstance(word, str):
Guido van Rossum9604e662007-08-30 03:46:43 +0000111 word = bytes(word, 'raw-unicode-escape')
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000112 if last_word is None:
113 last_word = word
114 last_charset = charset
115 elif charset != last_charset:
116 collapsed.append((last_word, last_charset))
117 last_word = word
118 last_charset = charset
119 elif last_charset is None:
120 last_word += BSPACE + word
121 else:
122 last_word += word
123 collapsed.append((last_word, last_charset))
124 return collapsed
125
126
127
128def make_header(decoded_seq, maxlinelen=None, header_name=None,
129 continuation_ws=' '):
130 """Create a Header from a sequence of pairs as returned by decode_header()
131
132 decode_header() takes a header value string and returns a sequence of
133 pairs of the format (decoded_string, charset) where charset is the string
134 name of the character set.
135
136 This function takes one of those sequence of pairs and returns a Header
137 instance. Optional maxlinelen, header_name, and continuation_ws are as in
138 the Header constructor.
139 """
140 h = Header(maxlinelen=maxlinelen, header_name=header_name,
141 continuation_ws=continuation_ws)
142 for s, charset in decoded_seq:
143 # None means us-ascii but we can simply pass it on to h.append()
144 if charset is not None and not isinstance(charset, Charset):
145 charset = Charset(charset)
146 h.append(s, charset)
147 return h
148
149
150
151class Header:
152 def __init__(self, s=None, charset=None,
153 maxlinelen=None, header_name=None,
154 continuation_ws=' ', errors='strict'):
155 """Create a MIME-compliant header that can contain many character sets.
156
157 Optional s is the initial header value. If None, the initial header
158 value is not set. You can later append to the header with .append()
159 method calls. s may be a byte string or a Unicode string, but see the
160 .append() documentation for semantics.
161
162 Optional charset serves two purposes: it has the same meaning as the
163 charset argument to the .append() method. It also sets the default
164 character set for all subsequent .append() calls that omit the charset
165 argument. If charset is not provided in the constructor, the us-ascii
166 charset is used both as s's initial charset and as the default for
167 subsequent .append() calls.
168
169 The maximum line length can be specified explicit via maxlinelen. For
170 splitting the first line to a shorter value (to account for the field
171 header which isn't included in s, e.g. `Subject') pass in the name of
Guido van Rossum9604e662007-08-30 03:46:43 +0000172 the field in header_name. The default maxlinelen is 78 as recommended
173 by RFC 2822.
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000174
175 continuation_ws must be RFC 2822 compliant folding whitespace (usually
176 either a space or a hard tab) which will be prepended to continuation
177 lines.
178
179 errors is passed through to the .append() call.
180 """
181 if charset is None:
182 charset = USASCII
183 elif not isinstance(charset, Charset):
184 charset = Charset(charset)
185 self._charset = charset
186 self._continuation_ws = continuation_ws
187 self._chunks = []
188 if s is not None:
189 self.append(s, charset, errors)
190 if maxlinelen is None:
191 maxlinelen = MAXLINELEN
192 self._maxlinelen = maxlinelen
193 if header_name is None:
194 self._headerlen = 0
195 else:
196 # Take the separating colon and space into account.
197 self._headerlen = len(header_name) + 2
198
199 def __str__(self):
200 """Return the string value of the header."""
Guido van Rossum9604e662007-08-30 03:46:43 +0000201 self._normalize()
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000202 uchunks = []
203 lastcs = None
Guido van Rossum9604e662007-08-30 03:46:43 +0000204 for string, charset in self._chunks:
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000205 # We must preserve spaces between encoded and non-encoded word
206 # boundaries, which means for us we need to add a space when we go
207 # from a charset to None/us-ascii, or from None/us-ascii to a
208 # charset. Only do this for the second and subsequent chunks.
209 nextcs = charset
210 if uchunks:
211 if lastcs not in (None, 'us-ascii'):
212 if nextcs in (None, 'us-ascii'):
213 uchunks.append(SPACE)
214 nextcs = None
215 elif nextcs not in (None, 'us-ascii'):
216 uchunks.append(SPACE)
217 lastcs = nextcs
Guido van Rossum9604e662007-08-30 03:46:43 +0000218 uchunks.append(string)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000219 return EMPTYSTRING.join(uchunks)
220
221 # Rich comparison operators for equality only. BAW: does it make sense to
222 # have or explicitly disable <, <=, >, >= operators?
223 def __eq__(self, other):
224 # other may be a Header or a string. Both are fine so coerce
Guido van Rossum9604e662007-08-30 03:46:43 +0000225 # ourselves to a unicode (of the unencoded header value), swap the
226 # args and do another comparison.
227 return other == str(self)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000228
229 def __ne__(self, other):
230 return not self == other
231
232 def append(self, s, charset=None, errors='strict'):
233 """Append a string to the MIME header.
234
235 Optional charset, if given, should be a Charset instance or the name
236 of a character set (which will be converted to a Charset instance). A
237 value of None (the default) means that the charset given in the
238 constructor is used.
239
240 s may be a byte string or a Unicode string. If it is a byte string
241 (i.e. isinstance(s, str) is true), then charset is the encoding of
242 that byte string, and a UnicodeError will be raised if the string
243 cannot be decoded with that charset. If s is a Unicode string, then
244 charset is a hint specifying the character set of the characters in
245 the string. In this case, when producing an RFC 2822 compliant header
246 using RFC 2047 rules, the Unicode string will be encoded using the
247 following charsets in order: us-ascii, the charset hint, utf-8. The
248 first character set not to provoke a UnicodeError is used.
249
250 Optional `errors' is passed as the third argument to any unicode() or
251 ustr.encode() call.
252 """
253 if charset is None:
254 charset = self._charset
255 elif not isinstance(charset, Charset):
256 charset = Charset(charset)
257 if isinstance(s, str):
258 # Convert the string from the input character set to the output
259 # character set and store the resulting bytes and the charset for
260 # composition later.
261 input_charset = charset.input_codec or 'us-ascii'
262 input_bytes = s.encode(input_charset, errors)
263 else:
264 # We already have the bytes we will store internally.
265 input_bytes = s
266 # Ensure that the bytes we're storing can be decoded to the output
267 # character set, otherwise an early error is thrown.
268 output_charset = charset.output_codec or 'us-ascii'
269 output_string = input_bytes.decode(output_charset, errors)
270 self._chunks.append((output_string, charset))
271
Guido van Rossum9604e662007-08-30 03:46:43 +0000272 def encode(self, splitchars=';, \t', maxlinelen=None):
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000273 """Encode a message header into an RFC-compliant format.
274
275 There are many issues involved in converting a given string for use in
276 an email header. Only certain character sets are readable in most
277 email clients, and as header strings can only contain a subset of
278 7-bit ASCII, care must be taken to properly convert and encode (with
279 Base64 or quoted-printable) header strings. In addition, there is a
280 75-character length limit on any given encoded header field, so
281 line-wrapping must be performed, even with double-byte character sets.
282
283 This method will do its best to convert the string to the correct
284 character set used in email, and encode and line wrap it safely with
285 the appropriate scheme for that character set.
286
287 If the given charset is not known or an error occurs during
288 conversion, this function will return the header untouched.
289
290 Optional splitchars is a string containing characters to split long
291 ASCII lines on, in rough support of RFC 2822's `highest level
292 syntactic breaks'. This doesn't affect RFC 2047 encoded lines.
293 """
294 self._normalize()
Guido van Rossum9604e662007-08-30 03:46:43 +0000295 if maxlinelen is None:
296 maxlinelen = self._maxlinelen
297 # A maxlinelen of 0 means don't wrap. For all practical purposes,
298 # choosing a huge number here accomplishes that and makes the
299 # _ValueFormatter algorithm much simpler.
300 if maxlinelen == 0:
301 maxlinelen = 1000000
302 formatter = _ValueFormatter(self._headerlen, maxlinelen,
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000303 self._continuation_ws, splitchars)
304 for string, charset in self._chunks:
305 lines = string.splitlines()
306 for line in lines:
307 formatter.feed(line, charset)
308 if len(lines) > 1:
309 formatter.newline()
Barry Warsaw00b34222007-08-31 02:35:00 +0000310 formatter.add_transition()
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000311 return str(formatter)
312
313 def _normalize(self):
Guido van Rossum9604e662007-08-30 03:46:43 +0000314 # Step 1: Normalize the chunks so that all runs of identical charsets
315 # get collapsed into a single unicode string.
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000316 chunks = []
317 last_charset = None
318 last_chunk = []
319 for string, charset in self._chunks:
320 if charset == last_charset:
321 last_chunk.append(string)
322 else:
323 if last_charset is not None:
324 chunks.append((SPACE.join(last_chunk), last_charset))
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000325 last_chunk = [string]
326 last_charset = charset
327 if last_chunk:
328 chunks.append((SPACE.join(last_chunk), last_charset))
329 self._chunks = chunks
330
331
332
333class _ValueFormatter:
334 def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
335 self._maxlen = maxlen
336 self._continuation_ws = continuation_ws
337 self._continuation_ws_len = len(continuation_ws.replace('\t', SPACE8))
338 self._splitchars = splitchars
339 self._lines = []
340 self._current_line = _Accumulator(headerlen)
341
342 def __str__(self):
343 self.newline()
344 return NL.join(self._lines)
345
346 def newline(self):
Barry Warsaw00b34222007-08-31 02:35:00 +0000347 end_of_line = self._current_line.pop()
348 if end_of_line is not None:
349 self._current_line.push(end_of_line)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000350 if len(self._current_line) > 0:
351 self._lines.append(str(self._current_line))
352 self._current_line.reset()
353
Barry Warsaw00b34222007-08-31 02:35:00 +0000354 def add_transition(self):
355 self._current_line.push(None)
356
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000357 def feed(self, string, charset):
358 # If the string itself fits on the current line in its encoded format,
359 # then add it now and be done with it.
360 encoded_string = charset.header_encode(string)
361 if len(encoded_string) + len(self._current_line) <= self._maxlen:
362 self._current_line.push(encoded_string)
363 return
Guido van Rossum9604e662007-08-30 03:46:43 +0000364 # If the charset has no header encoding (i.e. it is an ASCII encoding)
365 # then we must split the header at the "highest level syntactic break"
366 # possible. Note that we don't have a lot of smarts about field
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000367 # syntax; we just try to break on semi-colons, then commas, then
Guido van Rossum9604e662007-08-30 03:46:43 +0000368 # whitespace. Eventually, this should be pluggable.
369 if charset.header_encoding is None:
370 for ch in self._splitchars:
371 if ch in string:
372 break
373 else:
374 ch = None
375 # If there's no available split character then regardless of
376 # whether the string fits on the line, we have to put it on a line
377 # by itself.
378 if ch is None:
379 if not self._current_line.is_onlyws():
380 self._lines.append(str(self._current_line))
381 self._current_line.reset(self._continuation_ws)
382 self._current_line.push(encoded_string)
383 else:
384 self._ascii_split(string, ch)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000385 return
Guido van Rossum9604e662007-08-30 03:46:43 +0000386 # Otherwise, we're doing either a Base64 or a quoted-printable
387 # encoding which means we don't need to split the line on syntactic
388 # breaks. We can basically just find enough characters to fit on the
389 # current line, minus the RFC 2047 chrome. What makes this trickier
390 # though is that we have to split at octet boundaries, not character
391 # boundaries but it's only safe to split at character boundaries so at
392 # best we can only get close.
393 encoded_lines = charset.header_encode_lines(string, self._maxlengths())
394 # The first element extends the current line, but if it's None then
395 # nothing more fit on the current line so start a new line.
396 try:
397 first_line = encoded_lines.pop(0)
398 except IndexError:
399 # There are no encoded lines, so we're done.
400 return
401 if first_line is not None:
402 self._current_line.push(first_line)
403 self._lines.append(str(self._current_line))
404 self._current_line.reset(self._continuation_ws)
405 try:
406 last_line = encoded_lines.pop()
407 except IndexError:
408 # There was only one line.
409 return
410 self._current_line.push(last_line)
Guido van Rossum9604e662007-08-30 03:46:43 +0000411 # Everything else are full lines in themselves.
412 for line in encoded_lines:
413 self._lines.append(self._continuation_ws + line)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000414
Guido van Rossum9604e662007-08-30 03:46:43 +0000415 def _maxlengths(self):
416 # The first line's length.
417 yield self._maxlen - len(self._current_line)
418 while True:
419 yield self._maxlen - self._continuation_ws_len
420
421 def _ascii_split(self, string, ch):
422 holding = _Accumulator()
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000423 # Split the line on the split character, preserving it. If the split
424 # character is whitespace RFC 2822 $2.2.3 requires us to fold on the
425 # whitespace, so that the line leads with the original whitespace we
426 # split on. However, if a higher syntactic break is used instead
427 # (e.g. comma or semicolon), the folding should happen after the split
428 # character. But then in that case, we need to add our own
429 # continuation whitespace -- although won't that break unfolding?
430 for part, splitpart, nextpart in _spliterator(ch, string):
431 if not splitpart:
432 # No splitpart means this is the last chunk. Put this part
433 # either on the current line or the next line depending on
434 # whether it fits.
435 holding.push(part)
436 if len(holding) + len(self._current_line) <= self._maxlen:
437 # It fits, but we're done.
438 self._current_line.push(str(holding))
439 else:
440 # It doesn't fit, but we're done. Before pushing a new
441 # line, watch out for the current line containing only
442 # whitespace.
443 holding.pop()
Guido van Rossum9604e662007-08-30 03:46:43 +0000444 if self._current_line.is_onlyws() and holding.is_onlyws():
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000445 # Don't start a new line.
446 holding.push(part)
447 part = None
448 self._current_line.push(str(holding))
449 self._lines.append(str(self._current_line))
450 if part is None:
451 self._current_line.reset()
452 else:
453 holding.reset(part)
454 self._current_line.reset(str(holding))
455 return
456 elif not nextpart:
457 # There must be some trailing split characters because we
458 # found a split character but no next part. In this case we
459 # must treat the thing to fit as the part + splitpart because
460 # if splitpart is whitespace it's not allowed to be the only
461 # thing on the line, and if it's not whitespace we must split
462 # after the syntactic break. In either case, we're done.
463 holding_prelen = len(holding)
464 holding.push(part + splitpart)
465 if len(holding) + len(self._current_line) <= self._maxlen:
466 self._current_line.push(str(holding))
467 elif holding_prelen == 0:
468 # This is the only chunk left so it has to go on the
469 # current line.
470 self._current_line.push(str(holding))
471 else:
472 save_part = holding.pop()
473 self._current_line.push(str(holding))
474 self._lines.append(str(self._current_line))
475 holding.reset(save_part)
476 self._current_line.reset(str(holding))
477 return
478 elif not part:
479 # We're leading with a split character. See if the splitpart
480 # and nextpart fits on the current line.
481 holding.push(splitpart + nextpart)
482 holding_len = len(holding)
483 # We know we're not leaving the nextpart on the stack.
484 holding.pop()
485 if holding_len + len(self._current_line) <= self._maxlen:
486 holding.push(splitpart)
487 else:
488 # It doesn't fit. Since there's no current part really
489 # the best we can do is start a new line and push the
490 # split part onto it.
491 self._current_line.push(str(holding))
492 holding.reset()
493 if len(self._current_line) > 0 and self._lines:
494 self._lines.append(str(self._current_line))
495 self._current_line.reset()
496 holding.push(splitpart)
497 else:
498 # All three parts are present. First let's see if all three
499 # parts will fit on the current line. If so, we don't need to
500 # split it.
501 holding.push(part + splitpart + nextpart)
502 holding_len = len(holding)
503 # Pop the part because we'll push nextpart on the next
504 # iteration through the loop.
505 holding.pop()
506 if holding_len + len(self._current_line) <= self._maxlen:
507 holding.push(part + splitpart)
508 else:
509 # The entire thing doesn't fit. See if we need to split
510 # before or after the split characters.
511 if splitpart.isspace():
512 # Split before whitespace. Remember that the
513 # whitespace becomes the continuation whitespace of
514 # the next line so it goes to current_line not holding.
515 holding.push(part)
516 self._current_line.push(str(holding))
517 holding.reset()
518 self._lines.append(str(self._current_line))
519 self._current_line.reset(splitpart)
520 else:
521 # Split after non-whitespace. The continuation
522 # whitespace comes from the instance variable.
523 holding.push(part + splitpart)
524 self._current_line.push(str(holding))
525 holding.reset()
526 self._lines.append(str(self._current_line))
527 if nextpart[0].isspace():
528 self._current_line.reset()
529 else:
530 self._current_line.reset(self._continuation_ws)
531 # Get the last of the holding part
532 self._current_line.push(str(holding))
533
534
535
536def _spliterator(character, string):
537 parts = list(reversed(re.split('(%s)' % character, string)))
538 while parts:
539 part = parts.pop()
540 splitparts = (parts.pop() if parts else None)
541 nextpart = (parts.pop() if parts else None)
542 yield (part, splitparts, nextpart)
543 if nextpart is not None:
544 parts.append(nextpart)
545
546
547class _Accumulator:
Guido van Rossum9604e662007-08-30 03:46:43 +0000548 def __init__(self, initial_size=0):
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000549 self._initial_size = initial_size
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000550 self._current = []
551
552 def push(self, string):
553 self._current.append(string)
554
555 def pop(self):
Barry Warsaw00b34222007-08-31 02:35:00 +0000556 if not self._current:
557 return None
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000558 return self._current.pop()
559
560 def __len__(self):
Barry Warsaw00b34222007-08-31 02:35:00 +0000561 return sum(((1 if string is None else len(string))
562 for string in self._current),
Guido van Rossum9604e662007-08-30 03:46:43 +0000563 self._initial_size)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000564
565 def __str__(self):
Barry Warsaw00b34222007-08-31 02:35:00 +0000566 if self._current and self._current[-1] is None:
567 self._current.pop()
568 return EMPTYSTRING.join((' ' if string is None else string)
569 for string in self._current)
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000570
571 def reset(self, string=None):
572 self._current = []
Guido van Rossum8b3febe2007-08-30 01:15:14 +0000573 self._initial_size = 0
574 if string is not None:
575 self.push(string)
Guido van Rossum9604e662007-08-30 03:46:43 +0000576
577 def is_onlyws(self):
578 return len(self) == 0 or str(self).isspace()