session/svm: add want_tx_event flag to fifo
Have applications use explicit flag to request events from vpp when it transmits from a full fifo. Change-Id: I687c8f050a066bd5ce739d880eaec1f286038d95 Signed-off-by: Florin Coras <fcoras@cisco.com>
This commit is contained in:

committed by
Damjan Marion

parent
008dbe109c
commit
0e88e851e0
@ -62,6 +62,7 @@ typedef struct _svm_fifo
|
||||
u32 segment_manager;
|
||||
CLIB_CACHE_LINE_ALIGN_MARK (end_shared);
|
||||
u32 head;
|
||||
volatile u32 want_tx_evt; /**< producer wants nudge */
|
||||
CLIB_CACHE_LINE_ALIGN_MARK (end_consumer);
|
||||
|
||||
/* producer */
|
||||
@ -169,6 +170,18 @@ svm_fifo_unset_event (svm_fifo_t * f)
|
||||
__sync_lock_release (&f->has_event);
|
||||
}
|
||||
|
||||
static inline void
|
||||
svm_fifo_set_want_tx_evt (svm_fifo_t * f, u8 want_evt)
|
||||
{
|
||||
f->want_tx_evt = want_evt;
|
||||
}
|
||||
|
||||
static inline u8
|
||||
svm_fifo_want_tx_evt (svm_fifo_t * f)
|
||||
{
|
||||
return f->want_tx_evt;
|
||||
}
|
||||
|
||||
svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
|
||||
void svm_fifo_free (svm_fifo_t * f);
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include <vcl/sock_test.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/ioctl.h>
|
||||
|
||||
#define SOCK_SERVER_USE_EPOLL 1
|
||||
#define VPPCOM_SESSION_ATTR_UNIT_TEST 0
|
||||
@ -828,6 +829,7 @@ main (int argc, char **argv)
|
||||
if (EPOLLIN & ssm->wait_events[i].events)
|
||||
#endif
|
||||
{
|
||||
read_again:
|
||||
rx_bytes = sock_test_read (client_fd, conn->buf,
|
||||
conn->buf_size, &conn->stats);
|
||||
if (rx_bytes > 0)
|
||||
@ -910,6 +912,8 @@ main (int argc, char **argv)
|
||||
(conn->cfg.test == SOCK_TEST_TYPE_BI))
|
||||
{
|
||||
stream_test_server (conn, rx_bytes);
|
||||
if (ioctl (conn->fd, FIONREAD))
|
||||
goto read_again;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -19,9 +19,10 @@
|
||||
#include <vppinfra/elog.h>
|
||||
|
||||
#define VCL_ELOG 0
|
||||
#define VCL_DBG_ON 1
|
||||
|
||||
#define VDBG(_lvl, _fmt, _args...) \
|
||||
if (vcm->debug > _lvl) \
|
||||
if (VCL_DBG_ON && vcm->debug > _lvl) \
|
||||
clib_warning ("vcl<w%d>: " _fmt, __vcl_worker_index, ##_args)
|
||||
|
||||
#define foreach_vcl_dbg_evt \
|
||||
|
@ -125,9 +125,7 @@ static inline int
|
||||
vcl_test_write (int fd, uint8_t *buf, uint32_t nbytes,
|
||||
sock_test_stats_t *stats, uint32_t verbose)
|
||||
{
|
||||
int tx_bytes = 0;
|
||||
int nbytes_left = nbytes;
|
||||
int rv, errno_val;
|
||||
int tx_bytes = 0, nbytes_left = nbytes, rv;
|
||||
|
||||
do
|
||||
{
|
||||
@ -163,10 +161,7 @@ vcl_test_write (int fd, uint8_t *buf, uint32_t nbytes,
|
||||
|
||||
if (tx_bytes < 0)
|
||||
{
|
||||
errno_val = errno;
|
||||
perror ("ERROR in sock_test_write()");
|
||||
fprintf (stderr, "SOCK_TEST: ERROR: socket write failed "
|
||||
"(errno = %d)!\n", errno_val);
|
||||
vterr ("vpcom_session_write", -errno);
|
||||
}
|
||||
else if (stats)
|
||||
stats->tx_bytes += tx_bytes;
|
||||
|
@ -1511,6 +1511,7 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
|
||||
svm_msg_q_msg_t msg;
|
||||
session_event_t *e;
|
||||
svm_msg_q_t *mq;
|
||||
u8 is_ct;
|
||||
|
||||
if (PREDICT_FALSE (!buf))
|
||||
return VPPCOM_EINVAL;
|
||||
@ -1519,9 +1520,6 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
|
||||
if (PREDICT_FALSE (!s))
|
||||
return VPPCOM_EBADFD;
|
||||
|
||||
tx_fifo = s->tx_fifo;
|
||||
is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
|
||||
|
||||
if (PREDICT_FALSE (s->is_vep))
|
||||
{
|
||||
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
|
||||
@ -1531,18 +1529,20 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
|
||||
return VPPCOM_EBADFD;
|
||||
}
|
||||
|
||||
if (!(s->session_state & STATE_OPEN))
|
||||
if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
|
||||
{
|
||||
session_state_t state = s->session_state;
|
||||
rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
|
||||
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
|
||||
"state 0x%x (%s)",
|
||||
getpid (), s->vpp_handle, session_handle,
|
||||
"state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
|
||||
state, vppcom_session_state_str (state));
|
||||
return rv;
|
||||
}
|
||||
|
||||
mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue;
|
||||
tx_fifo = s->tx_fifo;
|
||||
is_ct = vcl_session_is_ct (s);
|
||||
is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
|
||||
mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
|
||||
if (svm_fifo_is_full (tx_fifo))
|
||||
{
|
||||
if (is_nonblocking)
|
||||
@ -1551,15 +1551,15 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
|
||||
}
|
||||
while (svm_fifo_is_full (tx_fifo))
|
||||
{
|
||||
svm_fifo_set_want_tx_evt (tx_fifo, 1);
|
||||
svm_msg_q_lock (mq);
|
||||
while (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
|
||||
;
|
||||
svm_msg_q_wait (mq);
|
||||
|
||||
svm_msg_q_sub_w_lock (mq, &msg);
|
||||
e = svm_msg_q_msg_data (mq, &msg);
|
||||
svm_msg_q_unlock (mq);
|
||||
|
||||
if (!vcl_is_tx_evt_for_session (e, s->session_index,
|
||||
s->our_evt_q != 0))
|
||||
if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
|
||||
vcl_handle_mq_event (wrk, e);
|
||||
svm_msg_q_free_msg (mq, &msg);
|
||||
}
|
||||
@ -1576,17 +1576,9 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
|
||||
|
||||
ASSERT (n_write > 0);
|
||||
|
||||
if (VPPCOM_DEBUG > 2)
|
||||
{
|
||||
if (n_write <= 0)
|
||||
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
|
||||
"FIFO-FULL (%p)", getpid (), s->vpp_handle,
|
||||
session_handle, tx_fifo);
|
||||
else
|
||||
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
|
||||
"wrote %d bytes tx-fifo: (%p)", getpid (),
|
||||
s->vpp_handle, session_handle, n_write, tx_fifo);
|
||||
}
|
||||
VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
|
||||
s->vpp_handle, session_handle, n_write);
|
||||
|
||||
return n_write;
|
||||
}
|
||||
|
||||
|
@ -817,7 +817,7 @@ skip_dequeue:
|
||||
{
|
||||
stream_session_t *s; /* $$$ prefetch 1 ahead maybe */
|
||||
session_event_t *e;
|
||||
u8 is_full;
|
||||
u8 want_tx_evt;
|
||||
|
||||
e = &fifo_events[i];
|
||||
switch (e->event_type)
|
||||
@ -836,18 +836,19 @@ skip_dequeue:
|
||||
clib_warning ("It's dead, Jim!");
|
||||
continue;
|
||||
}
|
||||
is_full = svm_fifo_is_full (s->server_tx_fifo);
|
||||
|
||||
want_tx_evt = svm_fifo_want_tx_evt (s->server_tx_fifo);
|
||||
/* Spray packets in per session type frames, since they go to
|
||||
* different nodes */
|
||||
rv = (smm->session_tx_fns[s->session_type]) (vm, node, e, s,
|
||||
&n_tx_packets);
|
||||
if (PREDICT_TRUE (rv == SESSION_TX_OK))
|
||||
{
|
||||
/* Notify app there's tx space if not polling */
|
||||
if (PREDICT_FALSE (is_full
|
||||
&& !svm_fifo_has_event (s->server_tx_fifo)))
|
||||
session_dequeue_notify (s);
|
||||
if (PREDICT_FALSE (want_tx_evt))
|
||||
{
|
||||
svm_fifo_set_want_tx_evt (s->server_tx_fifo, 0);
|
||||
session_dequeue_notify (s);
|
||||
}
|
||||
}
|
||||
else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
|
||||
{
|
||||
|
@ -341,28 +341,29 @@ class VCLThruHostStackTestCase(VCLTestCase):
|
||||
# is fixed.
|
||||
|
||||
|
||||
class VCLThruHostStackExtendedATestCase(VCLTestCase):
|
||||
""" VCL Thru Host Stack Extended Tests """
|
||||
class VCLThruHostStackNSessionBidirTestCase(VCLTestCase):
|
||||
""" VCL Thru Host Stack NSession Bidir Tests """
|
||||
|
||||
def setUp(self):
|
||||
super(VCLThruHostStackExtendedATestCase, self).setUp()
|
||||
super(VCLThruHostStackNSessionBidirTestCase, self).setUp()
|
||||
|
||||
self.thru_host_stack_setup()
|
||||
if self.vppDebug:
|
||||
self.client_bi_dir_nsock_timeout = 120
|
||||
self.client_bi_dir_nsock_test_args = ["-B", "-X",
|
||||
self.client_bi_dir_nsock_test_args = ["-B", "-X", "-N 10000",
|
||||
self.loop0.local_ip4,
|
||||
self.server_port]
|
||||
else:
|
||||
self.client_bi_dir_nsock_timeout = 90
|
||||
self.client_bi_dir_nsock_test_args = ["-I", "2", "-B", "-X",
|
||||
"-N 1000",
|
||||
self.loop0.local_ip4,
|
||||
self.server_port]
|
||||
|
||||
def tearDown(self):
|
||||
self.thru_host_stack_tear_down()
|
||||
|
||||
super(VCLThruHostStackExtendedATestCase, self).tearDown()
|
||||
super(VCLThruHostStackNSessionBidirTestCase, self).tearDown()
|
||||
|
||||
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
|
||||
def test_vcl_thru_host_stack_bi_dir_nsock(self):
|
||||
@ -375,7 +376,7 @@ class VCLThruHostStackExtendedATestCase(VCLTestCase):
|
||||
|
||||
|
||||
class VCLThruHostStackExtendedBTestCase(VCLTestCase):
|
||||
""" VCL Thru Host Stack Extended Tests """
|
||||
""" VCL Thru Host Stack Extended B Tests """
|
||||
|
||||
def setUp(self):
|
||||
super(VCLThruHostStackExtendedBTestCase, self).setUp()
|
||||
@ -383,12 +384,13 @@ class VCLThruHostStackExtendedBTestCase(VCLTestCase):
|
||||
self.thru_host_stack_setup()
|
||||
if self.vppDebug:
|
||||
self.client_bi_dir_nsock_timeout = 120
|
||||
self.client_bi_dir_nsock_test_args = ["-B", "-X",
|
||||
self.client_bi_dir_nsock_test_args = ["-B", "-X", "-N 1000",
|
||||
self.loop0.local_ip4,
|
||||
self.server_port]
|
||||
else:
|
||||
self.client_bi_dir_nsock_timeout = 60
|
||||
self.client_bi_dir_nsock_test_args = ["-I", "2", "-B", "-X",
|
||||
"-N 1000",
|
||||
self.loop0.local_ip4,
|
||||
self.server_port]
|
||||
|
||||
@ -408,7 +410,7 @@ class VCLThruHostStackExtendedBTestCase(VCLTestCase):
|
||||
|
||||
|
||||
class VCLThruHostStackExtendedCTestCase(VCLTestCase):
|
||||
""" VCL Thru Host Stack Extended Tests """
|
||||
""" VCL Thru Host Stack Extended C Tests """
|
||||
|
||||
def setUp(self):
|
||||
super(VCLThruHostStackExtendedCTestCase, self).setUp()
|
||||
@ -422,7 +424,7 @@ class VCLThruHostStackExtendedCTestCase(VCLTestCase):
|
||||
self.numSockets = "5"
|
||||
|
||||
self.client_uni_dir_nsock_test_args = ["-I", self.numSockets,
|
||||
"-U", "-X",
|
||||
"-U", "-X", "-N 1000",
|
||||
self.loop0.local_ip4,
|
||||
self.server_port]
|
||||
|
||||
@ -442,7 +444,7 @@ class VCLThruHostStackExtendedCTestCase(VCLTestCase):
|
||||
|
||||
|
||||
class VCLThruHostStackExtendedDTestCase(VCLTestCase):
|
||||
""" VCL Thru Host Stack Extended Tests """
|
||||
""" VCL Thru Host Stack Extended D Tests """
|
||||
|
||||
def setUp(self):
|
||||
super(VCLThruHostStackExtendedDTestCase, self).setUp()
|
||||
@ -456,7 +458,7 @@ class VCLThruHostStackExtendedDTestCase(VCLTestCase):
|
||||
self.numSockets = "5"
|
||||
|
||||
self.client_uni_dir_nsock_test_args = ["-I", self.numSockets,
|
||||
"-U", "-X",
|
||||
"-U", "-X", "-N 1000",
|
||||
self.loop0.local_ip4,
|
||||
self.server_port]
|
||||
|
||||
@ -648,7 +650,7 @@ class VCLIpv6ThruHostStackTestCase(VCLTestCase):
|
||||
|
||||
|
||||
class VCLIpv6ThruHostStackExtendedATestCase(VCLTestCase):
|
||||
""" VCL IPv6 Thru Host Stack Extended Tests """
|
||||
""" VCL IPv6 Thru Host Stack Extended A Tests """
|
||||
|
||||
def setUp(self):
|
||||
super(VCLIpv6ThruHostStackExtendedATestCase, self).setUp()
|
||||
@ -682,7 +684,7 @@ class VCLIpv6ThruHostStackExtendedATestCase(VCLTestCase):
|
||||
|
||||
|
||||
class VCLIpv6ThruHostStackExtendedBTestCase(VCLTestCase):
|
||||
""" VCL IPv6 Thru Host Stack Extended Tests """
|
||||
""" VCL IPv6 Thru Host Stack Extended B Tests """
|
||||
|
||||
def setUp(self):
|
||||
super(VCLIpv6ThruHostStackExtendedBTestCase, self).setUp()
|
||||
@ -717,7 +719,7 @@ class VCLIpv6ThruHostStackExtendedBTestCase(VCLTestCase):
|
||||
|
||||
|
||||
class VCLIpv6ThruHostStackExtendedCTestCase(VCLTestCase):
|
||||
""" VCL IPv6 Thru Host Stack Extended Tests """
|
||||
""" VCL IPv6 Thru Host Stack Extended C Tests """
|
||||
|
||||
def setUp(self):
|
||||
super(VCLIpv6ThruHostStackExtendedCTestCase, self).setUp()
|
||||
@ -753,7 +755,7 @@ class VCLIpv6ThruHostStackExtendedCTestCase(VCLTestCase):
|
||||
|
||||
|
||||
class VCLIpv6ThruHostStackExtendedDTestCase(VCLTestCase):
|
||||
""" VCL IPv6 Thru Host Stack Extended Tests """
|
||||
""" VCL IPv6 Thru Host Stack Extended D Tests """
|
||||
|
||||
def setUp(self):
|
||||
super(VCLIpv6ThruHostStackExtendedDTestCase, self).setUp()
|
||||
|
Reference in New Issue
Block a user