session: async rx event notifications

Move from synchronous flushing of io and ctrl events from transports to
applications to an async model via a new session_input input node that
runs in interrupt mode. Events are coalesced per application worker.

On the one hand, this helps by minimizing message queue locking churn.
And on the other, it opens the possibility for further optimizations of
event message generation, obviates need for rx rescheduling rpcs and is
a first step towards a fully async data/io rx path.

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Id6bebcb65fc9feef8aa02ddf1af6d9ba6f6745ce
This commit is contained in:
Florin Coras
2022-12-22 15:03:44 -08:00
committed by Dave Barach
parent 6d733a93b2
commit 0242d30fc7
23 changed files with 950 additions and 630 deletions

View File

@ -706,10 +706,8 @@ ec_session_rx_callback (session_t *s)
receive_data_chunk (wrk, es);
if (svm_fifo_max_dequeue_cons (s->rx_fifo))
{
if (svm_fifo_set_event (s->rx_fifo))
session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
}
session_enqueue_notify (s);
return 0;
}

View File

@ -228,6 +228,8 @@ echo_server_rx_callback (session_t * s)
/* Program self-tap to retry */
if (svm_fifo_set_event (rx_fifo))
{
/* TODO should be session_enqueue_notify(s) but quic tests seem
* to fail if that's the case */
if (session_send_io_evt_to_thread (rx_fifo,
SESSION_IO_EVT_BUILTIN_RX))
clib_warning ("failed to enqueue self-tap");

View File

@ -503,7 +503,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
hc->http_state = HTTP_STATE_WAIT_APP;
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
app_worker_rx_notify (app_wrk, as);
return HTTP_SM_STOP;
@ -777,7 +777,7 @@ state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp)
}
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
app_worker_rx_notify (app_wrk, as);
return HTTP_SM_STOP;
}
@ -808,7 +808,7 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as)
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
ASSERT (app_wrk);
app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
app_worker_rx_notify (app_wrk, as);
return 1;
}
@ -864,8 +864,9 @@ maybe_reschedule:
if (hc->rx_buf_offset < vec_len (hc->rx_buf) ||
svm_fifo_max_dequeue_cons (ts->rx_fifo))
{
/* TODO is the flag really needed? */
if (svm_fifo_set_event (ts->rx_fifo))
session_send_io_evt_to_thread (ts->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
session_enqueue_notify (ts);
}
return HTTP_SM_CONTINUE;
}

View File

@ -830,7 +830,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
size_t len)
{
QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
u32 max_enq, rv;
u32 max_enq;
quic_ctx_t *sctx;
session_t *stream_session;
app_worker_t *app_wrk;
@ -895,10 +895,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
{
rv = app_worker_lock_and_send_event (app_wrk, stream_session,
SESSION_IO_EVT_RX);
if (rv)
QUIC_ERR ("Failed to ping app for RX");
app_worker_rx_notify (app_wrk, stream_session);
}
quic_ack_rx_data (stream_session);
}

View File

@ -309,8 +309,7 @@ done:
int
srtp_add_vpp_q_builtin_rx_evt (session_t *s)
{
if (svm_fifo_set_event (s->rx_fifo))
session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
session_enqueue_notify (s);
return 0;
}
@ -320,7 +319,7 @@ srtp_notify_app_enqueue (srtp_tc_t *ctx, session_t *app_session)
app_worker_t *app_wrk;
app_wrk = app_worker_get_if_valid (app_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX);
app_worker_rx_notify (app_wrk, app_session);
}
static inline int

View File

@ -1771,6 +1771,74 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd)
}
}
/* Used to be part of application_worker.c prior to adding support for
* async rx
*/
static int
test_mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
svm_msg_q_msg_t *msg)
{
int rv, n_try = 0;
while (n_try < 75)
{
rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
if (!rv)
return 0;
/*
* Break the loop if mq is full, usually this is because the
* app has crashed or is hanging on somewhere.
*/
if (rv != -1)
break;
n_try += 1;
usleep (1);
}
return -1;
}
/* Used to be part of application_worker.c prior to adding support for
* async rx and was used for delivering io events over mq
* NB: removed handling of mq congestion
*/
static inline int
test_app_send_io_evt_rx (app_worker_t *app_wrk, session_t *s)
{
svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
session_event_t *evt;
svm_msg_q_t *mq;
u32 app_session;
int rv;
if (app_worker_application_is_builtin (app_wrk))
return app_worker_rx_notify (app_wrk, s);
if (svm_fifo_has_event (s->rx_fifo))
return 0;
app_session = s->rx_fifo->shr->client_session_index;
mq = app_wrk->event_queue;
rv = test_mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
if (PREDICT_FALSE (rv))
{
clib_warning ("failed to alloc mq message");
return -1;
}
evt = svm_msg_q_msg_data (mq, mq_msg);
evt->event_type = SESSION_IO_EVT_RX;
evt->session_index = app_session;
(void) svm_fifo_set_event (s->rx_fifo);
svm_msg_q_add_and_unlock (mq, mq_msg);
return 0;
}
static int
session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
{
@ -1885,7 +1953,7 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
{
while (svm_fifo_has_event (rx_fifo))
;
app_worker_lock_and_send_event (app_wrk, &s, SESSION_IO_EVT_RX);
test_app_send_io_evt_rx (app_wrk, &s);
}
}

