blob: 7289e456e43964901e059e445d8fd4db456c8eea [file] [log] [blame]
Gregory P. Smith1d499262009-05-01 19:59:52 +00001# Copyright 2007 Google Inc.
2# Licensed to PSF under a Contributor Agreement.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied. See the License for the specific language governing
14# permissions and limitations under the License.
15#
16# See also: http://code.google.com/p/ipaddr-py/
17
18"""An IPv4/IPv6 manipulation library in Python.
19
20This library is used to create/poke/manipulate IPv4 and IPv6 addresses
21and prefixes.
22
23"""
24
25__version__ = '1.0.2'
26
27import struct
28
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +000029
Gregory P. Smith1d499262009-05-01 19:59:52 +000030class Error(Exception):
31
32 """Base class for exceptions."""
33
34
35class IPTypeError(Error):
36
37 """Tried to perform a v4 action on v6 object or vice versa."""
38
39
40class IPAddressExclusionError(Error):
41
42 """An Error we should never see occurred in address exclusion."""
43
44
45class IPv4IpValidationError(Error):
46
47 """Raised when an IPv4 address is invalid."""
48
49 def __init__(self, ip):
50 Error.__init__(self)
51 self.ip = ip
52
53 def __str__(self):
54 return repr(self.ip) + ' is not a valid IPv4 address'
55
56
57class IPv4NetmaskValidationError(Error):
58
59 """Raised when a netmask is invalid."""
60
61 def __init__(self, netmask):
62 Error.__init__(self)
63 self.netmask = netmask
64
65 def __str__(self):
66 return repr(self.netmask) + ' is not a valid IPv4 netmask'
67
68
69class IPv6IpValidationError(Error):
70
71 """Raised when an IPv6 address is invalid."""
72
73 def __init__(self, ip):
74 Error.__init__(self)
75 self.ip = ip
76
77 def __str__(self):
78 return repr(self.ip) + ' is not a valid IPv6 address'
79
80
81class IPv6NetmaskValidationError(Error):
82
83 """Raised when an IPv6 netmask is invalid."""
84
85 def __init__(self, netmask):
86 Error.__init__(self)
87 self.netmask = netmask
88
89 def __str__(self):
90 return repr(self.netmask) + ' is not a valid IPv6 netmask'
91
92
93class PrefixlenDiffInvalidError(Error):
94
95 """Raised when Sub/Supernets is called with a bad prefixlen_diff."""
96
97 def __init__(self, error_str):
98 Error.__init__(self)
99 self.error_str = error_str
100
101
102def IP(ipaddr):
103 """Take an IP string/int and return an object of the correct type.
104
105 Args:
106 ipaddr: A string or integer, the IP address. Either IPv4 or
107 IPv6 addresses may be supplied; integers less than 2**32 will
108 be considered to be IPv4.
109
110 Returns:
111 An IPv4 or IPv6 object.
112
113 Raises:
114 ValueError: if the string passed isn't either a v4 or a v6
115 address.
116
117 """
118
119 try:
120 return IPv4(ipaddr)
121 except (IPv4IpValidationError, IPv4NetmaskValidationError):
122 pass
123
124 try:
125 return IPv6(ipaddr)
126 except (IPv6IpValidationError, IPv6NetmaskValidationError):
127 pass
128
129 raise ValueError('%r does not appear to be an IPv4 or IPv6 address' %
130 ipaddr)
131
132
133def _collapse_address_list_recursive(addresses):
134 """Loops through the addresses, collapsing concurrent netblocks.
135
136 Example:
137
138 ip1 = IPv4('1.1.0.0/24')
139 ip2 = IPv4('1.1.1.0/24')
140 ip3 = IPv4('1.1.2.0/24')
141 ip4 = IPv4('1.1.3.0/24')
142 ip5 = IPv4('1.1.4.0/24')
143 ip6 = IPv4('1.1.0.1/22')
144
145 _collapse_address_list_recursive([ip1, ip2, ip3, ip4, ip5, ip6]) ->
146 [IPv4('1.1.0.0/22'), IPv4('1.1.4.0/24')]
147
148 This shouldn't be called directly; it is called via
149 collapse_address_list([]).
150
151 Args:
152 addresses: A list of IPv4 or IPv6 objects.
153
154 Returns:
155 A list of IPv4 or IPv6 objects depending on what we were passed.
156
157 """
158 ret_array = []
159 optimized = False
160
161 for cur_addr in addresses:
162 if not ret_array:
163 ret_array.append(cur_addr)
164 continue
165 if cur_addr in ret_array[-1]:
166 optimized = True
167 elif cur_addr == ret_array[-1].supernet().subnet()[1]:
168 ret_array.append(ret_array.pop().supernet())
169 optimized = True
170 else:
171 ret_array.append(cur_addr)
172
173 if optimized:
174 return _collapse_address_list_recursive(ret_array)
175
176 return ret_array
177
178
179def collapse_address_list(addresses):
180 """Collapse a list of IP objects.
181
182 Example:
183 collapse_address_list([IPv4('1.1.0.0/24'), IPv4('1.1.1.0/24')]) ->
184 [IPv4('1.1.0.0/23')]
185
186 Args:
187 addresses: A list of IPv4 or IPv6 objects.
188
189 Returns:
190 A list of IPv4 or IPv6 objects depending on what we were passed.
191
192 """
193 return _collapse_address_list_recursive(
194 sorted(addresses, key=BaseIP._get_networks_key))
195
196
Gregory P. Smith1d499262009-05-01 19:59:52 +0000197class BaseIP(object):
198
199 """A generic IP object.
200
201 This IP class contains most of the methods which are used by
202 the IPv4 and IPv6 classes.
203
204 """
205
206 def __getitem__(self, n):
207 if n >= 0:
208 if self.network + n > self.broadcast:
209 raise IndexError
210 return self._string_from_ip_int(self.network + n)
211 else:
Gregory P. Smith5edb1a12009-05-08 23:16:47 +0000212 n += 1
Gregory P. Smith1d499262009-05-01 19:59:52 +0000213 if self.broadcast + n < self.network:
214 raise IndexError
215 return self._string_from_ip_int(self.broadcast + n)
216
217 def __lt__(self, other):
218 try:
219 return (self.version < other.version
220 or self.ip < other.ip
221 or self.netmask < other.netmask)
222 except AttributeError:
223 return NotImplemented
224
225 def __gt__(self, other):
226 try:
227 return (self.version > other.version
228 or self.ip > other.ip
229 or self.netmask > other.netmask)
230 except AttributeError:
231 return NotImplemented
232
233 def __eq__(self, other):
234 try:
235 return (self.version == other.version
236 and self.ip == other.ip
237 and self.netmask == other.netmask)
238 except AttributeError:
239 return NotImplemented
240
241 def __ne__(self, other):
242 eq = self.__eq__(other)
243 if eq is NotImplemented:
244 return NotImplemented
245 return not eq
246
247 def __le__(self, other):
248 gt = self.__gt__(other)
249 if gt is NotImplemented:
250 return NotImplemented
251 return not gt
252
253 def __ge__(self, other):
254 lt = self.__lt__(other)
255 if lt is NotImplemented:
256 return NotImplemented
257 return not lt
258
259 def __repr__(self):
260 return '%s(%r)' % (self.__class__.__name__, str(self))
261
262 def __index__(self):
263 return self.ip
264
265 def __int__(self):
266 return self.ip
267
268 def __hex__(self):
269 return hex(int(self))
270
271 def address_exclude(self, other):
272 """Remove an address from a larger block.
273
274 For example:
275
276 addr1 = IP('10.1.1.0/24')
277 addr2 = IP('10.1.1.0/26')
278 addr1.address_exclude(addr2) =
279 [IP('10.1.1.64/26'), IP('10.1.1.128/25')]
280
281 or IPv6:
282
283 addr1 = IP('::1/32')
284 addr2 = IP('::1/128')
285 addr1.address_exclude(addr2) = [IP('::0/128'),
286 IP('::2/127'),
287 IP('::4/126'),
288 IP('::8/125'),
289 ...
290 IP('0:0:8000::/33')]
291
292 Args:
293 other: An IP object of the same type.
294
295 Returns:
296 A sorted list of IP objects addresses which is self minus
297 other.
298
299 Raises:
300 IPTypeError: If self and other are of difffering address
301 versions.
302 IPAddressExclusionError: There was some unknown error in the
303 address exclusion process. This likely points to a bug
304 elsewhere in this code.
305 ValueError: If other is not completely contained by self.
306
307 """
308 if not self.version == other.version:
309 raise IPTypeError("%s and %s aren't of the same version" % (
310 str(self), str(other)))
311
312 if other not in self:
313 raise ValueError('%s not contained in %s' % (str(other),
314 str(self)))
315
316 ret_addrs = []
317
318 # Make sure we're comparing the network of other.
319 other = IP(other.network_ext + '/' + str(other.prefixlen))
320
321 s1, s2 = self.subnet()
322 while s1 != other and s2 != other:
323 if other in s1:
324 ret_addrs.append(s2)
325 s1, s2 = s1.subnet()
326 elif other in s2:
327 ret_addrs.append(s1)
328 s1, s2 = s2.subnet()
329 else:
330 # If we got here, there's a bug somewhere.
331 raise IPAddressExclusionError('Error performing exclusion: '
332 's1: %s s2: %s other: %s' %
333 (str(s1), str(s2), str(other)))
334 if s1 == other:
335 ret_addrs.append(s2)
336 elif s2 == other:
337 ret_addrs.append(s1)
338 else:
339 # If we got here, there's a bug somewhere.
340 raise IPAddressExclusionError('Error performing exclusion: '
341 's1: %s s2: %s other: %s' %
342 (str(s1), str(s2), str(other)))
343
344 return sorted(ret_addrs, key=BaseIP._get_networks_key)
345
346 def compare_networks(self, other):
347 """Compare two IP objects.
348
349 This is only concerned about the comparison of the integer
350 representation of the network addresses. This means that the
351 host bits aren't considered at all in this method. If you want
352 to compare host bits, you can easily enough do a
353 'HostA.ip < HostB.ip'
354
355 Args:
356 other: An IP object.
357
358 Returns:
359 If the IP versions of self and other are the same, returns:
360
361 -1 if self < other:
362 eg: IPv4('1.1.1.0/24') < IPv4('1.1.2.0/24')
363 IPv6('1080::200C:417A') < IPv6('1080::200B:417B')
364 0 if self == other
365 eg: IPv4('1.1.1.1/24') == IPv4('1.1.1.2/24')
366 IPv6('1080::200C:417A/96') == IPv6('1080::200C:417B/96')
367 1 if self > other
368 eg: IPv4('1.1.1.0/24') > IPv4('1.1.0.0/24')
369 IPv6('1080::1:200C:417A/112') >
370 IPv6('1080::0:200C:417A/112')
371
372 If the IP versions of self and other are different, returns:
373
374 -1 if self.version < other.version
375 eg: IPv4('10.0.0.1/24') < IPv6('::1/128')
376 1 if self.version > other.version
377 eg: IPv6('::1/128') > IPv4('255.255.255.0/24')
378
379 """
380 if self.version < other.version:
381 return -1
382 if self.version > other.version:
383 return 1
384 # self.version == other.version below here:
385 if self.network < other.network:
386 return -1
387 if self.network > other.network:
388 return 1
389 # self.network == other.network below here:
390 if self.netmask < other.netmask:
391 return -1
392 if self.netmask > other.netmask:
393 return 1
394 # self.network == other.network and self.netmask == other.netmask
395 return 0
396
397 def _get_networks_key(self):
398 """Network-only key function.
399
400 Returns an object that identifies this address' network and
401 netmask. This function is a suitable "key" argument for sorted()
402 and list.sort().
403
404 """
405 return (self.version, self.network, self.netmask)
406
407 prefixlen = property(
408 fget=lambda self: self._prefixlen,
409 fset=lambda self, prefixlen: self._set_prefix(prefixlen))
410
411 def __str__(self):
412 return '%s/%s' % (self._string_from_ip_int(self.ip),
413 str(self.prefixlen))
414
415 def __hash__(self):
416 return hash(self.ip ^ self.netmask)
417
418 def __contains__(self, other):
419 return self.network <= other.ip and self.broadcast >= other.broadcast
420
421 @property
422 def ip_ext(self):
423 """Dotted decimal or colon string version of the IP address."""
424 return self._string_from_ip_int(self.ip)
425
426 @property
427 def ip_ext_full(self):
428 """Canonical string version of the IP address."""
429 return self.ip_ext
430
431 @property
432 def broadcast(self):
433 """Integer representation of the broadcast address."""
434 return self.ip | self.hostmask
435
436 @property
437 def broadcast_ext(self):
438 """Dotted decimal or colon string version of the broadcast."""
439 return self._string_from_ip_int(self.broadcast)
440
441 @property
442 def hostmask(self):
443 """Integer representation of the hostmask."""
444 return self.netmask ^ self._ALL_ONES
445
446 @property
447 def hostmask_ext(self):
448 """Dotted decimal or colon string version of the hostmask."""
449 return self._string_from_ip_int(self.hostmask)
450
451 @property
452 def network(self):
453 """Integer representation of the network."""
454 return self.ip & self.netmask
455
456 @property
457 def network_ext(self):
458 """Dotted decimal or colon string version of the network."""
459 return self._string_from_ip_int(self.network)
460
461 @property
462 def netmask_ext(self):
463 """Dotted decimal or colon string version of the netmask."""
464 return self._string_from_ip_int(self.netmask)
465
466 @property
467 def numhosts(self):
468 """Number of hosts in the current subnet."""
469 return self.broadcast - self.network + 1
470
471 @property
472 def version(self):
473 raise NotImplementedError('BaseIP has no version')
474
475 def _ip_int_from_prefix(self, prefixlen=None):
476 """Turn the prefix length netmask into a int for comparison.
477
478 Args:
479 prefixlen: An integer, the prefix length.
480
481 Returns:
482 An integer.
483
484 """
485 if not prefixlen and prefixlen != 0:
486 prefixlen = self.prefixlen
487 return self._ALL_ONES ^ (self._ALL_ONES >> prefixlen)
488
489 def _prefix_from_ip_int(self, ip_int, mask=32):
490 """Return prefix length from the decimal netmask.
491
492 Args:
493 ip_int: An integer, the IP address.
494 mask: The netmask. Defaults to 32.
495
496 Returns:
497 An integer, the prefix length.
498
499 """
500 while mask:
501 if ip_int & 1 == 1:
502 break
503 ip_int >>= 1
504 mask -= 1
505
506 return mask
507
508 def _ip_string_from_prefix(self, prefixlen=None):
509 """Turn a prefix length into a dotted decimal string.
510
511 Args:
512 prefixlen: An integer, the netmask prefix length.
513
514 Returns:
515 A string, the dotted decimal netmask string.
516
517 """
518 if not prefixlen:
519 prefixlen = self.prefixlen
520 return self._string_from_ip_int(self._ip_int_from_prefix(prefixlen))
521
522
523class IPv4(BaseIP):
524
525 """This class represents and manipulates 32-bit IPv4 addresses.
526
527 Attributes: [examples for IPv4('1.2.3.4/27')]
528 .ip: 16909060
529 .ip_ext: '1.2.3.4'
530 .ip_ext_full: '1.2.3.4'
531 .network: 16909056L
532 .network_ext: '1.2.3.0'
533 .hostmask: 31L (0x1F)
534 .hostmask_ext: '0.0.0.31'
535 .broadcast: 16909087L (0x102031F)
536 .broadcast_ext: '1.2.3.31'
537 .netmask: 4294967040L (0xFFFFFFE0)
538 .netmask_ext: '255.255.255.224'
539 .prefixlen: 27
540
541 """
542
543 # Equivalent to 255.255.255.255 or 32 bits of 1's.
544 _ALL_ONES = 0xffffffff
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +0000545 _version = 4
Gregory P. Smith1d499262009-05-01 19:59:52 +0000546
547 def __init__(self, ipaddr):
548 """Instantiate a new IPv4 object.
549
550 Args:
551 ipaddr: A string or integer representing the IP [& network].
552 '192.168.1.1/32'
553 '192.168.1.1/255.255.255.255'
554 '192.168.1.1/0.0.0.255'
555 '192.168.1.1'
556 are all functionally the same in IPv4. That is to say,
557 failing to provide a subnetmask will create an object with
558 a mask of /32. A netmask of '255.255.255.255' is assumed
559 to be /32 and '0.0.0.0' is assumed to be /0, even though
560 other netmasks can be expressed both as host- and
561 net-masks. (255.0.0.0 == 0.255.255.255)
562
563 Additionally, an integer can be passed, so
564 IPv4('192.168.1.1') == IPv4(3232235777).
565 or, more generally
566 IPv4(IPv4('192.168.1.1').ip) == IPv4('192.168.1.1')
567
568 Raises:
569 IPv4IpValidationError: If ipaddr isn't a valid IPv4 address.
570 IPv4NetmaskValidationError: If the netmask isn't valid for
571 an IPv4 address.
572
573 """
574 BaseIP.__init__(self)
Gregory P. Smith1d499262009-05-01 19:59:52 +0000575
576 # Efficient constructor from integer.
577 if isinstance(ipaddr, int) or isinstance(ipaddr, long):
578 self.ip = ipaddr
579 self._prefixlen = 32
580 self.netmask = self._ALL_ONES
581 if ipaddr < 0 or ipaddr > self._ALL_ONES:
582 raise IPv4IpValidationError(ipaddr)
583 return
584
Gregory P. Smith1d499262009-05-01 19:59:52 +0000585 # Assume input argument to be string or any object representation
586 # which converts into a formatted IP prefix string.
587 addr = str(ipaddr).split('/')
588
589 if len(addr) > 2:
590 raise IPv4IpValidationError(ipaddr)
591
592 if not self._is_valid_ip(addr[0]):
593 raise IPv4IpValidationError(addr[0])
594
595 self.ip = self._ip_int_from_string(addr[0])
596
597 if len(addr) == 2:
598 mask = addr[1].split('.')
599 if len(mask) == 4:
600 # We have dotted decimal netmask.
601 if not self._is_valid_netmask(addr[1]):
602 raise IPv4NetmaskValidationError(addr[1])
603 if self._is_hostmask(addr[1]):
604 self.netmask = (
605 self._ip_int_from_string(addr[1]) ^ self._ALL_ONES)
606 else:
607 self.netmask = self._ip_int_from_string(addr[1])
608 self._prefixlen = self._prefix_from_ip_int(self.netmask)
609 else:
610 # We have a netmask in prefix length form.
611 if not self._is_valid_netmask(addr[1]):
612 raise IPv4NetmaskValidationError(addr[1])
613 self._prefixlen = int(addr[1])
614 self.netmask = self._ip_int_from_prefix(self._prefixlen)
615 else:
616 self._prefixlen = 32
617 self.netmask = self._ip_int_from_prefix(self._prefixlen)
618
619 def _set_prefix(self, prefixlen):
620 """Change the prefix length.
621
622 Args:
623 prefixlen: An integer, the new prefix length.
624
625 Raises:
626 IPv4NetmaskValidationError: If prefixlen is out of bounds.
627
628 """
629 if not 0 <= prefixlen <= 32:
630 raise IPv4NetmaskValidationError(prefixlen)
631 self._prefixlen = prefixlen
632 self.netmask = self._ip_int_from_prefix(self._prefixlen)
633
634 def subnet(self, prefixlen_diff=1):
635 """The subnets which join to make the current subnet.
636
637 In the case that self contains only one IP
638 (self._prefixlen == 32), return a list with just ourself.
639
640 Args:
641 prefixlen_diff: An integer, the amount the prefix length
642 should be increased by. Given a /24 network and a
643 prefixlen_diff of 3, for example, 8 subnets of size /27
644 will be returned. The default value of 1 splits the
645 current network into two halves.
646
647 Returns:
648 A list of IPv4 objects.
649
650 Raises:
651 PrefixlenDiffInvalidError: The prefixlen_diff is too small
652 or too large.
653
654 """
655 if self._prefixlen == 32:
656 return [self]
657
658 if prefixlen_diff < 0:
659 raise PrefixlenDiffInvalidError('prefix length diff must be > 0')
660 new_prefixlen = self.prefixlen + prefixlen_diff
661
662 if not self._is_valid_netmask(str(new_prefixlen)):
663 raise PrefixlenDiffInvalidError(
664 'prefix length diff %d is invalid for netblock %s' % (
665 new_prefixlen, str(self)))
666
667 first = IPv4(
668 self._string_from_ip_int(self.network) + '/' +
669 str(self._prefixlen + prefixlen_diff))
670 subnets = [first]
671 current = first
672 while True:
673 broadcast = current.broadcast
674 if broadcast == self.broadcast:
675 break
676 current = IPv4(self._string_from_ip_int(broadcast + 1) + '/' +
677 str(new_prefixlen))
678 subnets.append(current)
679
680 return subnets
681
682 def supernet(self, prefixlen_diff=1):
683 """The supernet containing the current network.
684
685 Args:
686 prefixlen_diff: An integer, the amount the prefix length of
687 the network should be decreased by. For example, given a
688 /24 network and a prefixlen_diff of 3, a supernet with a
689 /21 netmask is returned.
690
691 Returns:
692 An IPv4 object.
693
694 Raises:
695 PrefixlenDiffInvalidError: If
696 self.prefixlen - prefixlen_diff < 0. I.e., you have a
697 negative prefix length.
698
699 """
700 if self.prefixlen == 0:
701 return self
702 if self.prefixlen - prefixlen_diff < 0:
703 raise PrefixlenDiffInvalidError(
704 'current prefixlen is %d, cannot have a prefixlen_diff of %d' %
705 (self.prefixlen, prefixlen_diff))
706 return IPv4(self.ip_ext + '/' + str(self.prefixlen - prefixlen_diff))
707
708 @property
709 def is_private(self):
710 """Test if this address is allocated for private networks.
711
712 Returns:
713 A boolean, True if the address is reserved per RFC 1918.
714
715 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +0000716 for network in _IPV4_RFC1918_NETWORKS:
717 if self in network:
718 return True
719 return False
Gregory P. Smith1d499262009-05-01 19:59:52 +0000720
721 @property
722 def is_multicast(self):
723 """Test if the address is reserved for multicast use.
724
725 Returns:
726 A boolean, True if the address is multicast.
727 See RFC 3171 for details.
728
729 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +0000730 return self in _IPV4_RFC3171_MULTICAST
Gregory P. Smith1d499262009-05-01 19:59:52 +0000731
732 @property
733 def is_loopback(self):
734 """Test if the address is a loopback adddress.
735
736 Returns:
737 A boolean, True if the address is a loopback per RFC 3330.
738
739 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +0000740 return self in _IPV4_RFC3330_LOOPBACK
Gregory P. Smith1d499262009-05-01 19:59:52 +0000741
742 @property
743 def is_link_local(self):
744 """Test if the address is reserved for link-local.
745
746 Returns:
747 A boolean, True if the address is link-local per RFC 3927.
748
749 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +0000750 return self in _IPV4_RFC3927_LINK_LOCAL
Gregory P. Smith1d499262009-05-01 19:59:52 +0000751
752 @property
753 def version(self):
754 return self._version
755
756 @property
757 def packed(self):
758 """The binary representation of this address."""
759 return struct.pack('!I', self.ip)
760
761 def _is_hostmask(self, ip_str):
762 """Test if the IP string is a hostmask (rather than a netmask).
763
764 Args:
765 ip_str: A string, the potential hostmask.
766
767 Returns:
768 A boolean, True if the IP string is a hostmask.
769
770 """
771 parts = [int(x) for x in ip_str.split('.')]
772 if parts[0] < parts[-1]:
773 return True
774 return False
775
776 def _ip_int_from_string(self, ip_str):
777 """Turn the given IP string into an integer for comparison.
778
779 Args:
780 ip_str: A string, the IP address.
781
782 Returns:
783 The IP address as an integer.
784
785 """
786 packed_ip = 0
787 for oc in ip_str.split('.'):
788 packed_ip = (packed_ip << 8) | int(oc)
789 return packed_ip
790
791 def _string_from_ip_int(self, ip_int):
792 """Turns a 32-bit integer into dotted decimal notation.
793
794 Args:
795 ip_int: An integer, the IP address.
796
797 Returns:
798 The IP address as a string in dotted decimal notation.
799
800 """
801 octets = []
802 for _ in xrange(4):
803 octets.insert(0, str(ip_int & 0xFF))
804 ip_int >>= 8
805 return '.'.join(octets)
806
807 def _is_valid_ip(self, ip_str):
808 """Validate the dotted decimal notation IP/netmask string.
809
810 Args:
811 ip_str: A string, the IP address.
812
813 Returns:
814 A boolean, True if the string is a valid dotted decimal IP
815 string.
816
817 """
818 octets = ip_str.split('.')
819 if len(octets) == 1:
820 # We have an integer rather than a dotted decimal IP.
821 try:
822 return int(ip_str) >= 0 and int(ip_str) <= self._ALL_ONES
823 except ValueError:
824 return False
825
826 if len(octets) != 4:
827 return False
828
829 for octet in octets:
830 try:
831 if not 0 <= int(octet) <= 255:
832 return False
833 except ValueError:
834 return False
835 return True
836
837 def _is_valid_netmask(self, netmask):
838 """Verify that the netmask is valid.
839
840 Args:
841 netmask: A string, either a prefix or dotted decimal
842 netmask.
843
844 Returns:
845 A boolean, True if the prefix represents a valid IPv4
846 netmask.
847
848 """
849 if len(netmask.split('.')) == 4:
850 return self._is_valid_ip(netmask)
851 try:
852 netmask = int(netmask)
853 except ValueError:
854 return False
855 return 0 <= netmask <= 32
856
857
858class IPv6(BaseIP):
859
860 """This class respresents and manipulates 128-bit IPv6 addresses.
861
862 Attributes: [examples for IPv6('2001:658:22A:CAFE:200::1/64')]
863 .ip: 42540616829182469433547762482097946625L
864 .ip_ext: '2001:658:22a:cafe:200::1'
865 .ip_ext_full: '2001:0658:022a:cafe:0200:0000:0000:0001'
866 .network: 42540616829182469433403647294022090752L
867 .network_ext: '2001:658:22a:cafe::'
868 .hostmask: 18446744073709551615L
869 .hostmask_ext: '::ffff:ffff:ffff:ffff'
870 .broadcast: 42540616829182469451850391367731642367L
871 .broadcast_ext: '2001:658:22a:cafe:ffff:ffff:ffff:ffff'
872 .netmask: 340282366920938463444927863358058659840L
873 .netmask_ext: 64
874 .prefixlen: 64
875
876 """
877
878 _ALL_ONES = (2**128) - 1
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +0000879 _version = 6
Gregory P. Smith1d499262009-05-01 19:59:52 +0000880
881 def __init__(self, ipaddr):
882 """Instantiate a new IPv6 object.
883
884 Args:
885 ipaddr: A string or integer representing the IP or the IP
886 and prefix/netmask.
887 '2001:4860::/128'
888 '2001:4860:0000:0000:0000:0000:0000:0000/128'
889 '2001:4860::'
890 are all functionally the same in IPv6. That is to say,
891 failing to provide a subnetmask will create an object with
892 a mask of /128.
893
894 Additionally, an integer can be passed, so
895 IPv6('2001:4860::') ==
896 IPv6(42541956101370907050197289607612071936L).
897 or, more generally
898 IPv6(IPv6('2001:4860::').ip) == IPv6('2001:4860::')
899
900 Raises:
901 IPv6IpValidationError: If ipaddr isn't a valid IPv6 address.
902 IPv6NetmaskValidationError: If the netmask isn't valid for
903 an IPv6 address.
904
905 """
906 BaseIP.__init__(self)
Gregory P. Smith1d499262009-05-01 19:59:52 +0000907
908 # Efficient constructor from integer.
909 if isinstance(ipaddr, long) or isinstance(ipaddr, int):
910 self.ip = ipaddr
911 self._prefixlen = 128
912 self.netmask = self._ALL_ONES
913 if ipaddr < 0 or ipaddr > self._ALL_ONES:
914 raise IPv6IpValidationError(ipaddr)
915 return
916
Gregory P. Smith1d499262009-05-01 19:59:52 +0000917 # Assume input argument to be string or any object representation
918 # which converts into a formatted IP prefix string.
919 addr_str = str(ipaddr)
920 if not addr_str:
921 raise IPv6IpValidationError('')
922 addr = addr_str.split('/')
923 if len(addr) > 1:
924 if self._is_valid_netmask(addr[1]):
925 self._prefixlen = int(addr[1])
926 else:
927 raise IPv6NetmaskValidationError(addr[1])
928 else:
929 self._prefixlen = 128
930
931 self.netmask = self._ip_int_from_prefix(self._prefixlen)
932
933 if not self._is_valid_ip(addr[0]):
934 raise IPv6IpValidationError(addr[0])
935
936 self.ip = self._ip_int_from_string(addr[0])
937
938 @property
939 def ip_ext_full(self):
940 """Returns the expanded version of the IPv6 string."""
941 return self._explode_shorthand_ip_string(self.ip_ext)
942
943 def _set_prefix(self, prefixlen):
944 """Change the prefix length.
945
946 Args:
947 prefixlen: An integer, the new prefix length.
948
949 Raises:
950 IPv6NetmaskValidationError: If prefixlen is out of bounds.
951
952 """
953 if not 0 <= prefixlen <= 128:
954 raise IPv6NetmaskValidationError(prefixlen)
955 self._prefixlen = prefixlen
956 self.netmask = self._ip_int_from_prefix(self.prefixlen)
957
958 def subnet(self, prefixlen_diff=1):
959 """The subnets which join to make the current subnet.
960
961 In the case that self contains only one IP
962 (self._prefixlen == 128), return a list with just ourself.
963
964 Args:
965 prefixlen_diff: An integer, the amount the prefix length
966 should be increased by.
967
968 Returns:
969 A list of IPv6 objects.
970
971 Raises:
972 PrefixlenDiffInvalidError: The prefixlen_diff is too small
973 or too large.
974
975 """
976 # Preserve original functionality (return [self] if
977 # self.prefixlen == 128).
978 if self.prefixlen == 128:
979 return [self]
980
981 if prefixlen_diff < 0:
982 raise PrefixlenDiffInvalidError('Prefix length diff must be > 0')
983 new_prefixlen = self.prefixlen + prefixlen_diff
984 if not self._is_valid_netmask(str(new_prefixlen)):
985 raise PrefixlenDiffInvalidError(
986 'Prefix length diff %d is invalid for netblock %s' % (
987 new_prefixlen, str(self)))
988 first = IPv6(
989 self._string_from_ip_int(self.network) + '/' +
990 str(self._prefixlen + prefixlen_diff))
991 subnets = [first]
992 current = first
993 while True:
994 broadcast = current.broadcast
995 if current.broadcast == self.broadcast:
996 break
997 current = IPv6(self._string_from_ip_int(broadcast + 1) + '/' +
998 str(new_prefixlen))
999 subnets.append(current)
1000
1001 return subnets
1002
1003 def supernet(self, prefixlen_diff=1):
1004 """The supernet containing the current network.
1005
1006 Args:
1007 prefixlen_diff: An integer, the amount the prefix length of the
1008 network should be decreased by. For example, given a /96
1009 network and a prefixlen_diff of 3, a supernet with a /93
1010 netmask is returned.
1011
1012 Returns:
1013 An IPv6 object.
1014
1015 Raises:
1016 PrefixlenDiffInvalidError: If
1017 self._prefixlen - prefixlen_diff < 0. I.e., you have a
1018 negative prefix length.
1019
1020 """
1021 if self.prefixlen == 0:
1022 return self
1023 if self.prefixlen - prefixlen_diff < 0:
1024 raise PrefixlenDiffInvalidError(
1025 'current prefixlen is %d, cannot have a prefixlen_diff of %d' %
1026 (self.prefixlen, prefixlen_diff))
1027 return IPv6(self.ip_ext + '/' + str(self.prefixlen - prefixlen_diff))
1028
1029 @property
1030 def is_multicast(self):
1031 """Test if the address is reserved for multicast use.
1032
1033 Returns:
1034 A boolean, True if the address is a multicast address.
1035 See RFC 2373 2.7 for details.
1036
1037 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +00001038 return self in _IPV6_RFC2373_MULTICAST
Gregory P. Smith1d499262009-05-01 19:59:52 +00001039
1040 @property
1041 def is_unspecified(self):
1042 """Test if the address is unspecified.
1043
1044 Returns:
1045 A boolean, True if this is the unspecified address as defined in
1046 RFC 2373 2.5.2.
1047
1048 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +00001049 return self == _IPV6_RFC2373_UNSPECIFIED
Gregory P. Smith1d499262009-05-01 19:59:52 +00001050
1051 @property
1052 def is_loopback(self):
1053 """Test if the address is a loopback adddress.
1054
1055 Returns:
1056 A boolean, True if the address is a loopback address as defined in
1057 RFC 2373 2.5.3.
1058
1059 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +00001060 return self == _IPV6_RFC2373_LOOPBACK
Gregory P. Smith1d499262009-05-01 19:59:52 +00001061
1062 @property
1063 def is_link_local(self):
1064 """Test if the address is reserved for link-local.
1065
1066 Returns:
1067 A boolean, True if the address is reserved per RFC 4291.
1068
1069 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +00001070 return self in _IPV6_RFC4291_LINK_LOCAL
Gregory P. Smith1d499262009-05-01 19:59:52 +00001071
1072 @property
1073 def is_site_local(self):
1074 """Test if the address is reserved for site-local.
1075
1076 Note that the site-local address space has been deprecated by RFC 3879.
1077 Use is_private to test if this address is in the space of unique local
1078 addresses as defined by RFC 4193.
1079
1080 Returns:
1081 A boolean, True if the address is reserved per RFC 3513 2.5.6.
1082
1083 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +00001084 return self in _IPV6_RFC3513_SITE_LOCAL
Gregory P. Smith1d499262009-05-01 19:59:52 +00001085
1086 @property
1087 def is_private(self):
1088 """Test if this address is allocated for private networks.
1089
1090 Returns:
1091 A boolean, True if the address is reserved per RFC 4193.
1092
1093 """
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +00001094 return self in _IPV6_RFC4193_PRIVATE
Gregory P. Smith1d499262009-05-01 19:59:52 +00001095
1096 @property
1097 def version(self):
1098 return self._version
1099
1100 @property
1101 def packed(self):
1102 """The binary representation of this address."""
1103 return struct.pack('!QQ', self.ip >> 64, self.ip & (2**64 - 1))
1104
1105 def _is_shorthand_ip(self, ip_str=None):
1106 """Determine if the address is shortened.
1107
1108 Args:
1109 ip_str: A string, the IPv6 address.
1110
1111 Returns:
1112 A boolean, True if the address is shortened.
1113
1114 """
1115 if ip_str.count('::') == 1:
1116 return True
1117 return False
1118
1119 def _explode_shorthand_ip_string(self, ip_str):
1120 """Expand a shortened IPv6 address.
1121
1122 Args:
1123 ip_str: A string, the IPv6 address.
1124
1125 Returns:
1126 A string, the expanded IPv6 address.
1127
1128 """
1129 if self._is_shorthand_ip(ip_str):
1130 new_ip = []
1131 hextet = ip_str.split('::')
1132 sep = len(hextet[0].split(':')) + len(hextet[1].split(':'))
1133 new_ip = hextet[0].split(':')
1134
1135 for _ in xrange(8 - sep):
1136 new_ip.append('0000')
1137 new_ip += hextet[1].split(':')
1138
1139 # Now need to make sure every hextet is 4 lower case characters.
1140 # If a hextet is < 4 characters, we've got missing leading 0's.
1141 ret_ip = []
1142 for hextet in new_ip:
1143 ret_ip.append(('0' * (4 - len(hextet)) + hextet).lower())
1144 return ':'.join(ret_ip)
1145 # We've already got a longhand ip_str.
1146 return ip_str
1147
1148 def _is_valid_ip(self, ip_str=None):
1149 """Ensure we have a valid IPv6 address.
1150
1151 Probably not as exhaustive as it should be.
1152
1153 Args:
1154 ip_str: A string, the IPv6 address.
1155
1156 Returns:
1157 A boolean, True if this is a valid IPv6 address.
1158
1159 """
1160 if not ip_str:
1161 ip_str = self.ip_ext
1162
1163 # We need to have at least one ':'.
1164 if ':' not in ip_str:
1165 return False
1166
1167 # We can only have one '::' shortener.
1168 if ip_str.count('::') > 1:
1169 return False
1170
1171 # '::' should be encompassed by start, digits or end.
1172 if ':::' in ip_str:
1173 return False
1174
1175 # A single colon can neither start nor end an address.
1176 if ((ip_str.startswith(':') and not ip_str.startswith('::')) or
1177 (ip_str.endswith(':') and not ip_str.endswith('::'))):
1178 return False
1179
1180 # If we have no concatenation, we need to have 8 fields with 7 ':'.
1181 if '::' not in ip_str and ip_str.count(':') != 7:
1182 # We might have an IPv4 mapped address.
1183 if ip_str.count('.') != 3:
1184 return False
1185
1186 ip_str = self._explode_shorthand_ip_string(ip_str)
1187
1188 # Now that we have that all squared away, let's check that each of the
1189 # hextets are between 0x0 and 0xFFFF.
1190 for hextet in ip_str.split(':'):
1191 if hextet.count('.') == 3:
1192 # If we have an IPv4 mapped address, the IPv4 portion has to be
1193 # at the end of the IPv6 portion.
1194 if not ip_str.split(':')[-1] == hextet:
1195 return False
1196 try:
1197 IPv4(hextet)
1198 except IPv4IpValidationError:
1199 return False
1200 elif int(hextet, 16) < 0x0 or int(hextet, 16) > 0xFFFF:
1201 return False
1202 return True
1203
1204 def _is_valid_netmask(self, prefixlen):
1205 """Verify that the netmask/prefixlen is valid.
1206
1207 Args:
1208 prefixlen: A string, the netmask in prefix length format.
1209
1210 Returns:
1211 A boolean, True if the prefix represents a valid IPv6
1212 netmask.
1213
1214 """
1215 try:
1216 prefixlen = int(prefixlen)
1217 except ValueError:
1218 return False
1219 return 0 <= prefixlen <= 128
1220
1221 def _ip_int_from_string(self, ip_str=None):
1222 """Turn an IPv6 address into an integer.
1223
1224 Args:
1225 ip_str: A string, the IPv6 address.
1226
1227 Returns:
1228 A long, the IPv6 address.
1229
1230 """
1231 if not ip_str:
1232 ip_str = self.ip_ext
1233
1234 ip_int = 0
1235
1236 fields = self._explode_shorthand_ip_string(ip_str).split(':')
1237
1238 # Do we have an IPv4 mapped (::ffff:a.b.c.d) or compact (::a.b.c.d)
1239 # address?
1240 if fields[-1].count('.') == 3:
1241 ipv4_string = fields.pop()
1242 ipv4_int = IPv4(ipv4_string).ip
1243 octets = []
1244 for _ in xrange(2):
1245 octets.append(hex(ipv4_int & 0xFFFF).lstrip('0x').rstrip('L'))
1246 ipv4_int >>= 16
1247 fields.extend(reversed(octets))
1248
1249 for field in fields:
1250 ip_int = (ip_int << 16) + int(field, 16)
1251
1252 return ip_int
1253
1254 def _compress_hextets(self, hextets):
1255 """Compresses a list of hextets.
1256
1257 Compresses a list of strings, replacing the longest continuous
1258 sequence of "0" in the list with "" and adding empty strings at
1259 the beginning or at the end of the string such that subsequently
1260 calling ":".join(hextets) will produce the compressed version of
1261 the IPv6 address.
1262
1263 Args:
1264 hextets: A list of strings, the hextets to compress.
1265
1266 Returns:
1267 A list of strings.
1268
1269 """
1270 best_doublecolon_start = -1
1271 best_doublecolon_len = 0
1272 doublecolon_start = -1
1273 doublecolon_len = 0
1274 for index in range(len(hextets)):
1275 if hextets[index] == '0':
1276 doublecolon_len += 1
1277 if doublecolon_start == -1:
1278 # Start of a sequence of zeros.
1279 doublecolon_start = index
1280 if doublecolon_len > best_doublecolon_len:
1281 # This is the longest sequence of zeros so far.
1282 best_doublecolon_len = doublecolon_len
1283 best_doublecolon_start = doublecolon_start
1284 else:
1285 doublecolon_len = 0
1286 doublecolon_start = -1
1287
1288 if best_doublecolon_len > 1:
1289 best_doublecolon_end = (best_doublecolon_start +
1290 best_doublecolon_len)
1291 # For zeros at the end of the address.
1292 if best_doublecolon_end == len(hextets):
1293 hextets += ['']
1294 hextets[best_doublecolon_start:best_doublecolon_end] = ['']
1295 # For zeros at the beginning of the address.
1296 if best_doublecolon_start == 0:
1297 hextets = [''] + hextets
1298
1299 return hextets
1300
1301 def _string_from_ip_int(self, ip_int=None):
1302 """Turns a 128-bit integer into hexadecimal notation.
1303
1304 Args:
1305 ip_int: An integer, the IP address.
1306
1307 Returns:
1308 A string, the hexadecimal representation of the address.
1309
1310 Raises:
1311 ValueError: The address is bigger than 128 bits of all ones.
1312
1313 """
1314 if not ip_int and ip_int != 0:
1315 ip_int = self.ip
1316
1317 if ip_int > self._ALL_ONES:
1318 raise ValueError('IPv6 address is too large')
1319
1320 hex_str = '%032x' % ip_int
1321 hextets = []
1322 for x in range(0, 32, 4):
1323 hextets.append('%x' % int(hex_str[x:x+4], 16))
1324
1325 hextets = self._compress_hextets(hextets)
1326 return ':'.join(hextets)
1327
1328 @property
1329 def netmask_ext(self):
1330 """IPv6 extended netmask.
1331
1332 We don't deal with netmasks in IPv6 like we do in IPv4. This is
1333 here strictly for IPv4 compatibility. We simply return the
1334 prefix length.
1335
1336 Returns:
1337 An integer.
1338
1339 """
1340 return self.prefixlen
Gregory P. Smith9ac3ee62009-05-03 19:37:05 +00001341
1342
1343# IPv4 constants.
1344_IPV4_RFC1918_NETWORKS = (IPv4('10.0.0.0/8'),
1345 IPv4('172.16.0.0/12'),
1346 IPv4('192.168.0.0/16'))
1347_IPV4_RFC3171_MULTICAST = IPv4('224.0.0.0/4')
1348_IPV4_RFC3330_LOOPBACK = IPv4('127.0.0.0/8')
1349_IPV4_RFC3927_LINK_LOCAL = IPv4('169.254.0.0/16')
1350
1351# IPv6 constants.
1352_IPV6_RFC2373_MULTICAST = IPv6('ff00::/8')
1353_IPV6_RFC2373_UNSPECIFIED = IPv6('::')
1354_IPV6_RFC2373_LOOPBACK = IPv6('::1')
1355_IPV6_RFC4291_LINK_LOCAL = IPv6('fe80::/10')
1356_IPV6_RFC3513_SITE_LOCAL = IPv6('fec0::/10') # Deprecated by RFC3879.
1357_IPV6_RFC4193_PRIVATE = IPv6('fc00::/7')