session: add per worker ct context

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ie20dc1e369735965bd780f04cd8703c099065fcc
This commit is contained in:
Florin Coras
2021-11-15 14:01:02 -08:00
committed by Dave Barach
parent a25ce96ba3
commit 7fe006965e

View File

@ -46,11 +46,16 @@ typedef struct ct_cleanup_req_
u32 ct_index;
} ct_cleanup_req_t;
typedef struct ct_worker_
{
ct_connection_t *connections; /**< Per-worker connection pools */
ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
u8 have_cleanups; /**< Set if cleanup rpc pending */
} ct_worker_t;
typedef struct ct_main_
{
ct_connection_t **connections; /**< Per-worker connection pools */
ct_cleanup_req_t **pending_cleanups;
u8 *heave_cleanups;
ct_worker_t *wrk; /**< Per-worker state */
u32 n_workers; /**< Number of vpp workers */
u32 n_sessions; /**< Cumulative sessions counter */
u32 *ho_reusable; /**< Vector of reusable ho indices */
@ -62,13 +67,20 @@ typedef struct ct_main_
static ct_main_t ct_main;
static inline ct_worker_t *
ct_worker_get (u32 thread_index)
{
return &ct_main.wrk[thread_index];
}
static ct_connection_t *
ct_connection_alloc (u32 thread_index)
{
ct_worker_t *wrk = ct_worker_get (thread_index);
ct_connection_t *ct;
pool_get_zero (ct_main.connections[thread_index], ct);
ct->c_c_index = ct - ct_main.connections[thread_index];
pool_get_zero (wrk->connections, ct);
ct->c_c_index = ct - wrk->connections;
ct->c_thread_index = thread_index;
ct->client_wrk = ~0;
ct->server_wrk = ~0;
@ -80,22 +92,25 @@ ct_connection_alloc (u32 thread_index)
static ct_connection_t *
ct_connection_get (u32 ct_index, u32 thread_index)
{
if (pool_is_free_index (ct_main.connections[thread_index], ct_index))
ct_worker_t *wrk = ct_worker_get (thread_index);
if (pool_is_free_index (wrk->connections, ct_index))
return 0;
return pool_elt_at_index (ct_main.connections[thread_index], ct_index);
return pool_elt_at_index (wrk->connections, ct_index);
}
static void
ct_connection_free (ct_connection_t * ct)
{
ct_worker_t *wrk = ct_worker_get (ct->c_thread_index);
if (CLIB_DEBUG)
{
u32 thread_index = ct->c_thread_index;
memset (ct, 0xfc, sizeof (*ct));
pool_put (ct_main.connections[thread_index], ct);
clib_memset (ct, 0xfc, sizeof (*ct));
pool_put (wrk->connections, ct);
return;
}
pool_put (ct_main.connections[ct->c_thread_index], ct);
pool_put (wrk->connections, ct);
}
static ct_connection_t *
@ -106,7 +121,7 @@ ct_half_open_alloc (void)
clib_spinlock_lock (&cm->ho_reuseable_lock);
vec_foreach (hip, cm->ho_reusable)
pool_put_index (cm->connections[0], *hip);
pool_put_index (cm->wrk[0].connections, *hip);
vec_reset_length (cm->ho_reusable);
clib_spinlock_unlock (&cm->ho_reuseable_lock);
@ -936,31 +951,32 @@ ct_handle_cleanups (void *args)
{
uword thread_index = pointer_to_uword (args);
const u32 max_cleanups = 100;
ct_main_t *cm = &ct_main;
ct_cleanup_req_t *req;
ct_connection_t *ct;
u32 n_to_handle = 0;
ct_worker_t *wrk;
session_t *s;
cm->heave_cleanups[thread_index] = 0;
n_to_handle = clib_fifo_elts (cm->pending_cleanups[thread_index]);
wrk = ct_worker_get (thread_index);
wrk->have_cleanups = 0;
n_to_handle = clib_fifo_elts (wrk->pending_cleanups);
n_to_handle = clib_min (n_to_handle, max_cleanups);
while (n_to_handle)
{
clib_fifo_sub2 (cm->pending_cleanups[thread_index], req);
clib_fifo_sub2 (wrk->pending_cleanups, req);
ct = ct_connection_get (req->ct_index, thread_index);
s = session_get (ct->c_s_index, ct->c_thread_index);
if (!svm_fifo_has_event (s->tx_fifo))
ct_session_postponed_cleanup (ct);
else
clib_fifo_add1 (cm->pending_cleanups[thread_index], *req);
clib_fifo_add1 (wrk->pending_cleanups, *req);
n_to_handle -= 1;
}
if (clib_fifo_elts (cm->pending_cleanups[thread_index]))
if (clib_fifo_elts (wrk->pending_cleanups))
{
cm->heave_cleanups[thread_index] = 1;
wrk->have_cleanups = 1;
session_send_rpc_evt_to_thread_force (
thread_index, ct_handle_cleanups,
uword_to_pointer (thread_index, void *));
@ -970,18 +986,20 @@ ct_handle_cleanups (void *args)
static void
ct_program_cleanup (ct_connection_t *ct)
{
ct_main_t *cm = &ct_main;
ct_cleanup_req_t *req;
uword thread_index;
ct_worker_t *wrk;
thread_index = ct->c_thread_index;
clib_fifo_add2 (cm->pending_cleanups[thread_index], req);
wrk = ct_worker_get (ct->c_thread_index);
clib_fifo_add2 (wrk->pending_cleanups, req);
req->ct_index = ct->c_c_index;
if (cm->heave_cleanups[thread_index])
if (wrk->have_cleanups)
return;
cm->heave_cleanups[thread_index] = 1;
wrk->have_cleanups = 1;
session_send_rpc_evt_to_thread_force (
thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
}
@ -1164,12 +1182,11 @@ format_ct_session (u8 * s, va_list * args)
clib_error_t *
ct_enable_disable (vlib_main_t * vm, u8 is_en)
{
vlib_thread_main_t *vtm = &vlib_thread_main;
ct_main_t *cm = &ct_main;
cm->n_workers = vlib_num_workers ();
vec_validate (cm->connections, cm->n_workers);
vec_validate (cm->pending_cleanups, cm->n_workers);
vec_validate (cm->heave_cleanups, cm->n_workers);
vec_validate (cm->wrk, vtm->n_vlib_mains);
clib_spinlock_init (&cm->ho_reuseable_lock);
clib_rwlock_init (&cm->app_segs_lock);
return 0;