Session layer improvements

Among others:
- Moved app event queue to shared memory segment
- Use private memory segment for builtin apps
- Remove pid from svm fifo
- Protect session fifo (de)allocation
- Use fifo event for session disconnects
- Have session queue node poll in all wk threads

Change-Id: I89dbf7fdfebef12f5ef2b34ba3ef3c2c07f49ff2
Signed-off-by: Florin Coras <fcoras@cisco.com>
This commit is contained in:
Florin Coras
2017-04-19 13:00:05 -07:00
committed by Dave Barach
parent bc66a9122f
commit a546481752
25 changed files with 605 additions and 417 deletions
+12 -18
View File
@@ -57,7 +57,7 @@ format_svm_fifo (u8 * s, va_list * args)
if (verbose > 1)
s = format
(s, "server session %d thread %d client session %d thread %d\n",
f->server_session_index, f->server_thread_index,
f->master_session_index, f->master_thread_index,
f->client_session_index, f->client_thread_index);
if (verbose)
@@ -353,8 +353,7 @@ ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued)
}
static int
svm_fifo_enqueue_internal (svm_fifo_t * f,
int pid, u32 max_bytes, u8 * copy_from_here)
svm_fifo_enqueue_internal (svm_fifo_t * f, u32 max_bytes, u8 * copy_from_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
u32 cursize, nitems;
@@ -411,10 +410,9 @@ svm_fifo_enqueue_internal (svm_fifo_t * f,
}
int
svm_fifo_enqueue_nowait (svm_fifo_t * f,
int pid, u32 max_bytes, u8 * copy_from_here)
svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_from_here)
{
return svm_fifo_enqueue_internal (f, pid, max_bytes, copy_from_here);
return svm_fifo_enqueue_internal (f, max_bytes, copy_from_here);
}
/**
@@ -426,7 +424,6 @@ svm_fifo_enqueue_nowait (svm_fifo_t * f,
*/
static int
svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
int pid,
u32 offset,
u32 required_bytes,
u8 * copy_from_here)
@@ -439,7 +436,7 @@ svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
/* Users would do well to avoid this */
if (PREDICT_FALSE (f->tail == (offset % f->nitems)))
{
rv = svm_fifo_enqueue_internal (f, pid, required_bytes, copy_from_here);
rv = svm_fifo_enqueue_internal (f, required_bytes, copy_from_here);
if (rv > 0)
return 0;
return -1;
@@ -484,18 +481,16 @@ svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
int
svm_fifo_enqueue_with_offset (svm_fifo_t * f,
int pid,
u32 offset,
u32 required_bytes, u8 * copy_from_here)
{
return svm_fifo_enqueue_with_offset_internal
(f, pid, offset, required_bytes, copy_from_here);
return svm_fifo_enqueue_with_offset_internal (f, offset, required_bytes,
copy_from_here);
}
static int
svm_fifo_dequeue_internal (svm_fifo_t * f,
int pid, u32 max_bytes, u8 * copy_here)
svm_fifo_dequeue_internal (svm_fifo_t * f, u32 max_bytes, u8 * copy_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
u32 cursize, nitems;
@@ -545,14 +540,13 @@ svm_fifo_dequeue_internal (svm_fifo_t * f,
}
int
svm_fifo_dequeue_nowait (svm_fifo_t * f,
int pid, u32 max_bytes, u8 * copy_here)
svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_here)
{
return svm_fifo_dequeue_internal (f, pid, max_bytes, copy_here);
return svm_fifo_dequeue_internal (f, max_bytes, copy_here);
}
int
svm_fifo_peek (svm_fifo_t * f, int pid, u32 relative_offset, u32 max_bytes,
svm_fifo_peek (svm_fifo_t * f, u32 relative_offset, u32 max_bytes,
u8 * copy_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
@@ -590,7 +584,7 @@ svm_fifo_peek (svm_fifo_t * f, int pid, u32 relative_offset, u32 max_bytes,
}
int
svm_fifo_dequeue_drop (svm_fifo_t * f, int pid, u32 max_bytes)
svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes)
{
u32 total_drop_bytes, first_drop_bytes, second_drop_bytes;
u32 cursize, nitems;
+9 -22
View File
@@ -23,13 +23,6 @@
#include <vppinfra/format.h>
#include <pthread.h>
typedef enum
{
SVM_FIFO_TAG_NOT_HELD = 0,
SVM_FIFO_TAG_DEQUEUE,
SVM_FIFO_TAG_ENQUEUE,
} svm_lock_tag_t;
/** Out-of-order segment */
typedef struct
{
@@ -37,7 +30,7 @@ typedef struct
u32 prev; /**< Previous linked-list element pool index */
u32 start; /**< Start of segment, normalized*/
u32 length; /**< Length of segment */
u32 length; /**< Length of segment */
} ooo_segment_t;
format_function_t format_ooo_segment;
@@ -52,12 +45,11 @@ typedef struct
CLIB_CACHE_LINE_ALIGN_MARK (end_cursize);
volatile u8 has_event; /**< non-zero if deq event exists */
u32 owner_pid;
/* Backpointers */
u32 server_session_index;
u32 master_session_index;
u32 client_session_index;
u8 server_thread_index;
u8 master_thread_index;
u8 client_thread_index;
u32 segment_manager;
CLIB_CACHE_LINE_ALIGN_MARK (end_shared);
@@ -117,19 +109,14 @@ svm_fifo_unset_event (svm_fifo_t * f)
svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
void svm_fifo_free (svm_fifo_t * f);
int svm_fifo_enqueue_nowait (svm_fifo_t * f, int pid, u32 max_bytes,
int svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 max_bytes,
u8 * copy_from_here);
int svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset,
u32 required_bytes, u8 * copy_from_here);
int svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_here);
int svm_fifo_enqueue_with_offset (svm_fifo_t * f, int pid,
u32 offset, u32 required_bytes,
u8 * copy_from_here);
int svm_fifo_dequeue_nowait (svm_fifo_t * f, int pid, u32 max_bytes,
u8 * copy_here);
int svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
u8 * copy_here);
int svm_fifo_dequeue_drop (svm_fifo_t * f, int pid, u32 max_bytes);
int svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 max_bytes, u8 * copy_here);
int svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes);
u32 svm_fifo_number_ooo_segments (svm_fifo_t * f);
ooo_segment_t *svm_fifo_first_ooo_segment (svm_fifo_t * f);
+46 -4
View File
@@ -70,6 +70,44 @@ svm_fifo_segment_create (svm_fifo_segment_create_args_t * a)
return (0);
}
/** Create an svm fifo segment in process-private memory */
int
svm_fifo_segment_create_process_private (svm_fifo_segment_create_args_t * a)
{
svm_fifo_segment_private_t *s;
svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
ssvm_shared_header_t *sh;
svm_fifo_segment_header_t *fsh;
/* Allocate a fresh segment */
pool_get (sm->segments, s);
memset (s, 0, sizeof (*s));
s->ssvm.ssvm_size = ~0;
s->ssvm.i_am_master = 1;
s->ssvm.my_pid = getpid ();
s->ssvm.name = (u8 *) a->segment_name;
s->ssvm.requested_va = ~0;
/* Allocate a [sic] shared memory header, in process memory... */
sh = clib_mem_alloc_aligned (sizeof (*sh), CLIB_CACHE_LINE_BYTES);
s->ssvm.sh = sh;
memset (sh, 0, sizeof (*sh));
sh->heap = clib_mem_get_heap ();
/* Set up svm_fifo_segment shared header */
fsh = clib_mem_alloc (sizeof (*fsh));
memset (fsh, 0, sizeof (*fsh));
sh->opaque[0] = fsh;
s->h = fsh;
fsh->segment_name = format (0, "%s%c", a->segment_name, 0);
sh->ready = 1;
a->new_segment_index = s - sm->segments;
return (0);
}
/** (slave) attach to an svm fifo segment */
int
svm_fifo_segment_attach (svm_fifo_segment_create_args_t * a)
@@ -82,7 +120,6 @@ svm_fifo_segment_attach (svm_fifo_segment_create_args_t * a)
/* Allocate a fresh segment */
pool_get (sm->segments, s);
memset (s, 0, sizeof (*s));
s->ssvm.ssvm_size = a->segment_size;
@@ -126,19 +163,22 @@ svm_fifo_segment_alloc_fifo (svm_fifo_segment_private_t * s,
sh = s->ssvm.sh;
fsh = (svm_fifo_segment_header_t *) sh->opaque[0];
ssvm_lock (sh, 1, 0);
oldheap = ssvm_push_heap (sh);
/* Note: this can fail, in which case: create another segment */
f = svm_fifo_create (data_size_in_bytes);
if (f == 0)
if (PREDICT_FALSE (f == 0))
{
ssvm_pop_heap (oldheap);
ssvm_unlock (sh);
return (0);
}
vec_add1 (fsh->fifos, f);
ssvm_pop_heap (oldheap);
ssvm_unlock (sh);
return (f);
}
@@ -152,8 +192,9 @@ svm_fifo_segment_free_fifo (svm_fifo_segment_private_t * s, svm_fifo_t * f)
sh = s->ssvm.sh;
fsh = (svm_fifo_segment_header_t *) sh->opaque[0];
oldheap = ssvm_push_heap (sh);
ssvm_lock (sh, 1, 0);
oldheap = ssvm_push_heap (sh);
for (i = 0; i < vec_len (fsh->fifos); i++)
{
if (fsh->fifos[i] == f)
@@ -167,6 +208,7 @@ svm_fifo_segment_free_fifo (svm_fifo_segment_private_t * s, svm_fifo_t * f)
found:
clib_mem_free (f);
ssvm_pop_heap (oldheap);
ssvm_unlock (sh);
}
void
+5
View File
@@ -17,6 +17,7 @@
#include <svm/svm_fifo.h>
#include <svm/ssvm.h>
#include <vppinfra/lock.h>
typedef struct
{
@@ -32,6 +33,8 @@ typedef struct
typedef struct
{
volatile u32 lock;
/** pool of segments */
svm_fifo_segment_private_t *segments;
/* Where to put the next one */
@@ -78,6 +81,8 @@ typedef enum
} ssvm_fifo_segment_api_error_enum_t;
int svm_fifo_segment_create (svm_fifo_segment_create_args_t * a);
int svm_fifo_segment_create_process_private (svm_fifo_segment_create_args_t
* a);
int svm_fifo_segment_attach (svm_fifo_segment_create_args_t * a);
void svm_fifo_segment_delete (svm_fifo_segment_private_t * s);
+10 -17
View File
@@ -25,7 +25,6 @@ hello_world (int verbose)
u8 *test_data;
u8 *retrieved_data = 0;
clib_error_t *error = 0;
int pid = getpid ();
memset (a, 0, sizeof (*a));
@@ -48,18 +47,16 @@ hello_world (int verbose)
vec_validate (retrieved_data, vec_len (test_data) - 1);
while (svm_fifo_max_enqueue (f) >= vec_len (test_data))
svm_fifo_enqueue_nowait (f, pid, vec_len (test_data), test_data);
svm_fifo_enqueue_nowait (f, vec_len (test_data), test_data);
while (svm_fifo_max_dequeue (f) >= vec_len (test_data))
svm_fifo_dequeue_nowait (f, pid, vec_len (retrieved_data),
retrieved_data);
svm_fifo_dequeue_nowait (f, vec_len (retrieved_data), retrieved_data);
while (svm_fifo_max_enqueue (f) >= vec_len (test_data))
svm_fifo_enqueue_nowait (f, pid, vec_len (test_data), test_data);
svm_fifo_enqueue_nowait (f, vec_len (test_data), test_data);
while (svm_fifo_max_dequeue (f) >= vec_len (test_data))
svm_fifo_dequeue_nowait (f, pid, vec_len (retrieved_data),
retrieved_data);
svm_fifo_dequeue_nowait (f, vec_len (retrieved_data), retrieved_data);
if (!memcmp (retrieved_data, test_data, vec_len (test_data)))
error = clib_error_return (0, "data test OK, got '%s'", retrieved_data);
@@ -81,7 +78,6 @@ master (int verbose)
u8 *test_data;
u8 *retrieved_data = 0;
int i;
int pid = getpid ();
memset (a, 0, sizeof (*a));
@@ -104,7 +100,7 @@ master (int verbose)
vec_validate (retrieved_data, vec_len (test_data) - 1);
for (i = 0; i < 1000; i++)
svm_fifo_enqueue_nowait (f, pid, vec_len (test_data), test_data);
svm_fifo_enqueue_nowait (f, vec_len (test_data), test_data);
return clib_error_return (0, "master (enqueue) done");
}
@@ -176,7 +172,6 @@ offset (int verbose)
u32 *test_data = 0;
u32 *recovered_data = 0;
int i;
int pid = getpid ();
memset (a, 0, sizeof (*a));
@@ -199,19 +194,19 @@ offset (int verbose)
vec_add1 (test_data, i);
/* Enqueue the first 1024 u32's */
svm_fifo_enqueue_nowait (f, pid, 4096 /* bytes to enqueue */ ,
svm_fifo_enqueue_nowait (f, 4096 /* bytes to enqueue */ ,
(u8 *) test_data);
/* Enqueue the third 1024 u32's 2048 ahead of the current tail */
svm_fifo_enqueue_with_offset (f, pid, 4096, 4096, (u8 *) & test_data[2048]);
svm_fifo_enqueue_with_offset (f, 4096, 4096, (u8 *) & test_data[2048]);
/* Enqueue the second 1024 u32's at the current tail */
svm_fifo_enqueue_nowait (f, pid, 4096 /* bytes to enqueue */ ,
svm_fifo_enqueue_nowait (f, 4096 /* bytes to enqueue */ ,
(u8 *) & test_data[1024]);
vec_validate (recovered_data, (3 * 1024) - 1);
svm_fifo_dequeue_nowait (f, pid, 3 * 4096, (u8 *) recovered_data);
svm_fifo_dequeue_nowait (f, 3 * 4096, (u8 *) recovered_data);
for (i = 0; i < (3 * 1024); i++)
{
@@ -237,7 +232,6 @@ slave (int verbose)
int rv;
u8 *test_data;
u8 *retrieved_data = 0;
int pid = getpid ();
int i;
memset (a, 0, sizeof (*a));
@@ -262,8 +256,7 @@ slave (int verbose)
for (i = 0; i < 1000; i++)
{
svm_fifo_dequeue_nowait (f, pid, vec_len (retrieved_data),
retrieved_data);
svm_fifo_dequeue_nowait (f, vec_len (retrieved_data), retrieved_data);
if (memcmp (retrieved_data, test_data, vec_len (retrieved_data)))
return clib_error_return (0, "retrieved data incorrect, '%s'",
retrieved_data);
+125 -64
View File
File diff suppressed because it is too large Load Diff
+19 -21
View File
@@ -164,7 +164,7 @@ setup_signal_handlers (void)
}
void
application_attach (uri_udp_test_main_t * utm)
application_send_attach (uri_udp_test_main_t * utm)
{
vl_api_application_attach_t *bmp;
u32 fifo_size = 3 << 20;
@@ -174,8 +174,8 @@ application_attach (uri_udp_test_main_t * utm)
bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
bmp->client_index = utm->my_client_index;
bmp->context = ntohl (0xfeedface);
bmp->options[SESSION_OPTIONS_FLAGS] =
SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
bmp->options[APP_OPTIONS_FLAGS] =
APP_OPTIONS_FLAGS_USE_FIFO | APP_OPTIONS_FLAGS_ADD_SEGMENT;
bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
@@ -307,7 +307,7 @@ cut_through_thread_fn (void *arg)
/* We read from the tx fifo and write to the rx fifo */
do
{
actual_transfer = svm_fifo_dequeue_nowait (tx_fifo, 0,
actual_transfer = svm_fifo_dequeue_nowait (tx_fifo,
vec_len (my_copy_buffer),
my_copy_buffer);
}
@@ -318,7 +318,7 @@ cut_through_thread_fn (void *arg)
buffer_offset = 0;
while (actual_transfer > 0)
{
rv = svm_fifo_enqueue_nowait (rx_fifo, 0, actual_transfer,
rv = svm_fifo_enqueue_nowait (rx_fifo, actual_transfer,
my_copy_buffer + buffer_offset);
if (rv > 0)
{
@@ -357,7 +357,6 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
u64 bytes_received = 0, bytes_sent = 0;
i32 bytes_to_read;
int rv;
int mypid = getpid ();
f64 before, after, delta, bytes_per_second;
svm_fifo_t *rx_fifo, *tx_fifo;
int buffer_offset, bytes_to_send = 0;
@@ -382,8 +381,7 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
buffer_offset = 0;
while (bytes_to_send > 0)
{
rv = svm_fifo_enqueue_nowait (tx_fifo, mypid,
bytes_to_send,
rv = svm_fifo_enqueue_nowait (tx_fifo, bytes_to_send,
test_data + buffer_offset);
if (rv > 0)
@@ -402,7 +400,7 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
buffer_offset = 0;
while (bytes_to_read > 0)
{
rv = svm_fifo_dequeue_nowait (rx_fifo, mypid,
rv = svm_fifo_dequeue_nowait (rx_fifo,
bytes_to_read,
utm->rx_buf + buffer_offset);
if (rv > 0)
@@ -415,8 +413,8 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
}
while (bytes_received < bytes_sent)
{
rv = svm_fifo_dequeue_nowait (rx_fifo, mypid,
vec_len (utm->rx_buf), utm->rx_buf);
rv =
svm_fifo_dequeue_nowait (rx_fifo, vec_len (utm->rx_buf), utm->rx_buf);
if (rv > 0)
{
#if CLIB_DEBUG > 0
@@ -459,7 +457,7 @@ uri_udp_client_test (uri_udp_test_main_t * utm)
{
session_t *session;
application_attach (utm);
application_send_attach (utm);
udp_client_connect (utm);
if (wait_for_state_change (utm, STATE_READY))
@@ -559,8 +557,8 @@ vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
128 * 1024);
ASSERT (session->server_tx_fifo);
session->server_rx_fifo->server_session_index = session - utm->sessions;
session->server_tx_fifo->server_session_index = session - utm->sessions;
session->server_rx_fifo->master_session_index = session - utm->sessions;
session->server_tx_fifo->master_session_index = session - utm->sessions;
utm->cut_through_session_index = session - utm->sessions;
rv = pthread_create (&utm->cut_through_thread_handle,
@@ -805,19 +803,19 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
do
{
nbytes = svm_fifo_dequeue_nowait (rx_fifo, 0,
vec_len (utm->rx_buf), utm->rx_buf);
nbytes = svm_fifo_dequeue_nowait (rx_fifo, vec_len (utm->rx_buf),
utm->rx_buf);
}
while (nbytes <= 0);
do
{
rv = svm_fifo_enqueue_nowait (tx_fifo, 0, nbytes, utm->rx_buf);
rv = svm_fifo_enqueue_nowait (tx_fifo, nbytes, utm->rx_buf);
}
while (rv == -2);
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = e->event_id;
if (svm_fifo_set_event (tx_fifo))
@@ -839,11 +837,11 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
0 /* nowait */ );
switch (e->event_type)
{
case FIFO_EVENT_SERVER_RX:
case FIFO_EVENT_APP_RX:
server_handle_fifo_event_rx (utm, e);
break;
case FIFO_EVENT_SERVER_EXIT:
case FIFO_EVENT_DISCONNECT:
return;
default:
@@ -893,7 +891,7 @@ void
udp_server_test (uri_udp_test_main_t * utm)
{
application_attach (utm);
application_send_attach (utm);
/* Bind to uri */
server_listen (utm);
+16 -32
View File
@@ -87,14 +87,17 @@ application_new ()
void
application_del (application_t * app)
{
api_main_t *am = &api_main;
void *oldheap;
segment_manager_t *sm;
u64 handle;
u32 index, *handles = 0;
int i;
vnet_unbind_args_t _a, *a = &_a;
/*
* The app event queue allocated in first segment is cleared with
* the segment manager. No need to explicitly free it.
*/
/*
* Cleanup segment managers
*/
@@ -120,14 +123,6 @@ application_del (application_t * app)
vnet_unbind (a);
}
/*
* Free the event fifo in the /vpe-api shared-memory segment
*/
oldheap = svm_push_data_heap (am->vlib_rp);
if (app->event_queue)
unix_shared_memory_queue_free (app->event_queue);
svm_pop_heap (oldheap);
application_table_del (app);
pool_put (app_pool, app);
}
@@ -149,30 +144,14 @@ int
application_init (application_t * app, u32 api_client_index, u64 * options,
session_cb_vft_t * cb_fns)
{
api_main_t *am = &api_main;
segment_manager_t *sm;
segment_manager_properties_t *props;
void *oldheap;
u32 app_evt_queue_size;
u32 app_evt_queue_size, first_seg_size;
int rv;
app_evt_queue_size = options[APP_EVT_QUEUE_SIZE] > 0 ?
options[APP_EVT_QUEUE_SIZE] : default_app_evt_queue_size;
/* Allocate event fifo in the /vpe-api shared-memory segment */
oldheap = svm_push_data_heap (am->vlib_rp);
/* Allocate server event queue */
app->event_queue =
unix_shared_memory_queue_init (app_evt_queue_size,
sizeof (session_fifo_event_t),
0 /* consumer pid */ ,
0
/* (do not) signal when queue non-empty */
);
svm_pop_heap (oldheap);
/* Setup segment manager */
sm = segment_manager_new ();
sm->app_index = app->index;
@@ -181,16 +160,21 @@ application_init (application_t * app, u32 api_client_index, u64 * options,
props->rx_fifo_size = options[SESSION_OPTIONS_RX_FIFO_SIZE];
props->tx_fifo_size = options[SESSION_OPTIONS_TX_FIFO_SIZE];
props->add_segment = props->add_segment_size != 0;
props->use_private_segment = options[APP_OPTIONS_FLAGS]
& APP_OPTIONS_FLAGS_BUILTIN_APP;
if ((rv = segment_manager_init (sm, props,
options[SESSION_OPTIONS_SEGMENT_SIZE])))
first_seg_size = options[SESSION_OPTIONS_SEGMENT_SIZE];
if ((rv = segment_manager_init (sm, props, first_seg_size)))
return rv;
app->first_segment_manager = segment_manager_index (sm);
app->api_client_index = api_client_index;
app->flags = options[SESSION_OPTIONS_FLAGS];
app->flags = options[APP_OPTIONS_FLAGS];
app->cb_fns = *cb_fns;
/* Allocate app event queue in the first shared-memory segment */
app->event_queue = segment_manager_alloc_queue (sm, app_evt_queue_size);
/* Check that the obvious things are properly set up */
application_verify_cb_fns (cb_fns);
@@ -451,8 +435,8 @@ application_format_connects (application_t * app, int verbose)
continue;
fifo = fifos[i];
session_index = fifo->server_session_index;
thread_index = fifo->server_thread_index;
session_index = fifo->master_session_index;
thread_index = fifo->master_thread_index;
session = stream_session_get (session_index, thread_index);
str = format (0, "%U", format_stream_session, session, verbose);
-12
View File
@@ -61,18 +61,6 @@ typedef struct _application
/** Flags */
u32 flags;
/* Stream server mode: accept or connect
* TODO REMOVE*/
u8 mode;
/** Index of the listen session or connect session
* TODO REMOVE*/
u32 session_index;
/** Session thread index for client connect sessions
* TODO REMOVE */
u32 thread_index;
/*
* Binary API interface to external app
*/
+6 -20
View File
@@ -142,7 +142,7 @@ vnet_connect_i (u32 app_index, u32 api_context, session_type_t sst,
* Server is willing to have a direct fifo connection created
* instead of going through the state machine, etc.
*/
if (server->flags & SESSION_OPTIONS_FLAGS_USE_FIFO)
if (server->flags & APP_OPTIONS_FLAGS_USE_FIFO)
return server->cb_fns.
redirect_connect_callback (server->api_client_index, mp);
}
@@ -363,7 +363,11 @@ vnet_disconnect_session (vnet_disconnect_args_t * a)
if (!s || s->app_index != a->app_index)
return VNET_API_ERROR_INVALID_VALUE;
stream_session_disconnect (s);
/* We're peeking into another's thread pool. Make sure */
ASSERT (s->session_index == index);
session_send_session_evt_to_thread (a->handle, FIFO_EVENT_DISCONNECT,
thread_index);
return 0;
}
@@ -395,24 +399,6 @@ vnet_connect (vnet_connect_args_t * a)
return vnet_connect_i (a->app_index, a->api_context, sst, &a->tep, a->mp);
}
int
vnet_disconnect (vnet_disconnect_args_t * a)
{
stream_session_t *session;
u32 session_index, thread_index;
if (api_parse_session_handle (a->handle, &session_index, &thread_index))
{
clib_warning ("Invalid handle");
return -1;
}
session = stream_session_get (session_index, thread_index);
stream_session_disconnect (session);
return 0;
}
/*
* fd.io coding-style-patch-verification: ON
*
+32 -6
View File
@@ -30,10 +30,18 @@ typedef enum _session_api_proto
typedef struct _vnet_app_attach_args_t
{
/** Binary API client index */
u32 api_client_index;
/** Application and segment manager options */
u64 *options;
/** Session to application callback functions */
session_cb_vft_t *session_cb_vft;
/** Flag that indicates if app is builtin */
u8 builtin;
/*
* Results
*/
@@ -110,7 +118,7 @@ typedef struct _vnet_disconnect_args_t
typedef enum
{
APP_EVT_QUEUE_SIZE,
SESSION_OPTIONS_FLAGS,
APP_OPTIONS_FLAGS,
SESSION_OPTIONS_SEGMENT_SIZE,
SESSION_OPTIONS_ADD_SEGMENT_SIZE,
SESSION_OPTIONS_RX_FIFO_SIZE,
@@ -119,11 +127,30 @@ typedef enum
SESSION_OPTIONS_N_OPTIONS
} app_attach_options_index_t;
/** Server can handle delegated connect requests from local clients */
#define SESSION_OPTIONS_FLAGS_USE_FIFO (1<<0)
#define foreach_app_options_flags \
_(USE_FIFO, "Use FIFO with redirects") \
_(ADD_SEGMENT, "Add segment and signal app if needed") \
_(BUILTIN_APP, "Application is builtin") \
/** Server wants vpp to add segments when out of memory for fifos */
#define SESSION_OPTIONS_FLAGS_ADD_SEGMENT (1<<1)
typedef enum _app_options
{
#define _(sym, str) APP_OPTIONS_##sym,
foreach_app_options_flags
#undef _
} app_options_t;
typedef enum _app_options_flags
{
#define _(sym, str) APP_OPTIONS_FLAGS_##sym = 1 << APP_OPTIONS_##sym,
foreach_app_options_flags
#undef _
} app_options_flags_t;
///** Server can handle delegated connect requests from local clients */
//#define APP_OPTIONS_FLAGS_USE_FIFO (1<<0)
//
///** Server wants vpp to add segments when out of memory for fifos */
//#define APP_OPTIONS_FLAGS_ADD_SEGMENT (1<<1)
#define VNET_CONNECT_REDIRECTED 123
@@ -138,7 +165,6 @@ int vnet_disconnect_session (vnet_disconnect_args_t * a);
int vnet_bind (vnet_bind_args_t * a);
int vnet_connect (vnet_connect_args_t * a);
int vnet_unbind (vnet_unbind_args_t * a);
int vnet_disconnect (vnet_disconnect_args_t * a);
int
api_parse_session_handle (u64 handle, u32 * session_index,
+38 -27
View File
@@ -218,8 +218,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
* 2) buffer chains */
if (peek_data)
{
n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
rx_offset, len_to_deq0, data0);
n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
len_to_deq0, data0);
if (n_bytes_read <= 0)
goto dequeue_fail;
@@ -230,8 +230,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
else
{
n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
s0->pid, len_to_deq0,
data0);
len_to_deq0, data0);
if (n_bytes_read <= 0)
goto dequeue_fail;
}
@@ -301,6 +300,26 @@ session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
n_tx_pkts, 0);
}
stream_session_t *
session_event_get_session (session_fifo_event_t * e0, u8 thread_index)
{
svm_fifo_t *f0;
stream_session_t *s0;
u32 session_index0;
f0 = e0->fifo;
session_index0 = f0->master_session_index;
/* $$$ add multiple event queues, per vpp worker thread */
ASSERT (f0->master_thread_index == thread_index);
s0 = stream_session_get_if_valid (session_index0, thread_index);
ASSERT (s0->thread_index == thread_index);
return s0;
}
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
@@ -370,34 +389,24 @@ skip_dequeue:
n_events = vec_len (my_fifo_events);
for (i = 0; i < n_events; i++)
{
svm_fifo_t *f0; /* $$$ prefetch 1 ahead maybe */
stream_session_t *s0;
u32 session_index0;
stream_session_t *s0; /* $$$ prefetch 1 ahead maybe */
session_fifo_event_t *e0;
e0 = &my_fifo_events[i];
f0 = e0->fifo;
session_index0 = f0->server_session_index;
/* $$$ add multiple event queues, per vpp worker thread */
ASSERT (f0->server_thread_index == my_thread_index);
s0 = stream_session_get_if_valid (session_index0, my_thread_index);
if (CLIB_DEBUG && !s0)
{
clib_warning ("It's dead, Jim!");
continue;
}
if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
continue;
ASSERT (s0->thread_index == my_thread_index);
switch (e0->event_type)
{
case FIFO_EVENT_SERVER_TX:
case FIFO_EVENT_APP_TX:
s0 = session_event_get_session (e0, my_thread_index);
if (CLIB_DEBUG && !s0)
{
clib_warning ("It's dead, Jim!");
continue;
}
if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
continue;
/* Spray packets in per session type frames, since they go to
* different nodes */
rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
@@ -408,10 +417,12 @@ skip_dequeue:
goto done;
break;
case FIFO_EVENT_SERVER_EXIT:
case FIFO_EVENT_DISCONNECT:
s0 = stream_session_get_from_handle (e0->session_handle);
stream_session_disconnect (s0);
break;
case FIFO_EVENT_BUILTIN_RX:
s0 = session_event_get_session (e0, my_thread_index);
svm_fifo_unset_event (s0->server_rx_fifo);
/* Get session's server */
app = application_get (s0->app_index);
+113 -21
View File
@@ -27,6 +27,11 @@ u32 segment_name_counter = 0;
*/
segment_manager_t *segment_managers = 0;
/**
* Process private segment index
*/
u32 private_segment_index = ~0;
/**
* Default fifo and segment size. TODO config.
*/
@@ -100,6 +105,26 @@ session_manager_add_first_segment (segment_manager_t * sm, u32 segment_size)
return rv;
}
static void
segment_manager_alloc_process_private_segment ()
{
svm_fifo_segment_create_args_t _a, *a = &_a;
if (private_segment_index != ~0)
return;
memset (a, 0, sizeof (*a));
a->segment_name = "process-private-segment";
a->segment_size = ~0;
a->new_segment_index = ~0;
if (svm_fifo_segment_create_process_private (a))
clib_warning ("Failed to create process private segment");
private_segment_index = a->new_segment_index;
ASSERT (private_segment_index != ~0);
}
/**
* Initializes segment manager based on options provided.
* Returns error if svm segment allocation fails.
@@ -114,7 +139,9 @@ segment_manager_init (segment_manager_t * sm,
/* app allocates these */
sm->properties = properties;
if (first_seg_size > 0)
first_seg_size = first_seg_size > 0 ? first_seg_size : default_segment_size;
if (sm->properties->use_private_segment == 0)
{
rv = session_manager_add_first_segment (sm, first_seg_size);
if (rv)
@@ -123,7 +150,15 @@ segment_manager_init (segment_manager_t * sm,
return rv;
}
}
else
{
if (private_segment_index == ~0)
segment_manager_alloc_process_private_segment ();
ASSERT (private_segment_index != ~0);
vec_add1 (sm->segment_indices, private_segment_index);
}
clib_spinlock_init (&sm->lockp);
return 0;
}
@@ -162,8 +197,8 @@ segment_manager_del (segment_manager_t * sm)
stream_session_t *session;
fifo = fifos[i];
session_index = fifo->server_session_index;
thread_index = fifo->server_thread_index;
session_index = fifo->master_session_index;
thread_index = fifo->master_thread_index;
session = stream_session_get (session_index, thread_index);
@@ -183,7 +218,9 @@ segment_manager_del (segment_manager_t * sm)
deleted_thread_indices[i]);
/* Instead of directly removing the session call disconnect */
stream_session_disconnect (session);
session_send_session_evt_to_thread (stream_session_handle (session),
FIFO_EVENT_DISCONNECT,
deleted_thread_indices[i]);
/*
stream_session_table_del (smm, session);
@@ -200,6 +237,7 @@ segment_manager_del (segment_manager_t * sm)
/* svm_fifo_segment_delete (fifo_segment); */
}
clib_spinlock_free (&sm->lockp);
vec_free (deleted_sessions);
vec_free (deleted_thread_indices);
pool_put (segment_managers, sm);
@@ -232,9 +270,13 @@ segment_manager_alloc_session_fifos (segment_manager_t * sm,
u8 added_a_segment = 0;
int i;
/* Allocate svm fifos */
ASSERT (vec_len (sm->segment_indices));
/* Make sure we don't have multiple threads trying to allocate segments
* at the same time. */
clib_spinlock_lock (&sm->lockp);
/* Allocate svm fifos */
again:
for (i = 0; i < vec_len (sm->segment_indices); i++)
{
@@ -283,7 +325,9 @@ again:
}
if (session_manager_add_segment (sm))
return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
{
return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
}
added_a_segment = 1;
goto again;
@@ -295,14 +339,16 @@ again:
}
}
if (added_a_segment)
return segment_manager_notify_app_seg_add (sm, *fifo_segment_index);
/* Backpointers to segment manager */
sm_index = segment_manager_index (sm);
(*server_tx_fifo)->segment_manager = sm_index;
(*server_rx_fifo)->segment_manager = sm_index;
clib_spinlock_unlock (&sm->lockp);
if (added_a_segment)
return segment_manager_notify_app_seg_add (sm, *fifo_segment_index);
return 0;
}
@@ -313,26 +359,72 @@ segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
segment_manager_t *sm;
svm_fifo_segment_private_t *fifo_segment;
sm = segment_manager_get_if_valid (rx_fifo->segment_manager);
/* It's possible to have no segment manager if the session was removed
* as result of a detach */
if (!sm)
return;
fifo_segment = svm_fifo_get_segment (svm_segment_index);
svm_fifo_segment_free_fifo (fifo_segment, rx_fifo);
svm_fifo_segment_free_fifo (fifo_segment, tx_fifo);
/* If we have segment manager, try doing some cleanup.
* It's possible to have no segment manager if the session was removed
* as result of a detach */
sm = segment_manager_get_if_valid (rx_fifo->segment_manager);
if (sm)
/* Remove segment only if it holds no fifos and not the first */
if (sm->segment_indices[0] != svm_segment_index
&& !svm_fifo_segment_has_fifos (fifo_segment))
{
/* Remove segment only if it holds no fifos and not the first */
if (sm->segment_indices[0] != svm_segment_index
&& !svm_fifo_segment_has_fifos (fifo_segment))
{
svm_fifo_segment_delete (fifo_segment);
vec_del1 (sm->segment_indices, svm_segment_index);
}
svm_fifo_segment_delete (fifo_segment);
vec_del1 (sm->segment_indices, svm_segment_index);
}
}
/**
* Allocates shm queue in the first segment
*/
unix_shared_memory_queue_t *
segment_manager_alloc_queue (segment_manager_t * sm, u32 queue_size)
{
ssvm_shared_header_t *sh;
svm_fifo_segment_private_t *segment;
unix_shared_memory_queue_t *q;
void *oldheap;
ASSERT (sm->segment_indices != 0);
segment = svm_fifo_get_segment (sm->segment_indices[0]);
sh = segment->ssvm.sh;
oldheap = ssvm_push_heap (sh);
q =
unix_shared_memory_queue_init (queue_size, sizeof (session_fifo_event_t),
0 /* consumer pid */ , 0
/* signal when queue non-empty */ );
ssvm_pop_heap (oldheap);
return q;
}
/**
* Frees shm queue allocated in the first segment
*/
void
segment_manager_dealloc_queue (segment_manager_t * sm,
unix_shared_memory_queue_t * q)
{
ssvm_shared_header_t *sh;
svm_fifo_segment_private_t *segment;
void *oldheap;
ASSERT (sm->segment_indices != 0);
segment = svm_fifo_get_segment (sm->segment_indices[0]);
sh = segment->ssvm.sh;
oldheap = ssvm_push_heap (sh);
unix_shared_memory_queue_free (q);
ssvm_pop_heap (oldheap);
}
/*
* fd.io coding-style-patch-verification: ON
*
+12
View File
@@ -18,6 +18,10 @@
#include <vnet/vnet.h>
#include <svm/svm_fifo_segment.h>
#include <vlibmemory/unix_shared_memory_queue.h>
#include <vlibmemory/api.h>
#include <vppinfra/lock.h>
typedef struct _segment_manager_properties
{
/** Session fifo sizes. */
@@ -30,10 +34,14 @@ typedef struct _segment_manager_properties
/** Flag that indicates if additional segments should be created */
u8 add_segment;
/** Use private memory segment instead of shared memory */
u8 use_private_segment;
} segment_manager_properties_t;
typedef struct _segment_manager
{
clib_spinlock_t lockp;
/** segments mapped by this manager */
u32 *segment_indices;
@@ -95,6 +103,10 @@ segment_manager_alloc_session_fifos (segment_manager_t * sm,
void
segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
svm_fifo_t * tx_fifo);
unix_shared_memory_queue_t *segment_manager_alloc_queue (segment_manager_t *
sm, u32 queue_size);
void segment_manager_dealloc_queue (segment_manager_t * sm,
unix_shared_memory_queue_t * q);
#endif /* SRC_VNET_SESSION_SEGMENT_MANAGER_H_ */
/*
+78 -60
View File
@@ -377,33 +377,6 @@ stream_session_lookup_transport6 (ip6_address_t * lcl, ip6_address_t * rmt,
return 0;
}
/**
* Allocate vpp event queue (once) per worker thread
*/
void
session_vpp_event_queue_allocate (session_manager_main_t * smm,
u32 thread_index)
{
api_main_t *am = &api_main;
void *oldheap;
if (smm->vpp_event_queues[thread_index] == 0)
{
/* Allocate event fifo in the /vpe-api shared-memory segment */
oldheap = svm_push_data_heap (am->vlib_rp);
smm->vpp_event_queues[thread_index] =
unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
sizeof (session_fifo_event_t),
0 /* consumer pid */ ,
0
/* (do not) send signal when queue non-empty */
);
svm_pop_heap (oldheap);
}
}
int
stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
stream_session_t ** ret_s)
@@ -428,11 +401,11 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
/* Initialize backpointers */
pool_index = s - smm->sessions[thread_index];
server_rx_fifo->server_session_index = pool_index;
server_rx_fifo->server_thread_index = thread_index;
server_rx_fifo->master_session_index = pool_index;
server_rx_fifo->master_thread_index = thread_index;
server_tx_fifo->server_session_index = pool_index;
server_tx_fifo->server_thread_index = thread_index;
server_tx_fifo->master_session_index = pool_index;
server_tx_fifo->master_thread_index = thread_index;
s->server_rx_fifo = server_rx_fifo;
s->server_tx_fifo = server_tx_fifo;
@@ -485,7 +458,7 @@ stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
return -1;
enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, s->pid, len, data);
enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
if (queue_event)
{
@@ -527,14 +500,14 @@ stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
return svm_fifo_peek (s->server_tx_fifo, s->pid, offset, max_bytes, buffer);
return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
}
u32
stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
return svm_fifo_dequeue_drop (s->server_tx_fifo, s->pid, max_bytes);
return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
}
/**
@@ -568,7 +541,7 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
{
/* Fabricate event */
evt.fifo = s->server_rx_fifo;
evt.event_type = FIFO_EVENT_SERVER_RX;
evt.event_type = FIFO_EVENT_APP_RX;
evt.event_id = serial_number++;
/* Add event to server's event queue */
@@ -899,37 +872,45 @@ stream_session_stop_listen (stream_session_t * s)
return 0;
}
void
session_send_session_evt_to_thread (u64 session_handle,
fifo_event_type_t evt_type,
u32 thread_index)
{
static u16 serial_number = 0;
session_fifo_event_t evt;
unix_shared_memory_queue_t *q;
/* Fabricate event */
evt.session_handle = session_handle;
evt.event_type = evt_type;
evt.event_id = serial_number++;
q = session_manager_get_vpp_event_queue (thread_index);
/* Based on request block (or not) for lack of space */
if (PREDICT_TRUE (q->cursize < q->maxsize))
unix_shared_memory_queue_add (q, (u8 *) & evt,
0 /* do wait for mutex */ );
else
{
clib_warning ("queue full");
return;
}
}
/**
* Disconnect session and propagate to transport. This should eventually
* result in a delete notification that allows us to cleanup session state.
* Called for both active/passive disconnects.
*
* Should be called from the session's thread.
*/
void
stream_session_disconnect (stream_session_t * s)
{
// session_fifo_event_t evt;
s->session_state = SESSION_STATE_CLOSED;
/* RPC to vpp evt queue in the right thread */
tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
// {
// /* Fabricate event */
// evt.fifo = s->server_rx_fifo;
// evt.event_type = FIFO_EVENT_SERVER_RX;
// evt.event_id = serial_number++;
//
// /* Based on request block (or not) for lack of space */
// if (PREDICT_TRUE(q->cursize < q->maxsize))
// unix_shared_memory_queue_add (app->event_queue, (u8 *) &evt,
// 0 /* do wait for mutex */);
// else
// {
// clib_warning("fifo full");
// return -1;
// }
// }
}
/**
@@ -976,6 +957,33 @@ session_get_transport_vft (u8 type)
return &tp_vfts[type];
}
/**
* Allocate vpp event queue (once) per worker thread
*/
void
session_vpp_event_queue_allocate (session_manager_main_t * smm,
u32 thread_index)
{
api_main_t *am = &api_main;
void *oldheap;
if (smm->vpp_event_queues[thread_index] == 0)
{
/* Allocate event fifo in the /vpe-api shared-memory segment */
oldheap = svm_push_data_heap (am->vlib_rp);
smm->vpp_event_queues[thread_index] =
unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
sizeof (session_fifo_event_t),
0 /* consumer pid */ ,
0
/* (do not) send signal when queue non-empty */
);
svm_pop_heap (oldheap);
}
}
static clib_error_t *
session_manager_main_enable (vlib_main_t * vm)
{
@@ -1043,6 +1051,18 @@ session_manager_main_enable (vlib_main_t * vm)
return 0;
}
void
session_node_enable_disable (u8 is_en)
{
u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
/* *INDENT-OFF* */
foreach_vlib_main (({
vlib_node_set_state (this_vlib_main, session_queue_node.index,
state);
}));
/* *INDENT-ON* */
}
clib_error_t *
vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
{
@@ -1051,16 +1071,14 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
if (session_manager_main.is_enabled)
return 0;
vlib_node_set_state (vm, session_queue_node.index,
VLIB_NODE_STATE_POLLING);
session_node_enable_disable (is_en);
return session_manager_main_enable (vm);
}
else
{
session_manager_main.is_enabled = 0;
vlib_node_set_state (vm, session_queue_node.index,
VLIB_NODE_STATE_DISABLED);
session_node_enable_disable (is_en);
}
return 0;
+11 -8
View File
@@ -17,9 +17,6 @@
#include <vnet/session/transport.h>
#include <vlibmemory/unix_shared_memory_queue.h>
#include <vlibmemory/api.h>
#include <vppinfra/sparse_vec.h>
#include <svm/svm_fifo_segment.h>
#include <vnet/session/session_debug.h>
#include <vnet/session/segment_manager.h>
@@ -31,10 +28,10 @@
typedef enum
{
FIFO_EVENT_SERVER_RX,
FIFO_EVENT_SERVER_TX,
FIFO_EVENT_APP_RX,
FIFO_EVENT_APP_TX,
FIFO_EVENT_TIMEOUT,
FIFO_EVENT_SERVER_EXIT,
FIFO_EVENT_DISCONNECT,
FIFO_EVENT_BUILTIN_RX
} fifo_event_type_t;
@@ -96,7 +93,11 @@ typedef enum
/* *INDENT-OFF* */
typedef CLIB_PACKED (struct {
svm_fifo_t * fifo;
union
{
svm_fifo_t * fifo;
u64 session_handle;
};
u8 event_type;
u16 event_id;
}) session_fifo_event_t;
@@ -370,7 +371,9 @@ int stream_session_listen (stream_session_t * s, transport_endpoint_t * tep);
int stream_session_stop_listen (stream_session_t * s);
void stream_session_disconnect (stream_session_t * s);
void stream_session_cleanup (stream_session_t * s);
void session_send_session_evt_to_thread (u64 session_handle,
fifo_event_type_t evt_type,
u32 thread_index);
u8 *format_stream_session (u8 * s, va_list * args);
void session_register_transport (u8 type, const transport_proto_vft_t * vft);
+25 -33
View File
@@ -96,7 +96,7 @@ send_session_accept_callback (stream_session_t * s)
memset (mp, 0, sizeof (*mp));
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
mp->context = server->index;
listener = listen_session_get (s->session_type, s->listener_index);
tp_vft = session_get_transport_vft (s->session_type);
tc = tp_vft->get_connection (s->connection_index, s->thread_index);
@@ -270,23 +270,6 @@ static session_cb_vft_t uri_session_cb_vft = {
.redirect_connect_callback = redirect_connect_callback
};
static int
api_session_not_valid (u32 session_index, u32 thread_index)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
stream_session_t *pool;
if (thread_index >= vec_len (smm->sessions))
return VNET_API_ERROR_INVALID_VALUE;
pool = smm->sessions[thread_index];
if (pool_is_free_index (pool, session_index))
return VNET_API_ERROR_INVALID_VALUE_2;
return 0;
}
static void
vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
{
@@ -324,9 +307,9 @@ vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
rv = vnet_application_attach (a);
done:
/* *INDENT-OFF* */
REPLY_MACRO2 (VL_API_APPLICATION_ATTACH_REPLY, ({
rmp->retval = rv;
if (!rv)
{
rmp->segment_name_length = 0;
@@ -558,24 +541,33 @@ static void
vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
{
stream_session_t *s;
int rv;
u32 session_index, thread_index;
session_index = stream_session_index_from_handle (mp->handle);
thread_index = stream_session_thread_from_handle (mp->handle);
if (api_session_not_valid (session_index, thread_index))
return;
vnet_disconnect_args_t _a, *a = &_a;
s = stream_session_get (session_index, thread_index);
rv = mp->retval;
if (rv)
/* Server isn't interested, kill the session */
if (mp->retval)
{
/* Server isn't interested, kill the session */
stream_session_disconnect (s);
return;
a->app_index = mp->context;
a->handle = mp->handle;
vnet_disconnect_session (a);
}
else
{
stream_session_parse_handle (mp->handle, &session_index, &thread_index);
s = stream_session_get_if_valid (session_index, thread_index);
if (!s)
{
clib_warning ("session doesn't exist");
return;
}
if (s->app_index != mp->context)
{
clib_warning ("app doesn't own session");
return;
}
/* XXX volatile? */
s->session_state = SESSION_STATE_READY;
}
s->session_state = SESSION_STATE_READY;
}
static void
+5 -4
View File
@@ -62,8 +62,7 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
bytes_this_chunk = bytes_this_chunk < s->bytes_to_send
? bytes_this_chunk : s->bytes_to_send;
rv = svm_fifo_enqueue_nowait (s->server_tx_fifo, 0 /*pid */ ,
bytes_this_chunk,
rv = svm_fifo_enqueue_nowait (s->server_tx_fifo, bytes_this_chunk,
test_data + test_buf_offset);
/* If we managed to enqueue data... */
@@ -95,7 +94,7 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
{
/* Fabricate TX event, send to vpp */
evt.fifo = s->server_tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = serial_number++;
unix_shared_memory_queue_add (tm->vpp_event_queue, (u8 *) & evt,
@@ -113,7 +112,7 @@ receive_test_chunk (tclient_main_t * tm, session_t * s)
/* Allow enqueuing of new event */
// svm_fifo_unset_event (rx_fifo);
n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (tm->rx_buf),
n_read = svm_fifo_dequeue_nowait (rx_fifo, vec_len (tm->rx_buf),
tm->rx_buf);
if (n_read > 0)
{
@@ -457,6 +456,8 @@ attach_builtin_test_clients ()
options[SESSION_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
options[SESSION_OPTIONS_SEGMENT_SIZE] = (2 << 30); /*$$$$ config / arg */
options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP;
a->options = options;
return vnet_application_attach (a);
+4 -4
View File
@@ -180,7 +180,7 @@ builtin_server_rx_callback (stream_session_t * s)
vec_validate (bsm->rx_buf, max_transfer - 1);
_vec_len (bsm->rx_buf) = max_transfer;
actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, 0, max_transfer,
actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, max_transfer,
bsm->rx_buf);
ASSERT (actual_transfer == max_transfer);
@@ -190,8 +190,7 @@ builtin_server_rx_callback (stream_session_t * s)
* Echo back
*/
n_written =
svm_fifo_enqueue_nowait (tx_fifo, 0, actual_transfer, bsm->rx_buf);
n_written = svm_fifo_enqueue_nowait (tx_fifo, actual_transfer, bsm->rx_buf);
if (n_written != max_transfer)
clib_warning ("short trout!");
@@ -200,7 +199,7 @@ builtin_server_rx_callback (stream_session_t * s)
{
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = serial_number++;
unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
@@ -288,6 +287,7 @@ server_attach ()
a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 128 << 20;
a->options[SESSION_OPTIONS_RX_FIFO_SIZE] = 1 << 16;
a->options[SESSION_OPTIONS_TX_FIFO_SIZE] = 1 << 16;
a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP;
a->segment_name = segment_name;
a->segment_name_length = ARRAY_LEN (segment_name);
+8 -5
View File
@@ -487,7 +487,8 @@ u8 *
format_tcp_connection (u8 * s, va_list * args)
{
tcp_connection_t *tc = va_arg (*args, tcp_connection_t *);
if (!tc)
return s;
if (tc->c_is_ip4)
{
s = format (s, "[#%d][%s] %U:%d->%U:%d", tc->c_thread_index, "T",
@@ -747,12 +748,14 @@ void
tcp_initialize_timer_wheels (tcp_main_t * tm)
{
tw_timer_wheel_16t_2w_512sl_t *tw;
vec_foreach (tw, tm->timer_wheels)
{
/* *INDENT-OFF* */
foreach_vlib_main (({
tw = &tm->timer_wheels[ii];
tw_timer_wheel_init_16t_2w_512sl (tw, tcp_expired_timers_dispatch,
100e-3 /* timer period 100ms */ , ~0);
tw->last_run_time = vlib_time_now (tm->vlib_main);
}
tw->last_run_time = vlib_time_now (this_vlib_main);
}));
/* *INDENT-ON* */
}
clib_error_t *
+4 -4
View File
@@ -1011,8 +1011,8 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
clib_warning ("ooo: offset %d len %d", offset, data_len);
rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo, s0->pid, offset,
data_len, vlib_buffer_get_current (b));
rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo, offset, data_len,
vlib_buffer_get_current (b));
/* Nothing written */
if (rv)
@@ -2392,8 +2392,8 @@ tcp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
{
t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
clib_memcpy (&t0->tcp_header, tcp0, sizeof (t0->tcp_header));
clib_memcpy (&t0->tcp_connection, tc0,
sizeof (t0->tcp_connection));
if (tc0)
clib_memcpy (&t0->tcp_connection, tc0, sizeof (*tc0));
}
vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
-6
View File
@@ -1558,7 +1558,6 @@ tcp46_send_reset_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_buffer_t *b0;
tcp_tx_trace_t *t0;
tcp_header_t *th0;
tcp_connection_t *tc0;
u32 error0 = TCP_ERROR_RST_SENT, next0 = TCP_RESET_NEXT_IP_LOOKUP;
bi0 = from[0];
@@ -1592,13 +1591,8 @@ tcp46_send_reset_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
th0 = ip4_next_header ((ip4_header_t *) th0);
else
th0 = ip6_next_header ((ip6_header_t *) th0);
tc0 =
tcp_connection_get (vnet_buffer (b0)->tcp.connection_index,
my_thread_index);
t0 = vlib_add_trace (vm, node, b0, sizeof (*t0));
clib_memcpy (&t0->tcp_header, th0, sizeof (t0->tcp_header));
clib_memcpy (&t0->tcp_connection, tc0,
sizeof (t0->tcp_connection));
}
vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
+20 -23
View File
@@ -351,8 +351,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
/*
* Enqueue an initial (un-dequeued) chunk
*/
rv = svm_fifo_enqueue_nowait (f, 0 /* pid */ ,
sizeof (u32), (u8 *) test_data);
rv = svm_fifo_enqueue_nowait (f, sizeof (u32), (u8 *) test_data);
TCP_TEST ((rv == sizeof (u32)), "enqueued %d", rv);
TCP_TEST ((f->tail == 4), "fifo tail %u", f->tail);
@@ -364,7 +363,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
{
offset = (2 * i + 1) * sizeof (u32);
data = (u8 *) (test_data + (2 * i + 1));
rv = svm_fifo_enqueue_with_offset (f, 0, offset, sizeof (u32), data);
rv = svm_fifo_enqueue_with_offset (f, offset, sizeof (u32), data);
if (verbose)
vlib_cli_output (vm, "add [%d] [%d, %d]", 2 * i + 1, offset,
offset + sizeof (u32));
@@ -393,7 +392,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
{
offset = (2 * i + 0) * sizeof (u32);
data = (u8 *) (test_data + (2 * i + 0));
rv = svm_fifo_enqueue_with_offset (f, 0, offset, sizeof (u32), data);
rv = svm_fifo_enqueue_with_offset (f, offset, sizeof (u32), data);
if (verbose)
vlib_cli_output (vm, "add [%d] [%d, %d]", 2 * i, offset,
offset + sizeof (u32));
@@ -418,8 +417,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
/*
* Enqueue the missing u32
*/
rv = svm_fifo_enqueue_nowait (f, 0 /* pid */ , sizeof (u32),
(u8 *) (test_data + 2));
rv = svm_fifo_enqueue_nowait (f, sizeof (u32), (u8 *) (test_data + 2));
if (verbose)
vlib_cli_output (vm, "fifo after missing link: %U", format_svm_fifo, f,
1);
@@ -432,8 +430,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
*/
for (i = 0; i < 7; i++)
{
rv = svm_fifo_dequeue_nowait (f, 0 /* pid */ , sizeof (u32),
(u8 *) & data_word);
rv = svm_fifo_dequeue_nowait (f, sizeof (u32), (u8 *) & data_word);
if (rv != sizeof (u32))
{
clib_warning ("bytes dequeues %u", rv);
@@ -457,7 +454,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
{
offset = (2 * i + 1) * sizeof (u32);
data = (u8 *) (test_data + (2 * i + 1));
rv = svm_fifo_enqueue_with_offset (f, 0, offset, sizeof (u32), data);
rv = svm_fifo_enqueue_with_offset (f, offset, sizeof (u32), data);
if (verbose)
vlib_cli_output (vm, "add [%d] [%d, %d]", 2 * i + 1, offset,
offset + sizeof (u32));
@@ -468,13 +465,13 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
}
}
rv = svm_fifo_enqueue_with_offset (f, 0, 8, 21, data);
rv = svm_fifo_enqueue_with_offset (f, 8, 21, data);
TCP_TEST ((rv == 0), "ooo enqueued %u", rv);
TCP_TEST ((svm_fifo_number_ooo_segments (f) == 1),
"number of ooo segments %u", svm_fifo_number_ooo_segments (f));
vec_validate (data_buf, vec_len (data));
svm_fifo_peek (f, 0, 0, vec_len (data), data_buf);
svm_fifo_peek (f, 0, vec_len (data), data_buf);
if (compare_data (data_buf, data, 8, vec_len (data), &j))
{
TCP_TEST (0, "[%d] peeked %u expected %u", j, data_buf[j], data[j]);
@@ -491,7 +488,7 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
{
offset = (2 * i + 1) * sizeof (u32);
data = (u8 *) (test_data + (2 * i + 1));
rv = svm_fifo_enqueue_with_offset (f, 0, offset, sizeof (u32), data);
rv = svm_fifo_enqueue_with_offset (f, offset, sizeof (u32), data);
if (verbose)
vlib_cli_output (vm, "add [%d] [%d, %d]", 2 * i + 1, offset,
offset + sizeof (u32));
@@ -502,13 +499,13 @@ tcp_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
}
}
rv = svm_fifo_enqueue_nowait (f, 0, 29, data);
rv = svm_fifo_enqueue_nowait (f, 29, data);
TCP_TEST ((rv == 32), "ooo enqueued %u", rv);
TCP_TEST ((svm_fifo_number_ooo_segments (f) == 0),
"number of ooo segments %u", svm_fifo_number_ooo_segments (f));
vec_validate (data_buf, vec_len (data));
svm_fifo_peek (f, 0, 0, vec_len (data), data_buf);
svm_fifo_peek (f, 0, vec_len (data), data_buf);
if (compare_data (data_buf, data, 0, vec_len (data), &j))
{
TCP_TEST (0, "[%d] peeked %u expected %u", j, data_buf[j], data[j]);
@@ -551,7 +548,7 @@ tcp_test_fifo2 (vlib_main_t * vm)
{
tp = vp + i;
data64 = tp->offset;
rv = svm_fifo_enqueue_with_offset (f, 0, tp->offset, tp->len,
rv = svm_fifo_enqueue_with_offset (f, tp->offset, tp->len,
(u8 *) & data64);
}
@@ -565,7 +562,7 @@ tcp_test_fifo2 (vlib_main_t * vm)
"first ooo seg length %u", ooo_seg->length);
data64 = 0;
rv = svm_fifo_enqueue_nowait (f, 0, sizeof (u32), (u8 *) & data64);
rv = svm_fifo_enqueue_nowait (f, sizeof (u32), (u8 *) & data64);
TCP_TEST ((rv == 3000), "bytes to be enqueued %u", rv);
svm_fifo_free (f);
@@ -581,7 +578,7 @@ tcp_test_fifo2 (vlib_main_t * vm)
{
tp = &test_data[i];
data64 = tp->offset;
rv = svm_fifo_enqueue_with_offset (f, 0, tp->offset, tp->len,
rv = svm_fifo_enqueue_with_offset (f, tp->offset, tp->len,
(u8 *) & data64);
if (rv)
{
@@ -599,7 +596,7 @@ tcp_test_fifo2 (vlib_main_t * vm)
"first ooo seg length %u", ooo_seg->length);
data64 = 0;
rv = svm_fifo_enqueue_nowait (f, 0, sizeof (u32), (u8 *) & data64);
rv = svm_fifo_enqueue_nowait (f, sizeof (u32), (u8 *) & data64);
TCP_TEST ((rv == 3000), "bytes to be enqueued %u", rv);
@@ -755,7 +752,7 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
for (i = 0; i < vec_len (generate); i++)
{
tp = generate + i;
rv = svm_fifo_enqueue_with_offset (f, 0, fifo_initial_offset
rv = svm_fifo_enqueue_with_offset (f, fifo_initial_offset
+ tp->offset, tp->len,
(u8 *) data_pattern + tp->offset);
}
@@ -776,7 +773,7 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
u32 bytes_to_enq = 1;
if (in_seq_all)
bytes_to_enq = total_size;
rv = svm_fifo_enqueue_nowait (f, 0, bytes_to_enq, data_pattern + 0);
rv = svm_fifo_enqueue_nowait (f, bytes_to_enq, data_pattern + 0);
if (verbose)
vlib_cli_output (vm, "in-order enqueue returned %d", rv);
@@ -793,7 +790,7 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
* Test if peeked data is the same as original data
*/
vec_validate (data_buf, vec_len (data_pattern));
svm_fifo_peek (f, 0, 0, vec_len (data_pattern), data_buf);
svm_fifo_peek (f, 0, vec_len (data_pattern), data_buf);
if (compare_data (data_buf, data_pattern, 0, vec_len (data_pattern), &j))
{
TCP_TEST (0, "[%d] peeked %u expected %u", j, data_buf[j],
@@ -806,11 +803,11 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
*/
if (drop)
{
svm_fifo_dequeue_drop (f, 0, vec_len (data_pattern));
svm_fifo_dequeue_drop (f, vec_len (data_pattern));
}
else
{
svm_fifo_dequeue_nowait (f, 0, vec_len (data_pattern), data_buf);
svm_fifo_dequeue_nowait (f, vec_len (data_pattern), data_buf);
if (compare_data
(data_buf, data_pattern, 0, vec_len (data_pattern), &j))
{
+5 -3
View File
@@ -59,10 +59,10 @@ builtin_server_rx_callback (stream_session_t * s)
vec_validate (my_copy_buffer, this_transfer - 1);
_vec_len (my_copy_buffer) = this_transfer;
actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, 0, this_transfer,
actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, this_transfer,
my_copy_buffer);
ASSERT (actual_transfer == this_transfer);
actual_transfer = svm_fifo_enqueue_nowait (tx_fifo, 0, this_transfer,
actual_transfer = svm_fifo_enqueue_nowait (tx_fifo, this_transfer,
my_copy_buffer);
ASSERT (actual_transfer == this_transfer);
@@ -72,7 +72,7 @@ builtin_server_rx_callback (stream_session_t * s)
{
/* Fabricate TX event, send to ourselves */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = 0;
q = session_manager_get_vpp_event_queue (s->thread_index);
unix_shared_memory_queue_add (q, (u8 *) & evt,
@@ -110,6 +110,8 @@ attach_builtin_uri_server ()
options[SESSION_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
options[SESSION_OPTIONS_SEGMENT_SIZE] = (2 << 30); /*$$$$ config / arg */
options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_BUILTIN_APP;
a->options = options;
return vnet_application_attach (a);
+2 -3
View File
@@ -145,8 +145,7 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
goto trace0;
}
svm_fifo_enqueue_nowait (f0, 0 /* pid */ ,
udp_len0 - sizeof (*udp0),
svm_fifo_enqueue_nowait (f0, udp_len0 - sizeof (*udp0),
(u8 *) (udp0 + 1));
b0->error = node->errors[SESSION_ERROR_ENQUEUED];
@@ -255,7 +254,7 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
{
/* Fabricate event */
evt.fifo = s0->server_rx_fifo;
evt.event_type = FIFO_EVENT_SERVER_RX;
evt.event_type = FIFO_EVENT_APP_RX;
evt.event_id = serial_number++;
/* Add event to server's event queue */