
We add the possibility to bind the destination UDP port of a Scapy SA to the ESP layer in the IPsec tunnel protection tests, even if it is not the default port for ESP (4500). This allows to test IPSec tunnel protection with ports other than 4500 in the UDP header, without hardcoding them in the Scapy patch (ex: 4545) Type: improvement Change-Id: I1eea3d4660ed1b59d827250a419af6b7b41c4a72 Signed-off-by: Arthur de Kerhor <arthurdekerhor@gmail.com>
203 lines
8.3 KiB
Diff
203 lines
8.3 KiB
Diff
diff --git a/scapy/layers/ipsec.py b/scapy/layers/ipsec.py
|
|
index ae057ee1..55d0dd53 100644
|
|
--- a/scapy/layers/ipsec.py
|
|
+++ b/scapy/layers/ipsec.py
|
|
@@ -56,6 +56,7 @@ from scapy.fields import ByteEnumField, ByteField, IntField, PacketField, \
|
|
ShortField, StrField, XIntField, XStrField, XStrLenField
|
|
from scapy.packet import Packet, bind_layers, Raw
|
|
from scapy.layers.inet import IP, UDP
|
|
+from scapy.contrib.mpls import MPLS
|
|
import scapy.modules.six as six
|
|
from scapy.modules.six.moves import range
|
|
from scapy.layers.inet6 import IPv6, IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \
|
|
@@ -359,11 +360,8 @@ class CryptAlgo(object):
|
|
encryptor = cipher.encryptor()
|
|
|
|
if self.is_aead:
|
|
- if esn_en:
|
|
- aad = struct.pack('!LLL', esp.spi, esn, esp.seq)
|
|
- else:
|
|
- aad = struct.pack('!LL', esp.spi, esp.seq)
|
|
- encryptor.authenticate_additional_data(aad)
|
|
+ encryptor.authenticate_additional_data(sa.build_aead(esp))
|
|
+
|
|
data = encryptor.update(data) + encryptor.finalize()
|
|
data += encryptor.tag[:self.icv_size]
|
|
else:
|
|
@@ -400,12 +398,7 @@ class CryptAlgo(object):
|
|
|
|
if self.is_aead:
|
|
# Tag value check is done during the finalize method
|
|
- if esn_en:
|
|
- decryptor.authenticate_additional_data(
|
|
- struct.pack('!LLL', esp.spi, esn, esp.seq))
|
|
- else:
|
|
- decryptor.authenticate_additional_data(
|
|
- struct.pack('!LL', esp.spi, esp.seq))
|
|
+ decryptor.authenticate_additional_data(sa.build_aead(esp))
|
|
try:
|
|
data = decryptor.update(data) + decryptor.finalize()
|
|
except InvalidTag as err:
|
|
@@ -445,6 +438,7 @@ if algorithms:
|
|
CRYPT_ALGOS['AES-CTR'] = CryptAlgo('AES-CTR',
|
|
cipher=algorithms.AES,
|
|
mode=modes.CTR,
|
|
+ block_size=1,
|
|
iv_size=8,
|
|
salt_size=4,
|
|
format_mode_iv=_aes_ctr_format_mode_iv)
|
|
@@ -452,6 +446,7 @@ if algorithms:
|
|
CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM',
|
|
cipher=algorithms.AES,
|
|
mode=modes.GCM,
|
|
+ block_size=1,
|
|
salt_size=4,
|
|
iv_size=8,
|
|
icv_size=16,
|
|
@@ -460,6 +455,7 @@ if algorithms:
|
|
CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM',
|
|
cipher=algorithms.AES,
|
|
mode=modes.CCM,
|
|
+ block_size=1,
|
|
iv_size=8,
|
|
salt_size=3,
|
|
icv_size=16,
|
|
@@ -544,7 +540,7 @@ class AuthAlgo(object):
|
|
else:
|
|
return self.mac(key, self.digestmod(), default_backend())
|
|
|
|
- def sign(self, pkt, key):
|
|
+ def sign(self, pkt, key, trailer=None):
|
|
"""
|
|
Sign an IPsec (ESP or AH) packet with this algo.
|
|
|
|
@@ -560,16 +556,20 @@ class AuthAlgo(object):
|
|
|
|
if pkt.haslayer(ESP):
|
|
mac.update(raw(pkt[ESP]))
|
|
+ if trailer:
|
|
+ mac.update(trailer)
|
|
pkt[ESP].data += mac.finalize()[:self.icv_size]
|
|
|
|
elif pkt.haslayer(AH):
|
|
clone = zero_mutable_fields(pkt.copy(), sending=True)
|
|
mac.update(raw(clone))
|
|
+ if trailer:
|
|
+ mac.update(trailer)
|
|
pkt[AH].icv = mac.finalize()[:self.icv_size]
|
|
|
|
return pkt
|
|
|
|
- def verify(self, pkt, key):
|
|
+ def verify(self, pkt, key, trailer):
|
|
"""
|
|
Check that the integrity check value (icv) of a packet is valid.
|
|
|
|
@@ -600,6 +600,8 @@ class AuthAlgo(object):
|
|
clone = zero_mutable_fields(pkt.copy(), sending=False)
|
|
|
|
mac.update(raw(clone))
|
|
+ if trailer:
|
|
+ mac.update(trailer) # bytearray(4)) #raw(trailer))
|
|
computed_icv = mac.finalize()[:self.icv_size]
|
|
|
|
# XXX: Cannot use mac.verify because the ICV can be truncated
|
|
@@ -788,7 +790,7 @@ class SecurityAssociation(object):
|
|
This class is responsible of "encryption" and "decryption" of IPsec packets. # noqa: E501
|
|
"""
|
|
|
|
- SUPPORTED_PROTOS = (IP, IPv6)
|
|
+ SUPPORTED_PROTOS = (IP, IPv6, MPLS)
|
|
|
|
def __init__(self, proto, spi, seq_num=1, crypt_algo=None, crypt_key=None,
|
|
auth_algo=None, auth_key=None, tunnel_header=None, nat_t_header=None, esn_en=False, esn=0): # noqa: E501
|
|
@@ -862,6 +864,23 @@ class SecurityAssociation(object):
|
|
raise TypeError('nat_t_header must be %s' % UDP.name)
|
|
self.nat_t_header = nat_t_header
|
|
|
|
+ def build_aead(self, esp):
|
|
+ if self.esn_en:
|
|
+ return (struct.pack('!LLL', esp.spi, self.seq_num >> 32, esp.seq))
|
|
+ else:
|
|
+ return (struct.pack('!LL', esp.spi, esp.seq))
|
|
+
|
|
+ def build_seq_num(self, num):
|
|
+ # only lower order bits are transmitted
|
|
+ # higher order bits are used in the ICV
|
|
+ lower = num & 0xffffffff
|
|
+ upper = num >> 32
|
|
+
|
|
+ if self.esn_en:
|
|
+ return lower, struct.pack("!I", upper)
|
|
+ else:
|
|
+ return lower, None
|
|
+
|
|
def check_spi(self, pkt):
|
|
if pkt.spi != self.spi:
|
|
raise TypeError('packet spi=0x%x does not match the SA spi=0x%x' %
|
|
@@ -875,7 +894,8 @@ class SecurityAssociation(object):
|
|
if len(iv) != self.crypt_algo.iv_size:
|
|
raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) # noqa: E501
|
|
|
|
- esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv)
|
|
+ low_seq_num, high_seq_num = self.build_seq_num(seq_num or self.seq_num)
|
|
+ esp = _ESPPlain(spi=self.spi, seq=low_seq_num, iv=iv)
|
|
|
|
if self.tunnel_header:
|
|
tunnel = self.tunnel_header.copy()
|
|
@@ -899,7 +919,7 @@ class SecurityAssociation(object):
|
|
esn_en=esn_en or self.esn_en,
|
|
esn=esn or self.esn)
|
|
|
|
- self.auth_algo.sign(esp, self.auth_key)
|
|
+ self.auth_algo.sign(esp, self.auth_key, high_seq_num)
|
|
|
|
if self.nat_t_header:
|
|
nat_t_header = self.nat_t_header.copy()
|
|
@@ -926,7 +946,8 @@ class SecurityAssociation(object):
|
|
|
|
def _encrypt_ah(self, pkt, seq_num=None):
|
|
|
|
- ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
|
|
+ low_seq_num, high_seq_num = self.build_seq_num(seq_num or self.seq_num)
|
|
+ ah = AH(spi=self.spi, seq=low_seq_num,
|
|
icv=b"\x00" * self.auth_algo.icv_size)
|
|
|
|
if self.tunnel_header:
|
|
@@ -966,7 +987,8 @@ class SecurityAssociation(object):
|
|
else:
|
|
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
|
|
|
|
- signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key) # noqa: E501
|
|
+ signed_pkt = self.auth_algo.sign(ip_header / ah / payload,
|
|
+ self.auth_key, high_seq_num) # noqa: E501
|
|
|
|
# sequence number must always change, unless specified by the user
|
|
if seq_num is None:
|
|
@@ -1003,11 +1025,12 @@ class SecurityAssociation(object):
|
|
|
|
def _decrypt_esp(self, pkt, verify=True, esn_en=None, esn=None):
|
|
|
|
+ low_seq_num, high_seq_num = self.build_seq_num(self.seq_num)
|
|
encrypted = pkt[ESP]
|
|
|
|
if verify:
|
|
self.check_spi(pkt)
|
|
- self.auth_algo.verify(encrypted, self.auth_key)
|
|
+ self.auth_algo.verify(encrypted, self.auth_key, high_seq_num)
|
|
|
|
esp = self.crypt_algo.decrypt(self, encrypted, self.crypt_key,
|
|
self.crypt_algo.icv_size or
|
|
@@ -1048,9 +1071,10 @@ class SecurityAssociation(object):
|
|
|
|
def _decrypt_ah(self, pkt, verify=True):
|
|
|
|
+ low_seq_num, high_seq_num = self.build_seq_num(self.seq_num)
|
|
if verify:
|
|
self.check_spi(pkt)
|
|
- self.auth_algo.verify(pkt, self.auth_key)
|
|
+ self.auth_algo.verify(pkt, self.auth_key, high_seq_num)
|
|
|
|
ah = pkt[AH]
|
|
payload = ah.payload
|