vcl/session: add api for changing session app worker

In case of multi process apps, after forking, the parent may decide to
close part or all of the sessions it shares with the child. Because the
sessions have fifos allocated in the parent's segment manager, they must
be moved to the child's segment manager.

Change-Id: I85b4c8c8545005724023ee14043647719cef61dd
Signed-off-by: Florin Coras <fcoras@cisco.com>
This commit is contained in:
Florin Coras
2019-01-02 19:31:22 -08:00
committed by Dave Barach
parent 3c1cf2c171
commit 30e79c2e38
11 changed files with 544 additions and 183 deletions

File diff suppressed because it is too large Load Diff

View File

@ -609,21 +609,6 @@ vppcom_send_unbind_sock (u64 vpp_handle)
vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & ump);
}
void
vppcom_send_accept_session_reply (u64 handle, u32 context, int retval)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
vl_api_accept_session_reply_t *rmp;
rmp = vl_msg_api_alloc (sizeof (*rmp));
memset (rmp, 0, sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
rmp->retval = htonl (retval);
rmp->context = context;
rmp->handle = handle;
vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & rmp);
}
void
vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert,
u32 cert_len)

View File

@ -375,21 +375,23 @@ vcl_worker_share_session (vcl_worker_t * parent, vcl_worker_t * wrk,
vcl_session_t * new_s)
{
vcl_shared_session_t *ss;
vcl_session_t *s;
vcl_session_t *old_s;
s = vcl_session_get (parent, new_s->session_index);
if (s->shared_index == ~0)
if (new_s->shared_index == ~0)
{
ss = vcl_shared_session_alloc ();
ss->session_index = new_s->session_index;
vec_add1 (ss->workers, parent->wrk_index);
s->shared_index = ss->ss_index;
vec_add1 (ss->workers, wrk->wrk_index);
new_s->shared_index = ss->ss_index;
old_s = vcl_session_get (parent, new_s->session_index);
old_s->shared_index = ss->ss_index;
}
else
{
ss = vcl_shared_session_get (s->shared_index);
ss = vcl_shared_session_get (new_s->shared_index);
vec_add1 (ss->workers, wrk->wrk_index);
}
new_s->shared_index = ss->ss_index;
vec_add1 (ss->workers, wrk->wrk_index);
}
int
@ -414,6 +416,12 @@ vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s)
return 1;
}
/* If the first removed and not last, start session worker change.
* First request goes to vpp and vpp reflects it back to the right
* worker */
if (i == 0)
vcl_send_session_worker_update (wrk, s, ss->workers[0]);
return 0;
}

View File

@ -69,7 +69,8 @@ typedef enum
STATE_ACCEPT = 0x08,
STATE_VPP_CLOSING = 0x10,
STATE_DISCONNECT = 0x20,
STATE_FAILED = 0x40
STATE_FAILED = 0x40,
STATE_UPDATED = 0x80,
} session_state_t;
#define SERVER_STATE_OPEN (STATE_ACCEPT|STATE_VPP_CLOSING)
@ -144,6 +145,7 @@ typedef struct vcl_shared_session_
{
u32 ss_index;
u32 *workers;
u32 session_index;
} vcl_shared_session_t;
typedef struct
@ -287,6 +289,8 @@ typedef struct vcl_worker_
/** Vector of unhandled events */
session_event_t *unhandled_evts_vector;
u32 *pending_session_wrk_updates;
/** Used also as a thread stop key buffer */
pthread_t thread_id;
@ -517,6 +521,7 @@ int vcl_worker_register_with_vpp (void);
int vcl_worker_set_bapi (void);
void vcl_worker_share_sessions (vcl_worker_t * parent_wrk);
int vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s);
vcl_shared_session_t *vcl_shared_session_get (u32 ss_index);
int vcl_session_get_refcnt (vcl_session_t * s);
void vcl_segment_table_add (u64 segment_handle, u32 svm_segment_index);
@ -543,6 +548,17 @@ vcl_worker_get_current (void)
return vcl_worker_get (vcl_get_worker_index ());
}
static inline svm_msg_q_t *
vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
{
if (vcl_session_is_ct (s))
return wrk->vpp_event_queues[0];
else
return wrk->vpp_event_queues[s->vpp_thread_index];
}
void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
u32 wrk_index);
/*
* VCL Binary API
*/
@ -556,12 +572,10 @@ void vppcom_send_disconnect_session (u64 vpp_handle);
void vppcom_send_bind_sock (vcl_session_t * session);
void vppcom_send_unbind_sock (u64 vpp_handle);
void vppcom_api_hookup (void);
void vppcom_send_accept_session_reply (u64 vpp_handle, u32 context, int rv);
void vppcom_send_application_tls_cert_add (vcl_session_t * session,
char *cert, u32 cert_len);
void
vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
u32 key_len);
void vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
u32 key_len);
void vcl_send_app_worker_add_del (u8 is_add);
void vcl_send_child_worker_del (vcl_worker_t * wrk);

