vpp/test/patches/scapy-2.4/ipsec.patch
Christian Hopps fb7e7ed2cd ipsec: fix padding/alignment for native IPsec encryption
Not all ESP crypto algorithms require padding/alignment to be the same
as AES block/IV size. CCM, CTR and GCM all have no padding/alignment
requirements, and the RFCs indicate that no padding (beyond ESPs 4 octet
alignment requirement) should be used unless TFC (traffic flow
confidentiality) has been requested.

  CTR: https://tools.ietf.org/html/rfc3686#section-3.2
  GCM: https://tools.ietf.org/html/rfc4106#section-3.2
  CCM: https://tools.ietf.org/html/rfc4309#section-3.2

- VPP is incorrectly using the IV/AES block size to pad CTR and GCM.
These modes do not require padding (beyond ESPs 4 octet requirement), as
a result packets will have unnecessary padding, which will waste
bandwidth at least and possibly fail certain network configurations that
have finely tuned MTU configurations at worst.

Fix this as well as changing the field names from ".*block_size" to
".*block_align" to better represent their actual (and only) use. Rename
"block_sz" in esp_encrypt to "esp_align" and set it correctly as well.

test: ipsec: Add unit-test to test for RFC correct padding/alignment

test: patch scapy to not incorrectly pad ccm, ctr, gcm modes as well

- Scapy is also incorrectly using the AES block size of 16 to pad CCM,
CTR, and GCM cipher modes. A bug report has been opened with the
and acknowledged with the upstream scapy project as well:

  https://github.com/secdev/scapy/issues/2322

Ticket: VPP-1928
Type: fix
Signed-off-by: Christian Hopps <chopps@labn.net>
Change-Id: Iaa4d6a325a2e99fdcb2c375a3395bcfe7947770e
2020-09-07 09:43:27 +00:00

216 lines
8.6 KiB
Diff