View File

@ -340,15 +340,15 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
return (dist1 < dist2);
}
static void
svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
void
svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
{
svm_msg_q_shared_queue_t *sq = mq->q.shr;
i8 *tailp;
u32 sz;
tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
clib_memcpy_fast (tailp, elem, sq->elsize);
clib_memcpy_fast (tailp, msg, sq->elsize);
sq->tail = (sq->tail + 1) % sq->maxsize;
@ -381,7 +381,7 @@ svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
svm_msg_q_wait_prod (mq);
}
svm_msg_q_add_raw (mq, (u8 *) msg);
svm_msg_q_add_raw (mq, msg);
svm_msg_q_unlock (mq);
@ -392,7 +392,7 @@ void
svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
svm_msg_q_add_raw (mq, (u8 *) msg);
svm_msg_q_add_raw (mq, msg);
svm_msg_q_unlock (mq);
}

View File

@ -190,6 +190,17 @@ int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
*/
void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
/**
* Producer enqueue one message to queue
*
* Must be called with mq locked. Prior to calling this, the producer should've
* obtained a message buffer from one of the rings.
*
* @param mq message queue
* @param msg message to be enqueued
*/
void svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg);
/**
* Producer enqueue one message to queue
*

View File

@ -1018,6 +1018,7 @@ list(APPEND VNET_SOURCES
session/session_rules_table.c
session/session_lookup.c
session/session_node.c
session/session_input.c
session/transport.c
session/application.c
session/application_worker.c

View File

@ -725,6 +725,12 @@ application_get_if_valid (u32 app_index)
return pool_elt_at_index (app_main.app_pool, app_index);
}
static int
_null_app_tx_callback (session_t *s)
{
return 0;
}
static void
application_verify_cb_fns (session_cb_vft_t * cb_fns)
{
@ -736,6 +742,8 @@ application_verify_cb_fns (session_cb_vft_t * cb_fns)
clib_warning ("No session disconnect callback function provided");
if (cb_fns->session_reset_callback == 0)
clib_warning ("No session reset callback function provided");
if (!cb_fns->builtin_app_tx_callback)
cb_fns->builtin_app_tx_callback = _null_app_tx_callback;
}
/**

View File

@ -77,17 +77,17 @@ typedef struct app_worker_
/** Pool of half-open session handles. Tracked in case worker detaches */
session_handle_t *half_open_table;
/* Per vpp worker fifos of events for app worker */
session_event_t **wrk_evts;
/* Vector of vpp workers mq congestion flags */
u8 *wrk_mq_congested;
/** Protects detached seg managers */
clib_spinlock_t detached_seg_managers_lock;
/** Vector of detached listener segment managers */
u32 *detached_seg_managers;
/** Fifo of messages postponed because of mq congestion */
app_wrk_postponed_msg_t *postponed_mq_msgs;
/** Lock to add/sub message from ref @postponed_mq_msgs */
clib_spinlock_t postponed_mq_msgs_lock;
} app_worker_t;
typedef struct app_worker_map_
@ -317,6 +317,12 @@ void application_enable_rx_mqs_nodes (u8 is_en);
* App worker
*/
always_inline u8
app_worker_mq_is_congested (app_worker_t *app_wrk)
{
return app_wrk->mq_congested > 0;
}
app_worker_t *app_worker_alloc (application_t * app);
int application_alloc_worker_and_init (application_t * app,
app_worker_t ** wrk);
@ -331,6 +337,10 @@ session_error_t app_worker_start_listen (app_worker_t *app_wrk,
app_listener_t *lstnr);
int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al);
int app_worker_init_accepted (session_t * s);
int app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
u32 opaque, int err);
int app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
u32 opaque, session_error_t err);
int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s);
int app_worker_init_connected (app_worker_t * app_wrk, session_t * s);
int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
@ -343,13 +353,21 @@ int app_worker_transport_closed_notify (app_worker_t * app_wrk,
int app_worker_reset_notify (app_worker_t * app_wrk, session_t * s);
int app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
session_cleanup_ntf_t ntf);
int app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
session_cleanup_ntf_t ntf,
void (*cleanup_cb) (session_t *s));
int app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
session_handle_t new_sh);
int app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s);
int app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s);
int app_worker_rx_notify (app_worker_t *app_wrk, session_t *s);
int app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
svm_fifo_t * f,
session_ft_action_t act, u32 len);
void app_worker_add_event (app_worker_t *app_wrk, session_t *s,
session_evt_type_t evt_type);
void app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
session_event_t *evt);
int app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index);
void app_worker_del_all_events (app_worker_t *app_wrk);
segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
session_t *);
segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
@ -364,9 +382,10 @@ void app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
u32 msg_len, int fd);
void app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
u32 msg_len);
int app_worker_send_event (app_worker_t * app, session_t * s, u8 evt);
int app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
u8 evt_type);
u8 app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index);
void app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index);
void app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk,
u32 thread_index);
session_t *app_worker_proxy_listener (app_worker_t * app, u8 fib_proto,
u8 transport_proto);
void app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index);
@ -395,6 +414,12 @@ void sapi_socket_close_w_handle (u32 api_handle);
crypto_engine_type_t app_crypto_engine_type_add (void);
u8 app_crypto_engine_n_types (void);
static inline u8
app_worker_application_is_builtin (app_worker_t *app_wrk)
{
return app_wrk->app_is_builtin;
}
#endif /* SRC_VNET_SESSION_APPLICATION_H_ */
/*

View File

@ -1027,6 +1027,17 @@ ct_close_is_reset (ct_connection_t *ct, session_t *s)
return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
}
static void
ct_session_cleanup_server_session (session_t *s)
{
ct_connection_t *ct;
ct = (ct_connection_t *) session_get_transport (s);
ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
session_free (s);
ct_connection_free (ct);
}
static void
ct_session_postponed_cleanup (ct_connection_t *ct)
{
@ -1047,33 +1058,38 @@ ct_session_postponed_cleanup (ct_connection_t *ct)
}
session_transport_closed_notify (&ct->connection);
/* It would be cleaner to call session_transport_delete_notify
* but then we can't control session cleanup lower */
session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
if (app_wrk)
app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
if (ct->flags & CT_CONN_F_CLIENT)
{
if (app_wrk)
app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
/* Normal free for client session as the fifos are allocated through
* the connects segment manager in a segment that's not shared with
* the server */
ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
session_free_w_fifos (s);
session_program_cleanup (s);
ct_connection_free (ct);
}
else
{
/* Manual session and fifo segment cleanup to avoid implicit
* segment manager cleanups and notifications */
app_wrk = app_worker_get_if_valid (s->app_wrk_index);
if (app_wrk)
{
app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
/* Remove custom cleanup notify infra when/if switching to normal
* session cleanup. Note that ct is freed in the cb function */
app_worker_cleanup_notify_custom (app_wrk, s,
SESSION_CLEANUP_SESSION,
ct_session_cleanup_server_session);
}
else
{
ct_connection_free (ct);
}
ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
session_free (s);
}
ct_connection_free (ct);
}
static void

