API: Python and Unix domain socket improvement
Handle the case where buffer overflows. Then SOCK_SEQPACKET assumption that multiple API messages are not returned by recv() is broken. Use SOCK_STREAM for API exchanges instead. Add support for running tests over sockets. make test SOCKET=1 Change-Id: Ibe5fd69b1bf617de4c7ba6cce0a7c2b3f97a2821 Signed-off-by: Ole Troan <ot@cisco.com>
This commit is contained in:
@@ -215,8 +215,6 @@ service {
|
||||
*/
|
||||
define igmp_event
|
||||
{
|
||||
u32 context;
|
||||
|
||||
u32 sw_if_index;
|
||||
vl_api_filter_mode_t filter;
|
||||
vl_api_ip4_address_t saddr;
|
||||
|
||||
@@ -60,7 +60,7 @@ _(IGMP_CLEAR_INTERFACE, igmp_clear_interface) \
|
||||
_(IGMP_CLEAR_INTERFACE, igmp_clear_interface) \
|
||||
_(IGMP_GROUP_PREFIX_SET, igmp_group_prefix_set) \
|
||||
_(IGMP_GROUP_PREFIX_DUMP, igmp_group_prefix_dump) \
|
||||
_(WANT_IGMP_EVENTS, want_igmp_events) \
|
||||
_(WANT_IGMP_EVENTS, want_igmp_events)
|
||||
|
||||
static void
|
||||
vl_api_igmp_listen_t_handler (vl_api_igmp_listen_t * mp)
|
||||
@@ -373,7 +373,7 @@ vl_api_want_igmp_events_t_handler (vl_api_want_igmp_events_t * mp)
|
||||
}
|
||||
rv = VNET_API_ERROR_INVALID_REGISTRATION;
|
||||
|
||||
done:;
|
||||
done:
|
||||
REPLY_MACRO (VL_API_WANT_IGMP_EVENTS_REPLY + im->msg_id_base);
|
||||
}
|
||||
|
||||
@@ -399,7 +399,6 @@ VL_MSG_API_REAPER_FUNCTION (want_igmp_events_reaper);
|
||||
|
||||
void
|
||||
send_igmp_event (vl_api_registration_t * rp,
|
||||
u32 context,
|
||||
igmp_filter_mode_t filter,
|
||||
u32 sw_if_index,
|
||||
const ip46_address_t * saddr, const ip46_address_t * gaddr)
|
||||
@@ -408,7 +407,6 @@ send_igmp_event (vl_api_registration_t * rp,
|
||||
clib_memset (mp, 0, sizeof (*mp));
|
||||
|
||||
mp->_vl_msg_id = ntohs ((VL_API_IGMP_EVENT) + igmp_main.msg_id_base);
|
||||
mp->context = context;
|
||||
mp->sw_if_index = htonl (sw_if_index);
|
||||
mp->filter = htonl (filter);
|
||||
clib_memcpy (&mp->saddr, &saddr->ip4, sizeof (ip4_address_t));
|
||||
@@ -440,7 +438,7 @@ igmp_event (igmp_filter_mode_t filter,
|
||||
({
|
||||
rp = vl_api_client_index_to_registration (api_client->client_index);
|
||||
if (rp)
|
||||
send_igmp_event (rp, 0, filter, sw_if_index, saddr, gaddr);
|
||||
send_igmp_event (rp, filter, sw_if_index, saddr, gaddr);
|
||||
}));
|
||||
/* *INDENT-ON* */
|
||||
}
|
||||
|
||||
@@ -722,8 +722,7 @@ vl_sock_api_init (vlib_main_t * vm)
|
||||
vec_free (tmp);
|
||||
}
|
||||
|
||||
sock->flags = CLIB_SOCKET_F_IS_SERVER | CLIB_SOCKET_F_SEQPACKET |
|
||||
CLIB_SOCKET_F_ALLOW_GROUP_WRITE;
|
||||
sock->flags = CLIB_SOCKET_F_IS_SERVER | CLIB_SOCKET_F_ALLOW_GROUP_WRITE;
|
||||
error = clib_socket_init (sock);
|
||||
if (error)
|
||||
return error;
|
||||
|
||||
@@ -374,8 +374,7 @@ vl_socket_client_connect (char *socket_path, char *client_name,
|
||||
|
||||
sock = &scm->client_socket;
|
||||
sock->config = socket_path;
|
||||
sock->flags = CLIB_SOCKET_F_IS_CLIENT
|
||||
| CLIB_SOCKET_F_SEQPACKET | CLIB_SOCKET_F_NON_BLOCKING_CONNECT;
|
||||
sock->flags = CLIB_SOCKET_F_IS_CLIENT | CLIB_SOCKET_F_NON_BLOCKING_CONNECT;
|
||||
|
||||
if ((error = clib_socket_init (sock)))
|
||||
{
|
||||
|
||||
@@ -70,7 +70,7 @@ class VppTransport(object):
|
||||
def connect(self, name, pfx, msg_handler, rx_qlen):
|
||||
|
||||
# Create a UDS socket
|
||||
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
||||
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.socket.settimeout(self.read_timeout)
|
||||
|
||||
# Connect the socket to the port where the server is listening
|
||||
@@ -150,36 +150,28 @@ class VppTransport(object):
|
||||
n = self.socket.send(buf)
|
||||
|
||||
def _read(self):
|
||||
# Header and message
|
||||
try:
|
||||
msg = self.socket.recv(4096)
|
||||
if len(msg) == 0:
|
||||
return None
|
||||
except socket.error as message:
|
||||
logging.error(message)
|
||||
raise
|
||||
|
||||
(_, l, _) = self.header.unpack(msg[:16])
|
||||
hdr = self.socket.recv(16)
|
||||
if not hdr:
|
||||
return
|
||||
(_, l, _) = self.header.unpack(hdr) # If at head of message
|
||||
|
||||
# Read rest of message
|
||||
msg = self.socket.recv(l)
|
||||
if l > len(msg):
|
||||
buf = bytearray(l + 16)
|
||||
nbytes = len(msg)
|
||||
buf = bytearray(l)
|
||||
view = memoryview(buf)
|
||||
view[:4096] = msg
|
||||
view = view[4096:]
|
||||
# Read rest of message
|
||||
remaining_bytes = l - 4096 + 16
|
||||
while remaining_bytes > 0:
|
||||
bytes_to_read = (remaining_bytes if remaining_bytes
|
||||
<= 4096 else 4096)
|
||||
nbytes = self.socket.recv_into(view, bytes_to_read)
|
||||
if nbytes == 0:
|
||||
logging.error('recv failed')
|
||||
break
|
||||
view[:nbytes] = msg
|
||||
view = view[nbytes:]
|
||||
left = l - nbytes
|
||||
while left:
|
||||
nbytes = self.socket.recv_into(view, left)
|
||||
view = view[nbytes:]
|
||||
remaining_bytes -= nbytes
|
||||
else:
|
||||
buf = msg
|
||||
return buf[16:]
|
||||
left -= nbytes
|
||||
return buf
|
||||
if l == len(msg):
|
||||
return msg
|
||||
raise VPPTransportSocketIOError(1, 'Unknown socket read error')
|
||||
|
||||
def read(self):
|
||||
if not self.connected:
|
||||
|
||||
@@ -297,6 +297,8 @@ help:
|
||||
@echo ""
|
||||
@echo " SKIP_AARCH64=1 - skip tests that are failing on the ARM platorm in FD.io CI"
|
||||
@echo ""
|
||||
@echo " SOCKET=1 - Communicate with VPP over Unix domain socket instead of SHM"
|
||||
@echo ""
|
||||
@echo "Creating test documentation"
|
||||
@echo " test-doc - generate documentation for test framework"
|
||||
@echo " test-wipe-doc - wipe documentation for test framework"
|
||||
|
||||
+5
-2
@@ -313,8 +313,10 @@ class VppTestCase(unittest.TestCase):
|
||||
coredump_size, "runtime-dir", cls.tempdir, "}",
|
||||
"api-trace", "{", "on", "}", "api-segment", "{",
|
||||
"prefix", cls.shm_prefix, "}", "cpu", "{",
|
||||
"main-core", str(cpu_core_number), "}", "statseg",
|
||||
"{", "socket-name", cls.stats_sock, "}", "plugins",
|
||||
"main-core", str(cpu_core_number), "}",
|
||||
"statseg", "{", "socket-name", cls.stats_sock, "}",
|
||||
"socksvr", "{", "socket-name", cls.api_sock, "}",
|
||||
"plugins",
|
||||
"{", "plugin", "dpdk_plugin.so", "{", "disable",
|
||||
"}", "plugin", "rdma_plugin.so", "{", "disable",
|
||||
"}", "plugin", "unittest_plugin.so", "{", "enable",
|
||||
@@ -415,6 +417,7 @@ class VppTestCase(unittest.TestCase):
|
||||
cls.tempdir = tempfile.mkdtemp(
|
||||
prefix='vpp-unittest-%s-' % cls.__name__)
|
||||
cls.stats_sock = "%s/stats.sock" % cls.tempdir
|
||||
cls.api_sock = "%s/api.sock" % cls.tempdir
|
||||
cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
|
||||
cls.file_handler.setFormatter(
|
||||
Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
|
||||
|
||||
@@ -226,8 +226,16 @@ class VppPapiProvider(object):
|
||||
if 'VPP_API_DIR' not in os.environ:
|
||||
os.environ['VPP_API_DIR'] = os.getenv('VPP_INSTALL_PATH')
|
||||
|
||||
use_socket = False
|
||||
try:
|
||||
if os.environ['SOCKET'] == '1':
|
||||
use_socket = True
|
||||
except:
|
||||
pass
|
||||
self.vpp = VPP(logger=test_class.logger,
|
||||
read_timeout=read_timeout)
|
||||
read_timeout=read_timeout,
|
||||
use_socket=use_socket,
|
||||
server_address=test_class.api_sock)
|
||||
self._events = deque()
|
||||
|
||||
def __enter__(self):
|
||||
|
||||
Reference in New Issue
Block a user