Add support for tcp/session buffer chains

Change-Id: I01c6e3dc3a1b2785df37bb66b19c4b5cbb8f3211
Signed-off-by: Florin Coras <fcoras@cisco.com>
This commit is contained in:
Florin Coras
2017-05-07 19:12:02 -07:00
committed by Damjan Marion
parent 1ea82dfe5d
commit f6d68ed2db
7 changed files with 270 additions and 89 deletions

View File

@ -6,14 +6,28 @@ import time
# action can be reflect or drop
action = "drop"
test = 0
def test_data (data, n_rcvd):
n_read = len (data);
for i in range(n_read):
expected = (n_rcvd + i) & 0xff
byte_got = ord (data[i])
if (byte_got != expected):
print("Difference at byte {}. Expected {} got {}"
.format(n_rcvd + i, expected, byte_got))
return n_read
def handle_connection (connection, client_address):
print("Received connection from {}".format(repr(client_address)))
n_rcvd = 0
try:
while True:
data = connection.recv(4096)
if not data:
break;
if (test == 1):
n_rcvd += test_data (data, n_rcvd)
if (action != "drop"):
connection.sendall(data)
finally:
@ -78,8 +92,9 @@ def run(mode, ip, port):
if __name__ == "__main__":
if (len(sys.argv)) < 4:
raise Exception("Usage: ./dummy_app <mode> <ip> <port> [<action>]")
if (len(sys.argv) == 5):
raise Exception("Usage: ./dummy_app <mode> <ip> <port> [<action> <test>]")
if (len(sys.argv) == 6):
action = sys.argv[4]
test = int(sys.argv[5])
run (sys.argv[1], sys.argv[2], int(sys.argv[3]))

View File