File diff suppressed because it is too large Load Diff

View File

@ -866,7 +866,7 @@ segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
/* Thread that allocated the fifos must be the one to clean them up */
ASSERT (rx_fifo->master_thread_index == vlib_get_thread_index () ||
rx_fifo->refcnt > 1);
rx_fifo->refcnt > 1 || vlib_thread_is_main_w_barrier ());
/* It's possible to have no segment manager if the session was removed
* as result of a detach. */

File diff suppressed because it is too large Load Diff

View File

@ -100,8 +100,8 @@ typedef struct session_worker_
/** Convenience pointer to this worker's vlib_main */
vlib_main_t *vm;
/** Per-proto vector of sessions to enqueue */
u32 **session_to_enqueue;
/** Per-proto vector of session handles to enqueue */
session_handle_t **session_to_enqueue;
/** Timerfd used to periodically signal wrk session queue node */
int timerfd;
@ -148,6 +148,9 @@ typedef struct session_worker_
/** List head for first worker evts pending handling on main */
clib_llist_index_t evts_pending_main;
/** Per-app-worker bitmap of pending notifications */
uword *app_wrks_pending_ntf;
int config_index;
u8 dma_enabled;
session_dma_transfer *dma_trans;
@ -275,6 +278,7 @@ typedef struct session_main_
extern session_main_t session_main;
extern vlib_node_registration_t session_queue_node;
extern vlib_node_registration_t session_input_node;
extern vlib_node_registration_t session_queue_process_node;
extern vlib_node_registration_t session_queue_pre_input_node;
@ -358,7 +362,8 @@ int session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq);
session_t *session_alloc (u32 thread_index);
void session_free (session_t * s);
void session_free_w_fifos (session_t * s);
void session_cleanup (session_t *s);
void session_program_cleanup (session_t *s);
void session_cleanup_half_open (session_handle_t ho_handle);
u8 session_is_valid (u32 si, u8 thread_index);
@ -452,8 +457,9 @@ void session_transport_reset (session_t * s);
void session_transport_cleanup (session_t * s);
int session_send_io_evt_to_thread (svm_fifo_t * f,
session_evt_type_t evt_type);
int session_enqueue_notify (session_t * s);
int session_enqueue_notify (session_t *s);
int session_dequeue_notify (session_t * s);
int session_enqueue_notify_cl (session_t *s);
int session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
session_evt_type_t evt_type);
void session_send_rpc_evt_to_thread (u32 thread_index, void *fp,
@ -485,6 +491,10 @@ int session_enqueue_dgram_connection (session_t * s,
session_dgram_hdr_t * hdr,
vlib_buffer_t * b, u8 proto,
u8 queue_event);
int session_enqueue_dgram_connection_cl (session_t *s,
session_dgram_hdr_t *hdr,
vlib_buffer_t *b, u8 proto,
u8 queue_event);
int session_stream_connect_notify (transport_connection_t * tc,
session_error_t err);
int session_dgram_connect_notify (transport_connection_t * tc,
@ -502,6 +512,7 @@ int session_stream_accept (transport_connection_t * tc, u32 listener_index,
u32 thread_index, u8 notify);
int session_dgram_accept (transport_connection_t * tc, u32 listener_index,
u32 thread_index);
/**
* Initialize session layer for given transport proto and ip version
*
@ -765,8 +776,8 @@ do { \
return clib_error_return (0, "session layer is not enabled"); \
} while (0)
int session_main_flush_enqueue_events (u8 proto, u32 thread_index);
int session_main_flush_all_enqueue_events (u8 transport_proto);
void session_main_flush_enqueue_events (transport_proto_t transport_proto,
u32 thread_index);
void session_queue_run_on_main_thread (vlib_main_t * vm);
/**
@ -799,6 +810,8 @@ fifo_segment_t *session_main_get_wrk_mqs_segment (void);
void session_node_enable_disable (u8 is_en);
clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
void session_wrk_handle_evts_main_rpc (void *);
void session_wrk_program_app_wrk_evts (session_worker_t *wrk,
u32 app_wrk_index);
session_t *session_alloc_for_connection (transport_connection_t * tc);
session_t *session_alloc_for_half_open (transport_connection_t *tc);

View File

@ -460,6 +460,52 @@ mq_send_session_cleanup_cb (session_t * s, session_cleanup_ntf_t ntf)
app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_CLEANUP, &m, sizeof (m));
}
static int
mq_send_io_rx_event (session_t *s)
{
session_event_t *mq_evt;
svm_msg_q_msg_t mq_msg;
app_worker_t *app_wrk;
svm_msg_q_t *mq;
if (svm_fifo_has_event (s->rx_fifo))
return 0;
app_wrk = app_worker_get (s->app_wrk_index);
mq = app_wrk->event_queue;
mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
mq_evt = svm_msg_q_msg_data (mq, &mq_msg);
mq_evt->event_type = SESSION_IO_EVT_RX;
mq_evt->session_index = s->rx_fifo->shr->client_session_index;
(void) svm_fifo_set_event (s->rx_fifo);
svm_msg_q_add_raw (mq, &mq_msg);
return 0;
}
static int
mq_send_io_tx_event (session_t *s)
{
app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
svm_msg_q_t *mq = app_wrk->event_queue;
session_event_t *mq_evt;
svm_msg_q_msg_t mq_msg;
mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
mq_evt = svm_msg_q_msg_data (mq, &mq_msg);
mq_evt->event_type = SESSION_IO_EVT_TX;
mq_evt->session_index = s->tx_fifo->shr->client_session_index;
svm_msg_q_add_raw (mq, &mq_msg);
return 0;
}
static session_cb_vft_t session_mq_cb_vft = {
.session_accept_callback = mq_send_session_accepted_cb,
.session_disconnect_callback = mq_send_session_disconnected_cb,
@ -469,6 +515,8 @@ static session_cb_vft_t session_mq_cb_vft = {
.session_cleanup_callback = mq_send_session_cleanup_cb,
.add_segment_callback = mq_send_add_segment_cb,
.del_segment_callback = mq_send_del_segment_cb,
.builtin_app_rx_callback = mq_send_io_rx_event,
.builtin_app_tx_callback = mq_send_io_tx_event,
};
static void
@ -1246,6 +1294,8 @@ static session_cb_vft_t session_mq_sapi_cb_vft = {
.session_cleanup_callback = mq_send_session_cleanup_cb,
.add_segment_callback = mq_send_add_segment_sapi_cb,
.del_segment_callback = mq_send_del_segment_sapi_cb,
.builtin_app_rx_callback = mq_send_io_rx_event,
.builtin_app_tx_callback = mq_send_io_tx_event,
};
static void

View File

@ -0,0 +1,296 @@
/* SPDX-License-Identifier: Apache-2.0
* Copyright(c) 2023 Cisco Systems, Inc.
*/
#include <vnet/session/session.h>
#include <vnet/session/application.h>
static inline int
mq_try_lock (svm_msg_q_t *mq)
{
int rv, n_try = 0;
while (n_try < 100)
{
rv = svm_msg_q_try_lock (mq);
if (!rv)
return 0;
n_try += 1;
usleep (1);
}
return -1;
}
always_inline u8
mq_event_ring_index (session_evt_type_t et)
{
return (et >= SESSION_CTRL_EVT_RPC ? SESSION_MQ_CTRL_EVT_RING :
SESSION_MQ_IO_EVT_RING);
}
void
app_worker_del_all_events (app_worker_t *app_wrk)
{
session_worker_t *wrk;
session_event_t *evt;
u32 thread_index;
session_t *s;
for (thread_index = 0; thread_index < vec_len (app_wrk->wrk_evts);
thread_index++)
{
while (clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
{
clib_fifo_sub2 (app_wrk->wrk_evts[thread_index], evt);
switch (evt->event_type)
{
case SESSION_CTRL_EVT_MIGRATED:
s = session_get (evt->session_index, thread_index);
transport_cleanup (session_get_transport_proto (s),
s->connection_index, s->thread_index);
session_free (s);
break;
case SESSION_CTRL_EVT_CLEANUP:
s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
break;
uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
break;
case SESSION_CTRL_EVT_HALF_CLEANUP:
s = ho_session_get (evt->session_index);
pool_put_index (app_wrk->half_open_table, s->ho_index);
session_free (s);
break;
default:
break;
}
}
wrk = session_main_get_worker (thread_index);
clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
}
}
always_inline int
app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index,
u8 is_builtin)
{
application_t *app = application_get (app_wrk->app_index);
svm_msg_q_t *mq = app_wrk->event_queue;
session_event_t *evt;
u32 n_evts = 128, i;
u8 ring_index, mq_is_cong;
session_t *s;
n_evts = clib_min (n_evts, clib_fifo_elts (app_wrk->wrk_evts[thread_index]));
if (!is_builtin)
{
mq_is_cong = app_worker_mq_is_congested (app_wrk);
if (mq_try_lock (mq))
{
app_worker_set_mq_wrk_congested (app_wrk, thread_index);
return 0;
}
}
for (i = 0; i < n_evts; i++)
{
evt = clib_fifo_head (app_wrk->wrk_evts[thread_index]);
if (!is_builtin)
{
ring_index = mq_event_ring_index (evt->event_type);
if (svm_msg_q_or_ring_is_full (mq, ring_index))
{
app_worker_set_mq_wrk_congested (app_wrk, thread_index);
break;
}
}
switch (evt->event_type)
{
case SESSION_IO_EVT_RX:
s = session_get (evt->session_index, thread_index);
s->flags &= ~SESSION_F_RX_EVT;
/* Application didn't confirm accept yet */
if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
break;
app->cb_fns.builtin_app_rx_callback (s);
break;
/* Handle sessions that might not be on current thread */
case SESSION_IO_EVT_BUILTIN_RX:
s = session_get_from_handle_if_valid (evt->session_handle);
if (!s || s->session_state == SESSION_STATE_ACCEPTING)
break;
app->cb_fns.builtin_app_rx_callback (s);
break;
case SESSION_IO_EVT_TX:
s = session_get (evt->session_index, thread_index);
app->cb_fns.builtin_app_tx_callback (s);
break;
case SESSION_IO_EVT_TX_MAIN:
s = session_get_from_handle_if_valid (evt->session_handle);
if (!s)
break;
app->cb_fns.builtin_app_tx_callback (s);
break;
case SESSION_CTRL_EVT_BOUND:
/* No app cb function currently */
if (is_builtin)
break;
mq_send_session_bound_cb (app_wrk->wrk_index, evt->as_u64[1] >> 32,
evt->session_handle,
evt->as_u64[1] & 0xffffffff);
break;
case SESSION_CTRL_EVT_ACCEPTED:
s = session_get (evt->session_index, thread_index);
app->cb_fns.session_accept_callback (s);
break;
case SESSION_CTRL_EVT_CONNECTED:
if (!(evt->as_u64[1] & 0xffffffff))
s = session_get (evt->session_index, thread_index);
else
s = 0;
app->cb_fns.session_connected_callback (app_wrk->wrk_index,
evt->as_u64[1] >> 32, s,
evt->as_u64[1] & 0xffffffff);
break;
case SESSION_CTRL_EVT_DISCONNECTED:
s = session_get (evt->session_index, thread_index);
app->cb_fns.session_disconnect_callback (s);
break;
case SESSION_CTRL_EVT_RESET:
s = session_get (evt->session_index, thread_index);
app->cb_fns.session_reset_callback (s);
break;
case SESSION_CTRL_EVT_UNLISTEN_REPLY:
if (is_builtin)
break;
mq_send_unlisten_reply (app_wrk, evt->session_handle,
evt->as_u64[1] >> 32,
evt->as_u64[1] & 0xffffffff);
break;
case SESSION_CTRL_EVT_MIGRATED:
s = session_get (evt->session_index, thread_index);
app->cb_fns.session_migrate_callback (s, evt->as_u64[1]);
transport_cleanup (session_get_transport_proto (s),
s->connection_index, s->thread_index);
session_free (s);
/* Notify app that it has data on the new session */
s = session_get_from_handle (evt->as_u64[1]);
session_send_io_evt_to_thread (s->rx_fifo,
SESSION_IO_EVT_BUILTIN_RX);
break;
case SESSION_CTRL_EVT_TRANSPORT_CLOSED:
s = session_get (evt->session_index, thread_index);
if (app->cb_fns.session_transport_closed_callback)
app->cb_fns.session_transport_closed_callback (s);
break;
case SESSION_CTRL_EVT_CLEANUP:
s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
if (app->cb_fns.session_cleanup_callback)
app->cb_fns.session_cleanup_callback (s, evt->as_u64[0] >> 32);
if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
break;
uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
break;
case SESSION_CTRL_EVT_HALF_CLEANUP:
s = ho_session_get (evt->session_index);
ASSERT (session_vlib_thread_is_cl_thread ());
if (app->cb_fns.half_open_cleanup_callback)
app->cb_fns.half_open_cleanup_callback (s);
pool_put_index (app_wrk->half_open_table, s->ho_index);
session_free (s);
break;
case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
app->cb_fns.add_segment_callback (app_wrk->wrk_index,
evt->as_u64[1]);
break;
case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
app->cb_fns.del_segment_callback (app_wrk->wrk_index,
evt->as_u64[1]);
break;
default:
clib_warning ("unexpected event: %u", evt->event_type);
ASSERT (0);
break;
}
clib_fifo_advance_head (app_wrk->wrk_evts[thread_index], 1);
}
if (!is_builtin)
{
svm_msg_q_unlock (mq);
if (mq_is_cong && i == n_evts)
app_worker_unset_wrk_mq_congested (app_wrk, thread_index);
}
return 0;
}
int
app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index)
{
if (app_worker_application_is_builtin (app_wrk))
return app_worker_flush_events_inline (app_wrk, thread_index,
1 /* is_builtin */);
else
return app_worker_flush_events_inline (app_wrk, thread_index,
0 /* is_builtin */);
}
static inline int
session_wrk_flush_events (session_worker_t *wrk)
{
app_worker_t *app_wrk;
uword app_wrk_index;
u32 thread_index;
thread_index = wrk->vm->thread_index;
app_wrk_index = clib_bitmap_first_set (wrk->app_wrks_pending_ntf);
while (app_wrk_index != ~0)
{
app_wrk = app_worker_get_if_valid (app_wrk_index);
/* app_wrk events are flushed on free, so should be valid here */
ASSERT (app_wrk != 0);
app_wrk_flush_wrk_events (app_wrk, thread_index);
if (!clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
app_wrk_index =
clib_bitmap_next_set (wrk->app_wrks_pending_ntf, app_wrk_index + 1);
}
if (!clib_bitmap_is_zero (wrk->app_wrks_pending_ntf))
vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
return 0;
}
VLIB_NODE_FN (session_input_node)
(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
{
u32 thread_index = vm->thread_index;
session_worker_t *wrk;
wrk = session_main_get_worker (thread_index);
session_wrk_flush_events (wrk);
return 0;
}
VLIB_REGISTER_NODE (session_input_node) = {
.name = "session-input",
.type = VLIB_NODE_TYPE_INPUT,
.state = VLIB_NODE_STATE_DISABLED,
};
/*
* fd.io coding-style-patch-verification: ON
*
* Local Variables:
* eval: (c-set-style "gnu")
* End:
*/

View File

@ -142,10 +142,14 @@ session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
session_worker_stat_error_inc (wrk, rv, 1);
app_wrk = application_get_worker (app, mp->wrk_index);
mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
if (mp->ext_config)
session_mq_free_ext_config (app, mp->ext_config);
/* Make sure events are flushed before releasing barrier, to avoid
* potential race with accept. */
app_wrk_flush_wrk_events (app_wrk, 0);
}
static void
@ -170,7 +174,8 @@ session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
rv = vnet_bind_uri (a);
app_wrk = application_get_worker (app, 0);
mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
app_wrk_flush_wrk_events (app_wrk, 0);
}
static void
@ -215,7 +220,7 @@ session_mq_connect_one (session_connect_msg_t *mp)
wrk = session_main_get_worker (vlib_get_thread_index ());
session_worker_stat_error_inc (wrk, rv, 1);
app_wrk = application_get_worker (app, mp->wrk_index);
mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
app_worker_connect_notify (app_wrk, 0, rv, mp->context);
}
if (mp->ext_config)
@ -324,7 +329,7 @@ session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
session_worker_stat_error_inc (wrk, rv, 1);
app_wrk = application_get_worker (app, 0 /* default wrk only */ );
mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
app_worker_connect_notify (app_wrk, 0, rv, mp->context);
}
}
@ -410,7 +415,7 @@ session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
if (!app_wrk)
return;
mq_send_unlisten_reply (app_wrk, sh, mp->context, rv);
app_worker_unlisten_reply (app_wrk, sh, mp->context, rv);
}
static void
@ -466,7 +471,7 @@ session_mq_accepted_reply_handler (session_worker_t *wrk,
session_set_state (s, SESSION_STATE_READY);
if (!svm_fifo_is_empty_prod (s->rx_fifo))
app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
app_worker_rx_notify (app_wrk, s);
/* Closed while waiting for app to reply. Resend disconnect */
if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
@ -669,7 +674,7 @@ session_mq_worker_update_handler (void *data)
session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
app_worker_rx_notify (app_wrk, s);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
app_worker_close_notify (app_wrk, s);
@ -1790,7 +1795,7 @@ session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
break;
svm_fifo_unset_event (s->rx_fifo);
app_wrk = app_worker_get (s->app_wrk_index);
app_worker_builtin_rx (app_wrk, s);
app_worker_rx_notify (app_wrk, s);
break;
case SESSION_IO_EVT_TX_MAIN:
s = session_get_if_valid (e->session_index, 0 /* main thread */);

View File

@ -379,6 +379,8 @@ typedef enum
SESSION_CTRL_EVT_APP_WRK_RPC,
SESSION_CTRL_EVT_TRANSPORT_ATTR,
SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY,
SESSION_CTRL_EVT_TRANSPORT_CLOSED,
SESSION_CTRL_EVT_HALF_CLEANUP,
} session_evt_type_t;
#define foreach_session_ctrl_evt \
@ -437,6 +439,7 @@ typedef struct
session_handle_t session_handle;
session_rpc_args_t rpc_args;
u32 ctrl_data_index;
u64 as_u64[2];
struct
{
u8 data[0];

View File

@ -1394,11 +1394,10 @@ always_inline uword
tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame, int is_ip4)
{
u32 thread_index = vm->thread_index, errors = 0;
u32 thread_index = vm->thread_index, n_left_from, *from;
tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
u16 err_counters[TCP_N_ERROR] = { 0 };
u32 n_left_from, *from;
if (node->flags & VLIB_NODE_FLAG_TRACE)
tcp_established_trace_frame (vm, node, frame, is_ip4);
@ -1462,9 +1461,7 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
b += 1;
}
errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP,
thread_index);
err_counters[TCP_ERROR_MSG_QUEUE_FULL] = errors;
session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
tcp_store_err_counters (established, err_counters);
tcp_handle_postponed_dequeues (wrk);
tcp_handle_disconnects (wrk);
@ -1746,7 +1743,7 @@ always_inline uword
tcp46_syn_sent_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
vlib_frame_t *frame, int is_ip4)
{
u32 n_left_from, *from, thread_index = vm->thread_index, errors = 0;
u32 n_left_from, *from, thread_index = vm->thread_index;
tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
@ -1981,9 +1978,7 @@ tcp46_syn_sent_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
tcp_inc_counter (syn_sent, error, 1);
}
errors =
session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
tcp_inc_counter (syn_sent, TCP_ERROR_MSG_QUEUE_FULL, errors);
session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
vlib_buffer_free (vm, from, frame->n_vectors);
tcp_handle_disconnects (wrk);
@ -2058,7 +2053,7 @@ always_inline uword
tcp46_rcv_process_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
vlib_frame_t *frame, int is_ip4)
{
u32 thread_index = vm->thread_index, errors, n_left_from, *from, max_deq;
u32 thread_index = vm->thread_index, n_left_from, *from, max_deq;
tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
@ -2431,9 +2426,7 @@ tcp46_rcv_process_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
tcp_inc_counter (rcv_process, error, 1);
}
errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP,
thread_index);
tcp_inc_counter (rcv_process, TCP_ERROR_MSG_QUEUE_FULL, errors);
session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
tcp_handle_postponed_dequeues (wrk);
tcp_handle_disconnects (wrk);
vlib_buffer_free (vm, from, frame->n_vectors);

