vcl: support for eventfd mq signaling
- support eventfd based mq signaling. Based on configuration, vcl epoll/select can use either condvars or epoll on mq eventfds. - add vcl support for memfd segments - vpp explicitly registers cut-through segments with apps/vcl - if using eventfd, make ldp allow one call to libc_epoll_create. Needed for the message queue epfd - update svm_queue_t to allow blocking calls with eventfd signaling. Change-Id: I064151ac370bbe29bb16c968bf4e3659c8286bea Signed-off-by: Florin Coras <fcoras@cisco.com>
This commit is contained in:

committed by
Dave Barach

parent
f46663c65b
commit
9936831502
@ -15,6 +15,7 @@
|
||||
|
||||
#include <svm/message_queue.h>
|
||||
#include <vppinfra/mem.h>
|
||||
#include <sys/eventfd.h>
|
||||
|
||||
static inline svm_msg_q_ring_t *
|
||||
svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
|
||||
@ -235,6 +236,38 @@ svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
|
||||
svm_queue_sub_raw (mq->q, (u8 *) msg);
|
||||
}
|
||||
|
||||
void
|
||||
svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd)
|
||||
{
|
||||
mq->q->consumer_evtfd = fd;
|
||||
}
|
||||
|
||||
void
|
||||
svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd)
|
||||
{
|
||||
mq->q->producer_evtfd = fd;
|
||||
}
|
||||
|
||||
int
|
||||
svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq)
|
||||
{
|
||||
int fd;
|
||||
if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
|
||||
return -1;
|
||||
svm_msg_q_set_consumer_eventfd (mq, fd);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq)
|
||||
{
|
||||
int fd;
|
||||
if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
|
||||
return -1;
|
||||
svm_msg_q_set_producer_eventfd (mq, fd);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* fd.io coding-style-patch-verification: ON
|
||||
*
|
||||
|
@ -22,7 +22,6 @@
|
||||
|
||||
#include <vppinfra/clib.h>
|
||||
#include <vppinfra/error.h>
|
||||
#include <vppinfra/time.h>
|
||||
#include <svm/queue.h>
|
||||
|
||||
typedef struct svm_msg_q_ring_
|
||||
@ -214,6 +213,40 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
|
||||
*/
|
||||
svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
|
||||
|
||||
/**
|
||||
* Set event fd for queue consumer
|
||||
*
|
||||
* If set, queue will exclusively use eventfds for signaling. Moreover,
|
||||
* afterwards, the queue should only be used in non-blocking mode. Waiting
|
||||
* for events should be done externally using something like epoll.
|
||||
*
|
||||
* @param mq message queue
|
||||
* @param fd consumer eventfd
|
||||
*/
|
||||
void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
|
||||
|
||||
/**
|
||||
* Set event fd for queue producer
|
||||
*
|
||||
* If set, queue will exclusively use eventfds for signaling. Moreover,
|
||||
* afterwards, the queue should only be used in non-blocking mode. Waiting
|
||||
* for events should be done externally using something like epoll.
|
||||
*
|
||||
* @param mq message queue
|
||||
* @param fd producer eventfd
|
||||
*/
|
||||
void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
|
||||
|
||||
/**
|
||||
* Allocate event fd for queue consumer
|
||||
*/
|
||||
int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
|
||||
|
||||
/**
|
||||
* Allocate event fd for queue consumer
|
||||
*/
|
||||
int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
|
||||
|
||||
/**
|
||||
* Check if message queue is full
|
||||
*/
|
||||
@ -290,12 +323,13 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
|
||||
/**
|
||||
* Wait for message queue event
|
||||
*
|
||||
* Must be called with mutex held
|
||||
* Must be called with mutex held. The queue only works non-blocking
|
||||
* with eventfds, so handle blocking calls as an exception here.
|
||||
*/
|
||||
static inline void
|
||||
svm_msg_q_wait (svm_msg_q_t * mq)
|
||||
{
|
||||
pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
|
||||
svm_queue_wait (mq->q);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -309,13 +343,19 @@ svm_msg_q_wait (svm_msg_q_t * mq)
|
||||
static inline int
|
||||
svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
|
||||
{
|
||||
struct timespec ts;
|
||||
return svm_queue_timedwait (mq->q, timeout);
|
||||
}
|
||||
|
||||
ts.tv_sec = unix_time_now () + (u32) timeout;
|
||||
ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
|
||||
if (pthread_cond_timedwait (&mq->q->condvar, &mq->q->mutex, &ts))
|
||||
return -1;
|
||||
return 0;
|
||||
static inline int
|
||||
svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
|
||||
{
|
||||
return mq->q->consumer_evtfd;
|
||||
}
|
||||
|
||||
static inline int
|
||||
svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
|
||||
{
|
||||
return mq->q->producer_evtfd;
|
||||
}
|
||||
|
||||
#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <vppinfra/cache.h>
|
||||
#include <svm/queue.h>
|
||||
#include <vppinfra/time.h>
|
||||
#include <vppinfra/lock.h>
|
||||
|
||||
svm_queue_t *
|
||||
svm_queue_init (void *base, int nels, int elsize)
|
||||
@ -127,6 +128,63 @@ svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
|
||||
}
|
||||
}
|
||||
|
||||
static inline void
|
||||
svm_queue_wait_inline (svm_queue_t * q)
|
||||
{
|
||||
if (q->producer_evtfd == -1)
|
||||
{
|
||||
pthread_cond_wait (&q->condvar, &q->mutex);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Fake a wait for event. We could use epoll but that would mean
|
||||
* using yet another fd. Should do for now */
|
||||
u32 cursize = q->cursize;
|
||||
pthread_mutex_unlock (&q->mutex);
|
||||
while (q->cursize == cursize)
|
||||
CLIB_PAUSE ();
|
||||
pthread_mutex_lock (&q->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
svm_queue_wait (svm_queue_t * q)
|
||||
{
|
||||
svm_queue_wait_inline (q);
|
||||
}
|
||||
|
||||
static inline int
|
||||
svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
|
||||
{
|
||||
struct timespec ts;
|
||||
ts.tv_sec = unix_time_now () + (u32) timeout;
|
||||
ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
|
||||
|
||||
if (q->producer_evtfd == -1)
|
||||
{
|
||||
return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
|
||||
}
|
||||
else
|
||||
{
|
||||
double max_time = unix_time_now () + timeout;
|
||||
u32 cursize = q->cursize;
|
||||
int rv;
|
||||
|
||||
pthread_mutex_unlock (&q->mutex);
|
||||
while (q->cursize == cursize && unix_time_now () < max_time)
|
||||
CLIB_PAUSE ();
|
||||
rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
|
||||
pthread_mutex_lock (&q->mutex);
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
||||
int
|
||||
svm_queue_timedwait (svm_queue_t * q, double timeout)
|
||||
{
|
||||
return svm_queue_timedwait_inline (q, timeout);
|
||||
}
|
||||
|
||||
/*
|
||||
* svm_queue_add_nolock
|
||||
*/
|
||||
@ -139,9 +197,7 @@ svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
|
||||
if (PREDICT_FALSE (q->cursize == q->maxsize))
|
||||
{
|
||||
while (q->cursize == q->maxsize)
|
||||
{
|
||||
(void) pthread_cond_wait (&q->condvar, &q->mutex);
|
||||
}
|
||||
svm_queue_wait_inline (q);
|
||||
}
|
||||
|
||||
tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
|
||||
@ -170,6 +226,9 @@ svm_queue_add_raw (svm_queue_t * q, u8 * elem)
|
||||
|
||||
q->tail = (q->tail + 1) % q->maxsize;
|
||||
q->cursize++;
|
||||
|
||||
if (q->cursize == 1)
|
||||
svm_queue_send_signal (q, 1);
|
||||
}
|
||||
|
||||
|
||||
@ -201,9 +260,7 @@ svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
|
||||
return (-2);
|
||||
}
|
||||
while (q->cursize == q->maxsize)
|
||||
{
|
||||
(void) pthread_cond_wait (&q->condvar, &q->mutex);
|
||||
}
|
||||
svm_queue_wait_inline (q);
|
||||
}
|
||||
|
||||
tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
|
||||
@ -253,9 +310,7 @@ svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
|
||||
return (-2);
|
||||
}
|
||||
while (q->cursize + 1 == q->maxsize)
|
||||
{
|
||||
(void) pthread_cond_wait (&q->condvar, &q->mutex);
|
||||
}
|
||||
svm_queue_wait_inline (q);
|
||||
}
|
||||
|
||||
tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
|
||||
@ -317,13 +372,9 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
|
||||
}
|
||||
else if (cond == SVM_Q_TIMEDWAIT)
|
||||
{
|
||||
struct timespec ts;
|
||||
ts.tv_sec = unix_time_now () + time;
|
||||
ts.tv_nsec = 0;
|
||||
while (q->cursize == 0 && rc == 0)
|
||||
{
|
||||
rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
|
||||
}
|
||||
rc = svm_queue_timedwait_inline (q, time);
|
||||
|
||||
if (rc == ETIMEDOUT)
|
||||
{
|
||||
pthread_mutex_unlock (&q->mutex);
|
||||
@ -333,9 +384,7 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
|
||||
else
|
||||
{
|
||||
while (q->cursize == 0)
|
||||
{
|
||||
(void) pthread_cond_wait (&q->condvar, &q->mutex);
|
||||
}
|
||||
svm_queue_wait_inline (q);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,6 +81,24 @@ int svm_queue_is_full (svm_queue_t * q);
|
||||
int svm_queue_add_nolock (svm_queue_t * q, u8 * elem);
|
||||
int svm_queue_sub_raw (svm_queue_t * q, u8 * elem);
|
||||
|
||||
/**
|
||||
* Wait for queue event
|
||||
*
|
||||
* Must be called with mutex held.
|
||||
*/
|
||||
void svm_queue_wait (svm_queue_t * q);
|
||||
|
||||
/**
|
||||
* Timed wait for queue event
|
||||
*
|
||||
* Must be called with mutex held.
|
||||
*
|
||||
* @param q svm queue
|
||||
* @param timeout time in seconds
|
||||
* @return 0 on success, ETIMEDOUT on timeout or an error
|
||||
*/
|
||||
int svm_queue_timedwait (svm_queue_t * q, double timeout);
|
||||
|
||||
/**
|
||||
* Add element to queue with mutex held
|
||||
* @param q queue
|
||||
|
@ -263,7 +263,7 @@ wait_for_state_change (echo_main_t * em, connection_state_t state)
|
||||
return -1;
|
||||
if (em->time_to_stop == 1)
|
||||
return 0;
|
||||
if (!em->our_event_queue)
|
||||
if (!em->our_event_queue || em->state < STATE_ATTACHED)
|
||||
continue;
|
||||
|
||||
if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_NOWAIT, 0))
|
||||
@ -347,44 +347,17 @@ application_detach (echo_main_t * em)
|
||||
}
|
||||
|
||||
static int
|
||||
memfd_segment_attach (void)
|
||||
{
|
||||
ssvm_private_t _ssvm = { 0 }, *ssvm = &_ssvm;
|
||||
clib_error_t *error;
|
||||
int rv;
|
||||
|
||||
if ((error = vl_socket_client_recv_fd_msg (&ssvm->fd, 1, 5)))
|
||||
{
|
||||
clib_error_report (error);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((rv = ssvm_slave_init_memfd (ssvm)))
|
||||
return rv;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
fifo_segment_attach (char *name, u32 size, ssvm_segment_type_t type)
|
||||
ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd)
|
||||
{
|
||||
svm_fifo_segment_create_args_t _a, *a = &_a;
|
||||
clib_error_t *error;
|
||||
int rv;
|
||||
|
||||
memset (a, 0, sizeof (*a));
|
||||
a->segment_name = (char *) name;
|
||||
a->segment_size = size;
|
||||
a->segment_type = type;
|
||||
|
||||
if (type == SSVM_SEGMENT_MEMFD)
|
||||
{
|
||||
if ((error = vl_socket_client_recv_fd_msg (&a->memfd_fd, 1, 5)))
|
||||
{
|
||||
clib_error_report (error);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
a->memfd_fd = fd;
|
||||
|
||||
if ((rv = svm_fifo_segment_attach (a)))
|
||||
{
|
||||
@ -392,6 +365,7 @@ fifo_segment_attach (char *name, u32 size, ssvm_segment_type_t type)
|
||||
return rv;
|
||||
}
|
||||
|
||||
vec_reset_length (a->new_segment_indices);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -400,47 +374,57 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
|
||||
mp)
|
||||
{
|
||||
echo_main_t *em = &echo_main;
|
||||
ssvm_segment_type_t seg_type;
|
||||
int *fds = 0;
|
||||
u32 n_fds = 0;
|
||||
|
||||
if (mp->retval)
|
||||
{
|
||||
clib_warning ("attach failed: %U", format_api_error,
|
||||
clib_net_to_host_u32 (mp->retval));
|
||||
em->state = STATE_FAILED;
|
||||
return;
|
||||
goto failed;
|
||||
}
|
||||
|
||||
if (mp->segment_name_length == 0)
|
||||
{
|
||||
clib_warning ("segment_name_length zero");
|
||||
return;
|
||||
}
|
||||
|
||||
seg_type = em->use_sock_api ? SSVM_SEGMENT_MEMFD : SSVM_SEGMENT_SHM;
|
||||
|
||||
/* Attach to fifo segment */
|
||||
if (fifo_segment_attach ((char *) mp->segment_name, mp->segment_size,
|
||||
seg_type))
|
||||
{
|
||||
em->state = STATE_FAILED;
|
||||
return;
|
||||
}
|
||||
|
||||
/* If we're using memfd segments, read and attach to event qs segment */
|
||||
if (seg_type == SSVM_SEGMENT_MEMFD)
|
||||
{
|
||||
if (memfd_segment_attach ())
|
||||
{
|
||||
clib_warning ("failed to attach to evt q segment");
|
||||
em->state = STATE_FAILED;
|
||||
return;
|
||||
}
|
||||
goto failed;
|
||||
}
|
||||
|
||||
ASSERT (mp->app_event_queue_address);
|
||||
em->our_event_queue = uword_to_pointer (mp->app_event_queue_address,
|
||||
svm_msg_q_t *);
|
||||
|
||||
if (mp->n_fds)
|
||||
{
|
||||
vec_validate (fds, mp->n_fds);
|
||||
vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5);
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT)
|
||||
if (ssvm_segment_attach (0, SSVM_SEGMENT_MEMFD, fds[n_fds++]))
|
||||
goto failed;
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
|
||||
if (ssvm_segment_attach ((char *) mp->segment_name,
|
||||
SSVM_SEGMENT_MEMFD, fds[n_fds++]))
|
||||
goto failed;
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
|
||||
svm_msg_q_set_consumer_eventfd (em->our_event_queue, fds[n_fds++]);
|
||||
|
||||
vec_free (fds);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (ssvm_segment_attach ((char *) mp->segment_name, SSVM_SEGMENT_SHM,
|
||||
-1))
|
||||
goto failed;
|
||||
}
|
||||
|
||||
em->state = STATE_ATTACHED;
|
||||
return;
|
||||
failed:
|
||||
em->state = STATE_FAILED;
|
||||
return;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -28,6 +28,7 @@ libvppcom_la_SOURCES += \
|
||||
vcl/vcl_debug.h \
|
||||
vcl/vcl_event.c \
|
||||
vcl/vcl_private.h \
|
||||
vcl/vcl_private.c \
|
||||
$(libvppinfra_la_SOURCES) \
|
||||
$(libsvm_la_SOURCES) \
|
||||
$(libvlibmemoryclient_la_SOURCES)
|
||||
|
436
src/vcl/ldp.c
436
src/vcl/ldp.c
File diff suppressed because it is too large
Load Diff
@ -61,15 +61,35 @@ static void
|
||||
vcm->app_state = STATE_APP_ENABLED;
|
||||
}
|
||||
|
||||
static int
|
||||
ssvm_segment_attach (char *name, ssvm_segment_type_t type, int fd)
|
||||
{
|
||||
svm_fifo_segment_create_args_t _a, *a = &_a;
|
||||
int rv;
|
||||
|
||||
memset (a, 0, sizeof (*a));
|
||||
a->segment_name = (char *) name;
|
||||
a->segment_type = type;
|
||||
|
||||
if (type == SSVM_SEGMENT_MEMFD)
|
||||
a->memfd_fd = fd;
|
||||
|
||||
if ((rv = svm_fifo_segment_attach (a)))
|
||||
{
|
||||
clib_warning ("svm_fifo_segment_attach ('%s') failed", name);
|
||||
return rv;
|
||||
}
|
||||
vec_reset_length (a->new_segment_indices);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
|
||||
mp)
|
||||
{
|
||||
static svm_fifo_segment_create_args_t _a;
|
||||
svm_fifo_segment_create_args_t *a = &_a;
|
||||
int rv;
|
||||
u32 n_fds = 0;
|
||||
int *fds = 0;
|
||||
|
||||
memset (a, 0, sizeof (*a));
|
||||
if (mp->retval)
|
||||
{
|
||||
clib_warning ("VCL<%d>: attach failed: %U", getpid (),
|
||||
@ -77,30 +97,41 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
|
||||
return;
|
||||
}
|
||||
|
||||
if (mp->segment_name_length == 0)
|
||||
vcm->app_event_queue = uword_to_pointer (mp->app_event_queue_address,
|
||||
svm_msg_q_t *);
|
||||
if (mp->n_fds)
|
||||
{
|
||||
clib_warning ("VCL<%d>: segment_name_length zero", getpid ());
|
||||
return;
|
||||
vec_validate (fds, mp->n_fds);
|
||||
vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5);
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT)
|
||||
if (ssvm_segment_attach ("vpp-mq-seg", SSVM_SEGMENT_MEMFD,
|
||||
fds[n_fds++]))
|
||||
return;
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
|
||||
if (ssvm_segment_attach ((char *) mp->segment_name,
|
||||
SSVM_SEGMENT_MEMFD, fds[n_fds++]))
|
||||
return;
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
|
||||
{
|
||||
svm_msg_q_set_consumer_eventfd (vcm->app_event_queue, fds[n_fds]);
|
||||
if (vcm->mqs_epfd < 0)
|
||||
clib_unix_warning ("epoll_create() returned");
|
||||
vcl_mq_epoll_add_evfd (vcm->app_event_queue);
|
||||
n_fds++;
|
||||
}
|
||||
|
||||
vec_free (fds);
|
||||
}
|
||||
|
||||
a->segment_name = (char *) mp->segment_name;
|
||||
a->segment_size = mp->segment_size;
|
||||
|
||||
ASSERT (mp->app_event_queue_address);
|
||||
|
||||
/* Attach to the segment vpp created */
|
||||
rv = svm_fifo_segment_attach (a);
|
||||
vec_reset_length (a->new_segment_indices);
|
||||
if (PREDICT_FALSE (rv))
|
||||
else
|
||||
{
|
||||
clib_warning ("VCL<%d>: svm_fifo_segment_attach ('%s') failed",
|
||||
getpid (), mp->segment_name);
|
||||
return;
|
||||
if (ssvm_segment_attach ((char *) mp->segment_name, SSVM_SEGMENT_SHM,
|
||||
-1))
|
||||
return;
|
||||
}
|
||||
|
||||
vcm->app_event_queue =
|
||||
uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *);
|
||||
|
||||
vcm->app_state = STATE_APP_ATTACHED;
|
||||
}
|
||||
|
||||
@ -128,18 +159,19 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
|
||||
static void
|
||||
vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
|
||||
{
|
||||
static svm_fifo_segment_create_args_t _a;
|
||||
svm_fifo_segment_create_args_t *a = &_a;
|
||||
int rv;
|
||||
ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
|
||||
int fd = -1;
|
||||
|
||||
vcm->mounting_segment = 1;
|
||||
memset (a, 0, sizeof (*a));
|
||||
a->segment_name = (char *) mp->segment_name;
|
||||
a->segment_size = mp->segment_size;
|
||||
/* Attach to the segment vpp created */
|
||||
rv = svm_fifo_segment_attach (a);
|
||||
vec_reset_length (a->new_segment_indices);
|
||||
if (PREDICT_FALSE (rv))
|
||||
|
||||
if (mp->fd_flags)
|
||||
{
|
||||
vl_socket_client_recv_fd_msg (&fd, 1, 5);
|
||||
seg_type = SSVM_SEGMENT_MEMFD;
|
||||
}
|
||||
|
||||
if (PREDICT_FALSE (ssvm_segment_attach ((char *) mp->segment_name,
|
||||
seg_type, fd)))
|
||||
{
|
||||
clib_warning ("VCL<%d>: svm_fifo_segment_attach ('%s') failed",
|
||||
getpid (), mp->segment_name);
|
||||
@ -163,6 +195,39 @@ vl_api_unmap_segment_t_handler (vl_api_unmap_segment_t * mp)
|
||||
VDBG (1, "Unmapped segment '%s'", mp->segment_name);
|
||||
}
|
||||
|
||||
static void
|
||||
vl_api_app_cut_through_registration_add_t_handler
|
||||
(vl_api_app_cut_through_registration_add_t * mp)
|
||||
{
|
||||
vcl_cut_through_registration_t *ctr;
|
||||
u32 mqc_index = ~0;
|
||||
int *fds = 0;
|
||||
|
||||
if (mp->n_fds)
|
||||
{
|
||||
ASSERT (mp->n_fds == 2);
|
||||
vec_validate (fds, mp->n_fds);
|
||||
vl_socket_client_recv_fd_msg (fds, mp->n_fds, 5);
|
||||
}
|
||||
|
||||
ctr = vcl_ct_registration_lock_and_alloc ();
|
||||
ctr->mq = uword_to_pointer (mp->evt_q_address, svm_msg_q_t *);
|
||||
ctr->peer_mq = uword_to_pointer (mp->peer_evt_q_address, svm_msg_q_t *);
|
||||
VDBG (0, "Adding ct registration %u", vcl_ct_registration_index (ctr));
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
|
||||
{
|
||||
svm_msg_q_set_consumer_eventfd (ctr->mq, fds[0]);
|
||||
svm_msg_q_set_producer_eventfd (ctr->peer_mq, fds[1]);
|
||||
mqc_index = vcl_mq_epoll_add_evfd (ctr->mq);
|
||||
ctr->epoll_evt_conn_index = mqc_index;
|
||||
vec_free (fds);
|
||||
}
|
||||
vcl_ct_registration_lookup_add (mp->evt_q_address,
|
||||
vcl_ct_registration_index (ctr));
|
||||
vcl_ct_registration_unlock ();
|
||||
}
|
||||
|
||||
static void
|
||||
vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
|
||||
{
|
||||
@ -483,19 +548,20 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
|
||||
VCL_SESSION_UNLOCK ();
|
||||
}
|
||||
|
||||
#define foreach_sock_msg \
|
||||
_(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \
|
||||
_(BIND_SOCK_REPLY, bind_sock_reply) \
|
||||
_(UNBIND_SOCK_REPLY, unbind_sock_reply) \
|
||||
_(ACCEPT_SESSION, accept_session) \
|
||||
_(CONNECT_SESSION_REPLY, connect_session_reply) \
|
||||
_(DISCONNECT_SESSION, disconnect_session) \
|
||||
_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
|
||||
_(RESET_SESSION, reset_session) \
|
||||
_(APPLICATION_ATTACH_REPLY, application_attach_reply) \
|
||||
_(APPLICATION_DETACH_REPLY, application_detach_reply) \
|
||||
_(MAP_ANOTHER_SEGMENT, map_another_segment) \
|
||||
_(UNMAP_SEGMENT, unmap_segment)
|
||||
#define foreach_sock_msg \
|
||||
_(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \
|
||||
_(BIND_SOCK_REPLY, bind_sock_reply) \
|
||||
_(UNBIND_SOCK_REPLY, unbind_sock_reply) \
|
||||
_(ACCEPT_SESSION, accept_session) \
|
||||
_(CONNECT_SESSION_REPLY, connect_session_reply) \
|
||||
_(DISCONNECT_SESSION, disconnect_session) \
|
||||
_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
|
||||
_(RESET_SESSION, reset_session) \
|
||||
_(APPLICATION_ATTACH_REPLY, application_attach_reply) \
|
||||
_(APPLICATION_DETACH_REPLY, application_detach_reply) \
|
||||
_(MAP_ANOTHER_SEGMENT, map_another_segment) \
|
||||
_(UNMAP_SEGMENT, unmap_segment) \
|
||||
_(APP_CUT_THROUGH_REGISTRATION_ADD, app_cut_through_registration_add) \
|
||||
|
||||
void
|
||||
vppcom_api_hookup (void)
|
||||
@ -547,7 +613,8 @@ vppcom_app_send_attach (void)
|
||||
(vcm->cfg.app_scope_local ? APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE : 0) |
|
||||
(vcm->cfg.app_scope_global ? APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE : 0) |
|
||||
(app_is_proxy ? APP_OPTIONS_FLAGS_IS_PROXY : 0) |
|
||||
APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS;
|
||||
APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS |
|
||||
(vcm->cfg.use_mq_eventfd ? APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD : 0);
|
||||
bmp->options[APP_OPTIONS_PROXY_TRANSPORT] =
|
||||
(u64) ((vcm->cfg.app_proxy_transport_tcp ? 1 << TRANSPORT_PROTO_TCP : 0) |
|
||||
(vcm->cfg.app_proxy_transport_udp ? 1 << TRANSPORT_PROTO_UDP : 0));
|
||||
@ -709,31 +776,50 @@ vppcom_connect_to_vpp (char *app_name)
|
||||
{
|
||||
api_main_t *am = &api_main;
|
||||
vppcom_cfg_t *vcl_cfg = &vcm->cfg;
|
||||
int rv = VPPCOM_OK;
|
||||
|
||||
if (!vcl_cfg->vpp_api_filename)
|
||||
vcl_cfg->vpp_api_filename = format (0, "/vpe-api%c", 0);
|
||||
|
||||
VDBG (0, "VCL<%d>: app (%s) connecting to VPP api (%s)...",
|
||||
getpid (), app_name, vcl_cfg->vpp_api_filename);
|
||||
|
||||
if (vl_client_connect_to_vlib ((char *) vcl_cfg->vpp_api_filename, app_name,
|
||||
vcm->cfg.vpp_api_q_length) < 0)
|
||||
if (vcl_cfg->vpp_api_socket_name)
|
||||
{
|
||||
clib_warning ("VCL<%d>: app (%s) connect failed!", getpid (), app_name);
|
||||
rv = VPPCOM_ECONNREFUSED;
|
||||
if (vl_socket_client_connect ((char *) vcl_cfg->vpp_api_socket_name,
|
||||
app_name, 0 /* default rx/tx buffer */ ))
|
||||
{
|
||||
clib_warning ("VCL<%d>: app (%s) socket connect failed!",
|
||||
getpid (), app_name);
|
||||
return VPPCOM_ECONNREFUSED;
|
||||
}
|
||||
|
||||
if (vl_socket_client_init_shm (0))
|
||||
{
|
||||
clib_warning ("VCL<%d>: app (%s) init shm failed!",
|
||||
getpid (), app_name);
|
||||
return VPPCOM_ECONNREFUSED;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
vcm->vl_input_queue = am->shmem_hdr->vl_input_queue;
|
||||
vcm->my_client_index = (u32) am->my_client_index;
|
||||
vcm->app_state = STATE_APP_CONN_VPP;
|
||||
if (!vcl_cfg->vpp_api_filename)
|
||||
vcl_cfg->vpp_api_filename = format (0, "/vpe-api%c", 0);
|
||||
|
||||
VDBG (0, "VCL<%d>: app (%s) connecting to VPP api (%s)...", getpid (),
|
||||
app_name, vcl_cfg->vpp_api_filename);
|
||||
|
||||
if (vl_client_connect_to_vlib ((char *) vcl_cfg->vpp_api_filename,
|
||||
app_name, vcm->cfg.vpp_api_q_length) < 0)
|
||||
{
|
||||
clib_warning ("VCL<%d>: app (%s) connect failed!", getpid (),
|
||||
app_name);
|
||||
return VPPCOM_ECONNREFUSED;
|
||||
}
|
||||
|
||||
VDBG (0, "VCL<%d>: app (%s) is connected to VPP!", getpid (), app_name);
|
||||
}
|
||||
|
||||
vcm->vl_input_queue = am->shmem_hdr->vl_input_queue;
|
||||
vcm->my_client_index = (u32) am->my_client_index;
|
||||
vcm->app_state = STATE_APP_CONN_VPP;
|
||||
|
||||
VDBG (0, "VCL<%d>: app (%s) is connected to VPP!", getpid (), app_name);
|
||||
|
||||
vcl_evt (VCL_EVT_INIT, vcm);
|
||||
return rv;
|
||||
return VPPCOM_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
|
File diff suppressed because it is too large
Load Diff
151
src/vcl/vcl_private.c
Normal file
151
src/vcl/vcl_private.c
Normal file
@ -0,0 +1,151 @@
|
||||
/*
|
||||
* Copyright (c) 2018 Cisco and/or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this
|
||||
* You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <vcl/vcl_private.h>
|
||||
|
||||
vcl_cut_through_registration_t *
|
||||
vcl_ct_registration_lock_and_alloc (void)
|
||||
{
|
||||
vcl_cut_through_registration_t *cr;
|
||||
pool_get (vcm->cut_through_registrations, cr);
|
||||
clib_spinlock_lock (&vcm->ct_registration_lock);
|
||||
memset (cr, 0, sizeof (*cr));
|
||||
cr->epoll_evt_conn_index = -1;
|
||||
return cr;
|
||||
}
|
||||
|
||||
u32
|
||||
vcl_ct_registration_index (vcl_cut_through_registration_t * ctr)
|
||||
{
|
||||
return (ctr - vcm->cut_through_registrations);
|
||||
}
|
||||
|
||||
void
|
||||
vcl_ct_registration_unlock (void)
|
||||
{
|
||||
clib_spinlock_unlock (&vcm->ct_registration_lock);
|
||||
}
|
||||
|
||||
vcl_cut_through_registration_t *
|
||||
vcl_ct_registration_get (u32 ctr_index)
|
||||
{
|
||||
if (pool_is_free_index (vcm->cut_through_registrations, ctr_index))
|
||||
return 0;
|
||||
return pool_elt_at_index (vcm->cut_through_registrations, ctr_index);
|
||||
}
|
||||
|
||||
vcl_cut_through_registration_t *
|
||||
vcl_ct_registration_lock_and_lookup (uword mq_addr)
|
||||
{
|
||||
uword *p;
|
||||
clib_spinlock_lock (&vcm->ct_registration_lock);
|
||||
p = hash_get (vcm->ct_registration_by_mq, mq_addr);
|
||||
if (!p)
|
||||
return 0;
|
||||
return vcl_ct_registration_get (p[0]);
|
||||
}
|
||||
|
||||
void
|
||||
vcl_ct_registration_lookup_add (uword mq_addr, u32 ctr_index)
|
||||
{
|
||||
hash_set (vcm->ct_registration_by_mq, mq_addr, ctr_index);
|
||||
}
|
||||
|
||||
void
|
||||
vcl_ct_registration_lookup_del (uword mq_addr)
|
||||
{
|
||||
hash_unset (vcm->ct_registration_by_mq, mq_addr);
|
||||
}
|
||||
|
||||
void
|
||||
vcl_ct_registration_del (vcl_cut_through_registration_t * ctr)
|
||||
{
|
||||
pool_put (vcm->cut_through_registrations, ctr);
|
||||
}
|
||||
|
||||
vcl_mq_evt_conn_t *
|
||||
vcl_mq_evt_conn_alloc (void)
|
||||
{
|
||||
vcl_mq_evt_conn_t *mqc;
|
||||
pool_get (vcm->mq_evt_conns, mqc);
|
||||
memset (mqc, 0, sizeof (*mqc));
|
||||
return mqc;
|
||||
}
|
||||
|
||||
u32
|
||||
vcl_mq_evt_conn_index (vcl_mq_evt_conn_t * mqc)
|
||||
{
|
||||
return (mqc - vcm->mq_evt_conns);
|
||||
}
|
||||
|
||||
vcl_mq_evt_conn_t *
|
||||
vcl_mq_evt_conn_get (u32 mq_conn_idx)
|
||||
{
|
||||
return pool_elt_at_index (vcm->mq_evt_conns, mq_conn_idx);
|
||||
}
|
||||
|
||||
int
|
||||
vcl_mq_epoll_add_evfd (svm_msg_q_t * mq)
|
||||
{
|
||||
struct epoll_event e = { 0 };
|
||||
vcl_mq_evt_conn_t *mqc;
|
||||
u32 mqc_index;
|
||||
int mq_fd;
|
||||
|
||||
mq_fd = svm_msg_q_get_consumer_eventfd (mq);
|
||||
|
||||
if (vcm->mqs_epfd < 0 || mq_fd == -1)
|
||||
return -1;
|
||||
|
||||
mqc = vcl_mq_evt_conn_alloc ();
|
||||
mqc_index = vcl_mq_evt_conn_index (mqc);
|
||||
mqc->mq_fd = mq_fd;
|
||||
mqc->mq = mq;
|
||||
|
||||
e.events = EPOLLIN;
|
||||
e.data.u32 = mqc_index;
|
||||
if (epoll_ctl (vcm->mqs_epfd, EPOLL_CTL_ADD, mq_fd, &e) < 0)
|
||||
{
|
||||
clib_warning ("failed to add mq eventfd to mq epoll fd");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return mqc_index;
|
||||
}
|
||||
|
||||
int
|
||||
vcl_mq_epoll_del_evfd (u32 mqc_index)
|
||||
{
|
||||
vcl_mq_evt_conn_t *mqc;
|
||||
|
||||
if (vcm->mqs_epfd || mqc_index == ~0)
|
||||
return -1;
|
||||
|
||||
mqc = vcl_mq_evt_conn_get (mqc_index);
|
||||
if (epoll_ctl (vcm->mqs_epfd, EPOLL_CTL_DEL, mqc->mq_fd, 0) < 0)
|
||||
{
|
||||
clib_warning ("failed to del mq eventfd to mq epoll fd");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* fd.io coding-style-patch-verification: ON
|
||||
*
|
||||
* Local Variables:
|
||||
* eval: (c-set-style "gnu")
|
||||
* End:
|
||||
*/
|
@ -141,7 +141,6 @@ typedef struct
|
||||
vppcom_epoll_t vep;
|
||||
int libc_epfd;
|
||||
svm_msg_q_t *our_evt_q;
|
||||
u32 ct_registration;
|
||||
u64 options[16];
|
||||
vce_event_handler_reg_t *poll_reg;
|
||||
vcl_session_msg_t *accept_evts_fifo;
|
||||
@ -168,12 +167,14 @@ typedef struct vppcom_cfg_t_
|
||||
u8 app_scope_global;
|
||||
u8 *namespace_id;
|
||||
u64 namespace_secret;
|
||||
u8 use_mq_eventfd;
|
||||
f64 app_timeout;
|
||||
f64 session_timeout;
|
||||
f64 accept_timeout;
|
||||
u32 event_ring_size;
|
||||
char *event_log_path;
|
||||
u8 *vpp_api_filename;
|
||||
u8 *vpp_api_socket_name;
|
||||
} vppcom_cfg_t;
|
||||
|
||||
void vppcom_cfg (vppcom_cfg_t * vcl_cfg);
|
||||
@ -181,9 +182,18 @@ void vppcom_cfg (vppcom_cfg_t * vcl_cfg);
|
||||
typedef struct vcl_cut_through_registration_
|
||||
{
|
||||
svm_msg_q_t *mq;
|
||||
svm_msg_q_t *peer_mq;
|
||||
u32 sid;
|
||||
u32 epoll_evt_conn_index; /*< mq evt connection index part of
|
||||
the mqs evtfd epoll (if used) */
|
||||
} vcl_cut_through_registration_t;
|
||||
|
||||
typedef struct vcl_mq_evt_conn_
|
||||
{
|
||||
svm_msg_q_t *mq;
|
||||
int mq_fd;
|
||||
} vcl_mq_evt_conn_t;
|
||||
|
||||
typedef struct vppcom_main_t_
|
||||
{
|
||||
u8 init;
|
||||
@ -203,6 +213,15 @@ typedef struct vppcom_main_t_
|
||||
clib_spinlock_t sessions_lockp;
|
||||
vcl_session_t *sessions;
|
||||
|
||||
/** Message queues epoll fd. Initialized only if using mqs with eventfds */
|
||||
int mqs_epfd;
|
||||
|
||||
/** Pool of event message queue event connections */
|
||||
vcl_mq_evt_conn_t *mq_evt_conns;
|
||||
|
||||
/** Per worker buffer for receiving mq epoll events */
|
||||
struct epoll_event *mq_events;
|
||||
|
||||
/* Hash table for disconnect processing */
|
||||
uword *session_index_by_vpp_handles;
|
||||
|
||||
@ -214,6 +233,8 @@ typedef struct vppcom_main_t_
|
||||
/* Our event queue */
|
||||
svm_msg_q_t *app_event_queue;
|
||||
|
||||
svm_msg_q_t **vpp_event_queues;
|
||||
|
||||
/* unique segment name counter */
|
||||
u32 unique_segment_index;
|
||||
|
||||
@ -237,6 +258,14 @@ typedef struct vppcom_main_t_
|
||||
/** Pool of cut through registrations */
|
||||
vcl_cut_through_registration_t *cut_through_registrations;
|
||||
|
||||
/** Lock for accessing ct registration pool */
|
||||
clib_spinlock_t ct_registration_lock;
|
||||
|
||||
/** Cut-through registration by mq address hash table */
|
||||
uword *ct_registration_by_mq;
|
||||
|
||||
svm_msg_q_msg_t *mq_msg_vector;
|
||||
|
||||
/** Flag indicating that a new segment is being mounted */
|
||||
volatile u32 mounting_segment;
|
||||
|
||||
@ -284,6 +313,21 @@ do { \
|
||||
|
||||
#define VCL_INVALID_SESSION_INDEX ((u32)~0)
|
||||
|
||||
static inline vcl_session_t *
|
||||
vcl_session_alloc (void)
|
||||
{
|
||||
vcl_session_t *s;
|
||||
pool_get (vcm->sessions, s);
|
||||
memset (s, 0, sizeof (*s));
|
||||
return s;
|
||||
}
|
||||
|
||||
static inline void
|
||||
vcl_session_free (vcl_session_t * s)
|
||||
{
|
||||
pool_put (vcm->sessions, s);
|
||||
}
|
||||
|
||||
static inline vcl_session_t *
|
||||
vcl_session_get (u32 session_index)
|
||||
{
|
||||
@ -374,6 +418,23 @@ vppcom_session_table_lookup_listener (u64 listener_handle)
|
||||
|
||||
const char *vppcom_session_state_str (session_state_t state);
|
||||
|
||||
/*
|
||||
* Helpers
|
||||
*/
|
||||
vcl_cut_through_registration_t *vcl_ct_registration_lock_and_alloc (void);
|
||||
void vcl_ct_registration_del (vcl_cut_through_registration_t * ctr);
|
||||
u32 vcl_ct_registration_index (vcl_cut_through_registration_t * ctr);
|
||||
void vcl_ct_registration_unlock (void);
|
||||
vcl_cut_through_registration_t *vcl_ct_registration_get (u32 ctr_index);
|
||||
vcl_cut_through_registration_t *vcl_ct_registration_lock_and_lookup (uword);
|
||||
void vcl_ct_registration_lookup_add (uword mq_addr, u32 ctr_index);
|
||||
void vcl_ct_registration_lookup_del (uword mq_addr);
|
||||
vcl_mq_evt_conn_t *vcl_mq_evt_conn_alloc (void);
|
||||
u32 vcl_mq_evt_conn_index (vcl_mq_evt_conn_t * mqc);
|
||||
vcl_mq_evt_conn_t *vcl_mq_evt_conn_get (u32 mq_conn_idx);
|
||||
int vcl_mq_epoll_add_evfd (svm_msg_q_t * mq);
|
||||
int vcl_mq_epoll_del_evfd (u32 mqc_index);
|
||||
|
||||
/*
|
||||
* VCL Binary API
|
||||
*/
|
||||
|
@ -629,7 +629,6 @@ main (int argc, char **argv)
|
||||
printf ("SERVER (fd %d): TX (%d bytes) - '%s'\n",
|
||||
conn->fd, tx_bytes, conn->buf);
|
||||
}
|
||||
|
||||
else // Extraneous read data from non-echo tests???
|
||||
{
|
||||
xtra++;
|
||||
|
479
src/vcl/vppcom.c
479
src/vcl/vppcom.c
File diff suppressed because it is too large
Load Diff
@ -31,17 +31,18 @@ extern "C"
|
||||
/*
|
||||
* VPPCOM Public API Definitions, Enums, and Data Structures
|
||||
*/
|
||||
#define INVALID_SESSION_ID (~0)
|
||||
#define VPPCOM_CONF_DEFAULT "/etc/vpp/vcl.conf"
|
||||
#define VPPCOM_ENV_CONF "VCL_CONFIG"
|
||||
#define VPPCOM_ENV_DEBUG "VCL_DEBUG"
|
||||
#define VPPCOM_ENV_API_PREFIX "VCL_API_PREFIX"
|
||||
#define VPPCOM_ENV_APP_PROXY_TRANSPORT_TCP "VCL_APP_PROXY_TRANSPORT_TCP"
|
||||
#define VPPCOM_ENV_APP_PROXY_TRANSPORT_UDP "VCL_APP_PROXY_TRANSPORT_UDP"
|
||||
#define VPPCOM_ENV_APP_NAMESPACE_ID "VCL_APP_NAMESPACE_ID"
|
||||
#define VPPCOM_ENV_APP_NAMESPACE_SECRET "VCL_APP_NAMESPACE_SECRET"
|
||||
#define VPPCOM_ENV_APP_SCOPE_LOCAL "VCL_APP_SCOPE_LOCAL"
|
||||
#define VPPCOM_ENV_APP_SCOPE_GLOBAL "VCL_APP_SCOPE_GLOBAL"
|
||||
#define INVALID_SESSION_ID (~0)
|
||||
#define VPPCOM_CONF_DEFAULT "/etc/vpp/vcl.conf"
|
||||
#define VPPCOM_ENV_CONF "VCL_CONFIG"
|
||||
#define VPPCOM_ENV_DEBUG "VCL_DEBUG"
|
||||
#define VPPCOM_ENV_API_PREFIX "VCL_API_PREFIX"
|
||||
#define VPPCOM_ENV_APP_PROXY_TRANSPORT_TCP "VCL_APP_PROXY_TRANSPORT_TCP"
|
||||
#define VPPCOM_ENV_APP_PROXY_TRANSPORT_UDP "VCL_APP_PROXY_TRANSPORT_UDP"
|
||||
#define VPPCOM_ENV_APP_NAMESPACE_ID "VCL_APP_NAMESPACE_ID"
|
||||
#define VPPCOM_ENV_APP_NAMESPACE_SECRET "VCL_APP_NAMESPACE_SECRET"
|
||||
#define VPPCOM_ENV_APP_SCOPE_LOCAL "VCL_APP_SCOPE_LOCAL"
|
||||
#define VPPCOM_ENV_APP_SCOPE_GLOBAL "VCL_APP_SCOPE_GLOBAL"
|
||||
#define VPPCOM_ENV_VPP_API_SOCKET "VCL_VPP_API_SOCKET"
|
||||
|
||||
typedef enum
|
||||
{
|
||||
@ -250,6 +251,7 @@ extern int vppcom_session_sendto (uint32_t session_index, void *buffer,
|
||||
vppcom_endpt_t * ep);
|
||||
extern int vppcom_poll (vcl_poll_t * vp, uint32_t n_sids,
|
||||
double wait_for_time);
|
||||
extern int vppcom_mq_epoll_fd (void);
|
||||
|
||||
/*
|
||||
* VPPCOM Event Functions
|
||||
|
@ -453,7 +453,7 @@ vl_sock_api_send_fd_msg (int socket_fd, int fds[], int n_fds)
|
||||
{
|
||||
struct msghdr mh = { 0 };
|
||||
struct iovec iov[1];
|
||||
char ctl[CMSG_SPACE (sizeof (int)) * n_fds];
|
||||
char ctl[CMSG_SPACE (sizeof (int) * n_fds)];
|
||||
struct cmsghdr *cmsg;
|
||||
char *msg = "fdmsg";
|
||||
int rv;
|
||||
@ -470,7 +470,7 @@ vl_sock_api_send_fd_msg (int socket_fd, int fds[], int n_fds)
|
||||
cmsg->cmsg_len = CMSG_LEN (sizeof (int) * n_fds);
|
||||
cmsg->cmsg_level = SOL_SOCKET;
|
||||
cmsg->cmsg_type = SCM_RIGHTS;
|
||||
memcpy (CMSG_DATA (cmsg), fds, sizeof (int) * n_fds);
|
||||
clib_memcpy (CMSG_DATA (cmsg), fds, sizeof (int) * n_fds);
|
||||
|
||||
rv = sendmsg (socket_fd, &mh, 0);
|
||||
if (rv < 0)
|
||||
|
@ -312,6 +312,12 @@ application_init (application_t * app, u32 api_client_index, u8 * app_name,
|
||||
}
|
||||
else
|
||||
{
|
||||
if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
|
||||
{
|
||||
clib_warning ("mq eventfds can only be used if socket transport is "
|
||||
"used for api");
|
||||
return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
|
||||
}
|
||||
seg_type = SSVM_SEGMENT_PRIVATE;
|
||||
}
|
||||
|
||||
@ -336,6 +342,8 @@ application_init (application_t * app, u32 api_client_index, u8 * app_name,
|
||||
props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE];
|
||||
if (options[APP_OPTIONS_EVT_QUEUE_SIZE])
|
||||
props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE];
|
||||
if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
|
||||
props->use_mq_eventfd = 1;
|
||||
if (options[APP_OPTIONS_TLS_ENGINE])
|
||||
app->tls_engine = options[APP_OPTIONS_TLS_ENGINE];
|
||||
props->segment_type = seg_type;
|
||||
@ -970,6 +978,8 @@ application_free_local_session (application_t * app, local_session_t * s)
|
||||
local_session_t *
|
||||
application_get_local_session (application_t * app, u32 session_index)
|
||||
{
|
||||
if (pool_is_free_index (app->local_sessions, session_index))
|
||||
return 0;
|
||||
return pool_elt_at_index (app->local_sessions, session_index);
|
||||
}
|
||||
|
||||
@ -1078,6 +1088,23 @@ application_stop_local_listen (application_t * server, session_handle_t lh)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
|
||||
{
|
||||
int fd;
|
||||
|
||||
/*
|
||||
* segment manager initializes only the producer eventds, since vpp is
|
||||
* typically the producer. But for local sessions, we also pass to the
|
||||
* apps the mqs they listen on for events from peer apps, so they are also
|
||||
* consumer fds.
|
||||
*/
|
||||
fd = svm_msg_q_get_producer_eventfd (sq);
|
||||
svm_msg_q_set_consumer_eventfd (sq, fd);
|
||||
fd = svm_msg_q_get_producer_eventfd (cq);
|
||||
svm_msg_q_set_consumer_eventfd (cq, fd);
|
||||
}
|
||||
|
||||
int
|
||||
application_local_session_connect (u32 table_index, application_t * client,
|
||||
application_t * server,
|
||||
@ -1125,8 +1152,12 @@ application_local_session_connect (u32 table_index, application_t * client,
|
||||
return seg_index;
|
||||
}
|
||||
seg = segment_manager_get_segment_w_lock (sm, seg_index);
|
||||
sq = segment_manager_alloc_queue (seg, props->evt_q_size);
|
||||
cq = segment_manager_alloc_queue (seg, cprops->evt_q_size);
|
||||
sq = segment_manager_alloc_queue (seg, props);
|
||||
cq = segment_manager_alloc_queue (seg, cprops);
|
||||
|
||||
if (props->use_mq_eventfd)
|
||||
application_local_session_fix_eventds (sq, cq);
|
||||
|
||||
ls->server_evt_q = pointer_to_uword (sq);
|
||||
ls->client_evt_q = pointer_to_uword (cq);
|
||||
rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
|
||||
@ -1273,7 +1304,7 @@ application_local_session_disconnect (u32 app_index, local_session_t * ls)
|
||||
|
||||
if (app_index == ls->client_index)
|
||||
{
|
||||
send_local_session_disconnect_callback (ls->app_index, ls);
|
||||
mq_send_local_session_disconnected_cb (ls->app_index, ls);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1292,7 +1323,7 @@ application_local_session_disconnect (u32 app_index, local_session_t * ls)
|
||||
}
|
||||
else
|
||||
{
|
||||
send_local_session_disconnect_callback (client->index, ls);
|
||||
mq_send_local_session_disconnected_cb (client->index, ls);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,8 +267,8 @@ application_local_session_listener_has_transport (local_session_t * ls)
|
||||
return (tp != TRANSPORT_PROTO_NONE);
|
||||
}
|
||||
|
||||
void send_local_session_disconnect_callback (u32 app_index,
|
||||
local_session_t * ls);
|
||||
void mq_send_local_session_disconnected_cb (u32 app_index,
|
||||
local_session_t * ls);
|
||||
|
||||
int application_connect (u32 client_index, u32 api_context,
|
||||
session_endpoint_t * sep);
|
||||
|
@ -475,7 +475,7 @@ vnet_application_attach (vnet_app_attach_args_t * a)
|
||||
a->session_cb_vft)))
|
||||
return clib_error_return_code (0, rv, 0, "app init: %d", rv);
|
||||
|
||||
a->app_event_queue_address = pointer_to_uword (app->event_queue);
|
||||
a->app_evt_q = app->event_queue;
|
||||
sm = segment_manager_get (app->first_segment_manager);
|
||||
fs = segment_manager_get_segment_w_lock (sm, 0);
|
||||
|
||||
@ -569,7 +569,18 @@ vnet_disconnect_session (vnet_disconnect_args_t * a)
|
||||
if (session_handle_is_local (a->handle))
|
||||
{
|
||||
local_session_t *ls;
|
||||
ls = application_get_local_session_from_handle (a->handle);
|
||||
|
||||
/* Disconnect reply came to worker 1 not main thread */
|
||||
if (vlib_get_thread_index () == 1)
|
||||
{
|
||||
vlib_rpc_call_main_thread (vnet_disconnect_session, (u8 *) a,
|
||||
sizeof (*a));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!(ls = application_get_local_session_from_handle (a->handle)))
|
||||
return 0;
|
||||
|
||||
if (ls->app_index != a->app_index && ls->client_index != a->app_index)
|
||||
{
|
||||
clib_warning ("app %u is neither client nor server for session %u",
|
||||
|
@ -42,7 +42,7 @@ typedef struct _vnet_app_attach_args_t
|
||||
* Results
|
||||
*/
|
||||
ssvm_private_t *segment;
|
||||
u64 app_event_queue_address;
|
||||
svm_msg_q_t *app_evt_q;
|
||||
u32 app_index;
|
||||
} vnet_app_attach_args_t;
|
||||
|
||||
@ -138,6 +138,7 @@ typedef enum
|
||||
_(USE_GLOBAL_SCOPE, "App can use global session scope") \
|
||||
_(USE_LOCAL_SCOPE, "App can use local session scope") \
|
||||
_(USE_MQ_FOR_CTRL_MSGS, "Use message queue for ctr msgs") \
|
||||
_(EVT_MQ_USE_EVENTFD, "Use eventfds for signaling") \
|
||||
|
||||
typedef enum _app_options
|
||||
{
|
||||
@ -153,6 +154,27 @@ typedef enum _app_options_flags
|
||||
#undef _
|
||||
} app_options_flags_t;
|
||||
|
||||
#define foreach_fd_type \
|
||||
_(VPP_MQ_SEGMENT, "Fd for vpp's event mq segment") \
|
||||
_(MEMFD_SEGMENT, "Fd for memfd segment") \
|
||||
_(MQ_EVENTFD, "Event fd used by message queue") \
|
||||
_(VPP_MQ_EVENTFD, "Event fd used by vpp's message queue") \
|
||||
|
||||
typedef enum session_fd_type_
|
||||
{
|
||||
#define _(sym, str) SESSION_FD_##sym,
|
||||
foreach_fd_type
|
||||
#undef _
|
||||
SESSION_N_FD_TYPE
|
||||
} session_fd_type_t;
|
||||
|
||||
typedef enum session_fd_flag_
|
||||
{
|
||||
#define _(sym, str) SESSION_FD_F_##sym = 1 << SESSION_FD_##sym,
|
||||
foreach_fd_type
|
||||
#undef _
|
||||
} session_fd_flag_t;
|
||||
|
||||
int vnet_bind_uri (vnet_bind_args_t *);
|
||||
int vnet_unbind_uri (vnet_unbind_args_t * a);
|
||||
clib_error_t *vnet_connect_uri (vnet_connect_args_t * a);
|
||||
|
@ -88,7 +88,7 @@ segment_manager_del_segment (segment_manager_t * sm,
|
||||
/**
|
||||
* Removes segment after acquiring writer lock
|
||||
*/
|
||||
always_inline void
|
||||
static inline void
|
||||
segment_manager_lock_and_del_segment (segment_manager_t * sm, u32 fs_index)
|
||||
{
|
||||
svm_fifo_segment_private_t *fs;
|
||||
@ -290,8 +290,7 @@ segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
|
||||
|
||||
segment = segment_manager_get_segment (sm, seg_index);
|
||||
if (i == 0)
|
||||
sm->event_queue = segment_manager_alloc_queue (segment,
|
||||
props->evt_q_size);
|
||||
sm->event_queue = segment_manager_alloc_queue (segment, props);
|
||||
|
||||
svm_fifo_segment_preallocate_fifo_pairs (segment,
|
||||
props->rx_fifo_size,
|
||||
@ -311,8 +310,7 @@ segment_manager_init (segment_manager_t * sm, u32 first_seg_size,
|
||||
return seg_index;
|
||||
}
|
||||
segment = segment_manager_get_segment (sm, seg_index);
|
||||
sm->event_queue = segment_manager_alloc_queue (segment,
|
||||
props->evt_q_size);
|
||||
sm->event_queue = segment_manager_alloc_queue (segment, props);
|
||||
}
|
||||
|
||||
return 0;
|
||||
@ -623,7 +621,7 @@ segment_manager_evt_q_expected_size (u32 q_len)
|
||||
*/
|
||||
svm_msg_q_t *
|
||||
segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
|
||||
u32 queue_size)
|
||||
segment_manager_properties_t * props)
|
||||
{
|
||||
u32 fifo_evt_size, session_evt_size = 256, notif_q_size;
|
||||
svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
|
||||
@ -631,21 +629,27 @@ segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
|
||||
void *oldheap;
|
||||
|
||||
fifo_evt_size = sizeof (session_event_t);
|
||||
notif_q_size = clib_max (16, queue_size >> 4);
|
||||
notif_q_size = clib_max (16, props->evt_q_size >> 4);
|
||||
/* *INDENT-OFF* */
|
||||
svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
|
||||
{queue_size, fifo_evt_size, 0},
|
||||
{props->evt_q_size, fifo_evt_size, 0},
|
||||
{notif_q_size, session_evt_size, 0}
|
||||
};
|
||||
/* *INDENT-ON* */
|
||||
cfg->consumer_pid = 0;
|
||||
cfg->n_rings = 2;
|
||||
cfg->q_nitems = queue_size;
|
||||
cfg->q_nitems = props->evt_q_size;
|
||||
cfg->ring_cfgs = rc;
|
||||
|
||||
oldheap = ssvm_push_heap (segment->ssvm.sh);
|
||||
q = svm_msg_q_alloc (cfg);
|
||||
ssvm_pop_heap (oldheap);
|
||||
|
||||
if (props->use_mq_eventfd)
|
||||
{
|
||||
if (svm_msg_q_alloc_producer_eventfd (q))
|
||||
clib_warning ("failed to alloc eventfd");
|
||||
}
|
||||
return q;
|
||||
}
|
||||
|
||||
|
@ -32,8 +32,10 @@ typedef struct _segment_manager_properties
|
||||
/** Configured additional segment size */
|
||||
u32 add_segment_size;
|
||||
|
||||
/** Flag that indicates if additional segments should be created */
|
||||
u8 add_segment;
|
||||
/** Flags */
|
||||
u8 add_segment:1; /**< can add new segments */
|
||||
u8 use_mq_eventfd:1; /**< use eventfds for mqs */
|
||||
u8 reserved:6;
|
||||
|
||||
/** Segment type: if set to SSVM_N_TYPES, private segments are used */
|
||||
ssvm_segment_type_t segment_type;
|
||||
@ -154,7 +156,8 @@ void segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
|
||||
svm_fifo_t * tx_fifo);
|
||||
u32 segment_manager_evt_q_expected_size (u32 q_size);
|
||||
svm_msg_q_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
|
||||
u32 queue_size);
|
||||
segment_manager_properties_t *
|
||||
props);
|
||||
void segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q);
|
||||
void segment_manager_app_detach (segment_manager_t * sm);
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
option version = "1.0.3";
|
||||
option version = "1.1.0";
|
||||
|
||||
/** \brief client->vpp, attach application to session layer
|
||||
@param client_index - opaque cookie to identify the sender
|
||||
@ -38,6 +38,9 @@ option version = "1.0.3";
|
||||
@param retval - return code for the request
|
||||
@param app_event_queue_address - vpp event queue address or 0 if this
|
||||
connection shouldn't send events
|
||||
@param n_fds - number of fds exchanged
|
||||
@param fd_flags - set of flags that indicate which fds are to be expected
|
||||
over the socket (set only if socket transport available)
|
||||
@param segment_size - size of first shm segment
|
||||
@param segment_name_length - length of segment name
|
||||
@param segment_name - name of segment client needs to attach to
|
||||
@ -46,6 +49,8 @@ define application_attach_reply {
|
||||
u32 context;
|
||||
i32 retval;
|
||||
u64 app_event_queue_address;
|
||||
u8 n_fds;
|
||||
u8 fd_flags;
|
||||
u32 segment_size;
|
||||
u8 segment_name_length;
|
||||
u8 segment_name[128];
|
||||
@ -91,11 +96,16 @@ autoreply define application_detach {
|
||||
/** \brief vpp->client, please map an additional shared memory segment
|
||||
@param client_index - opaque cookie to identify the sender
|
||||
@param context - sender context, to match reply w/ request
|
||||
@param segment_name -
|
||||
@param fd_flags - set of flags that indicate which, if any, fds are
|
||||
to be expected over the socket. This is set only if
|
||||
socket transport available
|
||||
@param segment_size - size of the segment to be mapped
|
||||
@param segment_name - name of the segment to be mapped
|
||||
*/
|
||||
autoreply define map_another_segment {
|
||||
u32 client_index;
|
||||
u32 context;
|
||||
u8 fd_flags;
|
||||
u32 segment_size;
|
||||
u8 segment_name[128];
|
||||
};
|
||||
@ -391,6 +401,26 @@ define connect_session_reply {
|
||||
u16 lcl_port;
|
||||
};
|
||||
|
||||
/** \brief ask app to add a new cut-through registration
|
||||
@param client_index - opaque cookie to identify the sender
|
||||
client to vpp direction only
|
||||
@param context - sender context, to match reply w/ request
|
||||
@param evt_q_address - address of the mq in ssvm segment
|
||||
@param peer_evt_q_address - address of peer's mq in ssvm segment
|
||||
@param n_fds - number of fds exchanged
|
||||
@param fd_flags - flag indicating the fds that will be exchanged over
|
||||
api socket
|
||||
*/
|
||||
autoreply define app_cut_through_registration_add
|
||||
{
|
||||
u32 client_index;
|
||||
u32 context;
|
||||
u64 evt_q_address;
|
||||
u64 peer_evt_q_address;
|
||||
u8 n_fds;
|
||||
u8 fd_flags;
|
||||
};
|
||||
|
||||
/** \brief enable/disable session layer
|
||||
@param client_index - opaque cookie to identify the sender
|
||||
client to vpp direction only
|
||||
|
@ -1245,6 +1245,11 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
|
||||
cfg->q_nitems = evt_q_length;
|
||||
cfg->ring_cfgs = rc;
|
||||
smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
|
||||
if (smm->evt_qs_use_memfd_seg)
|
||||
{
|
||||
if (svm_msg_q_alloc_consumer_eventfd (smm->vpp_event_queues[i]))
|
||||
clib_warning ("eventfd returned");
|
||||
}
|
||||
}
|
||||
|
||||
if (smm->evt_qs_use_memfd_seg)
|
||||
|
@ -60,16 +60,15 @@ _(APPLICATION_TLS_CERT_ADD, application_tls_cert_add) \
|
||||
_(APPLICATION_TLS_KEY_ADD, application_tls_key_add) \
|
||||
|
||||
static int
|
||||
session_send_memfd_fd (vl_api_registration_t * reg, const ssvm_private_t * sp)
|
||||
session_send_fds (vl_api_registration_t * reg, int fds[], int n_fds)
|
||||
{
|
||||
clib_error_t *error;
|
||||
int fd = sp->fd;
|
||||
if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
|
||||
{
|
||||
clib_warning ("can't send memfd fd");
|
||||
return -1;
|
||||
}
|
||||
error = vl_api_send_fd_msg (reg, &fd, 1);
|
||||
error = vl_api_send_fd_msg (reg, fds, n_fds);
|
||||
if (error)
|
||||
{
|
||||
clib_error_report (error);
|
||||
@ -81,8 +80,10 @@ session_send_memfd_fd (vl_api_registration_t * reg, const ssvm_private_t * sp)
|
||||
static int
|
||||
send_add_segment_callback (u32 api_client_index, const ssvm_private_t * sp)
|
||||
{
|
||||
int fds[SESSION_N_FD_TYPE], n_fds = 0;
|
||||
vl_api_map_another_segment_t *mp;
|
||||
vl_api_registration_t *reg;
|
||||
u8 fd_flags = 0;
|
||||
|
||||
reg = vl_mem_api_client_index_to_registration (api_client_index);
|
||||
if (!reg)
|
||||
@ -91,24 +92,31 @@ send_add_segment_callback (u32 api_client_index, const ssvm_private_t * sp)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ssvm_type (sp) == SSVM_SEGMENT_MEMFD
|
||||
&& vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
|
||||
if (ssvm_type (sp) == SSVM_SEGMENT_MEMFD)
|
||||
{
|
||||
clib_warning ("can't send memfd fd");
|
||||
return -1;
|
||||
if (vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
|
||||
{
|
||||
clib_warning ("can't send memfd fd");
|
||||
return -1;
|
||||
}
|
||||
|
||||
fd_flags |= SESSION_FD_F_MEMFD_SEGMENT;
|
||||
fds[n_fds] = sp->fd;
|
||||
n_fds += 1;
|
||||
}
|
||||
|
||||
mp = vl_msg_api_alloc_as_if_client (sizeof (*mp));
|
||||
mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp));
|
||||
memset (mp, 0, sizeof (*mp));
|
||||
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_MAP_ANOTHER_SEGMENT);
|
||||
mp->segment_size = sp->ssvm_size;
|
||||
mp->fd_flags = fd_flags;
|
||||
strncpy ((char *) mp->segment_name, (char *) sp->name,
|
||||
sizeof (mp->segment_name) - 1);
|
||||
|
||||
vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
|
||||
|
||||
if (ssvm_type (sp) == SSVM_SEGMENT_MEMFD)
|
||||
return session_send_memfd_fd (reg, sp);
|
||||
if (n_fds)
|
||||
return session_send_fds (reg, fds, n_fds);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -126,23 +134,58 @@ send_del_segment_callback (u32 api_client_index, const ssvm_private_t * fs)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ssvm_type (fs) == SSVM_SEGMENT_MEMFD
|
||||
&& vl_api_registration_file_index (reg) == VL_API_INVALID_FI)
|
||||
{
|
||||
clib_warning ("can't send memfd fd");
|
||||
return -1;
|
||||
}
|
||||
|
||||
mp = vl_msg_api_alloc_as_if_client (sizeof (*mp));
|
||||
mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp));
|
||||
memset (mp, 0, sizeof (*mp));
|
||||
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_UNMAP_SEGMENT);
|
||||
strncpy ((char *) mp->segment_name, (char *) fs->name,
|
||||
sizeof (mp->segment_name) - 1);
|
||||
strcpy ((char *) mp->segment_name, (char *) fs->name);
|
||||
|
||||
vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
|
||||
|
||||
if (ssvm_type (fs) == SSVM_SEGMENT_MEMFD)
|
||||
return session_send_memfd_fd (reg, fs);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
send_app_cut_through_registration_add (u32 api_client_index, u64 mq_addr,
|
||||
u64 peer_mq_addr)
|
||||
{
|
||||
vl_api_app_cut_through_registration_add_t *mp;
|
||||
vl_api_registration_t *reg;
|
||||
svm_msg_q_t *mq, *peer_mq;
|
||||
int fds[2];
|
||||
|
||||
reg = vl_mem_api_client_index_to_registration (api_client_index);
|
||||
if (!reg)
|
||||
{
|
||||
clib_warning ("no registration: %u", api_client_index);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp));
|
||||
memset (mp, 0, sizeof (*mp));
|
||||
mp->_vl_msg_id =
|
||||
clib_host_to_net_u16 (VL_API_APP_CUT_THROUGH_REGISTRATION_ADD);
|
||||
|
||||
mp->evt_q_address = mq_addr;
|
||||
mp->peer_evt_q_address = peer_mq_addr;
|
||||
|
||||
mq = uword_to_pointer (mq_addr, svm_msg_q_t *);
|
||||
peer_mq = uword_to_pointer (peer_mq_addr, svm_msg_q_t *);
|
||||
|
||||
if (svm_msg_q_get_producer_eventfd (mq) != -1)
|
||||
{
|
||||
mp->fd_flags |= SESSION_FD_F_MQ_EVENTFD;
|
||||
mp->n_fds = 2;
|
||||
/* app will overwrite exactly the fds we pass here. So
|
||||
* when we swap mq with peer_mq (accept vs connect) the
|
||||
* fds will still be valid */
|
||||
fds[0] = svm_msg_q_get_consumer_eventfd (mq);
|
||||
fds[1] = svm_msg_q_get_producer_eventfd (peer_mq);
|
||||
}
|
||||
|
||||
vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
|
||||
|
||||
if (mp->n_fds != 0)
|
||||
session_send_fds (reg, fds, mp->n_fds);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -230,25 +273,25 @@ send_session_accept_callback (stream_session_t * s)
|
||||
}
|
||||
|
||||
void
|
||||
send_local_session_disconnect_callback (u32 app_index, local_session_t * ls)
|
||||
mq_send_local_session_disconnected_cb (u32 app_index, local_session_t * ls)
|
||||
{
|
||||
application_t *app = application_get (app_index);
|
||||
vl_api_disconnect_session_t *mp;
|
||||
vl_api_registration_t *reg;
|
||||
svm_msg_q_msg_t _msg, *msg = &_msg;
|
||||
session_disconnected_msg_t *mp;
|
||||
svm_msg_q_t *app_mq;
|
||||
session_event_t *evt;
|
||||
|
||||
reg = vl_mem_api_client_index_to_registration (app->api_client_index);
|
||||
if (!reg)
|
||||
{
|
||||
clib_warning ("no registration: %u", app->api_client_index);
|
||||
return;
|
||||
}
|
||||
|
||||
mp = vl_mem_api_alloc_as_if_client_w_reg (reg, sizeof (*mp));
|
||||
memset (mp, 0, sizeof (*mp));
|
||||
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION);
|
||||
app_mq = app->event_queue;
|
||||
svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
|
||||
SVM_Q_WAIT, msg);
|
||||
svm_msg_q_unlock (app_mq);
|
||||
evt = svm_msg_q_msg_data (app_mq, msg);
|
||||
memset (evt, 0, sizeof (*evt));
|
||||
evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
|
||||
mp = (session_disconnected_msg_t *) evt->data;
|
||||
mp->handle = application_local_session_handle (ls);
|
||||
mp->context = app->api_client_index;
|
||||
vl_msg_api_send_shmem (reg->vl_input_queue, (u8 *) & mp);
|
||||
svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
|
||||
}
|
||||
|
||||
static void
|
||||
@ -414,6 +457,12 @@ mq_send_session_accepted_cb (stream_session_t * s)
|
||||
{
|
||||
local_session_t *ls = (local_session_t *) s;
|
||||
local_session_t *ll;
|
||||
u8 main_thread = vlib_num_workers ()? 1 : 0;
|
||||
|
||||
send_app_cut_through_registration_add (app->api_client_index,
|
||||
ls->server_evt_q,
|
||||
ls->client_evt_q);
|
||||
|
||||
if (application_local_session_listener_has_transport (ls))
|
||||
{
|
||||
listener = listen_session_get (ls->listener_index);
|
||||
@ -436,7 +485,7 @@ mq_send_session_accepted_cb (stream_session_t * s)
|
||||
}
|
||||
mp->handle = application_local_session_handle (ls);
|
||||
mp->port = ls->port;
|
||||
vpp_queue = session_manager_get_vpp_event_queue (0);
|
||||
vpp_queue = session_manager_get_vpp_event_queue (main_thread);
|
||||
mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
|
||||
mp->client_event_queue_address = ls->client_evt_q;
|
||||
mp->server_event_queue_address = ls->server_evt_q;
|
||||
@ -542,9 +591,15 @@ mq_send_session_connected_cb (u32 app_index, u32 api_context,
|
||||
else
|
||||
{
|
||||
local_session_t *ls = (local_session_t *) s;
|
||||
u8 main_thread = vlib_num_workers ()? 1 : 0;
|
||||
|
||||
send_app_cut_through_registration_add (app->api_client_index,
|
||||
ls->client_evt_q,
|
||||
ls->server_evt_q);
|
||||
|
||||
mp->handle = application_local_session_handle (ls);
|
||||
mp->lcl_port = ls->port;
|
||||
vpp_mq = session_manager_get_vpp_event_queue (0);
|
||||
vpp_mq = session_manager_get_vpp_event_queue (main_thread);
|
||||
mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
|
||||
mp->client_event_queue_address = ls->client_evt_q;
|
||||
mp->server_event_queue_address = ls->server_evt_q;
|
||||
@ -583,12 +638,13 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
|
||||
static void
|
||||
vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
|
||||
{
|
||||
int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
|
||||
vl_api_application_attach_reply_t *rmp;
|
||||
ssvm_private_t *segp, *evt_q_segment;
|
||||
vnet_app_attach_args_t _a, *a = &_a;
|
||||
vl_api_registration_t *reg;
|
||||
clib_error_t *error = 0;
|
||||
int rv = 0;
|
||||
u8 fd_flags = 0;
|
||||
|
||||
reg = vl_api_client_index_to_registration (mp->client_index);
|
||||
if (!reg)
|
||||
@ -629,9 +685,32 @@ vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
|
||||
{
|
||||
rv = clib_error_get_code (error);
|
||||
clib_error_report (error);
|
||||
vec_free (a->namespace_id);
|
||||
goto done;
|
||||
}
|
||||
vec_free (a->namespace_id);
|
||||
|
||||
/* Send event queues segment */
|
||||
if ((evt_q_segment = session_manager_get_evt_q_segment ()))
|
||||
{
|
||||
fd_flags |= SESSION_FD_F_VPP_MQ_SEGMENT;
|
||||
fds[n_fds] = evt_q_segment->fd;
|
||||
n_fds += 1;
|
||||
}
|
||||
/* Send fifo segment fd if needed */
|
||||
if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
|
||||
{
|
||||
fd_flags |= SESSION_FD_F_MEMFD_SEGMENT;
|
||||
fds[n_fds] = a->segment->fd;
|
||||
n_fds += 1;
|
||||
}
|
||||
if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
|
||||
{
|
||||
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
|
||||
fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
|
||||
n_fds += 1;
|
||||
}
|
||||
|
||||
done:
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
@ -646,20 +725,15 @@ done:
|
||||
memcpy (rmp->segment_name, segp->name, vec_len (segp->name));
|
||||
rmp->segment_name_length = vec_len (segp->name);
|
||||
}
|
||||
rmp->app_event_queue_address = a->app_event_queue_address;
|
||||
rmp->app_event_queue_address = pointer_to_uword (a->app_evt_q);
|
||||
rmp->n_fds = n_fds;
|
||||
rmp->fd_flags = fd_flags;
|
||||
}
|
||||
}));
|
||||
/* *INDENT-ON* */
|
||||
|
||||
if (rv)
|
||||
return;
|
||||
|
||||
/* Send fifo segment fd if needed */
|
||||
if (ssvm_type (a->segment) == SSVM_SEGMENT_MEMFD)
|
||||
session_send_memfd_fd (reg, a->segment);
|
||||
/* Send event queues segment */
|
||||
if ((evt_q_segment = session_manager_get_evt_q_segment ()))
|
||||
session_send_memfd_fd (reg, evt_q_segment);
|
||||
if (n_fds)
|
||||
session_send_fds (reg, fds, n_fds);
|
||||
}
|
||||
|
||||
static void
|
||||
|
Reference in New Issue
Block a user