@ -17,6 +17,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <vppinfra/format.h>
#include <signal.h>
@ -72,32 +73,59 @@ setup_signal_handler (void)
int
main (int argc, char *argv[])
{
int sockfd, portno, n, sent, accfd;
int sockfd, portno, n, sent, accfd, reuse;
socklen_t client_addr_len;
struct sockaddr_in serv_addr;
struct sockaddr_in client;
struct hostent *server;
u8 *rx_buffer = 0;
if (0 && argc < 3)
if (argc > 1 && argc < 3)
{
fformat (stderr, "usage %s hostname port\n", argv[0]);
fformat (stderr, "usage %s host port\n", argv[0]);
exit (0);
}
if (argc >= 3)
{
portno = atoi (argv[2]);
server = gethostbyname (argv[1]);
if (server == NULL)
{
clib_unix_warning ("gethostbyname");
exit (1);
}
}
else
{
/* Defaults */
portno = 1234;
server = gethostbyname ("6.0.1.1");
if (server == NULL)
{
clib_unix_warning ("gethostbyname");
exit (1);
}
}
setup_signal_handler ();
portno = 1234; // atoi(argv[2]);
sockfd = socket (AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
clib_unix_error ("socket");
exit (1);
}
server = gethostbyname ("6.0.1.1");
if (server == NULL)
reuse = 1;
if (setsockopt (sockfd, SOL_SOCKET, SO_REUSEADDR, (const char *) &reuse,
sizeof (reuse)) < 0)
{
clib_unix_warning ("gethostbyname");
clib_unix_error ("setsockopt(SO_REUSEADDR) failed");
exit (1);
}
bzero ((char *) &serv_addr, sizeof (serv_addr));
serv_addr.sin_family = AF_INET;
bcopy ((char *) server->h_addr,
@ -123,12 +151,15 @@ main (int argc, char *argv[])
if (signal_received)
break;
accfd = accept (sockfd, 0 /* don't care */ , 0);
client_addr_len = sizeof (struct sockaddr);
accfd = accept (sockfd, (struct sockaddr *) &client, &client_addr_len);
if (accfd < 0)
{
clib_unix_warning ("accept");
continue;
}
fformat (stderr, "Accepted connection from: %s : %d\n",
inet_ntoa (client.sin_addr), client.sin_port);
while (1)
{
n = recv (accfd, rx_buffer, vec_len (rx_buffer), 0 /* flags */ );

View File

@ -70,6 +70,58 @@ static u32 session_type_to_next[] = {
SESSION_QUEUE_NEXT_IP6_LOOKUP,
};
always_inline void
session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
u8 thread_index, svm_fifo_t * fifo,
vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg,
u32 * left_to_snd0, u16 * n_bufs, u32 * rx_offset,
u16 deq_per_buf, u8 peek_data)
{
vlib_buffer_t *chain_b0, *prev_b0;
u32 chain_bi0;
u16 len_to_deq0, n_bytes_read;
u8 *data0, j;
chain_bi0 = bi0;
chain_b0 = b0;
for (j = 1; j < n_bufs_per_seg; j++)
{
prev_b0 = chain_b0;
len_to_deq0 = clib_min (*left_to_snd0, deq_per_buf);
*n_bufs -= 1;
chain_bi0 = smm->tx_buffers[thread_index][*n_bufs];
_vec_len (smm->tx_buffers[thread_index]) = *n_bufs;
chain_b0 = vlib_get_buffer (vm, chain_bi0);
chain_b0->current_data = 0;
data0 = vlib_buffer_get_current (chain_b0);
if (peek_data)
{
n_bytes_read = svm_fifo_peek (fifo, *rx_offset, len_to_deq0, data0);
*rx_offset += n_bytes_read;
}
else
{
n_bytes_read = svm_fifo_dequeue_nowait (fifo, len_to_deq0, data0);
}
ASSERT (n_bytes_read == len_to_deq0);
chain_b0->current_length = n_bytes_read;
b0->total_length_not_including_first_buffer += chain_b0->current_length;
/* update previous buffer */
prev_b0->next_buffer = chain_bi0;
prev_b0->flags |= VLIB_BUFFER_NEXT_PRESENT;
/* update current buffer */
chain_b0->next_buffer = 0;
*left_to_snd0 -= n_bytes_read;
if (*left_to_snd0 == 0)
break;
}
}
always_inline int
session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
session_manager_main_t * smm,
@ -78,16 +130,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
int *n_tx_packets, u8 peek_data)
{
u32 n_trace = vlib_get_trace_count (vm, node);
u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
u32 n_frame_bytes, n_frames_per_evt;
u32 left_to_snd0, max_len_to_snd0, len_to_deq0, snd_space0;
u32 n_bufs_per_evt, n_frames_per_evt;
transport_connection_t *tc0;
transport_proto_vft_t *transport_vft;
u32 next_index, next0, *to_next, n_left_to_next, bi0;
vlib_buffer_t *b0;
u32 rx_offset = 0, max_dequeue0;
u16 snd_mss0;
u32 rx_offset = 0, max_dequeue0, n_bytes_per_seg;
u16 snd_mss0, n_bufs_per_seg, n_bufs;
u8 *data0;
int i, n_bytes_read;
u32 n_bytes_per_buf, deq_per_buf;
next_index = next0 = session_type_to_next[s0->session_type];
@ -134,8 +187,15 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
max_len_to_snd0 = snd_space0;
}
n_frame_bytes = snd_mss0 * VLIB_FRAME_SIZE;
n_frames_per_evt = ceil ((double) max_len_to_snd0 / n_frame_bytes);
n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm,
VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
n_bytes_per_seg = MAX_HDRS_LEN + snd_mss0;
n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf);
n_bufs_per_evt = (ceil ((double) max_len_to_snd0 / n_bytes_per_seg))
* n_bufs_per_seg;
n_frames_per_evt = ceil ((double) n_bufs_per_evt / VLIB_FRAME_SIZE);
deq_per_buf = clib_min (snd_mss0, n_bytes_per_buf);
n_bufs = vec_len (smm->tx_buffers[thread_index]);
left_to_snd0 = max_len_to_snd0;
@ -146,9 +206,9 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
{
vec_validate (smm->tx_buffers[thread_index],
n_bufs + VLIB_FRAME_SIZE - 1);
n_bufs +=
vlib_buffer_alloc (vm, &smm->tx_buffers[thread_index][n_bufs],
VLIB_FRAME_SIZE);
n_bufs += vlib_buffer_alloc (vm,
&smm->tx_buffers[thread_index][n_bufs],
VLIB_FRAME_SIZE);
/* buffer shortage
* XXX 0.9 because when debugging we might not get a full frame */
@ -165,11 +225,14 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
}
vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
while (left_to_snd0 && n_left_to_next)
while (left_to_snd0 && n_left_to_next >= n_bufs_per_seg)
{
/*
* Handle first buffer in chain separately
*/
/* Get free buffer */
n_bufs--;
bi0 = smm->tx_buffers[thread_index][n_bufs];
bi0 = smm->tx_buffers[thread_index][--n_bufs];
_vec_len (smm->tx_buffers[thread_index]) = n_bufs;
b0 = vlib_get_buffer (vm, bi0);
@ -177,11 +240,60 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
| VNET_BUFFER_LOCALLY_ORIGINATED;
b0->current_data = 0;
b0->total_length_not_including_first_buffer = 0;
/* RX on the local interface. tx in default fib */
vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;
len_to_deq0 = clib_min (left_to_snd0, deq_per_buf);
data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
if (peek_data)
{
n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
len_to_deq0, data0);
/* Keep track of progress locally, transport is also supposed to
* increment it independently when pushing the header */
rx_offset += n_bytes_read;
}
else
{
n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
len_to_deq0, data0);
}
if (n_bytes_read <= 0)
goto dequeue_fail;
b0->current_length = n_bytes_read;
left_to_snd0 -= n_bytes_read;
*n_tx_packets = *n_tx_packets + 1;
/*
* Fill in the remaining buffers in the chain, if any
*/
if (PREDICT_FALSE (n_bufs_per_seg > 1))
session_tx_fifo_chain_tail (smm, vm, thread_index,
s0->server_tx_fifo, b0, bi0,
n_bufs_per_seg, &left_to_snd0,
&n_bufs, &rx_offset, deq_per_buf,
peek_data);
/* Ask transport to push header after current_length and
* total_length_not_including_first_buffer are updated */
transport_vft->push_header (tc0, b0);
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
ed->data[0] = e0->event_id;
ed->data[1] = max_dequeue0;
ed->data[2] = len_to_deq0;
ed->data[3] = left_to_snd0;
}));
/* *INDENT-ON* */
/* usual speculation, or the enqueue_x1 macro will barf */
to_next[0] = bi0;
to_next += 1;
@ -199,50 +311,6 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
t0->server_thread_index = s0->thread_index;
}
len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
ed->data[0] = e0->event_id;
ed->data[1] = max_dequeue0;
ed->data[2] = len_to_deq0;
ed->data[3] = left_to_snd0;
}));
/* *INDENT-ON* */
/* Make room for headers */
data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
/* Dequeue the data
* TODO 1) peek instead of dequeue
* 2) buffer chains */
if (peek_data)
{
n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
len_to_deq0, data0);
if (n_bytes_read <= 0)
goto dequeue_fail;
/* Keep track of progress locally, transport is also supposed to
* increment it independently when pushing the header */
rx_offset += n_bytes_read;
}
else
{
n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
len_to_deq0, data0);
if (n_bytes_read <= 0)
goto dequeue_fail;
}
b0->current_length = n_bytes_read;
/* Ask transport to push header */
transport_vft->push_header (tc0, b0);
left_to_snd0 -= n_bytes_read;
*n_tx_packets = *n_tx_packets + 1;
vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
to_next, n_left_to_next,
bi0, next0);