File diff suppressed because it is too large Load Diff

View File

@ -97,6 +97,8 @@ typedef struct vppcom_endpt_t_
uint16_t port;
} vppcom_endpt_t;
typedef uint32_t vcl_session_handle_t;
typedef enum
{
VPPCOM_OK = 0,
@ -277,7 +279,8 @@ extern int vppcom_session_sendto (uint32_t session_handle, void *buffer,
extern int vppcom_poll (vcl_poll_t * vp, uint32_t n_sids,
double wait_for_time);
extern int vppcom_mq_epoll_fd (void);
extern int vppcom_session_index (uint32_t session_handle);
extern int vppcom_session_index (vcl_session_handle_t session_handle);
extern int vppcom_session_worker (vcl_session_handle_t session_handle);
extern int vppcom_session_handle (uint32_t session_index);
extern int vppcom_session_read_segments (uint32_t session_handle,

View File

@ -724,6 +724,48 @@ app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle)
return 0;
}
int
app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s)
{
segment_manager_t *sm;
svm_fifo_t *rxf, *txf;
s->app_wrk_index = app_wrk->wrk_index;
rxf = s->server_rx_fifo;
txf = s->server_tx_fifo;
if (!rxf || !txf)
return 0;
s->server_rx_fifo = 0;
s->server_tx_fifo = 0;
sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
if (session_alloc_fifos (sm, s))
return -1;
if (!svm_fifo_is_empty (rxf))
{
clib_memcpy_fast (s->server_rx_fifo->data, rxf->data, rxf->nitems);
s->server_rx_fifo->head = rxf->head;
s->server_rx_fifo->tail = rxf->tail;
s->server_rx_fifo->cursize = rxf->cursize;
}
if (!svm_fifo_is_empty (txf))
{
clib_memcpy_fast (s->server_tx_fifo->data, txf->data, txf->nitems);
s->server_tx_fifo->head = txf->head;
s->server_tx_fifo->tail = txf->tail;
s->server_tx_fifo->cursize = txf->cursize;
}
segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf);
return 0;
}
/**
* Start listening local transport endpoint for requested transport.
*
@ -889,6 +931,14 @@ app_worker_get_connect_segment_manager (app_worker_t * app)
return segment_manager_get (app->connects_seg_manager);
}
segment_manager_t *
app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk)
{
if (app_wrk->connects_seg_manager == (u32) ~ 0)
app_worker_alloc_connects_segment_manager (app_wrk);
return segment_manager_get (app_wrk->connects_seg_manager);
}
segment_manager_t *
app_worker_get_listen_segment_manager (app_worker_t * app,
stream_session_t * listener)

View File

@ -225,12 +225,15 @@ int app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk);
app_worker_t *app_worker_get (u32 wrk_index);
app_worker_t *app_worker_get_if_valid (u32 wrk_index);
application_t *app_worker_get_app (u32 wrk_index);
int app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s);
void app_worker_free (app_worker_t * app_wrk);
int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep,
u32 api_context);
segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
stream_session_t *);
segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
segment_manager_t
* app_worker_get_or_alloc_connect_segment_manager (app_worker_t *);
int app_worker_alloc_connects_segment_manager (app_worker_t * app);
int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle);
u32 app_worker_n_listeners (app_worker_t * app);

View File

@ -220,9 +220,9 @@ typedef struct session_bound_msg_
u8 lcl_is_ip4;
u8 lcl_ip[16];
u16 lcl_port;
u64 rx_fifo;
u64 tx_fifo;
u64 vpp_evt_q;
uword rx_fifo;
uword tx_fifo;
uword vpp_evt_q;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[128];
@ -233,12 +233,12 @@ typedef struct session_accepted_msg_
u32 context;
u64 listener_handle;
u64 handle;
u64 server_rx_fifo;
u64 server_tx_fifo;
uword server_rx_fifo;
uword server_tx_fifo;
u64 segment_handle;
u64 vpp_event_queue_address;
u64 server_event_queue_address;
u64 client_event_queue_address;
uword vpp_event_queue_address;
uword server_event_queue_address;
uword client_event_queue_address;
u16 port;
u8 is_ip4;
u8 ip[16];
@ -260,12 +260,12 @@ typedef struct session_connected_msg_
u32 context;
i32 retval;
u64 handle;
u64 server_rx_fifo;
u64 server_tx_fifo;
uword server_rx_fifo;
uword server_tx_fifo;
u64 segment_handle;
u64 vpp_event_queue_address;
u64 client_event_queue_address;
u64 server_event_queue_address;
uword vpp_event_queue_address;
uword client_event_queue_address;
uword server_event_queue_address;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[64];
@ -302,6 +302,28 @@ typedef struct session_reset_reply_msg_
u64 handle;
} __clib_packed session_reset_reply_msg_t;
typedef struct session_req_worker_update_msg_
{
u64 session_handle;
} __clib_packed session_req_worker_update_msg_t;
/* NOTE: using u16 for wrk indices because message needs to fit in 18B */
typedef struct session_worker_update_msg_
{
u32 client_index;
u16 wrk_index;
u16 req_wrk_index;
u64 handle;
} __clib_packed session_worker_update_msg_t;
typedef struct session_worker_update_reply_msg_
{
u64 handle;
uword rx_fifo;
uword tx_fifo;
u64 segment_handle;
} __clib_packed session_worker_update_reply_msg_t;
typedef struct app_session_event_
{
svm_msg_q_msg_t msg;

View File

@ -49,7 +49,10 @@ typedef enum
SESSION_CTRL_EVT_DISCONNECTED,
SESSION_CTRL_EVT_DISCONNECTED_REPLY,
SESSION_CTRL_EVT_RESET,
SESSION_CTRL_EVT_RESET_REPLY
SESSION_CTRL_EVT_RESET_REPLY,
SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
SESSION_CTRL_EVT_WORKER_UPDATE,
SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
} session_evt_type_t;
static inline const char *