View File

@ -61,8 +61,7 @@ tls_add_vpp_q_rx_evt (session_t * s)
int
tls_add_vpp_q_builtin_rx_evt (session_t * s)
{
if (svm_fifo_set_event (s->rx_fifo))
session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
session_enqueue_notify (s);
return 0;
}
@ -75,9 +74,10 @@ tls_add_vpp_q_tx_evt (session_t * s)
}
static inline int
tls_add_app_q_evt (app_worker_t * app, session_t * app_session)
tls_add_app_q_evt (app_worker_t *app_wrk, session_t *app_session)
{
return app_worker_lock_and_send_event (app, app_session, SESSION_IO_EVT_RX);
app_worker_add_event (app_wrk, app_session, SESSION_IO_EVT_RX);
return 0;
}
u32

View File

@ -149,11 +149,9 @@ udp_connection_enqueue (udp_connection_t * uc0, session_t * s0,
* enqueue event now while we still have the peeker lock */
if (s0->thread_index != thread_index)
{
wrote0 = session_enqueue_dgram_connection (s0, hdr0, b,
TRANSPORT_PROTO_UDP,
/* queue event */ 0);
if (queue_event && !svm_fifo_has_event (s0->rx_fifo))
session_enqueue_notify (s0);
wrote0 = session_enqueue_dgram_connection_cl (
s0, hdr0, b, TRANSPORT_PROTO_UDP,
/* queue event */ queue_event && !svm_fifo_has_event (s0->rx_fifo));
}
else
{
@ -232,10 +230,9 @@ always_inline uword
udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame, u8 is_ip4)
{
u32 n_left_from, *from, errors, *first_buffer;
u32 thread_index = vm->thread_index, n_left_from, *from, *first_buffer;
vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
u16 err_counters[UDP_N_ERROR] = { 0 };
u32 thread_index = vm->thread_index;
from = first_buffer = vlib_frame_vector_args (frame);
n_left_from = frame->n_vectors;
@ -327,9 +324,7 @@ udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
}
vlib_buffer_free (vm, first_buffer, frame->n_vectors);
errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP,
thread_index);
err_counters[UDP_ERROR_MQ_FULL] = errors;
session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP, thread_index);
udp_store_err_counters (vm, is_ip4, err_counters);
return frame->n_vectors;
}