View File

@ -432,33 +432,97 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
return 0;
}
/** Enqueue buffer chain tail */
always_inline int
session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
u32 offset, u8 is_in_order)
{
vlib_buffer_t *chain_b;
u32 chain_bi = b->next_buffer;
vlib_main_t *vm = vlib_get_main ();
u8 *data, len;
u16 written = 0;
int rv = 0;
do
{
chain_b = vlib_get_buffer (vm, chain_bi);
data = vlib_buffer_get_current (chain_b);
len = chain_b->current_length;
if (is_in_order)
{
rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
if (rv < len)
{
return (rv > 0) ? (written + rv) : written;
}
written += rv;
}
else
{
rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
data);
if (rv)
return -1;
offset += len;
}
}
while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
? chain_b->next_buffer : 0));
if (is_in_order)
return written;
return 0;
}
/*
* Enqueue data for delivery to session peer. Does not notify peer of enqueue
* event but on request can queue notification events for later delivery by
* calling stream_server_flush_enqueue_events().
*
* @param tc Transport connection which is to be enqueued data
* @param data Data to be enqueued
* @param len Length of data to be enqueued
* @param b Buffer to be enqueued
* @param offset Offset at which to start enqueueing if out-of-order
* @param queue_event Flag to indicate if peer is to be notified or if event
* is to be queued. The former is useful when more data is
* enqueued and only one event is to be generated.
* @param is_in_order Flag to indicate if data is in order
* @return Number of bytes enqueued or a negative value if enqueueing failed.
*/
int
stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
u8 queue_event)
stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
u32 offset, u8 queue_event, u8 is_in_order)
{
stream_session_t *s;
int enqueued;
int enqueued = 0, rv;
s = stream_session_get (tc->s_index, tc->thread_index);
/* Make sure there's enough space left. We might've filled the pipes */
if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
return -1;
enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
if (is_in_order)
{
enqueued =
svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
vlib_buffer_get_current (b));
if (PREDICT_FALSE
((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0))
{
rv = session_enqueue_chain_tail (s, b, 0, 1);
if (rv <= 0)
return enqueued;
enqueued += rv;
}
}
else
{
rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset,
b->current_length,
vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
if (rv)
return -1;
}
if (queue_event)
{
@ -476,7 +540,10 @@ stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
}
}
return enqueued;
if (is_in_order)
return enqueued;
return 0;
}
/** Check if we have space in rx fifo to push more bytes */

