blob: 1d97f8f5ea1084000e0ca59ea55660dbdf7b0980 [file] [log] [blame]
Guido van Rossum8b3febe2007-08-30 01:15:14 +00001# Copyright (C) 2002-2007 Python Software Foundation
2# Author: Ben Gertzfield, Barry Warsaw
3# Contact: email-sig@python.org
4
5"""Header encoding and decoding functionality."""
6
7__all__ = [
8 'Header',
9 'decode_header',
10 'make_header',
11 ]
12
13import re
14import binascii
15
16import email.quoprimime
17import email.base64mime
18
19from email.errors import HeaderParseError
20from email.charset import Charset
21
22NL = '\n'
23SPACE = ' '
24BSPACE = b' '
25SPACE8 = ' ' * 8
26EMPTYSTRING = ''
27
28MAXLINELEN = 76
29
30USASCII = Charset('us-ascii')
31UTF8 = Charset('utf-8')
32
33# Match encoded-word strings in the form =?charset?q?Hello_World?=
34ecre = re.compile(r'''
35 =\? # literal =?
36 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
37 \? # literal ?
38 (?P<encoding>[qb]) # either a "q" or a "b", case insensitive
39 \? # literal ?
40 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
41 \?= # literal ?=
42 (?=[ \t]|$) # whitespace or the end of the string
43 ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)
44
45# Field name regexp, including trailing colon, but not separating whitespace,
46# according to RFC 2822. Character range is from tilde to exclamation mark.
47# For use with .match()
48fcre = re.compile(r'[\041-\176]+:$')
49
50
51
52# Helpers
53_max_append = email.quoprimime._max_append
54
55
56
57def decode_header(header):
58 """Decode a message header value without converting charset.
59
60 Returns a list of (string, charset) pairs containing each of the decoded
61 parts of the header. Charset is None for non-encoded parts of the header,
62 otherwise a lower-case string containing the name of the character set
63 specified in the encoded string.
64
65 An email.Errors.HeaderParseError may be raised when certain decoding error
66 occurs (e.g. a base64 decoding exception).
67 """
68 # If no encoding, just return the header with no charset.
69 if not ecre.search(header):
70 return [(header, None)]
71 # First step is to parse all the encoded parts into triplets of the form
72 # (encoded_string, encoding, charset). For unencoded strings, the last
73 # two parts will be None.
74 words = []
75 for line in header.splitlines():
76 parts = ecre.split(line)
77 while parts:
78 unencoded = parts.pop(0).strip()
79 if unencoded:
80 words.append((unencoded, None, None))
81 if parts:
82 charset = parts.pop(0).lower()
83 encoding = parts.pop(0).lower()
84 encoded = parts.pop(0)
85 words.append((encoded, encoding, charset))
86 # The next step is to decode each encoded word by applying the reverse
87 # base64 or quopri transformation. decoded_words is now a list of the
88 # form (decoded_word, charset).
89 decoded_words = []
90 for encoded_string, encoding, charset in words:
91 if encoding is None:
92 # This is an unencoded word.
93 decoded_words.append((encoded_string, charset))
94 elif encoding == 'q':
95 word = email.quoprimime.header_decode(encoded_string)
96 decoded_words.append((word, charset))
97 elif encoding == 'b':
98 try:
99 word = email.base64mime.decode(encoded_string)
100 except binascii.Error:
101 raise HeaderParseError('Base64 decoding error')
102 else:
103 decoded_words.append((word, charset))
104 else:
105 raise AssertionError('Unexpected encoding: ' + encoding)
106 # Now convert all words to bytes and collapse consecutive runs of
107 # similarly encoded words.
108 collapsed = []
109 last_word = last_charset = None
110 for word, charset in decoded_words:
111 if isinstance(word, str):
112 word = bytes(ord(c) for c in word)
113 if last_word is None:
114 last_word = word
115 last_charset = charset
116 elif charset != last_charset:
117 collapsed.append((last_word, last_charset))
118 last_word = word
119 last_charset = charset
120 elif last_charset is None:
121 last_word += BSPACE + word
122 else:
123 last_word += word
124 collapsed.append((last_word, last_charset))
125 return collapsed
126
127
128
129def make_header(decoded_seq, maxlinelen=None, header_name=None,
130 continuation_ws=' '):
131 """Create a Header from a sequence of pairs as returned by decode_header()
132
133 decode_header() takes a header value string and returns a sequence of
134 pairs of the format (decoded_string, charset) where charset is the string
135 name of the character set.
136
137 This function takes one of those sequence of pairs and returns a Header
138 instance. Optional maxlinelen, header_name, and continuation_ws are as in
139 the Header constructor.
140 """
141 h = Header(maxlinelen=maxlinelen, header_name=header_name,
142 continuation_ws=continuation_ws)
143 for s, charset in decoded_seq:
144 # None means us-ascii but we can simply pass it on to h.append()
145 if charset is not None and not isinstance(charset, Charset):
146 charset = Charset(charset)
147 h.append(s, charset)
148 return h
149
150
151
152class Header:
153 def __init__(self, s=None, charset=None,
154 maxlinelen=None, header_name=None,
155 continuation_ws=' ', errors='strict'):
156 """Create a MIME-compliant header that can contain many character sets.
157
158 Optional s is the initial header value. If None, the initial header
159 value is not set. You can later append to the header with .append()
160 method calls. s may be a byte string or a Unicode string, but see the
161 .append() documentation for semantics.
162
163 Optional charset serves two purposes: it has the same meaning as the
164 charset argument to the .append() method. It also sets the default
165 character set for all subsequent .append() calls that omit the charset
166 argument. If charset is not provided in the constructor, the us-ascii
167 charset is used both as s's initial charset and as the default for
168 subsequent .append() calls.
169
170 The maximum line length can be specified explicit via maxlinelen. For
171 splitting the first line to a shorter value (to account for the field
172 header which isn't included in s, e.g. `Subject') pass in the name of
173 the field in header_name. The default maxlinelen is 76.
174
175 continuation_ws must be RFC 2822 compliant folding whitespace (usually
176 either a space or a hard tab) which will be prepended to continuation
177 lines.
178
179 errors is passed through to the .append() call.
180 """
181 if charset is None:
182 charset = USASCII
183 elif not isinstance(charset, Charset):
184 charset = Charset(charset)
185 self._charset = charset
186 self._continuation_ws = continuation_ws
187 self._chunks = []
188 if s is not None:
189 self.append(s, charset, errors)
190 if maxlinelen is None:
191 maxlinelen = MAXLINELEN
192 self._maxlinelen = maxlinelen
193 if header_name is None:
194 self._headerlen = 0
195 else:
196 # Take the separating colon and space into account.
197 self._headerlen = len(header_name) + 2
198
199 def __str__(self):
200 """Return the string value of the header."""
201 uchunks = []
202 lastcs = None
203 for s, charset in self._chunks:
204 # We must preserve spaces between encoded and non-encoded word
205 # boundaries, which means for us we need to add a space when we go
206 # from a charset to None/us-ascii, or from None/us-ascii to a
207 # charset. Only do this for the second and subsequent chunks.
208 nextcs = charset
209 if uchunks:
210 if lastcs not in (None, 'us-ascii'):
211 if nextcs in (None, 'us-ascii'):
212 uchunks.append(SPACE)
213 nextcs = None
214 elif nextcs not in (None, 'us-ascii'):
215 uchunks.append(SPACE)
216 lastcs = nextcs
217 uchunks.append(s)
218 return EMPTYSTRING.join(uchunks)
219
220 # Rich comparison operators for equality only. BAW: does it make sense to
221 # have or explicitly disable <, <=, >, >= operators?
222 def __eq__(self, other):
223 # other may be a Header or a string. Both are fine so coerce
224 # ourselves to a string, swap the args and do another comparison.
225 return other == self.encode()
226
227 def __ne__(self, other):
228 return not self == other
229
230 def append(self, s, charset=None, errors='strict'):
231 """Append a string to the MIME header.
232
233 Optional charset, if given, should be a Charset instance or the name
234 of a character set (which will be converted to a Charset instance). A
235 value of None (the default) means that the charset given in the
236 constructor is used.
237
238 s may be a byte string or a Unicode string. If it is a byte string
239 (i.e. isinstance(s, str) is true), then charset is the encoding of
240 that byte string, and a UnicodeError will be raised if the string
241 cannot be decoded with that charset. If s is a Unicode string, then
242 charset is a hint specifying the character set of the characters in
243 the string. In this case, when producing an RFC 2822 compliant header
244 using RFC 2047 rules, the Unicode string will be encoded using the
245 following charsets in order: us-ascii, the charset hint, utf-8. The
246 first character set not to provoke a UnicodeError is used.
247
248 Optional `errors' is passed as the third argument to any unicode() or
249 ustr.encode() call.
250 """
251 if charset is None:
252 charset = self._charset
253 elif not isinstance(charset, Charset):
254 charset = Charset(charset)
255 if isinstance(s, str):
256 # Convert the string from the input character set to the output
257 # character set and store the resulting bytes and the charset for
258 # composition later.
259 input_charset = charset.input_codec or 'us-ascii'
260 input_bytes = s.encode(input_charset, errors)
261 else:
262 # We already have the bytes we will store internally.
263 input_bytes = s
264 # Ensure that the bytes we're storing can be decoded to the output
265 # character set, otherwise an early error is thrown.
266 output_charset = charset.output_codec or 'us-ascii'
267 output_string = input_bytes.decode(output_charset, errors)
268 self._chunks.append((output_string, charset))
269
270 def encode(self, splitchars=';, \t'):
271 """Encode a message header into an RFC-compliant format.
272
273 There are many issues involved in converting a given string for use in
274 an email header. Only certain character sets are readable in most
275 email clients, and as header strings can only contain a subset of
276 7-bit ASCII, care must be taken to properly convert and encode (with
277 Base64 or quoted-printable) header strings. In addition, there is a
278 75-character length limit on any given encoded header field, so
279 line-wrapping must be performed, even with double-byte character sets.
280
281 This method will do its best to convert the string to the correct
282 character set used in email, and encode and line wrap it safely with
283 the appropriate scheme for that character set.
284
285 If the given charset is not known or an error occurs during
286 conversion, this function will return the header untouched.
287
288 Optional splitchars is a string containing characters to split long
289 ASCII lines on, in rough support of RFC 2822's `highest level
290 syntactic breaks'. This doesn't affect RFC 2047 encoded lines.
291 """
292 self._normalize()
293 formatter = _ValueFormatter(self._headerlen, self._maxlinelen,
294 self._continuation_ws, splitchars)
295 for string, charset in self._chunks:
296 lines = string.splitlines()
297 for line in lines:
298 formatter.feed(line, charset)
299 if len(lines) > 1:
300 formatter.newline()
301 return str(formatter)
302
303 def _normalize(self):
304 # Normalize the chunks so that all runs of identical charsets get
305 # collapsed into a single unicode string. You need a space between
306 # encoded words, or between encoded and unencoded words.
307 chunks = []
308 last_charset = None
309 last_chunk = []
310 for string, charset in self._chunks:
311 if charset == last_charset:
312 last_chunk.append(string)
313 else:
314 if last_charset is not None:
315 chunks.append((SPACE.join(last_chunk), last_charset))
316 if last_charset != USASCII or charset != USASCII:
317 chunks.append((' ', USASCII))
318 last_chunk = [string]
319 last_charset = charset
320 if last_chunk:
321 chunks.append((SPACE.join(last_chunk), last_charset))
322 self._chunks = chunks
323
324
325
326class _ValueFormatter:
327 def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
328 self._maxlen = maxlen
329 self._continuation_ws = continuation_ws
330 self._continuation_ws_len = len(continuation_ws.replace('\t', SPACE8))
331 self._splitchars = splitchars
332 self._lines = []
333 self._current_line = _Accumulator(headerlen)
334
335 def __str__(self):
336 self.newline()
337 return NL.join(self._lines)
338
339 def newline(self):
340 if len(self._current_line) > 0:
341 self._lines.append(str(self._current_line))
342 self._current_line.reset()
343
344 def feed(self, string, charset):
345 # If the string itself fits on the current line in its encoded format,
346 # then add it now and be done with it.
347 encoded_string = charset.header_encode(string)
348 if len(encoded_string) + len(self._current_line) <= self._maxlen:
349 self._current_line.push(encoded_string)
350 return
351 # Attempt to split the line at the highest-level syntactic break
352 # possible. Note that we don't have a lot of smarts about field
353 # syntax; we just try to break on semi-colons, then commas, then
354 # whitespace. Eventually, we'll allow this to be pluggable.
355 for ch in self._splitchars:
356 if ch in string:
357 break
358 else:
359 # We can't split the string to fit on the current line, so just
360 # put it on a line by itself.
361 self._lines.append(str(self._current_line))
362 self._current_line.reset(self._continuation_ws)
363 self._current_line.push(encoded_string)
364 return
365 self._spliterate(string, ch, charset)
366
367 def _spliterate(self, string, ch, charset):
368 holding = _Accumulator(transformfunc=charset.header_encode)
369 # Split the line on the split character, preserving it. If the split
370 # character is whitespace RFC 2822 $2.2.3 requires us to fold on the
371 # whitespace, so that the line leads with the original whitespace we
372 # split on. However, if a higher syntactic break is used instead
373 # (e.g. comma or semicolon), the folding should happen after the split
374 # character. But then in that case, we need to add our own
375 # continuation whitespace -- although won't that break unfolding?
376 for part, splitpart, nextpart in _spliterator(ch, string):
377 if not splitpart:
378 # No splitpart means this is the last chunk. Put this part
379 # either on the current line or the next line depending on
380 # whether it fits.
381 holding.push(part)
382 if len(holding) + len(self._current_line) <= self._maxlen:
383 # It fits, but we're done.
384 self._current_line.push(str(holding))
385 else:
386 # It doesn't fit, but we're done. Before pushing a new
387 # line, watch out for the current line containing only
388 # whitespace.
389 holding.pop()
390 if len(self._current_line) == 0 and (
391 len(holding) == 0 or str(holding).isspace()):
392 # Don't start a new line.
393 holding.push(part)
394 part = None
395 self._current_line.push(str(holding))
396 self._lines.append(str(self._current_line))
397 if part is None:
398 self._current_line.reset()
399 else:
400 holding.reset(part)
401 self._current_line.reset(str(holding))
402 return
403 elif not nextpart:
404 # There must be some trailing split characters because we
405 # found a split character but no next part. In this case we
406 # must treat the thing to fit as the part + splitpart because
407 # if splitpart is whitespace it's not allowed to be the only
408 # thing on the line, and if it's not whitespace we must split
409 # after the syntactic break. In either case, we're done.
410 holding_prelen = len(holding)
411 holding.push(part + splitpart)
412 if len(holding) + len(self._current_line) <= self._maxlen:
413 self._current_line.push(str(holding))
414 elif holding_prelen == 0:
415 # This is the only chunk left so it has to go on the
416 # current line.
417 self._current_line.push(str(holding))
418 else:
419 save_part = holding.pop()
420 self._current_line.push(str(holding))
421 self._lines.append(str(self._current_line))
422 holding.reset(save_part)
423 self._current_line.reset(str(holding))
424 return
425 elif not part:
426 # We're leading with a split character. See if the splitpart
427 # and nextpart fits on the current line.
428 holding.push(splitpart + nextpart)
429 holding_len = len(holding)
430 # We know we're not leaving the nextpart on the stack.
431 holding.pop()
432 if holding_len + len(self._current_line) <= self._maxlen:
433 holding.push(splitpart)
434 else:
435 # It doesn't fit. Since there's no current part really
436 # the best we can do is start a new line and push the
437 # split part onto it.
438 self._current_line.push(str(holding))
439 holding.reset()
440 if len(self._current_line) > 0 and self._lines:
441 self._lines.append(str(self._current_line))
442 self._current_line.reset()
443 holding.push(splitpart)
444 else:
445 # All three parts are present. First let's see if all three
446 # parts will fit on the current line. If so, we don't need to
447 # split it.
448 holding.push(part + splitpart + nextpart)
449 holding_len = len(holding)
450 # Pop the part because we'll push nextpart on the next
451 # iteration through the loop.
452 holding.pop()
453 if holding_len + len(self._current_line) <= self._maxlen:
454 holding.push(part + splitpart)
455 else:
456 # The entire thing doesn't fit. See if we need to split
457 # before or after the split characters.
458 if splitpart.isspace():
459 # Split before whitespace. Remember that the
460 # whitespace becomes the continuation whitespace of
461 # the next line so it goes to current_line not holding.
462 holding.push(part)
463 self._current_line.push(str(holding))
464 holding.reset()
465 self._lines.append(str(self._current_line))
466 self._current_line.reset(splitpart)
467 else:
468 # Split after non-whitespace. The continuation
469 # whitespace comes from the instance variable.
470 holding.push(part + splitpart)
471 self._current_line.push(str(holding))
472 holding.reset()
473 self._lines.append(str(self._current_line))
474 if nextpart[0].isspace():
475 self._current_line.reset()
476 else:
477 self._current_line.reset(self._continuation_ws)
478 # Get the last of the holding part
479 self._current_line.push(str(holding))
480
481
482
483def _spliterator(character, string):
484 parts = list(reversed(re.split('(%s)' % character, string)))
485 while parts:
486 part = parts.pop()
487 splitparts = (parts.pop() if parts else None)
488 nextpart = (parts.pop() if parts else None)
489 yield (part, splitparts, nextpart)
490 if nextpart is not None:
491 parts.append(nextpart)
492
493
494class _Accumulator:
495 def __init__(self, initial_size=0, transformfunc=None):
496 self._initial_size = initial_size
497 if transformfunc is None:
498 self._transformfunc = lambda string: string
499 else:
500 self._transformfunc = transformfunc
501 self._current = []
502
503 def push(self, string):
504 self._current.append(string)
505
506 def pop(self):
507 return self._current.pop()
508
509 def __len__(self):
510 return len(str(self)) + self._initial_size
511
512 def __str__(self):
513 return self._transformfunc(EMPTYSTRING.join(self._current))
514
515 def reset(self, string=None):
516 self._current = []
517 self._current_len = 0
518 self._initial_size = 0
519 if string is not None:
520 self.push(string)