diff --git a/scapy/layers/ipsec.py b/scapy/layers/ipsec.py
index 69e7ae3b..d9c1705b 100644
--- a/scapy/layers/ipsec.py
+++ b/scapy/layers/ipsec.py
@@ -344,8 +344,7 @@ class CryptAlgo(object):
encryptor = cipher.encryptor()
if self.is_aead:
- aad = struct.pack('!LL', esp.spi, esp.seq)
- encryptor.authenticate_additional_data(aad)
+ encryptor.authenticate_additional_data(sa.build_aead(esp))
data = encryptor.update(data) + encryptor.finalize()
data += encryptor.tag[:self.icv_size]
else:
@@ -380,10 +379,7 @@ class CryptAlgo(object):
if self.is_aead:
# Tag value check is done during the finalize method
- decryptor.authenticate_additional_data(
- struct.pack('!LL', esp.spi, esp.seq)
- )
-
+ decryptor.authenticate_additional_data(sa.build_aead(esp))
try:
data = decryptor.update(data) + decryptor.finalize()
except InvalidTag as err:
@@ -422,6 +418,7 @@ if algorithms:
CRYPT_ALGOS['AES-CTR'] = CryptAlgo('AES-CTR',
cipher=algorithms.AES,
mode=modes.CTR,
+ block_size=1,
iv_size=8,
salt_size=4,
format_mode_iv=_aes_ctr_format_mode_iv)
@@ -429,6 +426,7 @@ if algorithms:
CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM',
cipher=algorithms.AES,
mode=modes.GCM,
+ block_size=1,
salt_size=4,
iv_size=8,
icv_size=16,
@@ -437,6 +435,7 @@ if algorithms:
CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM',
cipher=algorithms.AES,
mode=modes.CCM,
+ block_size=1,
iv_size=8,
salt_size=3,
icv_size=16,
@@ -518,12 +517,16 @@ class AuthAlgo(object):
else:
return self.mac(key, self.digestmod(), default_backend())
- def sign(self, pkt, key):
+ def sign(self, pkt, key, trailer=None):
"""
Sign an IPsec (ESP or AH) packet with this algo.
@param pkt: a packet that contains a valid encrypted ESP or AH layer
@param key: the authentication key, a byte string
+ @param trailer: additional data appended to the packet for ICV
+ calculation, but not trnasmitted with the packet.
+ For example, the high order bits of the exteneded
+ sequence number.
@return: the signed packet
"""
@@ -534,16 +537,20 @@ class AuthAlgo(object):
if pkt.haslayer(ESP):
mac.update(raw(pkt[ESP]))
+ if trailer:
+ mac.update(trailer)
pkt[ESP].data += mac.finalize()[:self.icv_size]
elif pkt.haslayer(AH):
clone = zero_mutable_fields(pkt.copy(), sending=True)
mac.update(raw(clone))
+ if trailer:
+ mac.update(trailer)
pkt[AH].icv = mac.finalize()[:self.icv_size]
return pkt
- def verify(self, pkt, key):
+ def verify(self, pkt, key, trailer):
"""
Check that the integrity check value (icv) of a packet is valid.
@@ -574,6 +581,8 @@ class AuthAlgo(object):
clone = zero_mutable_fields(pkt.copy(), sending=False)
mac.update(raw(clone))
+ if trailer:
+ mac.update(trailer) # bytearray(4)) #raw(trailer))
computed_icv = mac.finalize()[:self.icv_size]
# XXX: Cannot use mac.verify because the ICV can be truncated
@@ -757,7 +766,8 @@ class SecurityAssociation(object):
SUPPORTED_PROTOS = (IP, IPv6)
def __init__(self, proto, spi, seq_num=1, crypt_algo=None, crypt_key=None,
- auth_algo=None, auth_key=None, tunnel_header=None, nat_t_header=None):
+ auth_algo=None, auth_key=None, tunnel_header=None, nat_t_header=None,
+ use_esn=False):
"""
@param proto: the IPsec proto to use (ESP or AH)
@param spi: the Security Parameters Index of this SA
@@ -771,6 +781,7 @@ class SecurityAssociation(object):
to encapsulate the encrypted packets.
@param nat_t_header: an instance of a UDP header that will be used
for NAT-Traversal.
+ @param use_esn: Use Extended Sequence Numbers
"""
if proto not in (ESP, AH, ESP.name, AH.name):
@@ -782,6 +793,7 @@ class SecurityAssociation(object):
self.spi = spi
self.seq_num = seq_num
+ self.use_esn = use_esn
if crypt_algo:
if crypt_algo not in CRYPT_ALGOS:
@@ -827,6 +839,23 @@ class SecurityAssociation(object):
raise TypeError('packet spi=0x%x does not match the SA spi=0x%x' %
(pkt.spi, self.spi))
+ def build_aead(self, esp):
+ if self.use_esn:
+ return (struct.pack('!LLL', esp.spi, self.seq_num >> 32, esp.seq))
+ else:
+ return (struct.pack('!LL', esp.spi, esp.seq))
+
+ def build_seq_num(self, num):
+ # only lower order bits are transmitted
+ # higher order bits are used in the ICV
+ lower = num & 0xffffffff
+ upper = num >> 32
+
+ if self.use_esn:
+ return lower, struct.pack("!I", upper)
+ else:
+ return lower, None
+
def _encrypt_esp(self, pkt, seq_num=None, iv=None):
if iv is None:
@@ -835,7 +864,8 @@ class SecurityAssociation(object):
if len(iv) != self.crypt_algo.iv_size:
raise TypeError('iv length must be %s' % self.crypt_algo.iv_size)
- esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv)
+ low_seq_num, high_seq_num = self.build_seq_num(seq_num or self.seq_num)
+ esp = _ESPPlain(spi=self.spi, seq=low_seq_num, iv=iv)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
@@ -857,7 +887,7 @@ class SecurityAssociation(object):
esp = self.crypt_algo.pad(esp)
esp = self.crypt_algo.encrypt(self, esp, self.crypt_key)
- self.auth_algo.sign(esp, self.auth_key)
+ self.auth_algo.sign(esp, self.auth_key, high_seq_num)
if self.nat_t_header:
nat_t_header = self.nat_t_header.copy()
@@ -884,7 +914,8 @@ class SecurityAssociation(object):
def _encrypt_ah(self, pkt, seq_num=None):
- ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
+ low_seq_num, high_seq_num = self.build_seq_num(seq_num or self.seq_num)
+ ah = AH(spi=self.spi, seq=low_seq_num,
icv = b"\x00" * self.auth_algo.icv_size)
if self.tunnel_header:
@@ -924,7 +955,8 @@ class SecurityAssociation(object):
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
- signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
+ signed_pkt = self.auth_algo.sign(ip_header / ah / payload,
+ self.auth_key, high_seq_num)
# sequence number must always change, unless specified by the user
if seq_num is None:
@@ -955,11 +987,12 @@ class SecurityAssociation(object):
def _decrypt_esp(self, pkt, verify=True):
+ low_seq_num, high_seq_num = self.build_seq_num(self.seq_num)
encrypted = pkt[ESP]
if verify:
self.check_spi(pkt)
- self.auth_algo.verify(encrypted, self.auth_key)
+ self.auth_algo.verify(encrypted, self.auth_key, high_seq_num)
esp = self.crypt_algo.decrypt(self, encrypted, self.crypt_key,
self.crypt_algo.icv_size or
@@ -998,9 +1031,11 @@ class SecurityAssociation(object):
def _decrypt_ah(self, pkt, verify=True):
+ low_seq_num, high_seq_num = self.build_seq_num(self.seq_num)
+
if verify:
self.check_spi(pkt)
- self.auth_algo.verify(pkt, self.auth_key)
+ self.auth_algo.verify(pkt, self.auth_key, high_seq_num)
ah = pkt[AH]
payload = ah.payload