View File

@ -345,8 +345,8 @@ stream_session_fifo_size (transport_connection_t * tc)
}
int
stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
u8 queue_event);
stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
u32 offset, u8 queue_event, u8 is_in_order);
u32
stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes);

View File

@ -993,9 +993,8 @@ tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b,
return TCP_ERROR_PURE_ACK;
}
written = stream_session_enqueue_data (&tc->connection,
vlib_buffer_get_current (b),
data_len, 1 /* queue event */ );
written = stream_session_enqueue_data (&tc->connection, b, 0,
1 /* queue event */ , 1);
TCP_EVT_DBG (TCP_EVT_INPUT, tc, 0, data_len, written);
@ -1053,12 +1052,10 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
return TCP_ERROR_PURE_ACK;
}
s0 = stream_session_get (tc->c_s_index, tc->c_thread_index);
/* Enqueue out-of-order data with absolute offset */
rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo,
vnet_buffer (b)->tcp.seq_number,
data_len, vlib_buffer_get_current (b));
rv = stream_session_enqueue_data (&tc->connection, b,
vnet_buffer (b)->tcp.seq_number,
0 /* queue event */ , 0);
/* Nothing written */
if (rv)
@ -1075,6 +1072,8 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
ooo_segment_t *newest;
u32 start, end;
s0 = stream_session_get (tc->c_s_index, tc->c_thread_index);
/* Get the newest segment from the fifo */
newest = svm_fifo_newest_ooo_segment (s0->server_rx_fifo);
start = ooo_segment_offset (s0->server_rx_fifo, newest);
@ -2543,6 +2542,7 @@ do { \
_(FIN_WAIT_1, TCP_FLAG_FIN, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE);
/* FIN confirming that the peer (app) has closed */
_(FIN_WAIT_2, TCP_FLAG_FIN, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE);
_(FIN_WAIT_2, TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE);
_(FIN_WAIT_2, TCP_FLAG_FIN | TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS,
TCP_ERROR_NONE);
_(LAST_ACK, TCP_FLAG_ACK, TCP_INPUT_NEXT_RCV_PROCESS, TCP_ERROR_NONE);

View File

@ -46,7 +46,7 @@ typedef struct
tcp_connection_t tcp_connection;
} tcp_tx_trace_t;
u16 dummy_mtu = 400;
u16 dummy_mtu = 1460;
u8 *
format_tcp_tx_trace (u8 * s, va_list * args)
@ -923,7 +923,7 @@ tcp_push_hdr_i (tcp_connection_t * tc, vlib_buffer_t * b,
u8 tcp_hdr_opts_len, opts_write_len, flags;
tcp_header_t *th;
data_len = b->current_length;
data_len = b->current_length + b->total_length_not_including_first_buffer;
vnet_buffer (b)->tcp.flags = 0;
if (compute_opts)