session: support for cl port reuse
Adds support for connectionless listener port reuse. Until now, cl listeners had fifos allocated to them and therefore only one app worker could ever listen, i.e., a session cannot have multiple fifos. To circumvent the limitation, this separates the fifos from the listener by allocating new cl sessions for each app worker that reuses the app listener. Flows are hashed to app worker cl sessions but, for now, this is not a consistent/fixed hash. Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Ic6533cd47f2765903669f88c288bd592fb17a19e
This commit is contained in:

committed by
Dave Barach

parent
5afc13d594
commit
7428eaa4a1
@@ -1,4 +1,5 @@
|
||||
master_process off;
|
||||
master_process on;
|
||||
worker_processes 2;
|
||||
daemon off;
|
||||
|
||||
events {
|
||||
@@ -16,7 +17,7 @@ http {
|
||||
sendfile on;
|
||||
server {
|
||||
listen 0.0.0.0:8443 quic;
|
||||
listen 0.0.0.0:8443 ssl;
|
||||
#listen 0.0.0.0:8443 ssl;
|
||||
root /usr/share/nginx;
|
||||
ssl_certificate /etc/nginx/ssl/localhost.crt;
|
||||
ssl_certificate_key /etc/nginx/ssl/localhost.key;
|
||||
|
@@ -847,13 +847,14 @@ vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
|
||||
|
||||
vls_shared_data_pool_runlock ();
|
||||
|
||||
if (s->rx_fifo)
|
||||
{
|
||||
vcl_session_share_fifos (s, s->rx_fifo, s->tx_fifo);
|
||||
}
|
||||
else if (s->session_state == VCL_STATE_LISTEN)
|
||||
if (s->session_state == VCL_STATE_LISTEN)
|
||||
{
|
||||
s->session_state = VCL_STATE_LISTEN_NO_MQ;
|
||||
s->rx_fifo = s->tx_fifo = 0;
|
||||
}
|
||||
else if (s->rx_fifo)
|
||||
{
|
||||
vcl_session_share_fifos (s, s->rx_fifo, s->tx_fifo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -52,6 +52,7 @@ static void
|
||||
app_listener_free (application_t * app, app_listener_t * app_listener)
|
||||
{
|
||||
clib_bitmap_free (app_listener->workers);
|
||||
vec_free (app_listener->cl_listeners);
|
||||
if (CLIB_DEBUG)
|
||||
clib_memset (app_listener, 0xfa, sizeof (*app_listener));
|
||||
pool_put (app->listeners, app_listener);
|
||||
@@ -321,6 +322,13 @@ app_listener_get_local_session (app_listener_t * al)
|
||||
return listen_session_get (al->local_index);
|
||||
}
|
||||
|
||||
session_t *
|
||||
app_listener_get_wrk_cl_session (app_listener_t *al, u32 wrk_map_index)
|
||||
{
|
||||
u32 si = vec_elt (al->cl_listeners, wrk_map_index);
|
||||
return session_get (si, 0 /* listener thread */);
|
||||
}
|
||||
|
||||
static app_worker_map_t *
|
||||
app_worker_map_alloc (application_t * app)
|
||||
{
|
||||
@@ -1017,6 +1025,53 @@ application_listener_select_worker (session_t * ls)
|
||||
return app_listener_select_worker (app, al);
|
||||
}
|
||||
|
||||
always_inline u32
|
||||
app_listener_cl_flow_hash (session_dgram_hdr_t *hdr)
|
||||
{
|
||||
u32 hash = 0;
|
||||
|
||||
if (hdr->is_ip4)
|
||||
{
|
||||
hash = clib_crc32c_u32 (hash, hdr->rmt_ip.ip4.as_u32);
|
||||
hash = clib_crc32c_u32 (hash, hdr->lcl_ip.ip4.as_u32);
|
||||
hash = clib_crc32c_u16 (hash, hdr->rmt_port);
|
||||
hash = clib_crc32c_u16 (hash, hdr->lcl_port);
|
||||
}
|
||||
else
|
||||
{
|
||||
hash = clib_crc32c_u64 (hash, hdr->rmt_ip.ip6.as_u64[0]);
|
||||
hash = clib_crc32c_u64 (hash, hdr->rmt_ip.ip6.as_u64[1]);
|
||||
hash = clib_crc32c_u64 (hash, hdr->lcl_ip.ip6.as_u64[0]);
|
||||
hash = clib_crc32c_u64 (hash, hdr->lcl_ip.ip6.as_u64[1]);
|
||||
hash = clib_crc32c_u16 (hash, hdr->rmt_port);
|
||||
hash = clib_crc32c_u16 (hash, hdr->lcl_port);
|
||||
}
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
session_t *
|
||||
app_listener_select_wrk_cl_session (session_t *ls, session_dgram_hdr_t *hdr)
|
||||
{
|
||||
u32 wrk_map_index = 0;
|
||||
application_t *app;
|
||||
app_listener_t *al;
|
||||
|
||||
app = application_get (ls->app_index);
|
||||
al = app_listener_get (app, ls->al_index);
|
||||
/* Crude test to check if only worker 0 is set */
|
||||
if (al->workers[0] != 1)
|
||||
{
|
||||
u32 hash = app_listener_cl_flow_hash (hdr);
|
||||
hash %= vec_len (al->workers) * sizeof (uword);
|
||||
wrk_map_index = clib_bitmap_next_set (al->workers, hash);
|
||||
if (wrk_map_index == ~0)
|
||||
wrk_map_index = clib_bitmap_first_set (al->workers);
|
||||
}
|
||||
|
||||
return app_listener_get_wrk_cl_session (al, wrk_map_index);
|
||||
}
|
||||
|
||||
int
|
||||
application_alloc_worker_and_init (application_t * app, app_worker_t ** wrk)
|
||||
{
|
||||
|
@@ -106,6 +106,8 @@ typedef struct app_listener_
|
||||
session_handle_t ls_handle; /**< session handle of the local or global
|
||||
listening session that also identifies
|
||||
the app listener */
|
||||
u32 *cl_listeners; /**< vector that maps app workers to their
|
||||
cl sessions with fifos */
|
||||
} app_listener_t;
|
||||
|
||||
typedef enum app_rx_mq_flags_
|
||||
@@ -254,6 +256,8 @@ void app_listener_cleanup (app_listener_t * app_listener);
|
||||
session_handle_t app_listener_handle (app_listener_t * app_listener);
|
||||
app_listener_t *app_listener_lookup (application_t * app,
|
||||
session_endpoint_cfg_t * sep);
|
||||
session_t *app_listener_select_wrk_cl_session (session_t *ls,
|
||||
session_dgram_hdr_t *hdr);
|
||||
|
||||
/**
|
||||
* Get app listener handle for listening session
|
||||
@@ -280,6 +284,7 @@ app_listener_t *app_listener_get_w_handle (session_handle_t handle);
|
||||
app_listener_t *app_listener_get_w_session (session_t * ls);
|
||||
session_t *app_listener_get_session (app_listener_t * al);
|
||||
session_t *app_listener_get_local_session (app_listener_t * al);
|
||||
session_t *app_listener_get_wrk_cl_session (app_listener_t *al, u32 wrk_index);
|
||||
|
||||
application_t *application_get (u32 index);
|
||||
application_t *application_get_if_valid (u32 index);
|
||||
|
@@ -185,13 +185,68 @@ app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
app_worker_alloc_wrk_cl_session (app_worker_t *app_wrk, session_t *ls)
|
||||
{
|
||||
svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
|
||||
segment_manager_t *sm;
|
||||
session_handle_t lsh;
|
||||
app_listener_t *al;
|
||||
session_t *s;
|
||||
|
||||
al = app_listener_get_w_session (ls);
|
||||
sm = app_worker_get_listen_segment_manager (app_wrk, ls);
|
||||
lsh = session_handle (ls);
|
||||
|
||||
s = session_alloc (0 /* listener on main worker */);
|
||||
session_set_state (s, SESSION_STATE_LISTENING);
|
||||
s->flags |= SESSION_F_IS_CLESS;
|
||||
s->app_wrk_index = app_wrk->wrk_index;
|
||||
ls = session_get_from_handle (lsh);
|
||||
s->session_type = ls->session_type;
|
||||
s->connection_index = ls->connection_index;
|
||||
|
||||
segment_manager_alloc_session_fifos (sm, s->thread_index, &rx_fifo,
|
||||
&tx_fifo);
|
||||
|
||||
rx_fifo->shr->master_session_index = s->session_index;
|
||||
rx_fifo->master_thread_index = s->thread_index;
|
||||
|
||||
tx_fifo->shr->master_session_index = s->session_index;
|
||||
tx_fifo->master_thread_index = s->thread_index;
|
||||
|
||||
s->rx_fifo = rx_fifo;
|
||||
s->tx_fifo = tx_fifo;
|
||||
|
||||
vec_validate (al->cl_listeners, app_wrk->wrk_map_index);
|
||||
al->cl_listeners[app_wrk->wrk_map_index] = s->session_index;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
app_worker_free_wrk_cl_session (app_worker_t *app_wrk, session_t *ls)
|
||||
{
|
||||
app_listener_t *al;
|
||||
session_t *s;
|
||||
|
||||
al = app_listener_get_w_session (ls);
|
||||
|
||||
s = app_listener_get_wrk_cl_session (al, app_wrk->wrk_map_index);
|
||||
segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
|
||||
session_free (s);
|
||||
|
||||
al->cl_listeners[app_wrk->wrk_map_index] = SESSION_INVALID_INDEX;
|
||||
}
|
||||
|
||||
int
|
||||
app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
|
||||
{
|
||||
segment_manager_t *sm;
|
||||
|
||||
/* Allocate segment manager. All sessions derived out of a listen session
|
||||
* have fifos allocated by the same segment manager. */
|
||||
* have fifos allocated by the same segment manager.
|
||||
* TODO(fcoras): limit memory consumption by cless listeners */
|
||||
if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
|
||||
return SESSION_E_ALLOC;
|
||||
|
||||
@@ -202,12 +257,9 @@ app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
|
||||
hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
|
||||
segment_manager_index (sm));
|
||||
|
||||
if (transport_connection_is_cless (session_get_transport (ls)))
|
||||
{
|
||||
if (ls->rx_fifo)
|
||||
return SESSION_E_NOSUPPORT;
|
||||
return app_worker_alloc_session_fifos (sm, ls);
|
||||
}
|
||||
if (ls->flags & SESSION_F_IS_CLESS)
|
||||
return app_worker_alloc_wrk_cl_session (app_wrk, ls);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -276,12 +328,8 @@ app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
|
||||
if (PREDICT_FALSE (!sm_indexp))
|
||||
return;
|
||||
|
||||
/* Dealloc fifos, if any (dgram listeners) */
|
||||
if (ls->rx_fifo)
|
||||
{
|
||||
segment_manager_dealloc_fifos (ls->rx_fifo, ls->tx_fifo);
|
||||
ls->tx_fifo = ls->rx_fifo = 0;
|
||||
}
|
||||
if (ls->flags & SESSION_F_IS_CLESS)
|
||||
app_worker_free_wrk_cl_session (app_wrk, ls);
|
||||
|
||||
/* Try to cleanup segment manager */
|
||||
sm = segment_manager_get (*sm_indexp);
|
||||
|
@@ -866,12 +866,23 @@ session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr,
|
||||
queue_event, 0 /* is_cl */);
|
||||
}
|
||||
|
||||
int
|
||||
session_enqueue_dgram_connection2 (session_t *s, session_dgram_hdr_t *hdr,
|
||||
vlib_buffer_t *b, u8 proto, u8 queue_event)
|
||||
{
|
||||
return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
|
||||
queue_event, 1 /* is_cl */);
|
||||
}
|
||||
|
||||
int
|
||||
session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr,
|
||||
vlib_buffer_t *b, u8 proto,
|
||||
u8 queue_event)
|
||||
{
|
||||
return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
|
||||
session_t *awls;
|
||||
|
||||
awls = app_listener_select_wrk_cl_session (s, hdr);
|
||||
return session_enqueue_dgram_connection_inline (awls, hdr, b, proto,
|
||||
queue_event, 1 /* is_cl */);
|
||||
}
|
||||
|
||||
|
@@ -495,6 +495,9 @@ int session_enqueue_dgram_connection (session_t * s,
|
||||
session_dgram_hdr_t * hdr,
|
||||
vlib_buffer_t * b, u8 proto,
|
||||
u8 queue_event);
|
||||
int session_enqueue_dgram_connection2 (session_t *s, session_dgram_hdr_t *hdr,
|
||||
vlib_buffer_t *b, u8 proto,
|
||||
u8 queue_event);
|
||||
int session_enqueue_dgram_connection_cl (session_t *s,
|
||||
session_dgram_hdr_t *hdr,
|
||||
vlib_buffer_t *b, u8 proto,
|
||||
|
@@ -276,7 +276,7 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
|
||||
session_handle_t handle, int rv)
|
||||
{
|
||||
session_bound_msg_t m = { 0 };
|
||||
transport_endpoint_t tep;
|
||||
transport_connection_t *ltc;
|
||||
fifo_segment_t *eq_seg;
|
||||
app_worker_t *app_wrk;
|
||||
application_t *app;
|
||||
@@ -298,23 +298,24 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
|
||||
else
|
||||
ls = app_listener_get_local_session (al);
|
||||
|
||||
session_get_endpoint (ls, &tep, 1 /* is_lcl */);
|
||||
m.lcl_port = tep.port;
|
||||
m.lcl_is_ip4 = tep.is_ip4;
|
||||
clib_memcpy_fast (m.lcl_ip, &tep.ip, sizeof (tep.ip));
|
||||
ltc = session_get_transport (ls);
|
||||
m.lcl_port = ltc->lcl_port;
|
||||
m.lcl_is_ip4 = ltc->is_ip4;
|
||||
clib_memcpy_fast (m.lcl_ip, <c->lcl_ip, sizeof (m.lcl_ip));
|
||||
app = application_get (app_wrk->app_index);
|
||||
eq_seg = application_get_rx_mqs_segment (app);
|
||||
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index);
|
||||
m.mq_index = ls->thread_index;
|
||||
|
||||
if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL &&
|
||||
ls->rx_fifo)
|
||||
if (transport_connection_is_cless (ltc))
|
||||
{
|
||||
session_t *wrk_ls;
|
||||
m.mq_index = transport_cl_thread ();
|
||||
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, m.mq_index);
|
||||
m.rx_fifo = fifo_segment_fifo_offset (ls->rx_fifo);
|
||||
m.tx_fifo = fifo_segment_fifo_offset (ls->tx_fifo);
|
||||
m.segment_handle = session_segment_handle (ls);
|
||||
wrk_ls = app_listener_get_wrk_cl_session (al, app_wrk->wrk_map_index);
|
||||
m.rx_fifo = fifo_segment_fifo_offset (wrk_ls->rx_fifo);
|
||||
m.tx_fifo = fifo_segment_fifo_offset (wrk_ls->tx_fifo);
|
||||
m.segment_handle = session_segment_handle (wrk_ls);
|
||||
}
|
||||
|
||||
snd_msg:
|
||||
|
@@ -136,39 +136,46 @@ udp_connection_enqueue (udp_connection_t * uc0, session_t * s0,
|
||||
int wrote0;
|
||||
|
||||
if (!(uc0->flags & UDP_CONN_F_CONNECTED))
|
||||
clib_spinlock_lock (&uc0->rx_lock);
|
||||
{
|
||||
clib_spinlock_lock (&uc0->rx_lock);
|
||||
|
||||
wrote0 = session_enqueue_dgram_connection_cl (
|
||||
s0, hdr0, b, TRANSPORT_PROTO_UDP, queue_event);
|
||||
|
||||
clib_spinlock_unlock (&uc0->rx_lock);
|
||||
|
||||
/* Expect cl udp enqueue to fail because fifo enqueue */
|
||||
if (PREDICT_FALSE (wrote0 == 0))
|
||||
*error0 = UDP_ERROR_FIFO_FULL;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (svm_fifo_max_enqueue_prod (s0->rx_fifo)
|
||||
< hdr0->data_length + sizeof (session_dgram_hdr_t))
|
||||
{
|
||||
*error0 = UDP_ERROR_FIFO_FULL;
|
||||
goto unlock_rx_lock;
|
||||
return;
|
||||
}
|
||||
|
||||
/* If session is owned by another thread and rx event needed,
|
||||
* enqueue event now while we still have the peeker lock */
|
||||
if (s0->thread_index != thread_index)
|
||||
{
|
||||
wrote0 = session_enqueue_dgram_connection_cl (
|
||||
wrote0 = session_enqueue_dgram_connection2 (
|
||||
s0, hdr0, b, TRANSPORT_PROTO_UDP,
|
||||
/* queue event */ queue_event && !svm_fifo_has_event (s0->rx_fifo));
|
||||
queue_event && !svm_fifo_has_event (s0->rx_fifo));
|
||||
}
|
||||
else
|
||||
{
|
||||
wrote0 = session_enqueue_dgram_connection (s0, hdr0, b,
|
||||
TRANSPORT_PROTO_UDP,
|
||||
queue_event);
|
||||
wrote0 = session_enqueue_dgram_connection (
|
||||
s0, hdr0, b, TRANSPORT_PROTO_UDP, queue_event);
|
||||
}
|
||||
|
||||
/* In some rare cases, session_enqueue_dgram_connection can fail because a
|
||||
* chunk cannot be allocated in the RX FIFO */
|
||||
if (PREDICT_FALSE (wrote0 == 0))
|
||||
*error0 = UDP_ERROR_FIFO_NOMEM;
|
||||
|
||||
unlock_rx_lock:
|
||||
|
||||
if (!(uc0->flags & UDP_CONN_F_CONNECTED))
|
||||
clib_spinlock_unlock (&uc0->rx_lock);
|
||||
}
|
||||
|
||||
always_inline session_t *
|
||||
|
Reference in New Issue
Block a user