vpp/test/test_punt.py

310 lines
9.6 KiB
Python
Raw Normal View History

#!/usr/bin/env python
import binascii
import random
import socket
import unittest
import os
import scapy.layers.inet6 as inet6
from util import ppp, ppc
from re import compile
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
from framework import VppTestCase, VppTestRunner
class TestPuntSocket(VppTestCase):
""" Punt Socket """
tempdir = ""
sock = None
err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
@classmethod
def setUpConstants(cls):
tempdir = cls.tempdir
cls.extra_vpp_punt_config = [
"punt", "{", "socket", cls.tempdir+"/socket_punt", "}"]
super(TestPuntSocket, cls).setUpConstants()
def process_cli(self, exp, ptr):
for line in self.vapi.cli(exp).split('\n')[1:]:
m = ptr.match(line.strip())
if m:
yield m.groups()
def show_errors(self):
for pack in self.process_cli("show errors", self.err_ptr):
try:
count, node, reason = pack
except ValueError:
pass
else:
yield count, node, reason
def get_punt_count(self, counter):
errors = list(self.show_errors())
for count, node, reason in errors:
if (node == counter and
reason == u'Socket TX'):
return int(count)
return 0
def socket_client_create(self, sock_name):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
os.unlink(sock_name)
except:
self.logger.debug("Unlink socket faild")
self.sock.bind(sock_name)
def socket_client_close(self):
self.sock.close()
class TestIP4PuntSocket(TestPuntSocket):
""" Punt Socket for IPv4 """
def setUp(self):
super(TestIP4PuntSocket, self).setUp()
self.create_pg_interfaces(range(2))
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
def tearDown(self):
super(TestIP4PuntSocket, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.admin_down()
def test_punt_socket_dump(self):
""" Punt socket registration"""
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 0)
#
# configure a punt socket
#
self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222")
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 2)
self.assertEqual(punts[0].punt.l4_port, 1111)
# self.assertEqual(punts[0].pathname, "/tmp/punt_socket_udp_1234")
self.assertEqual(punts[1].punt.l4_port, 2222)
# self.assertEqual(punts[1].pathname, "/tmp/punt_socket_udp_5678")
#
# deregister a punt socket
#
self.vapi.punt_socket_deregister(1111)
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 1)
#
# configure a punt socket again
#
self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
self.vapi.punt_socket_register(3333, self.tempdir+"/socket_punt_3333")
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 3)
#
# deregister all punt socket
#
self.vapi.punt_socket_deregister(1111)
self.vapi.punt_socket_deregister(2222)
self.vapi.punt_socket_deregister(3333)
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 0)
def test_punt_socket_traffic(self):
""" Punt socket traffic"""
nr_packets = 8
p = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
UDP(sport=9876, dport=1234) /
Raw('\xa5' * 100))
pkts = p * nr_packets
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 0)
#
# expect ICMP - port unreachable for all packets
#
self.vapi.cli("clear trace")
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(nr_packets)
for p in rx:
self.assertEqual(int(p[IP].proto), 1) # ICMP
self.assertEqual(int(p[ICMP].code), 3) # unreachable
#
# configure a punt socket
#
self.socket_client_create(self.tempdir+"/socket_punt_1234")
self.vapi.punt_socket_register(1234, self.tempdir+"/socket_punt_1234")
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 1)
#
# expect punt socket and no packets on pg0
#
self.vapi.cli("clear errors")
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg0.get_capture(0)
self.socket_client_close()
#
# remove punt socket. expect ICMP - port unreachable for all packets
#
self.vapi.punt_socket_deregister(1234)
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 0)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
# FIXME - when punt socket deregister is implemented
# self.pg0.get_capture(nr_packets)
class TestIP6PuntSocket(TestPuntSocket):
""" Punt Socket for IPv6"""
def setUp(self):
super(TestIP6PuntSocket, self).setUp()
self.create_pg_interfaces(range(2))
for i in self.pg_interfaces:
i.admin_up()
i.config_ip6()
i.resolve_ndp()
def tearDown(self):
super(TestIP6PuntSocket, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip6()
i.admin_down()
def test_punt_socket_dump(self):
""" Punt socket registration """
punts = self.vapi.punt_socket_dump(0)
self.assertEqual(len(punts), 0)
#
# configure a punt socket
#
self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111",
is_ip4=0)
self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222",
is_ip4=0)
punts = self.vapi.punt_socket_dump(1)
self.assertEqual(len(punts), 2)
self.assertEqual(punts[0].punt.l4_port, 1111)
# self.assertEqual(punts[0].pathname, "/tmp/punt_socket_udp_1234")
self.assertEqual(punts[1].punt.l4_port, 2222)
# self.assertEqual(punts[1].pathname, "/tmp/punt_socket_udp_5678")
#
# deregister a punt socket
#
self.vapi.punt_socket_deregister(1111, is_ip4=0)
punts = self.vapi.punt_socket_dump(1)
self.assertEqual(len(punts), 1)
#
# configure a punt socket again
#
self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111",
is_ip4=0)
punts = self.vapi.punt_socket_dump(1)
self.assertEqual(len(punts), 2)
#
# deregister all punt socket
#
self.vapi.punt_socket_deregister(1111, is_ip4=0)
self.vapi.punt_socket_deregister(2222, is_ip4=0)
self.vapi.punt_socket_deregister(3333, is_ip4=0)
punts = self.vapi.punt_socket_dump(1)
self.assertEqual(len(punts), 0)
def test_punt_socket_traffic(self):
""" Punt socket traffic"""
nr_packets = 2
p = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
inet6.UDP(sport=9876, dport=1234) /
Raw('\xa5' * 100))
pkts = p * nr_packets
punts = self.vapi.punt_socket_dump(1)
self.assertEqual(len(punts), 0)
#
# expect ICMPv6 - destination unreachable for all packets
#
self.vapi.cli("clear trace")
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(nr_packets)
for p in rx:
self.assertEqual(int(p[IPv6].nh), 58) # ICMPv6
self.assertEqual(int(p[ICMPv6DestUnreach].code), 4) # unreachable
#
# configure a punt socket
#
self.socket_client_create(self.tempdir+"/socket_punt_1234")
self.vapi.punt_socket_register(1234, self.tempdir+"/socket_punt_1234",
is_ip4=0)
punts = self.vapi.punt_socket_dump(1)
self.assertEqual(len(punts), 1)
#
# expect punt socket and no packets on pg0
#
self.vapi.cli("clear errors")
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg0.get_capture(0)
self.socket_client_close()
#
# remove punt socket. expect ICMP - dest. unreachable for all packets
#
self.vapi.punt_socket_deregister(1234, is_ip4=0)
punts = self.vapi.punt_socket_dump(1)
self.assertEqual(len(punts), 0)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
# FIXME - when punt socket deregister is implemented
# self.pg0.get_capture(nr_packets)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)