Files
vpp/test/test_syslog.py
Ole Troan f159f58dbf test framework: add factory function and default parameters
This is the first step to be able to remove all the API message
wrappers in vpp_papi_provider.py. This allows to remove all
functions that do not override parameters (different from zero),
and a separate dictionary for messages requiring different defaults.

The general requirement is that all new tests should use named
arguments directly. Not positional arguments through the wrapper.

Note when removing functions, the calls in vpp_papi_provider
wrappers do not necessarily follow message order.

Change-Id: If64916c07f8622c138db3a9d7c4a98b93a058e68
Signed-off-by: Ole Troan <ot@cisco.com>
2019-03-04 14:31:08 +00:00

201 lines
7.7 KiB
Python

#!/usr/bin/env python
import unittest
from framework import VppTestCase, VppTestRunner
from util import ppp
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP
from vpp_papi_provider import SYSLOG_SEVERITY
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
class TestSyslog(VppTestCase):
""" Syslog Protocol Test Cases """
@classmethod
def setUpClass(cls):
super(TestSyslog, cls).setUpClass()
try:
cls.pg0, = cls.create_pg_interfaces(range(1))
cls.pg0.admin_up()
cls.pg0.config_ip4()
cls.pg0.resolve_arp()
except Exception:
super(TestSyslog, cls).tearDownClass()
raise
def syslog_generate(self, facility, severity, appname, msgid, sd=None,
msg=None):
"""
Generate syslog message
:param facility: facility value
:param severity: severity level
:param appname: application name that originate message
:param msgid: message identifier
:param sd: structured data (optional)
:param msg: free-form message (optional)
"""
facility_str = ['kernel', 'user-level', 'mail-system',
'system-daemons', 'security-authorization', 'syslogd',
'line-printer', 'network-news', 'uucp', 'clock-daemon',
'', 'ftp-daemon', 'ntp-subsystem', 'log-audit',
'log-alert', '', 'local0', 'local1', 'local2',
'local3', 'local4', 'local5', 'local6', 'local7']
severity_str = ['emergency', 'alert', 'critical', 'error', 'warning',
'notice', 'informational', 'debug']
cli_str = "test syslog %s %s %s %s" % (facility_str[facility],
severity_str[severity],
appname,
msgid)
if sd is not None:
for sd_id, sd_params in sd.items():
cli_str += " sd-id %s" % (sd_id)
for name, value in sd_params.items():
cli_str += " sd-param %s %s" % (name, value)
if msg is not None:
cli_str += " %s" % (msg)
self.vapi.cli(cli_str)
def syslog_verify(self, data, facility, severity, appname, msgid, sd=None,
msg=None):
"""
Verify syslog message
:param data: syslog message
:param facility: facility value
:param severity: severity level
:param appname: application name that originate message
:param msgid: message identifier
:param sd: structured data (optional)
:param msg: free-form message (optional)
"""
message = data.decode('utf-8')
if sd is None:
sd = {}
try:
message = SyslogMessage.parse(message)
except ParseError as e:
self.logger.error(e)
raise
else:
self.assertEqual(message.facility, facility)
self.assertEqual(message.severity, severity)
self.assertEqual(message.appname, appname)
self.assertEqual(message.msgid, msgid)
self.assertEqual(message.msg, msg)
self.assertEqual(message.sd, sd)
self.assertEqual(message.version, 1)
self.assertEqual(message.hostname, self.pg0.local_ip4)
def test_syslog(self):
""" Syslog Protocol test """
self.vapi.syslog_set_sender(src_address=self.pg0.local_ip4n,
collector_address=self.pg0.remote_ip4n)
config = self.vapi.syslog_get_sender()
self.assertEqual(str(config.collector_address),
self.pg0.remote_ip4)
self.assertEqual(config.collector_port, 514)
self.assertEqual(str(config.src_address), self.pg0.local_ip4)
self.assertEqual(config.vrf_id, 0)
self.assertEqual(config.max_msg_size, 480)
appname = 'test'
msgid = 'testMsg'
msg = 'this is message'
sd1 = {'exampleSDID@32473': {'iut': '3',
'eventSource': 'App',
'eventID': '1011'}}
sd2 = {'exampleSDID@32473': {'iut': '3',
'eventSource': 'App',
'eventID': '1011'},
'examplePriority@32473': {'class': 'high'}}
self.pg_enable_capture(self.pg_interfaces)
self.syslog_generate(SyslogFacility.local7,
SyslogSeverity.info,
appname,
msgid,
None,
msg)
capture = self.pg0.get_capture(1)
try:
self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
self.assertEqual(capture[0][UDP].dport, 514)
self.assert_packet_checksums_valid(capture[0], False)
except:
self.logger.error(ppp("invalid packet:", capture[0]))
raise
self.syslog_verify(capture[0][Raw].load,
SyslogFacility.local7,
SyslogSeverity.info,
appname,
msgid,
None,
msg)
self.pg_enable_capture(self.pg_interfaces)
self.vapi.syslog_set_filter(SYSLOG_SEVERITY.WARN)
filter = self.vapi.syslog_get_filter()
self.assertEqual(filter.severity, SYSLOG_SEVERITY.WARN)
self.syslog_generate(SyslogFacility.local7,
SyslogSeverity.info,
appname,
msgid,
None,
msg)
self.pg0.assert_nothing_captured()
self.pg_enable_capture(self.pg_interfaces)
self.syslog_generate(SyslogFacility.local6,
SyslogSeverity.warning,
appname,
msgid,
sd1,
msg)
capture = self.pg0.get_capture(1)
self.syslog_verify(capture[0][Raw].load,
SyslogFacility.local6,
SyslogSeverity.warning,
appname,
msgid,
sd1,
msg)
self.vapi.syslog_set_sender(self.pg0.local_ip4n,
self.pg0.remote_ip4n,
collector_port=12345)
config = self.vapi.syslog_get_sender()
self.assertEqual(config.collector_port, 12345)
self.pg_enable_capture(self.pg_interfaces)
self.syslog_generate(SyslogFacility.local5,
SyslogSeverity.err,
appname,
msgid,
sd2,
None)
capture = self.pg0.get_capture(1)
try:
self.assertEqual(capture[0][UDP].dport, 12345)
except:
self.logger.error(ppp("invalid packet:", capture[0]))
raise
self.syslog_verify(capture[0][Raw].load,
SyslogFacility.local5,
SyslogSeverity.err,
appname,
msgid,
sd2,
None)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)