svm: add custom q implementation for mq
Add separate queue implementation for the message queue as it's custom tailored for fifo segments as opposed to binary api. Also move eventfds to the private data structures. Type: refactor Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: I6df0c824ecd94c7904516373f92a9fffc6b04736
This commit is contained in:
Florin Coras
committed by
Dave Barach
parent
15036ad0bc
commit
86f1232dde
@ -437,7 +437,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
|
||||
echo_segment_attach_mq (segment_handle, mp->app_mq, 0, &em->app_mq);
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
|
||||
svm_msg_q_set_consumer_eventfd (em->app_mq, fds[n_fds++]);
|
||||
svm_msg_q_set_eventfd (em->app_mq, fds[n_fds++]);
|
||||
|
||||
vec_free (fds);
|
||||
}
|
||||
|
@ -1839,9 +1839,8 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
|
||||
mq = app_wrk->event_queue;
|
||||
if (use_eventfd)
|
||||
{
|
||||
svm_msg_q_alloc_producer_eventfd (mq);
|
||||
svm_msg_q_alloc_consumer_eventfd (mq);
|
||||
prod_fd = svm_msg_q_get_producer_eventfd (mq);
|
||||
svm_msg_q_alloc_eventfd (mq);
|
||||
prod_fd = svm_msg_q_get_eventfd (mq);
|
||||
SESSION_TEST (prod_fd != -1, "mq producer eventd valid %u", prod_fd);
|
||||
}
|
||||
|
||||
|
@ -1042,7 +1042,7 @@ fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset, u32 mq_index)
|
||||
|
||||
mq = vec_elt_at_index (fs->mqs, mq_index);
|
||||
|
||||
if (!mq->q)
|
||||
if (!mq->q.shr)
|
||||
{
|
||||
svm_msg_q_shared_t *smq;
|
||||
smq = (svm_msg_q_shared_t *) ((u8 *) fs->h + offset);
|
||||
@ -1059,10 +1059,11 @@ fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index)
|
||||
{
|
||||
svm_msg_q_t *mq = vec_elt_at_index (fs->mqs, mq_index);
|
||||
|
||||
if (mq->q == 0)
|
||||
if (mq->q.shr == 0)
|
||||
return ~0ULL;
|
||||
|
||||
return (uword) ((u8 *) mq->q - (u8 *) fs->h) - sizeof (svm_msg_q_shared_t);
|
||||
return (uword) ((u8 *) mq->q.shr - (u8 *) fs->h) -
|
||||
sizeof (svm_msg_q_shared_t);
|
||||
}
|
||||
|
||||
int
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -24,6 +24,25 @@
|
||||
#include <vppinfra/error.h>
|
||||
#include <svm/queue.h>
|
||||
|
||||
typedef struct svm_msg_q_shr_queue_
|
||||
{
|
||||
pthread_mutex_t mutex; /* 8 bytes */
|
||||
pthread_cond_t condvar; /* 8 bytes */
|
||||
u32 head;
|
||||
u32 tail;
|
||||
volatile u32 cursize;
|
||||
u32 maxsize;
|
||||
u32 elsize;
|
||||
u32 pad;
|
||||
u8 data[0];
|
||||
} svm_msg_q_shared_queue_t;
|
||||
|
||||
typedef struct svm_msg_q_queue_
|
||||
{
|
||||
svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
|
||||
int evtfd; /**< producer/consumer eventfd */
|
||||
} svm_msg_q_queue_t;
|
||||
|
||||
typedef struct svm_msg_q_ring_shared_
|
||||
{
|
||||
volatile u32 cursize; /**< current size of the ring */
|
||||
@ -43,14 +62,14 @@ typedef struct svm_msg_q_ring_
|
||||
|
||||
typedef struct svm_msg_q_shared_
|
||||
{
|
||||
u32 n_rings; /**< number of rings after q */
|
||||
u32 pad; /**< 8 byte alignment for q */
|
||||
svm_queue_t q[0]; /**< queue for exchanging messages */
|
||||
u32 n_rings; /**< number of rings after q */
|
||||
u32 pad; /**< 8 byte alignment for q */
|
||||
svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
|
||||
} __clib_packed svm_msg_q_shared_t;
|
||||
|
||||
typedef struct svm_msg_q_
|
||||
{
|
||||
svm_queue_t *q; /**< queue for exchanging messages */
|
||||
svm_msg_q_queue_t q; /**< queue for exchanging messages */
|
||||
svm_msg_q_ring_t *rings; /**< rings with message data*/
|
||||
} __clib_packed svm_msg_q_t;
|
||||
|
||||
@ -232,7 +251,7 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
|
||||
svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
|
||||
|
||||
/**
|
||||
* Set event fd for queue consumer
|
||||
* Set event fd for queue
|
||||
*
|
||||
* If set, queue will exclusively use eventfds for signaling. Moreover,
|
||||
* afterwards, the queue should only be used in non-blocking mode. Waiting
|
||||
@ -241,35 +260,26 @@ svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
|
||||
* @param mq message queue
|
||||
* @param fd consumer eventfd
|
||||
*/
|
||||
void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
|
||||
void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
|
||||
|
||||
/**
|
||||
* Set event fd for queue producer
|
||||
*
|
||||
* If set, queue will exclusively use eventfds for signaling. Moreover,
|
||||
* afterwards, the queue should only be used in non-blocking mode. Waiting
|
||||
* for events should be done externally using something like epoll.
|
||||
*
|
||||
* @param mq message queue
|
||||
* @param fd producer eventfd
|
||||
* Allocate event fd for queue
|
||||
*/
|
||||
void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
|
||||
|
||||
/**
|
||||
* Allocate event fd for queue consumer
|
||||
*/
|
||||
int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
|
||||
|
||||
/**
|
||||
* Allocate event fd for queue consumer
|
||||
*/
|
||||
int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
|
||||
|
||||
int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
|
||||
|
||||
/**
|
||||
* Format message queue, shows msg count for each ring
|
||||
*/
|
||||
u8 *format_svm_msg_q (u8 * s, va_list * args);
|
||||
u8 *format_svm_msg_q (u8 *s, va_list *args);
|
||||
|
||||
/**
|
||||
* Check length of message queue
|
||||
*/
|
||||
static inline u32
|
||||
svm_msg_q_size (svm_msg_q_t *mq)
|
||||
{
|
||||
return clib_atomic_load_relax_n (&mq->q.shr->cursize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if message queue is full
|
||||
@ -277,14 +287,14 @@ u8 *format_svm_msg_q (u8 * s, va_list * args);
|
||||
static inline u8
|
||||
svm_msg_q_is_full (svm_msg_q_t * mq)
|
||||
{
|
||||
return (mq->q->cursize == mq->q->maxsize);
|
||||
return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
|
||||
}
|
||||
|
||||
static inline u8
|
||||
svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
|
||||
{
|
||||
svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
|
||||
return (ring->shr->cursize >= ring->nitems);
|
||||
return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -293,16 +303,7 @@ svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
|
||||
static inline u8
|
||||
svm_msg_q_is_empty (svm_msg_q_t * mq)
|
||||
{
|
||||
return (mq->q->cursize == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check length of message queue
|
||||
*/
|
||||
static inline u32
|
||||
svm_msg_q_size (svm_msg_q_t * mq)
|
||||
{
|
||||
return mq->q->cursize;
|
||||
return (svm_msg_q_size (mq) == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -320,9 +321,9 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
|
||||
static inline int
|
||||
svm_msg_q_try_lock (svm_msg_q_t * mq)
|
||||
{
|
||||
int rv = pthread_mutex_trylock (&mq->q->mutex);
|
||||
int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
|
||||
if (PREDICT_FALSE (rv == EOWNERDEAD))
|
||||
rv = pthread_mutex_consistent (&mq->q->mutex);
|
||||
rv = pthread_mutex_consistent (&mq->q.shr->mutex);
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -332,9 +333,9 @@ svm_msg_q_try_lock (svm_msg_q_t * mq)
|
||||
static inline int
|
||||
svm_msg_q_lock (svm_msg_q_t * mq)
|
||||
{
|
||||
int rv = pthread_mutex_lock (&mq->q->mutex);
|
||||
int rv = pthread_mutex_lock (&mq->q.shr->mutex);
|
||||
if (PREDICT_FALSE (rv == EOWNERDEAD))
|
||||
rv = pthread_mutex_consistent (&mq->q->mutex);
|
||||
rv = pthread_mutex_consistent (&mq->q.shr->mutex);
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -344,7 +345,7 @@ svm_msg_q_lock (svm_msg_q_t * mq)
|
||||
static inline void
|
||||
svm_msg_q_unlock (svm_msg_q_t * mq)
|
||||
{
|
||||
pthread_mutex_unlock (&mq->q->mutex);
|
||||
pthread_mutex_unlock (&mq->q.shr->mutex);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -353,11 +354,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
|
||||
* Must be called with mutex held. The queue only works non-blocking
|
||||
* with eventfds, so handle blocking calls as an exception here.
|
||||
*/
|
||||
static inline void
|
||||
svm_msg_q_wait (svm_msg_q_t * mq)
|
||||
{
|
||||
svm_queue_wait (mq->q);
|
||||
}
|
||||
void svm_msg_q_wait (svm_msg_q_t *mq);
|
||||
|
||||
/**
|
||||
* Timed wait for message queue event
|
||||
@ -367,22 +364,12 @@ svm_msg_q_wait (svm_msg_q_t * mq)
|
||||
* @param mq message queue
|
||||
* @param timeout time in seconds
|
||||
*/
|
||||
static inline int
|
||||
svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
|
||||
{
|
||||
return svm_queue_timedwait (mq->q, timeout);
|
||||
}
|
||||
int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
|
||||
|
||||
static inline int
|
||||
svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
|
||||
svm_msg_q_get_eventfd (svm_msg_q_t *mq)
|
||||
{
|
||||
return mq->q->consumer_evtfd;
|
||||
}
|
||||
|
||||
static inline int
|
||||
svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
|
||||
{
|
||||
return mq->q->producer_evtfd;
|
||||
return mq->q.evtfd;
|
||||
}
|
||||
|
||||
#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
|
||||
|
@ -121,7 +121,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
|
||||
{
|
||||
svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
|
||||
svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
|
||||
vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
|
||||
n_fds++;
|
||||
}
|
||||
@ -215,7 +215,7 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t *
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
|
||||
{
|
||||
svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
|
||||
svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
|
||||
vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
|
||||
n_fds++;
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ vcl_mq_epoll_add_evfd (vcl_worker_t * wrk, svm_msg_q_t * mq)
|
||||
u32 mqc_index;
|
||||
int mq_fd;
|
||||
|
||||
mq_fd = svm_msg_q_get_consumer_eventfd (mq);
|
||||
mq_fd = svm_msg_q_get_eventfd (mq);
|
||||
|
||||
if (wrk->mqs_epfd < 0 || mq_fd == -1)
|
||||
return -1;
|
||||
|
@ -89,8 +89,7 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
|
||||
{
|
||||
svm_msg_q_set_consumer_eventfd (wrk->app_event_queue,
|
||||
fds[n_fds_used++]);
|
||||
svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds_used++]);
|
||||
vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
|
||||
}
|
||||
|
||||
@ -236,7 +235,7 @@ vcl_api_add_del_worker_reply_handler (app_sapi_worker_add_del_reply_msg_t *
|
||||
|
||||
if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
|
||||
{
|
||||
svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
|
||||
svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
|
||||
vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
|
||||
n_fds++;
|
||||
}
|
||||
|
@ -870,7 +870,7 @@ segment_manager_alloc_queue (fifo_segment_t * segment,
|
||||
|
||||
if (props->use_mq_eventfd)
|
||||
{
|
||||
if (svm_msg_q_alloc_producer_eventfd (q))
|
||||
if (svm_msg_q_alloc_eventfd (q))
|
||||
clib_warning ("failed to alloc eventfd");
|
||||
}
|
||||
return q;
|
||||
|
@ -1548,9 +1548,6 @@ session_vpp_event_queues_allocate (session_main_t * smm)
|
||||
cfg->ring_cfgs = rc;
|
||||
|
||||
smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (eqs, i, cfg);
|
||||
|
||||
if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
|
||||
clib_warning ("eventfd returned");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -667,7 +667,7 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
|
||||
if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
|
||||
{
|
||||
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
|
||||
fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
|
||||
fds[n_fds] = svm_msg_q_get_eventfd (a->app_evt_q);
|
||||
n_fds += 1;
|
||||
}
|
||||
|
||||
@ -751,7 +751,7 @@ vl_api_app_worker_add_del_t_handler (vl_api_app_worker_add_del_t * mp)
|
||||
if (application_segment_manager_properties (app)->use_mq_eventfd)
|
||||
{
|
||||
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
|
||||
fds[n_fds] = svm_msg_q_get_producer_eventfd (args.evt_q);
|
||||
fds[n_fds] = svm_msg_q_get_eventfd (args.evt_q);
|
||||
n_fds += 1;
|
||||
}
|
||||
|
||||
@ -1317,7 +1317,7 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
|
||||
if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
|
||||
{
|
||||
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
|
||||
fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
|
||||
fds[n_fds] = svm_msg_q_get_eventfd (a->app_evt_q);
|
||||
n_fds += 1;
|
||||
}
|
||||
|
||||
@ -1426,7 +1426,7 @@ sapi_add_del_worker_handler (app_namespace_t * app_ns,
|
||||
if (application_segment_manager_properties (app)->use_mq_eventfd)
|
||||
{
|
||||
fd_flags |= SESSION_FD_F_MQ_EVENTFD;
|
||||
fds[n_fds] = svm_msg_q_get_producer_eventfd (args.evt_q);
|
||||
fds[n_fds] = svm_msg_q_get_eventfd (args.evt_q);
|
||||
n_fds += 1;
|
||||
}
|
||||
|
||||
|
@ -123,6 +123,7 @@ dump_thread_0_event_queue (void)
|
||||
vlib_main_t *vm = &vlib_global_main;
|
||||
u32 my_thread_index = vm->thread_index;
|
||||
session_event_t _e, *e = &_e;
|
||||
svm_msg_q_shared_queue_t *sq;
|
||||
svm_msg_q_ring_t *ring;
|
||||
session_t *s0;
|
||||
svm_msg_q_msg_t *msg;
|
||||
@ -130,11 +131,12 @@ dump_thread_0_event_queue (void)
|
||||
int i, index;
|
||||
|
||||
mq = session_main_get_vpp_event_queue (my_thread_index);
|
||||
index = mq->q->head;
|
||||
sq = mq->q.shr;
|
||||
index = sq->head;
|
||||
|
||||
for (i = 0; i < mq->q->cursize; i++)
|
||||
for (i = 0; i < sq->cursize; i++)
|
||||
{
|
||||
msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
|
||||
msg = (svm_msg_q_msg_t *) (&sq->data[0] + sq->elsize * index);
|
||||
ring = svm_msg_q_ring (mq, msg->ring_index);
|
||||
clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
|
||||
|
||||
@ -170,7 +172,7 @@ dump_thread_0_event_queue (void)
|
||||
|
||||
index++;
|
||||
|
||||
if (index == mq->q->maxsize)
|
||||
if (index == sq->maxsize)
|
||||
index = 0;
|
||||
}
|
||||
}
|
||||
@ -210,6 +212,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
|
||||
u8
|
||||
session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
|
||||
{
|
||||
svm_msg_q_shared_queue_t *sq;
|
||||
session_evt_elt_t *elt;
|
||||
session_worker_t *wrk;
|
||||
int i, index, found = 0;
|
||||
@ -226,16 +229,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
|
||||
* Search evt queue
|
||||
*/
|
||||
mq = wrk->vpp_event_queue;
|
||||
index = mq->q->head;
|
||||
for (i = 0; i < mq->q->cursize; i++)
|
||||
sq = mq->q.shr;
|
||||
index = sq->head;
|
||||
for (i = 0; i < sq->cursize; i++)
|
||||
{
|
||||
msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
|
||||
msg = (svm_msg_q_msg_t *) (&sq->data[0] + sq->elsize * index);
|
||||
ring = svm_msg_q_ring (mq, msg->ring_index);
|
||||
clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
|
||||
found = session_node_cmp_event (e, f);
|
||||
if (found)
|
||||
return 1;
|
||||
index = (index + 1) % mq->q->maxsize;
|
||||
index = (index + 1) % sq->maxsize;
|
||||
}
|
||||
/*
|
||||
* Search pending events vector
|
||||
|
Reference in New Issue
Block a user