vpp/test/test_syslog.py

233 lines
7.2 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
import unittest
from framework import VppTestCase, VppTestRunner
from util import ppp
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
from vpp_papi import VppEnum
class TestSyslog(VppTestCase):
"""Syslog Protocol Test Cases"""
@property
def SYSLOG_SEVERITY(self):
return VppEnum.vl_api_syslog_severity_t
@classmethod
def setUpClass(cls):
super(TestSyslog, cls).setUpClass()
try:
(cls.pg0,) = cls.create_pg_interfaces(range(1))
cls.pg0.admin_up()
cls.pg0.config_ip4()
cls.pg0.resolve_arp()
except Exception:
super(TestSyslog, cls).tearDownClass()
raise
@classmethod
def tearDownClass(cls):
super(TestSyslog, cls).tearDownClass()
def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None):
"""
Generate syslog message
:param facility: facility value
:param severity: severity level
:param appname: application name that originate message
:param msgid: message identifier
:param sd: structured data (optional)
:param msg: free-form message (optional)
"""
facility_str = [
"kernel",
"user-level",
"mail-system",
"system-daemons",
"security-authorization",
"syslogd",
"line-printer",
"network-news",
"uucp",
"clock-daemon",
"",
"ftp-daemon",
"ntp-subsystem",
"log-audit",
"log-alert",
"",
"local0",
"local1",
"local2",
"local3",
"local4",
"local5",
"local6",
"local7",
]
severity_str = [
"emergency",
"alert",
"critical",
"error",
"warning",
"notice",
"informational",
"debug",
]
cli_str = "test syslog %s %s %s %s" % (
facility_str[facility],
severity_str[severity],
appname,
msgid,
)
if sd is not None:
for sd_id, sd_params in sd.items():
cli_str += " sd-id %s" % (sd_id)
for name, value in sd_params.items():
cli_str += " sd-param %s %s" % (name, value)
if msg is not None:
cli_str += " %s" % (msg)
self.vapi.cli(cli_str)
def syslog_verify(
self, data, facility, severity, appname, msgid, sd=None, msg=None
):
"""
Verify syslog message
:param data: syslog message
:param facility: facility value
:param severity: severity level
:param appname: application name that originate message
:param msgid: message identifier
:param sd: structured data (optional)
:param msg: free-form message (optional)
"""
message = data.decode("utf-8")
if sd is None:
sd = {}
try:
message = SyslogMessage.parse(message)
except ParseError as e:
self.logger.error(e)
raise
else:
self.assertEqual(message.facility, facility)
self.assertEqual(message.severity, severity)
self.assertEqual(message.appname, appname)
self.assertEqual(message.msgid, msgid)
self.assertEqual(message.msg, msg)
self.assertEqual(message.sd, sd)
self.assertEqual(message.version, 1)
self.assertEqual(message.hostname, self.pg0.local_ip4)
def test_syslog(self):
"""Syslog Protocol test"""
self.vapi.syslog_set_sender(
src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4
)
config = self.vapi.syslog_get_sender()
self.assertEqual(str(config.collector_address), self.pg0.remote_ip4)
self.assertEqual(config.collector_port, 514)
self.assertEqual(str(config.src_address), self.pg0.local_ip4)
self.assertEqual(config.vrf_id, 0)
self.assertEqual(config.max_msg_size, 480)
appname = "test"
msgid = "testMsg"
msg = "this is message"
sd1 = {
"exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"}
}
sd2 = {
"exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"},
"examplePriority@32473": {"class": "high"},
}
self.pg_enable_capture(self.pg_interfaces)
self.syslog_generate(
SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
)
capture = self.pg0.get_capture(1)
try:
self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
self.assertEqual(capture[0][UDP].dport, 514)
self.assert_packet_checksums_valid(capture[0], False)
except:
self.logger.error(ppp("invalid packet:", capture[0]))
raise
self.syslog_verify(
capture[0][Raw].load,
SyslogFacility.local7,
SyslogSeverity.info,
appname,
msgid,
None,
msg,
)
self.pg_enable_capture(self.pg_interfaces)
self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
filter = self.vapi.syslog_get_filter()
self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
self.syslog_generate(
SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
)
self.pg0.assert_nothing_captured()
self.pg_enable_capture(self.pg_interfaces)
self.syslog_generate(
SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg
)
capture = self.pg0.get_capture(1)
self.syslog_verify(
capture[0][Raw].load,
SyslogFacility.local6,
SyslogSeverity.warning,
appname,
msgid,
sd1,
msg,
)
self.vapi.syslog_set_sender(
self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345
)
config = self.vapi.syslog_get_sender()
self.assertEqual(config.collector_port, 12345)
self.pg_enable_capture(self.pg_interfaces)
self.syslog_generate(
SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None
)
capture = self.pg0.get_capture(1)
try:
self.assertEqual(capture[0][UDP].dport, 12345)
except:
self.logger.error(ppp("invalid packet:", capture[0]))
raise
self.syslog_verify(
capture[0][Raw].load,
SyslogFacility.local5,
SyslogSeverity.err,
appname,
msgid,
sd2,
None,
)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)