TCP/session improvements

- Added svm fifo flag for tracking fifo dequeue events (replaces event
  length). Updated all code to switch to the new scheme.
- More session debugging
- Fix peek index wrap
- Add a trivial socket test client
- Fast retransmit/cc fixes
- tx and rx SACK fixes and unit testing
- SRTT computation fix
- remove dupack/ack burst filters
- improve ack rx
- improved segment rx
- builtin client test code

Change-Id: Ic4eb2d5ca446eb2260ccd3ccbcdaa73c64e7f4e1
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Dave Barach <dbarach@cisco.com>
This commit is contained in:
Florin Coras
2017-03-13 03:49:51 -07:00
parent 98ab09159a
commit 6792ec0596
28 changed files with 2207 additions and 569 deletions
+19 -16
View File
@@ -13,7 +13,7 @@
* limitations under the License.
*/
#include "svm_fifo.h"
#include <svm/svm_fifo.h>
/** create an svm fifo, in the current heap. Fails vs blow up the process */
svm_fifo_t *
@@ -362,18 +362,19 @@ svm_fifo_enqueue_nowait (svm_fifo_t * f,
return svm_fifo_enqueue_internal (f, pid, max_bytes, copy_from_here);
}
/** Enqueue a future segment.
/**
* Enqueue a future segment.
*
* Two choices: either copies the entire segment, or copies nothing
* Returns 0 of the entire segment was copied
* Returns -1 if none of the segment was copied due to lack of space
*/
static int
svm_fifo_enqueue_with_offset_internal2 (svm_fifo_t * f,
int pid,
u32 offset,
u32 required_bytes,
u8 * copy_from_here)
svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
int pid,
u32 offset,
u32 required_bytes,
u8 * copy_from_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
u32 cursize, nitems;
@@ -424,14 +425,14 @@ svm_fifo_enqueue_with_offset (svm_fifo_t * f,
u32 offset,
u32 required_bytes, u8 * copy_from_here)
{
return svm_fifo_enqueue_with_offset_internal2
return svm_fifo_enqueue_with_offset_internal
(f, pid, offset, required_bytes, copy_from_here);
}
static int
svm_fifo_dequeue_internal2 (svm_fifo_t * f,
int pid, u32 max_bytes, u8 * copy_here)
svm_fifo_dequeue_internal (svm_fifo_t * f,
int pid, u32 max_bytes, u8 * copy_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
u32 cursize, nitems;
@@ -484,7 +485,7 @@ int
svm_fifo_dequeue_nowait (svm_fifo_t * f,
int pid, u32 max_bytes, u8 * copy_here)
{
return svm_fifo_dequeue_internal2 (f, pid, max_bytes, copy_here);
return svm_fifo_dequeue_internal (f, pid, max_bytes, copy_here);
}
int
@@ -492,7 +493,7 @@ svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
u8 * copy_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
u32 cursize, nitems;
u32 cursize, nitems, real_head;
if (PREDICT_FALSE (f->cursize == 0))
return -2; /* nothing in the fifo */
@@ -500,6 +501,8 @@ svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
/* read cursize, which can only increase while we're working */
cursize = f->cursize;
nitems = f->nitems;
real_head = f->head + offset;
real_head = real_head >= nitems ? real_head - nitems : real_head;
/* Number of bytes we're going to copy */
total_copy_bytes = (cursize < max_bytes) ? cursize : max_bytes;
@@ -508,9 +511,9 @@ svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
{
/* Number of bytes in first copy segment */
first_copy_bytes =
((nitems - f->head + offset) < total_copy_bytes) ?
(nitems - f->head + offset) : total_copy_bytes;
clib_memcpy (copy_here, &f->data[f->head + offset], first_copy_bytes);
((nitems - real_head) < total_copy_bytes) ?
(nitems - real_head) : total_copy_bytes;
clib_memcpy (copy_here, &f->data[real_head], first_copy_bytes);
/* Number of bytes in second copy segment, if any */
second_copy_bytes = total_copy_bytes - first_copy_bytes;
+26 -2
View File
@@ -46,9 +46,11 @@ typedef struct
{
pthread_mutex_t mutex; /* 8 bytes */
pthread_cond_t condvar; /* 8 bytes */
u32 owner_pid;
svm_lock_tag_t tag;
volatile u32 cursize;
volatile u32 cursize; /**< current fifo size */
volatile u8 has_event; /**< non-zero if deq event exists */
u32 owner_pid;
u32 nitems;
/* Backpointers */
@@ -112,6 +114,28 @@ svm_fifo_has_ooo_data (svm_fifo_t * f)
return f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX;
}
/**
* Sets fifo event flag.
*
* @return 1 if flag was not set.
*/
always_inline u8
svm_fifo_set_event (svm_fifo_t * f)
{
/* Probably doesn't need to be atomic. Still, better avoid surprises */
return __sync_lock_test_and_set (&f->has_event, 1) == 0;
}
/**
* Unsets fifo event flag.
*/
always_inline void
svm_fifo_unset_event (svm_fifo_t * f)
{
/* Probably doesn't need to be atomic. Still, better avoid surprises */
__sync_lock_test_and_set (&f->has_event, 0);
}
svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
int svm_fifo_enqueue_nowait (svm_fifo_t * f, int pid, u32 max_bytes,
+2 -2
View File
@@ -15,8 +15,8 @@
#ifndef __included_ssvm_fifo_segment_h__
#define __included_ssvm_fifo_segment_h__
#include "svm_fifo.h"
#include "ssvm.h"
#include <svm/svm_fifo.h>
#include <svm/ssvm.h>
typedef struct
{
+4 -1
View File
@@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
noinst_PROGRAMS += uri_udp_test uri_tcp_test
noinst_PROGRAMS += uri_udp_test uri_tcp_test uri_socket_test
uri_udp_test_SOURCES = uri/uri_udp_test.c
uri_udp_test_LDADD = libvlibmemoryclient.la libvlibapi.la libsvm.la \
@@ -20,3 +20,6 @@ uri_udp_test_LDADD = libvlibmemoryclient.la libvlibapi.la libsvm.la \
uri_tcp_test_SOURCES = uri/uri_tcp_test.c
uri_tcp_test_LDADD = libvlibmemoryclient.la libvlibapi.la libsvm.la \
libvppinfra.la -lpthread -lm -lrt
uri_socket_test_SOURCES = uri/uri_socket_test.c
uri_socket_test_LDADD = libvppinfra.la -lpthread -lm -lrt
+126
View File
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2017 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <vppinfra/format.h>
int
main (int argc, char *argv[])
{
int sockfd, portno, n;
struct sockaddr_in serv_addr;
struct hostent *server;
u8 *rx_buffer = 0, *tx_buffer = 0;
u32 offset;
int iter, i;
if (0 && argc < 3)
{
fformat (stderr, "usage %s hostname port\n", argv[0]);
exit (0);
}
portno = 1234; // atoi(argv[2]);
sockfd = socket (AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
clib_unix_error ("socket");
exit (1);
}
server = gethostbyname ("6.0.1.1" /* argv[1] */ );
if (server == NULL)
{
clib_unix_warning ("gethostbyname");
exit (1);
}
bzero ((char *) &serv_addr, sizeof (serv_addr));
serv_addr.sin_family = AF_INET;
bcopy ((char *) server->h_addr,
(char *) &serv_addr.sin_addr.s_addr, server->h_length);
serv_addr.sin_port = htons (portno);
if (connect (sockfd, (const void *) &serv_addr, sizeof (serv_addr)) < 0)
{
clib_unix_warning ("connect");
exit (1);
}
vec_validate (rx_buffer, 1400);
vec_validate (tx_buffer, 1400);
for (i = 0; i < vec_len (tx_buffer); i++)
tx_buffer[i] = (i + 1) % 0xff;
/*
* Send one packet to warm up the RX pipeline
*/
n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
if (n != vec_len (tx_buffer))
{
clib_unix_warning ("write");
exit (0);
}
for (iter = 0; iter < 100000; iter++)
{
if (iter < 99999)
{
n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
if (n != vec_len (tx_buffer))
{
clib_unix_warning ("write");
exit (0);
}
}
offset = 0;
do
{
n = recv (sockfd, rx_buffer + offset,
vec_len (rx_buffer) - offset, 0 /* flags */ );
if (n < 0)
{
clib_unix_warning ("read");
exit (0);
}
offset += n;
}
while (offset < vec_len (rx_buffer));
for (i = 0; i < vec_len (rx_buffer); i++)
{
if (rx_buffer[i] != tx_buffer[i])
{
clib_warning ("[%d] read 0x%x not 0x%x",
rx_buffer[i], tx_buffer[i]);
exit (1);
}
}
}
close (sockfd);
return 0;
}
/*
* fd.io coding-style-patch-verification: ON
*
* Local Variables:
* eval: (c-set-style "gnu")
* End:
*/
+109 -50
View File
@@ -116,6 +116,7 @@ typedef struct
pthread_t client_rx_thread_handle;
u32 client_bytes_received;
u8 test_return_packets;
u32 bytes_to_send;
/* convenience */
svm_fifo_segment_main_t *segment_main;
@@ -313,11 +314,16 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
rx_fifo = e->fifo;
bytes = e->enqueue_length;
bytes = svm_fifo_max_dequeue (rx_fifo);
/* Allow enqueuing of new event */
svm_fifo_unset_event (rx_fifo);
/* Read the bytes */
do
{
n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
utm->rx_buf);
n_read = svm_fifo_dequeue_nowait (rx_fifo, 0,
clib_min (vec_len (utm->rx_buf),
bytes), utm->rx_buf);
if (n_read > 0)
{
bytes -= n_read;
@@ -333,9 +339,17 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
}
utm->client_bytes_received += n_read;
}
else
{
if (n_read == -2)
{
clib_warning ("weird!");
break;
}
}
}
while (n_read < 0 || bytes > 0);
while (bytes > 0);
}
void
@@ -479,47 +493,41 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
}
}
void
client_send_data (uri_tcp_test_main_t * utm)
static void
send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
u32 bytes)
{
u8 *test_data = utm->connect_test_data;
u64 bytes_sent = 0;
int rv;
int mypid = getpid ();
session_t *session;
svm_fifo_t *tx_fifo;
int buffer_offset, bytes_to_send = 0;
int test_buf_offset = 0;
u32 bytes_to_snd;
u32 queue_max_chunk = 64 << 10, actual_write;
session_fifo_event_t evt;
static int serial_number = 0;
int i;
u32 max_chunk = 64 << 10, write;
int rv;
session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
tx_fifo = session->server_tx_fifo;
bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
if (bytes_to_snd > vec_len (test_data))
bytes_to_snd = vec_len (test_data);
vec_validate (utm->rx_buf, vec_len (test_data) - 1);
for (i = 0; i < 1; i++)
while (bytes_to_snd > 0)
{
bytes_to_send = vec_len (test_data);
buffer_offset = 0;
while (bytes_to_send > 0)
actual_write =
bytes_to_snd > queue_max_chunk ? queue_max_chunk : bytes_to_snd;
rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, actual_write,
test_data + test_buf_offset);
if (rv > 0)
{
write = bytes_to_send > max_chunk ? max_chunk : bytes_to_send;
rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, write,
test_data + buffer_offset);
bytes_to_snd -= rv;
test_buf_offset += rv;
bytes_sent += rv;
if (rv > 0)
if (svm_fifo_set_event (tx_fifo))
{
bytes_to_send -= rv;
buffer_offset += rv;
bytes_sent += rv;
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
/* $$$$ for event logging */
evt.enqueue_length = rv;
evt.event_id = serial_number++;
unix_shared_memory_queue_add (utm->vpp_event_queue,
@@ -528,13 +536,40 @@ client_send_data (uri_tcp_test_main_t * utm)
}
}
}
}
void
client_send_data (uri_tcp_test_main_t * utm)
{
u8 *test_data = utm->connect_test_data;
int mypid = getpid ();
session_t *session;
svm_fifo_t *tx_fifo;
u32 n_iterations, leftover;
int i;
session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
tx_fifo = session->server_tx_fifo;
vec_validate (utm->rx_buf, vec_len (test_data) - 1);
n_iterations = utm->bytes_to_send / vec_len (test_data);
for (i = 0; i < n_iterations; i++)
{
send_test_chunk (utm, tx_fifo, mypid, 0);
}
leftover = utm->bytes_to_send % vec_len (test_data);
if (leftover)
send_test_chunk (utm, tx_fifo, mypid, leftover);
if (utm->test_return_packets)
{
f64 timeout = clib_time_now (&utm->clib_time) + 2;
/* Wait for the outstanding packets */
while (utm->client_bytes_received < vec_len (test_data))
while (utm->client_bytes_received <
vec_len (test_data) * n_iterations + leftover)
{
if (clib_time_now (&utm->clib_time) > timeout)
{
@@ -542,9 +577,8 @@ client_send_data (uri_tcp_test_main_t * utm)
break;
}
}
utm->time_to_stop = 1;
}
utm->time_to_stop = 1;
}
void
@@ -599,6 +633,11 @@ client_test (uri_tcp_test_main_t * utm)
/* Disconnect */
client_disconnect (utm);
if (wait_for_state_change (utm, STATE_START))
{
return;
}
}
static void
@@ -714,7 +753,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
{
svm_fifo_t *rx_fifo, *tx_fifo;
int n_read;
session_fifo_event_t evt;
unix_shared_memory_queue_t *q;
int rv, bytes;
@@ -722,34 +760,46 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
rx_fifo = e->fifo;
tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
bytes = e->enqueue_length;
bytes = svm_fifo_max_dequeue (rx_fifo);
/* Allow enqueuing of a new event */
svm_fifo_unset_event (rx_fifo);
if (bytes == 0)
return;
/* Read the bytes */
do
{
n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
utm->rx_buf);
if (n_read > 0)
bytes -= n_read;
if (utm->drop_packets)
continue;
/* Reflect if a non-drop session */
if (!utm->drop_packets && n_read > 0)
if (n_read > 0)
{
do
{
rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf);
}
while (rv == -2 && !utm->time_to_stop);
while (rv <= 0 && !utm->time_to_stop);
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
/* $$$$ for event logging */
evt.enqueue_length = n_read;
evt.event_id = e->event_id;
q = utm->vpp_event_queue;
unix_shared_memory_queue_add (q, (u8 *) & evt,
0 /* do wait for mutex */ );
/* If event wasn't set, add one */
if (svm_fifo_set_event (tx_fifo))
{
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
evt.event_id = e->event_id;
q = utm->vpp_event_queue;
unix_shared_memory_queue_add (q, (u8 *) & evt,
0 /* do wait for mutex */ );
}
}
if (n_read > 0)
bytes -= n_read;
}
while ((n_read < 0 || bytes > 0) && !utm->time_to_stop);
}
@@ -852,7 +902,10 @@ static void
vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
mp)
{
uri_tcp_test_main_t *utm = &uri_tcp_test_main;
clib_warning ("retval %d", ntohl (mp->retval));
utm->state = STATE_START;
}
#define foreach_uri_msg \
@@ -888,6 +941,7 @@ main (int argc, char **argv)
u8 *heap, *uri = 0;
u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234";
u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
u32 bytes_to_send = 64 << 10, mbytes;
u32 tmp;
mheap_t *h;
session_t *session;
@@ -934,6 +988,10 @@ main (int argc, char **argv)
drop_packets = 1;
else if (unformat (a, "test"))
test_return_packets = 1;
else if (unformat (a, "mbytes %d", &mbytes))
{
bytes_to_send = mbytes << 20;
}
else
{
fformat (stderr, "%s: usage [master|slave]\n");
@@ -956,6 +1014,7 @@ main (int argc, char **argv)
utm->segment_main = &svm_fifo_segment_main;
utm->drop_packets = drop_packets;
utm->test_return_packets = test_return_packets;
utm->bytes_to_send = bytes_to_send;
setup_signal_handlers ();
uri_api_hookup (utm);
+8 -5
View File
@@ -742,17 +742,20 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
/* $$$$ for event logging */
evt.enqueue_length = nbytes;
evt.event_id = e->event_id;
q = utm->vpp_event_queue;
unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
if (svm_fifo_set_event (tx_fifo))
{
q = utm->vpp_event_queue;
unix_shared_memory_queue_add (q, (u8 *) & evt,
0 /* do wait for mutex */ );
}
}
void
server_handle_event_queue (uri_udp_test_main_t * utm)
{
session_fifo_event_t _e, *e = &_e;;
session_fifo_event_t _e, *e = &_e;
while (1)
{
+2
View File
@@ -462,7 +462,9 @@ libvnet_la_SOURCES += \
vnet/tcp/tcp_output.c \
vnet/tcp/tcp_input.c \
vnet/tcp/tcp_newreno.c \
vnet/tcp/builtin_client.c \
vnet/tcp/builtin_server.c \
vnet/tcp/tcp_test.c \
vnet/tcp/tcp.c
nobase_include_HEADERS += \
+1 -2
View File
@@ -45,8 +45,7 @@ typedef struct _stream_session_cb_vft
void (*session_reset_callback) (stream_session_t * s);
/* Direct RX callback, for built-in servers */
int (*builtin_server_rx_callback) (stream_session_t * session,
session_fifo_event_t * ep);
int (*builtin_server_rx_callback) (stream_session_t * session);
/* Redirect connection to local server */
int (*redirect_connect_callback) (u32 api_client_index, void *mp);
+74 -53
View File
@@ -13,21 +13,14 @@
* limitations under the License.
*/
#include <math.h>
#include <vlib/vlib.h>
#include <vnet/vnet.h>
#include <vnet/pg/pg.h>
#include <vnet/ip/ip.h>
#include <vnet/tcp/tcp.h>
#include <vppinfra/hash.h>
#include <vppinfra/error.h>
#include <vppinfra/elog.h>
#include <vlibmemory/unix_shared_memory_queue.h>
#include <vnet/udp/udp_packet.h>
#include <math.h>
#include <vnet/session/application.h>
#include <vnet/session/session_debug.h>
#include <vlibmemory/unix_shared_memory_queue.h>
vlib_node_registration_t session_queue_node;
@@ -52,8 +45,8 @@ format_session_queue_trace (u8 * s, va_list * args)
vlib_node_registration_t session_queue_node;
#define foreach_session_queue_error \
_(TX, "Packets transmitted") \
#define foreach_session_queue_error \
_(TX, "Packets transmitted") \
_(TIMER, "Timer events")
typedef enum
@@ -91,10 +84,10 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
transport_proto_vft_t *transport_vft;
u32 next_index, next0, *to_next, n_left_to_next, bi0;
vlib_buffer_t *b0;
u32 rx_offset;
u32 rx_offset = 0, max_dequeue0;
u16 snd_mss0;
u8 *data0;
int i;
int i, n_bytes_read;
next_index = next0 = session_type_to_next[s0->session_type];
@@ -106,24 +99,33 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
snd_mss0 = transport_vft->send_mss (tc0);
/* Can't make any progress */
if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
|| snd_mss0 == 0)
if (snd_space0 == 0 || snd_mss0 == 0)
{
vec_add1 (smm->evts_partially_read[thread_index], *e0);
return 0;
}
ASSERT (e0->enqueue_length > 0);
/* Ensure we're not writing more than transport window allows */
max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0);
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
rx_offset = transport_vft->tx_fifo_offset (tc0);
}
/* Check how much we can pull. If buffering, subtract the offset */
max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo) - rx_offset;
/* Allow enqueuing of a new event */
svm_fifo_unset_event (s0->server_tx_fifo);
/* Nothing to read return */
if (max_dequeue0 == 0)
{
return 0;
}
/* Ensure we're not writing more than transport window allows */
max_len_to_snd0 = clib_min (max_dequeue0, snd_space0);
/* TODO check if transport is willing to send len_to_snd0
* bytes (Nagle) */
@@ -147,13 +149,10 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
* XXX 0.9 because when debugging we might not get a full frame */
if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
{
/* Keep track of how much we've dequeued and exit */
if (left_to_snd0 != max_len_to_snd0)
if (svm_fifo_set_event (s0->server_tx_fifo))
{
e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
vec_add1 (smm->evts_partially_read[thread_index], *e0);
}
return -1;
}
@@ -198,9 +197,9 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
/* *INDENT-OFF* */
SESSION_EVT_DBG(s0, SESSION_EVT_DEQ, ({
SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
ed->data[0] = e0->event_id;
ed->data[1] = e0->enqueue_length;
ed->data[1] = max_dequeue0;
ed->data[2] = len_to_deq0;
ed->data[3] = left_to_snd0;
}));
@@ -214,29 +213,30 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
* 2) buffer chains */
if (peek_data)
{
int n_bytes_read;
n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
rx_offset, len_to_deq0, data0);
if (n_bytes_read < 0)
if (n_bytes_read <= 0)
goto dequeue_fail;
/* Keep track of progress locally, transport is also supposed to
* increment it independently when pushing header */
* increment it independently when pushing the header */
rx_offset += n_bytes_read;
}
else
{
if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid,
len_to_deq0, data0) < 0)
n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
s0->pid, len_to_deq0,
data0);
if (n_bytes_read <= 0)
goto dequeue_fail;
}
b0->current_length = len_to_deq0;
b0->current_length = n_bytes_read;
/* Ask transport to push header */
transport_vft->push_header (tc0, b0);
left_to_snd0 -= len_to_deq0;
left_to_snd0 -= n_bytes_read;
*n_tx_packets = *n_tx_packets + 1;
vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
@@ -246,25 +246,31 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_put_next_frame (vm, node, next_index, n_left_to_next);
}
/* If we couldn't dequeue all bytes store progress */
if (max_len_to_snd0 < e0->enqueue_length)
/* If we couldn't dequeue all bytes mark as partially read */
if (max_len_to_snd0 < max_dequeue0)
{
e0->enqueue_length -= max_len_to_snd0;
vec_add1 (smm->evts_partially_read[thread_index], *e0);
/* If we don't already have new event */
if (svm_fifo_set_event (s0->server_tx_fifo))
{
vec_add1 (smm->evts_partially_read[thread_index], *e0);
}
}
return 0;
dequeue_fail:
/* Can't read from fifo. Store event rx progress, save as partially read,
* return buff to free list and return */
e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
vec_add1 (smm->evts_partially_read[thread_index], *e0);
/*
* Can't read from fifo. If we don't already have an event, save as partially
* read, return buff to free list and return
*/
clib_warning ("dequeue fail");
to_next -= 1;
n_left_to_next += 1;
if (svm_fifo_set_event (s0->server_tx_fifo))
{
vec_add1 (smm->evts_partially_read[thread_index], *e0);
}
vlib_put_next_frame (vm, node, next_index, n_left_to_next + 1);
_vec_len (smm->tx_buffers[thread_index]) += 1;
clib_warning ("dequeue fail");
return 0;
}
@@ -298,6 +304,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
session_fifo_event_t *my_fifo_events, *e;
u32 n_to_dequeue, n_events;
unix_shared_memory_queue_t *q;
application_t *app;
int n_tx_packets = 0;
u32 my_thread_index = vm->cpu_index;
int i, rv;
@@ -321,13 +328,18 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
return 0;
SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
/*
* If we didn't manage to process previous events try going
* over them again without dequeuing new ones.
*/
/* XXX: Block senders to sessions that can't keep up */
if (vec_len (my_fifo_events) >= 100)
goto skip_dequeue;
{
clib_warning ("too many fifo events unsolved");
goto skip_dequeue;
}
/* See you in the next life, don't be late */
if (pthread_mutex_trylock (&q->mutex))
@@ -352,19 +364,17 @@ skip_dequeue:
{
svm_fifo_t *f0; /* $$$ prefetch 1 ahead maybe */
stream_session_t *s0;
u32 server_session_index0, server_thread_index0;
u32 session_index0;
session_fifo_event_t *e0;
e0 = &my_fifo_events[i];
f0 = e0->fifo;
server_session_index0 = f0->server_session_index;
server_thread_index0 = f0->server_thread_index;
session_index0 = f0->server_session_index;
/* $$$ add multiple event queues, per vpp worker thread */
ASSERT (server_thread_index0 == my_thread_index);
ASSERT (f0->server_thread_index == my_thread_index);
s0 = stream_session_get_if_valid (server_session_index0,
my_thread_index);
s0 = stream_session_get_if_valid (session_index0, my_thread_index);
if (CLIB_DEBUG && !s0)
{
@@ -385,11 +395,20 @@ skip_dequeue:
rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
my_thread_index,
&n_tx_packets);
/* Out of buffers */
if (rv < 0)
goto done;
break;
case FIFO_EVENT_SERVER_EXIT:
stream_session_disconnect (s0);
break;
case FIFO_EVENT_BUILTIN_RX:
svm_fifo_unset_event (s0->server_rx_fifo);
/* Get session's server */
app = application_get (s0->app_index);
app->cb_fns.builtin_server_rx_callback (s0);
break;
default:
clib_warning ("unhandled event type %d", e0->event_type);
}
@@ -418,6 +437,8 @@ done:
vlib_node_increment_counter (vm, session_queue_node.index,
SESSION_QUEUE_ERROR_TX, n_tx_packets);
SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 1);
return n_tx_packets;
}
+44 -17
View File
@@ -804,30 +804,36 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
/* Get session's server */
app = application_get (s->app_index);
/* Fabricate event */
evt.fifo = s->server_rx_fifo;
evt.event_type = FIFO_EVENT_SERVER_RX;
evt.event_id = serial_number++;
evt.enqueue_length = svm_fifo_max_dequeue (s->server_rx_fifo);
/* Built-in server? Hand event to the callback... */
if (app->cb_fns.builtin_server_rx_callback)
return app->cb_fns.builtin_server_rx_callback (s, &evt);
return app->cb_fns.builtin_server_rx_callback (s);
/* Add event to server's event queue */
q = app->event_queue;
/* If no event, send one */
if (svm_fifo_set_event (s->server_rx_fifo))
{
/* Fabricate event */
evt.fifo = s->server_rx_fifo;
evt.event_type = FIFO_EVENT_SERVER_RX;
evt.event_id = serial_number++;
/* Based on request block (or not) for lack of space */
if (block || PREDICT_TRUE (q->cursize < q->maxsize))
unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
0 /* do wait for mutex */ );
else
return -1;
/* Add event to server's event queue */
q = app->event_queue;
/* Based on request block (or not) for lack of space */
if (block || PREDICT_TRUE (q->cursize < q->maxsize))
unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
0 /* do wait for mutex */ );
else
{
clib_warning ("fifo full");
return -1;
}
}
/* *INDENT-OFF* */
SESSION_EVT_DBG(s, SESSION_EVT_ENQ, ({
SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
ed->data[0] = evt.event_id;
ed->data[1] = evt.enqueue_length;
ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
}));
/* *INDENT-ON* */
@@ -1192,8 +1198,29 @@ stream_session_open (u8 sst, ip46_address_t * addr, u16 port_host_byte_order,
void
stream_session_disconnect (stream_session_t * s)
{
// session_fifo_event_t evt;
s->session_state = SESSION_STATE_CLOSED;
/* RPC to vpp evt queue in the right thread */
tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
// {
// /* Fabricate event */
// evt.fifo = s->server_rx_fifo;
// evt.event_type = FIFO_EVENT_SERVER_RX;
// evt.event_id = serial_number++;
//
// /* Based on request block (or not) for lack of space */
// if (PREDICT_TRUE(q->cursize < q->maxsize))
// unix_shared_memory_queue_add (app->event_queue, (u8 *) &evt,
// 0 /* do wait for mutex */);
// else
// {
// clib_warning("fifo full");
// return -1;
// }
// }
}
/**
+9 -10
View File
@@ -33,6 +33,7 @@ typedef enum
FIFO_EVENT_SERVER_TX,
FIFO_EVENT_TIMEOUT,
FIFO_EVENT_SERVER_EXIT,
FIFO_EVENT_BUILTIN_RX
} fifo_event_type_t;
#define foreach_session_input_error \
@@ -91,14 +92,13 @@ typedef enum
SESSION_STATE_N_STATES,
} stream_session_state_t;
typedef CLIB_PACKED (struct
{
svm_fifo_t * fifo;
u8 event_type;
/* $$$$ for event logging */
u16 event_id;
u32 enqueue_length;
}) session_fifo_event_t;
/* *INDENT-OFF* */
typedef CLIB_PACKED (struct {
svm_fifo_t * fifo;
u8 event_type;
u16 event_id;
}) session_fifo_event_t;
/* *INDENT-ON* */
typedef struct _stream_session_t
{
@@ -333,7 +333,7 @@ stream_session_get_index (stream_session_t * s)
}
always_inline u32
stream_session_max_enqueue (transport_connection_t * tc)
stream_session_max_rx_enqueue (transport_connection_t * tc)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
return svm_fifo_max_enqueue (s->server_rx_fifo);
@@ -346,7 +346,6 @@ stream_session_fifo_size (transport_connection_t * tc)
return s->server_rx_fifo->nitems;
}
int
stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
u8 queue_event);
+1 -1
View File
@@ -107,7 +107,7 @@ show_session_command_fn (vlib_main_t * vm, unformat_input_t * input,
{
if (once_per_pool)
{
str = format (str, "%-40s%-20s%-20s%-15s",
str = format (str, "%-50s%-20s%-20s%-15s",
"Connection", "Rx fifo", "Tx fifo",
"Session Index");
vlib_cli_output (vm, "%v", str);
+34 -4
View File
@@ -21,7 +21,8 @@
#define foreach_session_dbg_evt \
_(ENQ, "enqueue") \
_(DEQ, "dequeue")
_(DEQ, "dequeue") \
_(DEQ_NODE, "dequeue")
typedef enum _session_evt_dbg
{
@@ -30,7 +31,10 @@ typedef enum _session_evt_dbg
#undef _
} session_evt_dbg_e;
#if TRANSPORT_DEBUG
#define SESSION_DBG (0)
#define SESSION_DEQ_NODE_EVTS (0)
#if TRANSPORT_DEBUG && SESSION_DBG
#define DEC_SESSION_ETD(_s, _e, _size) \
struct \
@@ -44,6 +48,12 @@ typedef enum _session_evt_dbg
ed = ELOG_TRACK_DATA (&vlib_global_main.elog_main, \
_e, _tc->elog_track)
#define DEC_SESSION_ED(_e, _size) \
struct \
{ \
u32 data[_size]; \
} * ed; \
ed = ELOG_DATA (&vlib_global_main.elog_main, _e)
#define SESSION_EVT_DEQ_HANDLER(_s, _body) \
{ \
@@ -67,13 +77,33 @@ typedef enum _session_evt_dbg
do { _body; } while (0); \
}
#if SESSION_DEQ_NODE_EVTS
#define SESSION_EVT_DEQ_NODE_HANDLER(_node_evt) \
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
.format = "deq-node: %s", \
.format_args = "t4", \
.n_enum_strings = 2, \
.enum_strings = { \
"start", \
"end", \
}, \
}; \
DEC_SESSION_ED(_e, 1); \
ed->data[0] = _node_evt; \
}
#else
#define SESSION_EVT_DEQ_NODE_HANDLER(_node_evt)
#endif
#define CONCAT_HELPER(_a, _b) _a##_b
#define CC(_a, _b) CONCAT_HELPER(_a, _b)
#define SESSION_EVT_DBG(_s, _evt, _body) CC(_evt, _HANDLER)(_s, _body)
#define SESSION_EVT_DBG(_evt, _args...) CC(_evt, _HANDLER)(_args)
#else
#define SESSION_EVT_DBG(_s, _evt, _body)
#define SESSION_EVT_DBG(_evt, _args...)
#endif
#endif /* SRC_VNET_SESSION_SESSION_DEBUG_H_ */
+1 -1
View File
@@ -38,7 +38,7 @@ typedef struct _transport_connection
u32 thread_index; /**< Worker-thread index */
#if TRANSPORT_DEBUG
elog_track_t elog_track; /**< Debug purposes */
elog_track_t elog_track; /**< Event logging */
#endif
/** Macros for 'derived classes' where base is named "connection" */
File diff suppressed because it is too large Load Diff
+131
View File
@@ -0,0 +1,131 @@
/*
* tclient.h - skeleton vpp engine plug-in header file
*
* Copyright (c) <current-year> <your-organization>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __included_tclient_h__
#define __included_tclient_h__
#include <vnet/vnet.h>
#include <vnet/ip/ip.h>
#include <vnet/ethernet/ethernet.h>
#include <vppinfra/hash.h>
#include <vppinfra/error.h>
#include <vlibmemory/unix_shared_memory_queue.h>
#include <svm/svm_fifo_segment.h>
#include <vnet/session/session.h>
#include <vnet/session/application_interface.h>
typedef struct
{
u32 bytes_to_send;
u32 bytes_sent;
u32 bytes_to_receive;
u32 bytes_received;
svm_fifo_t *server_rx_fifo;
svm_fifo_t *server_tx_fifo;
u32 vpp_session_index;
u32 vpp_session_thread;
} session_t;
typedef struct
{
/* API message ID base */
u16 msg_id_base;
/* vpe input queue */
unix_shared_memory_queue_t *vl_input_queue;
/* API client handle */
u32 my_client_index;
/* The URI we're playing with */
u8 *uri;
/* Session pool */
session_t *sessions;
/* Hash table for disconnect processing */
uword *session_index_by_vpp_handles;
/* intermediate rx buffer */
u8 *rx_buf;
/* URI for slave's connect */
u8 *connect_uri;
u32 connected_session_index;
int i_am_master;
/* drop all packets */
int drop_packets;
/* Our event queue */
unix_shared_memory_queue_t *our_event_queue;
/* $$$ single thread only for the moment */
unix_shared_memory_queue_t *vpp_event_queue;
pid_t my_pid;
/* For deadman timers */
clib_time_t clib_time;
/* Connection counts */
u32 expected_connections;
volatile u32 ready_connections;
/* Signal variables */
volatile int run_test;
/* Number of iterations */
int n_iterations;
/* Bytes to send */
u32 bytes_to_send;
u32 configured_segment_size;
/* VNET_API_ERROR_FOO -> "Foo" hash table */
uword *error_string_by_error_number;
u8 *connect_test_data;
pthread_t client_thread_handle;
u32 client_bytes_received;
u8 test_return_packets;
/* convenience */
vlib_main_t *vlib_main;
vnet_main_t *vnet_main;
ethernet_main_t *ethernet_main;
} tclient_main_t;
tclient_main_t tclient_main;
vlib_node_registration_t tclient_node;
#endif /* __included_tclient_h__ */
/*
* fd.io coding-style-patch-verification: ON
*
* Local Variables:
* eval: (c-set-style "gnu")
* End:
*/
+66 -27
View File
@@ -22,6 +22,7 @@ typedef struct
{
u8 *rx_buf;
unix_shared_memory_queue_t **vpp_queue;
u32 byte_index;
vlib_main_t *vlib_main;
} builtin_server_main_t;
@@ -37,6 +38,7 @@ builtin_session_accept_callback (stream_session_t * s)
bsm->vpp_queue[s->thread_index] =
session_manager_get_vpp_event_queue (s->thread_index);
s->session_state = SESSION_STATE_READY;
bsm->byte_index = 0;
return 0;
}
@@ -80,57 +82,94 @@ builtin_redirect_connect_callback (u32 client_index, void *mp)
return -1;
}
int
builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * e)
void
test_bytes (builtin_server_main_t * bsm, int actual_transfer)
{
int n_written, bytes, total_copy_bytes;
int n_read;
svm_fifo_t *tx_fifo;
int i;
for (i = 0; i < actual_transfer; i++)
{
if (bsm->rx_buf[i] != ((bsm->byte_index + i) & 0xff))
{
clib_warning ("at %d expected %d got %d", bsm->byte_index + i,
(bsm->byte_index + i) & 0xff, bsm->rx_buf[i]);
}
}
bsm->byte_index += actual_transfer;
}
int
builtin_server_rx_callback (stream_session_t * s)
{
u32 n_written, max_dequeue, max_enqueue, max_transfer;
int actual_transfer;
svm_fifo_t *tx_fifo, *rx_fifo;
builtin_server_main_t *bsm = &builtin_server_main;
session_fifo_event_t evt;
static int serial_number = 0;
bytes = e->enqueue_length;
if (PREDICT_FALSE (bytes <= 0))
max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
max_enqueue = svm_fifo_max_enqueue (s->server_tx_fifo);
if (PREDICT_FALSE (max_dequeue == 0))
{
clib_warning ("bizarre rx callback: bytes %d", bytes);
return 0;
}
tx_fifo = s->server_tx_fifo;
rx_fifo = s->server_rx_fifo;
/* Number of bytes we're going to copy */
total_copy_bytes = (bytes < (tx_fifo->nitems - tx_fifo->cursize)) ? bytes :
tx_fifo->nitems - tx_fifo->cursize;
max_transfer = (max_dequeue < max_enqueue) ? max_dequeue : max_enqueue;
if (PREDICT_FALSE (total_copy_bytes <= 0))
/* No space in tx fifo */
if (PREDICT_FALSE (max_transfer == 0))
{
clib_warning ("no space in tx fifo, event had %d bytes", bytes);
/* XXX timeout for session that are stuck */
/* Program self-tap to retry */
if (svm_fifo_set_event (rx_fifo))
{
evt.fifo = rx_fifo;
evt.event_type = FIFO_EVENT_BUILTIN_RX;
evt.event_id = 0;
unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
(u8 *) & evt,
0 /* do wait for mutex */ );
}
return 0;
}
vec_validate (bsm->rx_buf, total_copy_bytes - 1);
_vec_len (bsm->rx_buf) = total_copy_bytes;
svm_fifo_unset_event (rx_fifo);
n_read = svm_fifo_dequeue_nowait (s->server_rx_fifo, 0, total_copy_bytes,
bsm->rx_buf);
ASSERT (n_read == total_copy_bytes);
vec_validate (bsm->rx_buf, max_transfer - 1);
_vec_len (bsm->rx_buf) = max_transfer;
actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, 0, max_transfer,
bsm->rx_buf);
ASSERT (actual_transfer == max_transfer);
// test_bytes (bsm, actual_transfer);
/*
* Echo back
*/
n_written = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, bsm->rx_buf);
ASSERT (n_written == total_copy_bytes);
n_written =
svm_fifo_enqueue_nowait (tx_fifo, 0, actual_transfer, bsm->rx_buf);
ASSERT (n_written == max_transfer);
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
evt.enqueue_length = total_copy_bytes;
evt.event_id = serial_number++;
if (svm_fifo_set_event (tx_fifo))
{
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
evt.event_id = serial_number++;
unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index], (u8 *) & evt,
0 /* do wait for mutex */ );
unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
(u8 *) & evt, 0 /* do wait for mutex */ );
}
return 0;
}
@@ -164,7 +203,7 @@ server_create (vlib_main_t * vm)
a->api_client_index = ~0;
a->session_cb_vft = &builtin_session_cb_vft;
a->options = options;
a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 10;
a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 128 << 20;
a->options[SESSION_OPTIONS_RX_FIFO_SIZE] = 64 << 10;
a->options[SESSION_OPTIONS_TX_FIFO_SIZE] = 64 << 10;
a->segment_name = segment_name;
+34 -3
View File
@@ -328,7 +328,7 @@ tcp_connection_init_vars (tcp_connection_t * tc)
{
tcp_connection_timers_init (tc);
tcp_set_snd_mss (tc);
tc->sack_sb.head = TCP_INVALID_SACK_HOLE_INDEX;
scoreboard_init (&tc->sack_sb);
tcp_cc_init (tc);
}
@@ -558,17 +558,48 @@ tcp_session_send_mss (transport_connection_t * trans_conn)
return tc->snd_mss;
}
/**
* Compute tx window session is allowed to fill.
*/
u32
tcp_session_send_space (transport_connection_t * trans_conn)
{
u32 snd_space;
tcp_connection_t *tc = (tcp_connection_t *) trans_conn;
return tcp_available_snd_space (tc);
/* If we haven't gotten dupacks or if we did and have gotten sacked bytes
* then we can still send */
if (PREDICT_TRUE (tcp_in_fastrecovery (tc) == 0
&& (tc->rcv_dupacks == 0
|| tc->sack_sb.last_sacked_bytes)))
{
snd_space = tcp_available_snd_space (tc);
/* If we can't write at least a segment, don't try at all */
if (snd_space < tc->snd_mss)
return 0;
return snd_space;
}
/* If in fast recovery, send 1 SMSS if wnd allows */
if (tcp_in_fastrecovery (tc) && tcp_available_snd_space (tc)
&& tcp_fastrecovery_sent_1_smss (tc))
{
tcp_fastrecovery_1_smss_on (tc);
return tc->snd_mss;
}
return 0;
}
u32
tcp_session_tx_fifo_offset (transport_connection_t * trans_conn)
{
tcp_connection_t *tc = (tcp_connection_t *) trans_conn;
ASSERT (seq_geq (tc->snd_nxt, tc->snd_una));
/* This still works if fast retransmit is on */
return (tc->snd_nxt - tc->snd_una);
}
@@ -762,7 +793,7 @@ tcp_main_enable (vlib_main_t * vm)
vec_validate (tm->timer_wheels, num_threads - 1);
tcp_initialize_timer_wheels (tm);
vec_validate (tm->delack_connections, num_threads - 1);
// vec_validate (tm->delack_connections, num_threads - 1);
/* Initialize clocks per tick for TCP timestamp. Used to compute
* monotonically increasing timestamps. */
+89 -22
View File
@@ -30,9 +30,10 @@
#define TCP_PAWS_IDLE 24 * 24 * 60 * 60 * THZ /**< 24 days */
#define TCP_MAX_OPTION_SPACE 40
#define TCP_DUPACK_THRESHOLD 3
#define TCP_MAX_RX_FIFO_SIZE 2 << 20
#define TCP_IW_N_SEGMENTS 10
#define TCP_DUPACK_THRESHOLD 3
#define TCP_MAX_RX_FIFO_SIZE 2 << 20
#define TCP_IW_N_SEGMENTS 10
#define TCP_ALWAYS_ACK 0 /**< If on, we always ack */
/** TCP FSM state definitions as per RFC793. */
#define foreach_tcp_fsm_state \
@@ -102,13 +103,12 @@ void tcp_update_time (f64 now, u32 thread_index);
/** TCP connection flags */
#define foreach_tcp_connection_flag \
_(DELACK, "Delay ACK") \
_(SNDACK, "Send ACK") \
_(BURSTACK, "Burst ACK set") \
_(FINSNT, "FIN sent") \
_(SENT_RCV_WND0, "Sent 0 receive window") \
_(RECOVERY, "Recovery on") \
_(FAST_RECOVERY, "Fast Recovery on")
_(FAST_RECOVERY, "Fast Recovery on") \
_(FR_1_SMSS, "Sent 1 SMSS")
typedef enum _tcp_connection_flag_bits
{
@@ -160,8 +160,12 @@ typedef struct _sack_scoreboard_hole
typedef struct _sack_scoreboard
{
sack_scoreboard_hole_t *holes; /**< Pool of holes */
u32 head; /**< Index to first entry */
u32 head; /**< Index of first entry */
u32 tail; /**< Index of last entry */
u32 sacked_bytes; /**< Number of bytes sacked in sb */
u32 last_sacked_bytes; /**< Number of bytes last sacked */
u32 snd_una_adv; /**< Bytes to add to snd_una */
u32 max_byte_sacked; /**< Highest byte acked */
} sack_scoreboard_t;
typedef enum _tcp_cc_algorithm_type
@@ -214,7 +218,7 @@ typedef struct _tcp_connection
sack_block_t *snd_sacks; /**< Vector of SACKs to send. XXX Fixed size? */
sack_scoreboard_t sack_sb; /**< SACK "scoreboard" that tracks holes */
u8 rcv_dupacks; /**< Number of DUPACKs received */
u16 rcv_dupacks; /**< Number of DUPACKs received */
u8 snt_dupacks; /**< Number of DUPACKs sent in a burst */
/* Congestion control */
@@ -224,6 +228,7 @@ typedef struct _tcp_connection
u32 bytes_acked; /**< Bytes acknowledged by current segment */
u32 rtx_bytes; /**< Retransmitted bytes */
u32 tsecr_last_ack; /**< Timestamp echoed to us in last healthy ACK */
u32 snd_congestion; /**< snd_una_max when congestion is detected */
tcp_cc_algorithm_t *cc_algo; /**< Congestion control algorithm */
/* RTT and RTO */
@@ -250,8 +255,10 @@ struct _tcp_cc_algorithm
#define tcp_fastrecovery_off(tc) (tc)->flags &= ~TCP_CONN_FAST_RECOVERY
#define tcp_in_fastrecovery(tc) ((tc)->flags & TCP_CONN_FAST_RECOVERY)
#define tcp_in_recovery(tc) ((tc)->flags & (TCP_CONN_FAST_RECOVERY | TCP_CONN_RECOVERY))
#define tcp_recovery_off(tc) ((tc)->flags &= ~(TCP_CONN_FAST_RECOVERY | TCP_CONN_RECOVERY))
#define tcp_in_slowstart(tc) (tc->cwnd < tc->ssthresh)
#define tcp_fastrecovery_sent_1_smss(tc) ((tc)->flags & TCP_CONN_FR_1_SMSS)
#define tcp_fastrecovery_1_smss_on(tc) ((tc)->flags |= TCP_CONN_FR_1_SMSS)
#define tcp_fastrecovery_1_smss_off(tc) ((tc)->flags &= ~TCP_CONN_FR_1_SMSS)
typedef enum
{
@@ -293,8 +300,8 @@ typedef struct _tcp_main
/* Per worker-thread timer wheel for connections timers */
tw_timer_wheel_16t_2w_512sl_t *timer_wheels;
/* Convenience per worker-thread vector of connections to DELACK */
u32 **delack_connections;
// /* Convenience per worker-thread vector of connections to DELACK */
// u32 **delack_connections;
/* Pool of half-open connections on which we've sent a SYN */
tcp_connection_t *half_open_connections;
@@ -397,8 +404,16 @@ tcp_end_seq (tcp_header_t * th, u32 len)
always_inline u32
tcp_flight_size (const tcp_connection_t * tc)
{
return tc->snd_una_max - tc->snd_una - tc->sack_sb.sacked_bytes
+ tc->rtx_bytes;
int flight_size;
flight_size = (int) ((tc->snd_una_max - tc->snd_una) + tc->rtx_bytes)
- (tc->rcv_dupacks * tc->snd_mss) /* - tc->sack_sb.sacked_bytes */ ;
/* Happens if we don't clear sacked bytes */
if (flight_size < 0)
return 0;
return flight_size;
}
/**
@@ -439,9 +454,13 @@ tcp_available_snd_space (const tcp_connection_t * tc)
return available_wnd - flight_size;
}
void tcp_update_rcv_wnd (tcp_connection_t * tc);
void tcp_retransmit_first_unacked (tcp_connection_t * tc);
void tcp_fast_retransmit (tcp_connection_t * tc);
void tcp_cc_congestion (tcp_connection_t * tc);
void tcp_cc_recover (tcp_connection_t * tc);
always_inline u32
tcp_time_now (void)
@@ -453,7 +472,7 @@ u32 tcp_push_header (transport_connection_t * tconn, vlib_buffer_t * b);
u32
tcp_prepare_retransmit_segment (tcp_connection_t * tc, vlib_buffer_t * b,
u32 max_bytes);
u32 offset, u32 max_bytes);
void tcp_connection_timers_init (tcp_connection_t * tc);
void tcp_connection_timers_reset (tcp_connection_t * tc);
@@ -476,14 +495,6 @@ tcp_timer_set (tcp_connection_t * tc, u8 timer_id, u32 interval)
tc->c_c_index, timer_id, interval);
}
always_inline void
tcp_retransmit_timer_set (tcp_connection_t * tc)
{
/* XXX Switch to faster TW */
tcp_timer_set (tc, TCP_TIMER_RETRANSMIT,
clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
}
always_inline void
tcp_timer_reset (tcp_connection_t * tc, u8 timer_id)
{
@@ -506,6 +517,27 @@ tcp_timer_update (tcp_connection_t * tc, u8 timer_id, u32 interval)
tc->c_c_index, timer_id, interval);
}
/* XXX Switch retransmit to faster TW */
always_inline void
tcp_retransmit_timer_set (tcp_connection_t * tc)
{
tcp_timer_set (tc, TCP_TIMER_RETRANSMIT,
clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
}
always_inline void
tcp_retransmit_timer_update (tcp_connection_t * tc)
{
tcp_timer_update (tc, TCP_TIMER_RETRANSMIT,
clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
}
always_inline void
tcp_retransmit_timer_reset (tcp_connection_t * tc)
{
tcp_timer_reset (tc, TCP_TIMER_RETRANSMIT);
}
always_inline u8
tcp_timer_is_active (tcp_connection_t * tc, tcp_timers_e timer)
{
@@ -516,6 +548,14 @@ void
scoreboard_remove_hole (sack_scoreboard_t * sb,
sack_scoreboard_hole_t * hole);
always_inline sack_scoreboard_hole_t *
scoreboard_get_hole (sack_scoreboard_t * sb, u32 index)
{
if (index != TCP_INVALID_SACK_HOLE_INDEX)
return pool_elt_at_index (sb->holes, index);
return 0;
}
always_inline sack_scoreboard_hole_t *
scoreboard_next_hole (sack_scoreboard_t * sb, sack_scoreboard_hole_t * hole)
{
@@ -532,6 +572,14 @@ scoreboard_first_hole (sack_scoreboard_t * sb)
return 0;
}
always_inline sack_scoreboard_hole_t *
scoreboard_last_hole (sack_scoreboard_t * sb)
{
if (sb->tail != TCP_INVALID_SACK_HOLE_INDEX)
return pool_elt_at_index (sb->holes, sb->tail);
return 0;
}
always_inline void
scoreboard_clear (sack_scoreboard_t * sb)
{
@@ -540,6 +588,10 @@ scoreboard_clear (sack_scoreboard_t * sb)
{
scoreboard_remove_hole (sb, hole);
}
sb->sacked_bytes = 0;
sb->last_sacked_bytes = 0;
sb->snd_una_adv = 0;
sb->max_byte_sacked = 0;
}
always_inline u32
@@ -548,6 +600,21 @@ scoreboard_hole_bytes (sack_scoreboard_hole_t * hole)
return hole->end - hole->start;
}
always_inline u32
scoreboard_hole_index (sack_scoreboard_t * sb, sack_scoreboard_hole_t * hole)
{
return hole - sb->holes;
}
always_inline void
scoreboard_init (sack_scoreboard_t * sb)
{
sb->head = TCP_INVALID_SACK_HOLE_INDEX;
sb->tail = TCP_INVALID_SACK_HOLE_INDEX;
}
void tcp_rcv_sacks (tcp_connection_t * tc, u32 ack);
always_inline void
tcp_cc_algo_register (tcp_cc_algorithm_type_e type,
const tcp_cc_algorithm_t * vft)
+217 -41
View File
File diff suppressed because it is too large Load Diff
+5 -2
View File
@@ -12,12 +12,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
tcp_error (NONE, "no error")
tcp_error (NO_LISTENER, "no listener for dst port")
tcp_error (LOOKUP_DROPS, "lookup drops")
tcp_error (DISPATCH, "Dispatch error")
tcp_error (ENQUEUED, "Packets pushed into rx fifo")
tcp_error (PARTIALLY_ENQUEUED, "Packets partially pushed into rx fifo")
tcp_error (PURE_ACK, "Pure acks")
tcp_error (SYNS_RCVD, "SYNs received")
tcp_error (SYN_ACKS_RCVD, "SYN-ACKs received")
@@ -26,11 +26,14 @@ tcp_error (FIFO_FULL, "Packets dropped for lack of rx fifo space")
tcp_error (EVENT_FIFO_FULL, "Events not sent for lack of event fifo space")
tcp_error (API_QUEUE_FULL, "Sessions not created for lack of API queue space")
tcp_error (CREATE_SESSION_FAIL, "Sessions couldn't be allocated")
tcp_error (SEGMENT_INVALID, "Invalid segment")
tcp_error (SEGMENT_INVALID, "Invalid segments")
tcp_error (SEGMENT_OLD, "Old segment")
tcp_error (ACK_INVALID, "Invalid ACK")
tcp_error (ACK_DUP, "Duplicate ACK")
tcp_error (ACK_OLD, "Old ACK")
tcp_error (ACK_FUTURE, "Future ACK")
tcp_error (PKTS_SENT, "Packets sent")
tcp_error (FILTERED_DUPACKS, "Filtered duplicate ACKs")
tcp_error (RST_SENT, "Resets sent")
tcp_error (INVALID_CONNECTION, "Invalid connection")
tcp_error (NO_WND, "No window")
+329 -182
View File
File diff suppressed because it is too large Load Diff
+204 -95
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -137,7 +137,7 @@ enum
typedef struct _sack_block
{
u32 start; /**< Start sequence number */
u32 end; /**< End sequence number */
u32 end; /**< End sequence number (first outside) */
} sack_block_t;
typedef struct
+216
View File
@@ -0,0 +1,216 @@
/*
* Copyright (c) 2017 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vnet/tcp/tcp.h>
#define TCP_TEST_I(_cond, _comment, _args...) \
({ \
int _evald = (_cond); \
if (!(_evald)) { \
fformat(stderr, "FAIL:%d: " _comment "\n", \
__LINE__, ##_args); \
} else { \
fformat(stderr, "PASS:%d: " _comment "\n", \
__LINE__, ##_args); \
} \
_evald; \
})
#define TCP_TEST(_cond, _comment, _args...) \
{ \
if (!TCP_TEST_I(_cond, _comment, ##_args)) { \
return 1; \
} \
}
static int
tcp_test_sack ()
{
tcp_connection_t _tc, *tc = &_tc;
sack_scoreboard_t *sb = &tc->sack_sb;
sack_block_t *sacks = 0, block;
sack_scoreboard_hole_t *hole;
int i;
memset (tc, 0, sizeof (*tc));
tc->snd_una = 0;
tc->snd_una_max = 1000;
tc->snd_nxt = 1000;
tc->opt.flags |= TCP_OPTS_FLAG_SACK;
scoreboard_init (&tc->sack_sb);
for (i = 0; i < 1000 / 100; i++)
{
block.start = i * 100;
block.end = (i + 1) * 100;
vec_add1 (sacks, block);
}
/*
* Inject even blocks
*/
for (i = 0; i < 1000 / 200; i++)
{
vec_add1 (tc->opt.sacks, sacks[i * 2]);
}
tc->opt.n_sack_blocks = vec_len (tc->opt.sacks);
tcp_rcv_sacks (tc, 0);
TCP_TEST ((pool_elts (sb->holes) == 5),
"scoreboard has %d elements", pool_elts (sb->holes));
/* First SACK block should be rejected */
hole = scoreboard_first_hole (sb);
TCP_TEST ((hole->start == 0 && hole->end == 200),
"first hole start %u end %u", hole->start, hole->end);
hole = scoreboard_last_hole (sb);
TCP_TEST ((hole->start == 900 && hole->end == 1000),
"last hole start %u end %u", hole->start, hole->end);
TCP_TEST ((sb->sacked_bytes == 400), "sacked bytes %d", sb->sacked_bytes);
TCP_TEST ((sb->snd_una_adv == 0), "snd_una_adv %u", sb->snd_una_adv);
TCP_TEST ((sb->last_sacked_bytes == 400),
"last sacked bytes %d", sb->last_sacked_bytes);
/*
* Inject odd blocks
*/
vec_reset_length (tc->opt.sacks);
for (i = 0; i < 1000 / 200; i++)
{
vec_add1 (tc->opt.sacks, sacks[i * 2 + 1]);
}
tc->opt.n_sack_blocks = vec_len (tc->opt.sacks);
tcp_rcv_sacks (tc, 0);
hole = scoreboard_first_hole (sb);
TCP_TEST ((pool_elts (sb->holes) == 1),
"scoreboard has %d holes", pool_elts (sb->holes));
TCP_TEST ((hole->start == 0 && hole->end == 100),
"first hole start %u end %u", hole->start, hole->end);
TCP_TEST ((sb->sacked_bytes == 900), "sacked bytes %d", sb->sacked_bytes);
TCP_TEST ((sb->snd_una_adv == 0), "snd_una_adv %u", sb->snd_una_adv);
TCP_TEST ((sb->max_byte_sacked == 1000),
"max sacked byte %u", sb->max_byte_sacked);
TCP_TEST ((sb->last_sacked_bytes == 500),
"last sacked bytes %d", sb->last_sacked_bytes);
/*
* Ack until byte 100, all bytes are now acked + sacked
*/
tcp_rcv_sacks (tc, 100);
TCP_TEST ((pool_elts (sb->holes) == 0),
"scoreboard has %d elements", pool_elts (sb->holes));
TCP_TEST ((sb->snd_una_adv == 900),
"snd_una_adv after ack %u", sb->snd_una_adv);
TCP_TEST ((sb->max_byte_sacked == 1000),
"max sacked byte %u", sb->max_byte_sacked);
TCP_TEST ((sb->sacked_bytes == 0), "sacked bytes %d", sb->sacked_bytes);
TCP_TEST ((sb->last_sacked_bytes == 0),
"last sacked bytes %d", sb->last_sacked_bytes);
/*
* Add new block
*/
vec_reset_length (tc->opt.sacks);
block.start = 1200;
block.end = 1300;
vec_add1 (tc->opt.sacks, block);
tc->snd_una_max = 1500;
tc->snd_una = 1000;
tc->snd_nxt = 1500;
tcp_rcv_sacks (tc, 1000);
TCP_TEST ((sb->snd_una_adv == 0),
"snd_una_adv after ack %u", sb->snd_una_adv);
TCP_TEST ((pool_elts (sb->holes) == 2),
"scoreboard has %d holes", pool_elts (sb->holes));
hole = scoreboard_first_hole (sb);
TCP_TEST ((hole->start == 1000 && hole->end == 1200),
"first hole start %u end %u", hole->start, hole->end);
hole = scoreboard_last_hole (sb);
TCP_TEST ((hole->start == 1300 && hole->end == 1500),
"last hole start %u end %u", hole->start, hole->end);
TCP_TEST ((sb->sacked_bytes == 100), "sacked bytes %d", sb->sacked_bytes);
/*
* Ack first hole
*/
vec_reset_length (tc->opt.sacks);
tcp_rcv_sacks (tc, 1200);
TCP_TEST ((sb->snd_una_adv == 100),
"snd_una_adv after ack %u", sb->snd_una_adv);
TCP_TEST ((sb->sacked_bytes == 0), "sacked bytes %d", sb->sacked_bytes);
TCP_TEST ((pool_elts (sb->holes) == 1),
"scoreboard has %d elements", pool_elts (sb->holes));
/*
* Remove all
*/
scoreboard_clear (sb);
TCP_TEST ((pool_elts (sb->holes) == 0),
"number of holes %d", pool_elts (sb->holes));
return 0;
}
static clib_error_t *
tcp_test (vlib_main_t * vm,
unformat_input_t * input, vlib_cli_command_t * cmd_arg)
{
int res = 0;
while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
{
if (unformat (input, "sack"))
{
res = tcp_test_sack ();
}
else
{
return clib_error_return (0, "unknown input `%U'",
format_unformat_error, input);
}
}
if (res)
{
return clib_error_return (0, "TCP unit test failed");
}
else
{
return 0;
}
}
VLIB_CLI_COMMAND (tcp_test_command, static) =
{
.path = "test tcp",.short_help = "internal tcp unit tests",.function =
tcp_test,};
/*
* fd.io coding-style-patch-verification: ON
*
* Local Variables:
* eval: (c-set-style "gnu")
* End:
*/
+16 -13
View File
@@ -39,10 +39,10 @@ builtin_session_disconnect_callback (stream_session_t * s)
}
static int
builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * ep)
builtin_server_rx_callback (stream_session_t * s)
{
svm_fifo_t *rx_fifo, *tx_fifo;
u32 this_transfer;
u32 this_transfer, max_deq, max_enq;
int actual_transfer;
u8 *my_copy_buffer;
session_fifo_event_t evt;
@@ -52,9 +52,9 @@ builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * ep)
rx_fifo = s->server_rx_fifo;
tx_fifo = s->server_tx_fifo;
this_transfer = svm_fifo_max_enqueue (tx_fifo)
< svm_fifo_max_dequeue (rx_fifo) ?
svm_fifo_max_enqueue (tx_fifo) : svm_fifo_max_dequeue (rx_fifo);
max_deq = svm_fifo_max_dequeue (rx_fifo);
max_enq = svm_fifo_max_enqueue (tx_fifo);
this_transfer = max_enq < max_deq ? max_enq : max_deq;
vec_validate (my_copy_buffer, this_transfer - 1);
_vec_len (my_copy_buffer) = this_transfer;
@@ -64,17 +64,20 @@ builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * ep)
ASSERT (actual_transfer == this_transfer);
actual_transfer = svm_fifo_enqueue_nowait (tx_fifo, 0, this_transfer,
my_copy_buffer);
ASSERT (actual_transfer == this_transfer);
copy_buffers[s->thread_index] = my_copy_buffer;
/* Fabricate TX event, send to ourselves */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
/* $$$$ for event logging */
evt.enqueue_length = actual_transfer;
evt.event_id = 0;
q = session_manager_get_vpp_event_queue (s->thread_index);
unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
if (svm_fifo_set_event (tx_fifo))
{
/* Fabricate TX event, send to ourselves */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
evt.event_id = 0;
q = session_manager_get_vpp_event_queue (s->thread_index);
unix_shared_memory_queue_add (q, (u8 *) & evt,
0 /* do wait for mutex */ );
}
return 0;
}
+28 -19
View File
@@ -244,44 +244,53 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
/* Get session's server */
server0 = application_get (s0->app_index);
/* Fabricate event */
evt.fifo = s0->server_rx_fifo;
evt.event_type = FIFO_EVENT_SERVER_RX;
evt.event_id = serial_number++;
evt.enqueue_length = svm_fifo_max_dequeue (s0->server_rx_fifo);
/* Built-in server? Deliver the goods... */
if (server0->cb_fns.builtin_server_rx_callback)
{
server0->cb_fns.builtin_server_rx_callback (s0, &evt);
server0->cb_fns.builtin_server_rx_callback (s0);
continue;
}
/* Add event to server's event queue */
q = server0->event_queue;
/* Don't block for lack of space */
if (PREDICT_TRUE (q->cursize < q->maxsize))
unix_shared_memory_queue_add (server0->event_queue, (u8 *) & evt,
0 /* do wait for mutex */ );
else
if (svm_fifo_set_event (s0->server_rx_fifo))
{
vlib_node_increment_counter (vm, udp4_uri_input_node.index,
SESSION_ERROR_FIFO_FULL, 1);
/* Fabricate event */
evt.fifo = s0->server_rx_fifo;
evt.event_type = FIFO_EVENT_SERVER_RX;
evt.event_id = serial_number++;
/* Add event to server's event queue */
q = server0->event_queue;
/* Don't block for lack of space */
if (PREDICT_TRUE (q->cursize < q->maxsize))
{
unix_shared_memory_queue_add (server0->event_queue,
(u8 *) & evt,
0 /* do wait for mutex */ );
}
else
{
vlib_node_increment_counter (vm, udp4_uri_input_node.index,
SESSION_ERROR_FIFO_FULL, 1);
}
}
/* *INDENT-OFF* */
if (1)
{
ELOG_TYPE_DECLARE (e) =
{
.format = "evt-enqueue: id %d length %d",.format_args = "i4i4",};
.format = "evt-enqueue: id %d length %d",
.format_args = "i4i4",};
struct
{
u32 data[2];
} *ed;
ed = ELOG_DATA (&vlib_global_main.elog_main, e);
ed->data[0] = evt.event_id;
ed->data[1] = evt.enqueue_length;
ed->data[1] = svm_fifo_max_dequeue (s0->server_rx_fifo);
}
/* *INDENT-ON* */
}
vec_reset_length (session_indices_to_enqueue);