View File

@ -173,7 +173,7 @@ session_mq_disconnected_handler (void *data)
svm_msg_q_unlock (app_wrk->event_queue);
evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
clib_memset (evt, 0, sizeof (*evt));
evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
rmp = (session_disconnected_reply_msg_t *) evt->data;
rmp->handle = mp->handle;
rmp->context = mp->context;
@ -207,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data)
}
}
static void
session_mq_worker_update_handler (void *data)
{
session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
session_worker_update_reply_msg_t *rmp;
svm_msg_q_msg_t _msg, *msg = &_msg;
app_worker_t *app_wrk;
u32 owner_app_wrk_map;
session_event_t *evt;
stream_session_t *s;
application_t *app;
app = application_lookup (mp->client_index);
if (!app)
return;
if (!(s = session_get_from_handle_if_valid (mp->handle)))
{
clib_warning ("invalid handle %llu", mp->handle);
return;
}
app_wrk = app_worker_get (s->app_wrk_index);
if (app_wrk->app_index != app->app_index)
{
clib_warning ("app %u does not own session %llu", app->app_index,
mp->handle);
return;
}
owner_app_wrk_map = app_wrk->wrk_map_index;
app_wrk = application_get_worker (app, mp->wrk_index);
/* This needs to come from the new owner */
if (mp->req_wrk_index == owner_app_wrk_map)
{
session_req_worker_update_msg_t *wump;
svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
SESSION_MQ_CTRL_EVT_RING,
SVM_Q_WAIT, msg);
svm_msg_q_unlock (app_wrk->event_queue);
evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
clib_memset (evt, 0, sizeof (*evt));
evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
wump = (session_req_worker_update_msg_t *) evt->data;
wump->session_handle = mp->handle;
svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
return;
}
app_worker_own_session (app_wrk, s);
/*
* Send reply
*/
svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
SESSION_MQ_CTRL_EVT_RING,
SVM_Q_WAIT, msg);
svm_msg_q_unlock (app_wrk->event_queue);
evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
clib_memset (evt, 0, sizeof (*evt));
evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
rmp = (session_worker_update_reply_msg_t *) evt->data;
rmp->handle = mp->handle;
rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
rmp->segment_handle = session_segment_handle (s);
svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
/*
* Retransmit messages that may have been lost
*/
if (!svm_fifo_is_empty (s->server_tx_fifo))
session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
if (!svm_fifo_is_empty (s->server_rx_fifo))
app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
app->cb_fns.session_disconnect_callback (s);
}
vlib_node_registration_t session_queue_node;
typedef struct
@ -936,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
case SESSION_CTRL_EVT_RESET_REPLY:
session_mq_reset_reply_handler (e->data);
break;
case SESSION_CTRL_EVT_WORKER_UPDATE:
session_mq_worker_update_handler (e->data);
break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}