udp: refactor udp code

Change-Id: I44d5c9df7c49b8d4d5677c6d319033b2da3e6b80
Signed-off-by: Florin Coras <fcoras@cisco.com>
This commit is contained in:
Florin Coras
2017-10-02 00:18:51 -07:00
committed by Dave Barach
parent 0cb01bde49
commit 3cbc04bea0
40 changed files with 1972 additions and 1274 deletions

91
src/scripts/vnet/uri/dummy_app.py Normal file → Executable file
View File

@ -3,6 +3,7 @@
import socket
import sys
import time
import argparse
# action can be reflect or drop
action = "drop"
@ -32,37 +33,52 @@ def handle_connection (connection, client_address):
connection.sendall(data)
finally:
connection.close()
def run_server(ip, port):
print("Starting server {}:{}".format(repr(ip), repr(port)))
def run_tcp_server(ip, port):
print("Starting TCP server {}:{}".format(repr(ip), repr(port)))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = (ip, int(port))
sock.bind(server_address)
sock.listen(1)
while True:
connection, client_address = sock.accept()
handle_connection (connection, client_address)
def run_udp_server(ip, port):
print("Starting UDP server {}:{}".format(repr(ip), repr(port)))
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = (ip, int(port))
sock.bind(server_address)
while True:
data, addr = sock.recvfrom(4096)
if (action != "drop"):
#snd_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto (data, addr)
def prepare_data():
def run_server(ip, port, proto):
if (proto == "tcp"):
run_tcp_server(ip, port)
elif (proto == "udp"):
run_udp_server(ip, port)
def prepare_data(power):
buf = []
for i in range (0, pow(2, 16)):
for i in range (0, pow(2, power)):
buf.append(i & 0xff)
return bytearray(buf)
def run_client(ip, port):
print("Starting client {}:{}".format(repr(ip), repr(port)))
def run_tcp_client(ip, port):
print("Starting TCP client {}:{}".format(repr(ip), repr(port)))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (ip, port)
server_address = (ip, int(port))
sock.connect(server_address)
data = prepare_data()
data = prepare_data(16)
n_rcvd = 0
n_sent = len (data)
try:
sock.sendall(data)
timeout = time.time() + 2
while n_rcvd < n_sent and time.time() < timeout:
tmp = sock.recv(1500)
@ -73,28 +89,53 @@ def run_client(ip, port):
print("Difference at byte {}. Sent {} got {}"
.format(n_rcvd + i, data[n_rcvd + i], tmp[i]))
n_rcvd += n_read
if (n_rcvd < n_sent or n_rcvd > n_sent):
print("Sent {} and got back {}".format(n_sent, n_rcvd))
else:
print("Got back what we've sent!!");
finally:
sock.close()
def run(mode, ip, port):
def run_udp_client(ip, port):
print("Starting UDP client {}:{}".format(repr(ip), repr(port)))
n_packets = 100
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = (ip, int(port))
data = prepare_data(10)
try:
for i in range (0, n_packets):
sock.sendto(data, server_address)
finally:
sock.close()
def run_client(ip, port, proto):
if (proto == "tcp"):
run_tcp_client(ip, port)
elif (proto == "udp"):
run_udp_client(ip, port)
def run(mode, ip, port, proto):
if (mode == "server"):
run_server (ip, port)
run_server (ip, port, proto)
elif (mode == "client"):
run_client (ip, port)
run_client (ip, port, proto)
else:
raise Exception("Unknown mode. Only client and server supported")
if __name__ == "__main__":
if (len(sys.argv)) < 4:
raise Exception("Usage: ./dummy_app <mode> <ip> <port> [<action> <test>]")
if (len(sys.argv) == 6):
action = sys.argv[4]
test = int(sys.argv[5])
run (sys.argv[1], sys.argv[2], int(sys.argv[3]))
parser = argparse.ArgumentParser()
parser.add_argument('-m', action='store', dest='mode')
parser.add_argument('-i', action='store', dest='ip')
parser.add_argument('-p', action='store', dest='port')
parser.add_argument('-proto', action='store', dest='proto')
parser.add_argument('-a', action='store', dest='action')
parser.add_argument('-t', action='store', dest='test')
results = parser.parse_args()
action = results.action
test = results.test
run(results.mode, results.ip, results.port, results.proto)
#if (len(sys.argv)) < 4:
# raise Exception("Usage: ./dummy_app <mode> <ip> <port> [<action> <test>]")
#if (len(sys.argv) == 6):
# action = sys.argv[4]
# test = int(sys.argv[5])
#run (sys.argv[1], sys.argv[2], int(sys.argv[3]))

View File

@ -1,6 +1,12 @@
loop create
set int ip address loop0 6.0.0.1/32
set int state loop0 up
set int state GigabitEthernet1b/0/0 up
set int ip address GigabitEthernet1b/0/0 192.168.1.1/24
create host-interface name vpp1
set int state host-vpp1 up
set int ip address host-vpp1 6.0.1.1/24
packet-generator new {
name udp

View File

@ -327,6 +327,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
svm_fifo_segment_create_args_t _a, *a = &_a;
int rv;
memset (a, 0, sizeof (*a));
a->segment_name = (char *) mp->segment_name;
a->segment_size = mp->segment_size;
/* Attach to the segment vpp created */
@ -590,7 +591,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
u32 bytes_to_snd;
u32 queue_max_chunk = 128 << 10, actual_write;
session_fifo_event_t evt;
static int serial_number = 0;
int rv;
bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
@ -615,7 +615,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = serial_number++;
unix_shared_memory_queue_add (utm->vpp_event_queue,
(u8 *) & evt,
@ -918,6 +917,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
memset (rmp, 0, sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
rmp->handle = mp->handle;
rmp->context = mp->context;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
session->bytes_received = 0;
@ -983,7 +983,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = e->event_id;
q = utm->vpp_event_queue;
unix_shared_memory_queue_add (q, (u8 *) & evt,
@ -997,7 +996,7 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
void
server_handle_event_queue (uri_tcp_test_main_t * utm)
{
session_fifo_event_t _e, *e = &_e;;
session_fifo_event_t _e, *e = &_e;
while (1)
{

File diff suppressed because it is too large Load Diff

View File

@ -136,7 +136,6 @@ typedef struct vppcom_main_t_
u8 init;
u32 *client_session_index_fifo;
volatile u32 bind_session_index;
u32 tx_event_id;
int main_cpu;
/* vpe input queue */
@ -2328,7 +2327,6 @@ vppcom_session_write (uint32_t session_index, void *buf, int n)
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = vcm->tx_event_id++;
rval = vppcom_session_at_index (session_index, &session);
if (PREDICT_FALSE (rval))

View File

@ -877,7 +877,7 @@ libvnet_la_SOURCES += \
vnet/session/session_table.c \
vnet/session/session_lookup.c \
vnet/session/session_node.c \
vnet/session/transport_interface.c \
vnet/session/transport.c \
vnet/session/application.c \
vnet/session/session_cli.c \
vnet/session/application_interface.c \

View File

@ -63,6 +63,24 @@ ip_is_local (u32 fib_index, ip46_address_t * ip46_address, u8 is_ip4)
return (flags & FIB_ENTRY_FLAG_LOCAL);
}
void
ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4)
{
if (is_ip4)
dst->ip4.as_u32 = src->ip4.as_u32;
else
clib_memcpy (&dst->ip6, &src->ip6, sizeof (ip6_address_t));
}
void
ip_set (ip46_address_t * dst, void *src, u8 is_ip4)
{
if (is_ip4)
dst->ip4.as_u32 = ((ip4_address_t *) src)->as_u32;
else
clib_memcpy (&dst->ip6, (ip6_address_t *) src, sizeof (ip6_address_t));
}
u8
ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4)
{
@ -97,22 +115,37 @@ ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4)
return 0;
}
void
ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4)
void *
ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4)
{
if (is_ip4)
dst->ip4.as_u32 = src->ip4.as_u32;
else
clib_memcpy (&dst->ip6, &src->ip6, sizeof (ip6_address_t));
}
ip_lookup_main_t *lm4 = &ip4_main.lookup_main;
ip_lookup_main_t *lm6 = &ip6_main.lookup_main;
ip_interface_address_t *ia = 0;
void
ip_set (ip46_address_t * dst, void *src, u8 is_ip4)
{
if (is_ip4)
dst->ip4.as_u32 = ((ip4_address_t *) src)->as_u32;
{
/* *INDENT-OFF* */
foreach_ip_interface_address (lm4, ia, sw_if_index, 1 /* unnumbered */ ,
({
return ip_interface_address_get_address (lm4, ia);
}));
/* *INDENT-ON* */
}
else
clib_memcpy (&dst->ip6, (ip6_address_t *) src, sizeof (ip6_address_t));
{
/* *INDENT-OFF* */
foreach_ip_interface_address (lm6, ia, sw_if_index, 1 /* unnumbered */ ,
({
ip6_address_t *rv;
rv = ip_interface_address_get_address (lm6, ia);
/* Trying to use a link-local ip6 src address is a fool's errand */
if (!ip6_address_is_link_local_unicast (rv))
return rv;
}));
/* *INDENT-ON* */
}
return 0;
}
/*

View File

@ -198,6 +198,7 @@ u8 ip_is_local (u32 fib_index, ip46_address_t * ip46_address, u8 is_ip4);
u8 ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4);
void ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4);
void ip_set (ip46_address_t * dst, void *src, u8 is_ip4);
void *ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4);
#endif /* included_ip_main_h */

View File

@ -415,7 +415,6 @@ application_open_session (application_t * app, session_endpoint_t * sep,
u32 api_context)
{
segment_manager_t *sm;
transport_connection_t *tc = 0;
int rv;
/* Make sure we have a segment manager for connects */
@ -427,13 +426,9 @@ application_open_session (application_t * app, session_endpoint_t * sep,
app->connects_seg_manager = segment_manager_index (sm);
}
if ((rv = stream_session_open (app->index, sep, &tc)))
if ((rv = session_open (app->index, sep, api_context)))
return rv;
/* Store api_context for when the reply comes. Not the nicest thing
* but better than allocating a separate half-open pool. */
tc->s_index = api_context;
return 0;
}

View File

@ -92,7 +92,8 @@ static int
vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle)
{
application_t *app;
u32 table_index, listener_index;
u32 table_index;
u64 listener;
int rv, have_local = 0;
app = application_get_if_valid (app_index);
@ -108,8 +109,8 @@ vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle)
table_index = application_session_table (app,
session_endpoint_fib_proto (sep));
listener_index = session_lookup_session_endpoint (table_index, sep);
if (listener_index != SESSION_INVALID_INDEX)
listener = session_lookup_session_endpoint (table_index, sep);
if (listener != SESSION_INVALID_HANDLE)
return VNET_API_ERROR_ADDRESS_IN_USE;
/*
@ -119,8 +120,8 @@ vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle)
if (application_has_local_scope (app) && session_endpoint_is_zero (sep))
{
table_index = application_local_session_table (app);
listener_index = session_lookup_session_endpoint (table_index, sep);
if (listener_index != SESSION_INVALID_INDEX)
listener = session_lookup_session_endpoint (table_index, sep);
if (listener != SESSION_INVALID_HANDLE)
return VNET_API_ERROR_ADDRESS_IN_USE;
session_lookup_add_session_endpoint (table_index, sep, app->index);
*handle = session_lookup_local_listener_make_handle (sep);
@ -206,6 +207,7 @@ vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep,
{
application_t *server, *app;
u32 table_index;
stream_session_t *listener;
if (session_endpoint_is_zero (sep))
return VNET_API_ERROR_INVALID_VALUE;
@ -243,10 +245,13 @@ vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep,
table_index = application_session_table (app,
session_endpoint_fib_proto (sep));
app_index = session_lookup_session_endpoint (table_index, sep);
server = application_get (app_index);
if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT))
return app_connect_redirect (server, mp);
listener = session_lookup_listener (table_index, sep);
if (listener)
{
server = application_get (listener->app_index);
if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT))
return app_connect_redirect (server, mp);
}
/*
* Not connecting to a local server, propagate to transport
@ -470,14 +475,15 @@ vnet_unbind_uri (vnet_unbind_args_t * a)
clib_error_t *
vnet_connect_uri (vnet_connect_args_t * a)
{
session_endpoint_t sep = SESSION_ENDPOINT_NULL;
session_endpoint_t sep_null = SESSION_ENDPOINT_NULL;
int rv;
/* Parse uri */
rv = parse_uri (a->uri, &sep);
a->sep = sep_null;
rv = parse_uri (a->uri, &a->sep);
if (rv)
return clib_error_return_code (0, rv, 0, "app init: %d", rv);
if ((rv = vnet_connect_i (a->app_index, a->api_context, &sep, a->mp)))
if ((rv = vnet_connect_i (a->app_index, a->api_context, &a->sep, a->mp)))
return clib_error_return_code (0, rv, 0, "connect failed");
return 0;
}
@ -489,7 +495,7 @@ vnet_disconnect_session (vnet_disconnect_args_t * a)
stream_session_t *s;
session_parse_handle (a->handle, &index, &thread_index);
s = stream_session_get_if_valid (index, thread_index);
s = session_get_if_valid (index, thread_index);
if (!s || s->app_index != a->app_index)
return VNET_API_ERROR_INVALID_VALUE;

View File

@ -56,11 +56,7 @@ typedef struct _vnet_bind_args_t
union
{
char *uri;
struct
{
session_endpoint_t sep;
transport_proto_t proto;
};
session_endpoint_t sep;
};
u32 app_index;
@ -86,23 +82,14 @@ typedef struct _vnet_unbind_args_t
typedef struct _vnet_connect_args
{
union
{
char *uri;
struct
{
session_endpoint_t sep;
transport_proto_t proto;
};
};
char *uri;
session_endpoint_t sep;
u32 app_index;
u32 api_context;
/* Used for redirects */
void *mp;
/* used for proxy connections */
u64 server_handle;
u64 session_handle;
} vnet_connect_args_t;
typedef struct _vnet_disconnect_args_t

View File

@ -273,7 +273,7 @@ segment_manager_del_sessions (segment_manager_t * sm)
if (session->session_state != SESSION_STATE_CLOSED)
{
session->session_state = SESSION_STATE_CLOSED;
session_send_session_evt_to_thread (stream_session_handle
session_send_session_evt_to_thread (session_handle
(session),
FIFO_EVENT_DISCONNECT,
thread_index);

File diff suppressed because it is too large Load Diff

View File

@ -105,7 +105,7 @@ typedef CLIB_PACKED (struct {
rpc_args_t rpc_args;
};
u8 event_type;
u16 event_id;
u8 postponed;
}) session_fifo_event_t;
/* *INDENT-ON* */
@ -128,17 +128,21 @@ struct _session_manager_main
/** Per worker thread session pools */
stream_session_t **sessions;
/** Per worker-thread count of threads peeking into the session pool */
u32 *session_peekers;
/** Per worker-thread rw peekers locks */
clib_spinlock_t *peekers_readers_locks;
clib_spinlock_t *peekers_write_locks;
/** Pool of listen sessions. Same type as stream sessions to ease lookups */
stream_session_t *listen_sessions[SESSION_N_TYPES];
/** Sparse vector to map dst port to stream server */
u16 *stream_server_by_dst_port[SESSION_N_TYPES];
/** Per-proto, per-worker enqueue epoch counters */
u8 *current_enqueue_epoch[TRANSPORT_N_PROTO];
/** per-worker enqueue epoch counters */
u8 *current_enqueue_epoch;
/** Per-worker thread vector of sessions to enqueue */
u32 **session_indices_to_enqueue_by_thread;
/** Per-proto, per-worker thread vector of sessions to enqueue */
u32 **session_to_enqueue[TRANSPORT_N_PROTO];
/** per-worker tx buffer free lists */
u32 **tx_buffers;
@ -149,6 +153,9 @@ struct _session_manager_main
/** per-worker active event vectors */
session_fifo_event_t **pending_event_vector;
/** per-worker postponed disconnects */
session_fifo_event_t **pending_disconnects;
/** vpp fifo event queue */
unix_shared_memory_queue_t **vpp_event_queues;
@ -213,6 +220,8 @@ stream_session_is_valid (u32 si, u8 thread_index)
return 1;
}
stream_session_t *session_alloc (u32 thread_index);
always_inline stream_session_t *
session_get (u32 si, u32 thread_index)
{
@ -221,7 +230,7 @@ session_get (u32 si, u32 thread_index)
}
always_inline stream_session_t *
stream_session_get_if_valid (u64 si, u32 thread_index)
session_get_if_valid (u64 si, u32 thread_index)
{
if (thread_index >= vec_len (session_manager_main.sessions))
return 0;
@ -234,7 +243,7 @@ stream_session_get_if_valid (u64 si, u32 thread_index)
}
always_inline u64
stream_session_handle (stream_session_t * s)
session_handle (stream_session_t * s)
{
return ((u64) s->thread_index << 32) | (u64) s->session_index;
}
@ -267,6 +276,66 @@ session_get_from_handle (u64 handle)
session_index_from_handle (handle));
}
/**
* Acquires a lock that blocks a session pool from expanding.
*
* This is typically used for safely peeking into other threads'
* pools in order to clone elements. Lock should be dropped as soon
* as possible by calling @ref session_pool_remove_peeker.
*
* NOTE: Avoid using pool_elt_at_index while the lock is held because
* it may lead to free elt bitmap expansion/contraction!
*/
always_inline void
session_pool_add_peeker (u32 thread_index)
{
session_manager_main_t *smm = &session_manager_main;
if (thread_index == vlib_get_thread_index ())
return;
clib_spinlock_lock_if_init (&smm->peekers_readers_locks[thread_index]);
smm->session_peekers[thread_index] += 1;
if (smm->session_peekers[thread_index] == 1)
clib_spinlock_lock_if_init (&smm->peekers_write_locks[thread_index]);
clib_spinlock_unlock_if_init (&smm->peekers_readers_locks[thread_index]);
}
always_inline void
session_pool_remove_peeker (u32 thread_index)
{
session_manager_main_t *smm = &session_manager_main;
if (thread_index == vlib_get_thread_index ())
return;
ASSERT (session_manager_main.session_peekers[thread_index] > 0);
clib_spinlock_lock_if_init (&smm->peekers_readers_locks[thread_index]);
smm->session_peekers[thread_index] -= 1;
if (smm->session_peekers[thread_index] == 0)
clib_spinlock_unlock_if_init (&smm->peekers_write_locks[thread_index]);
clib_spinlock_unlock_if_init (&smm->peekers_readers_locks[thread_index]);
}
/**
* Get session from handle and 'lock' pool resize if not in same thread
*
* Caller should drop the peek 'lock' as soon as possible.
*/
always_inline stream_session_t *
session_get_from_handle_safe (u64 handle)
{
session_manager_main_t *smm = &session_manager_main;
u32 thread_index = session_thread_from_handle (handle);
if (thread_index == vlib_get_thread_index ())
{
return pool_elt_at_index (smm->sessions[thread_index],
session_index_from_handle (handle));
}
else
{
session_pool_add_peeker (thread_index);
/* Don't use pool_elt_at index. See @ref session_pool_add_peeker */
return smm->sessions[thread_index] + session_index_from_handle (handle);
}
}
always_inline stream_session_t *
stream_session_listener_get (u8 sst, u64 si)
{
@ -296,17 +365,52 @@ stream_session_rx_fifo_size (transport_connection_t * tc)
return s->server_rx_fifo->nitems;
}
always_inline u32
session_get_index (stream_session_t * s)
{
return (s - session_manager_main.sessions[s->thread_index]);
}
always_inline stream_session_t *
session_clone_safe (u32 session_index, u32 thread_index)
{
stream_session_t *old_s, *new_s;
u32 current_thread_index = vlib_get_thread_index ();
/* If during the memcpy pool is reallocated AND the memory allocator
* decides to give the old chunk of memory to somebody in a hurry to
* scribble something on it, we have a problem. So add this thread as
* a session pool peeker.
*/
session_pool_add_peeker (thread_index);
new_s = session_alloc (current_thread_index);
old_s = session_manager_main.sessions[thread_index] + session_index;
clib_memcpy (new_s, old_s, sizeof (*new_s));
session_pool_remove_peeker (thread_index);
new_s->thread_index = current_thread_index;
new_s->session_index = session_get_index (new_s);
return new_s;
}
transport_connection_t *session_get_transport (stream_session_t * s);
u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc);
stream_session_t *session_alloc (u32 thread_index);
int
stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
u32 offset, u8 queue_event, u8 is_in_order);
int
stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes);
session_enqueue_stream_connection (transport_connection_t * tc,
vlib_buffer_t * b, u32 offset,
u8 queue_event, u8 is_in_order);
int session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b,
u8 proto, u8 queue_event);
int stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes);
u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes);
int stream_session_connect_notify (transport_connection_t * tc, u8 is_fail);
int session_stream_connect_notify (transport_connection_t * tc, u8 is_fail);
int session_dgram_connect_notify (transport_connection_t * tc,
u32 old_thread_index,
stream_session_t ** new_session);
void stream_session_init_fifos_pointers (transport_connection_t * tc,
u32 rx_pointer, u32 tx_pointer);
@ -314,12 +418,9 @@ void stream_session_accept_notify (transport_connection_t * tc);
void stream_session_disconnect_notify (transport_connection_t * tc);
void stream_session_delete_notify (transport_connection_t * tc);
void stream_session_reset_notify (transport_connection_t * tc);
int
stream_session_accept (transport_connection_t * tc, u32 listener_index,
u8 notify);
int
stream_session_open (u32 app_index, session_endpoint_t * tep,
transport_connection_t ** tc);
int stream_session_accept (transport_connection_t * tc, u32 listener_index,
u8 notify);
int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque);
int stream_session_listen (stream_session_t * s, session_endpoint_t * tep);
int stream_session_stop_listen (stream_session_t * s);
void stream_session_disconnect (stream_session_t * s);
@ -346,7 +447,7 @@ session_manager_get_vpp_event_queue (u32 thread_index)
return session_manager_main.vpp_event_queues[thread_index];
}
int session_manager_flush_enqueue_events (u32 thread_index);
int session_manager_flush_enqueue_events (u8 proto, u32 thread_index);
always_inline u64
listen_session_get_handle (stream_session_t * s)
@ -400,6 +501,8 @@ listen_session_del (stream_session_t * s)
pool_put (session_manager_main.listen_sessions[s->session_type], s);
}
transport_connection_t *listen_session_get_transport (stream_session_t * s);
int
listen_session_get_local_session_endpoint (stream_session_t * listener,
session_endpoint_t * sep);

View File

@ -99,10 +99,10 @@ send_session_accept_callback (stream_session_t * s)
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
mp->context = server->index;
listener = listen_session_get (s->session_type, s->listener_index);
tp_vft = session_get_transport_vft (s->session_type);
tp_vft = transport_protocol_get_vft (s->session_type);
tc = tp_vft->get_connection (s->connection_index, s->thread_index);
mp->listener_handle = listen_session_get_handle (listener);
mp->handle = stream_session_handle (s);
mp->handle = session_handle (s);
mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
@ -129,7 +129,7 @@ send_session_disconnect_callback (stream_session_t * s)
mp = vl_msg_api_alloc (sizeof (*mp));
memset (mp, 0, sizeof (*mp));
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION);
mp->handle = stream_session_handle (s);
mp->handle = session_handle (s);
vl_msg_api_send_shmem (q, (u8 *) & mp);
}
@ -148,7 +148,7 @@ send_session_reset_callback (stream_session_t * s)
mp = vl_msg_api_alloc (sizeof (*mp));
memset (mp, 0, sizeof (*mp));
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_RESET_SESSION);
mp->handle = stream_session_handle (s);
mp->handle = session_handle (s);
vl_msg_api_send_shmem (q, (u8 *) & mp);
}
@ -175,7 +175,7 @@ send_session_connected_callback (u32 app_index, u32 api_context,
vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo);
mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo);
mp->handle = stream_session_handle (s);
mp->handle = session_handle (s);
mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
mp->retval = 0;
}
@ -463,11 +463,14 @@ vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
}
/*
* Don't reply to stream (tcp) connects. The reply will come once
* the connection is established. In case of the redirects, the reply
* will come from the server app.
*/
if (rv == 0 || rv == VNET_API_ERROR_SESSION_REDIRECT)
return;
/* Got some error, relay it */
done:
/* *INDENT-OFF* */
REPLY_MACRO (VL_API_CONNECT_SESSION_REPLY);
@ -540,7 +543,7 @@ vl_api_reset_session_reply_t_handler (vl_api_reset_session_reply_t * mp)
return;
session_parse_handle (mp->handle, &index, &thread_index);
s = stream_session_get_if_valid (index, thread_index);
s = session_get_if_valid (index, thread_index);
if (s == 0 || app->index != s->app_index)
{
clib_warning ("Invalid session!");
@ -576,7 +579,7 @@ vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
else
{
session_parse_handle (mp->handle, &session_index, &thread_index);
s = stream_session_get_if_valid (session_index, thread_index);
s = session_get_if_valid (session_index, thread_index);
if (!s)
{
clib_warning ("session doesn't exist");
@ -623,8 +626,8 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
a->sep.port = mp->port;
a->sep.fib_index = mp->vrf;
a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
a->sep.transport_proto = mp->proto;
a->app_index = app->index;
a->proto = mp->proto;
if ((error = vnet_bind (a)))
{

View File

@ -55,7 +55,7 @@ format_stream_session (u8 * s, va_list * args)
int verbose = va_arg (*args, int);
transport_proto_vft_t *tp_vft;
u8 *str = 0;
tp_vft = session_get_transport_vft (ss->session_type);
tp_vft = transport_protocol_get_vft (ss->session_type);
if (verbose == 1 && ss->session_state >= SESSION_STATE_ACCEPTING)
str = format (0, "%-10u%-10u%-10lld",
@ -63,9 +63,7 @@ format_stream_session (u8 * s, va_list * args)
svm_fifo_max_enqueue (ss->server_tx_fifo),
stream_session_get_index (ss));
if (ss->session_state == SESSION_STATE_READY
|| ss->session_state == SESSION_STATE_ACCEPTING
|| ss->session_state == SESSION_STATE_CLOSED)
if (ss->session_state >= SESSION_STATE_ACCEPTING)
{
s = format (s, "%U", tp_vft->format_connection, ss->connection_index,
ss->thread_index, verbose);
@ -146,16 +144,17 @@ unformat_stream_session (unformat_input_t * input, va_list * args)
return 0;
if (is_ip4)
s = session_lookup4 (fib_index, &lcl.ip4, &rmt.ip4,
clib_host_to_net_u16 (lcl_port),
clib_host_to_net_u16 (rmt_port), proto);
s = session_lookup_safe4 (fib_index, &lcl.ip4, &rmt.ip4,
clib_host_to_net_u16 (lcl_port),
clib_host_to_net_u16 (rmt_port), proto);
else
s = session_lookup6 (fib_index, &lcl.ip6, &rmt.ip6,
clib_host_to_net_u16 (lcl_port),
clib_host_to_net_u16 (rmt_port), proto);
s = session_lookup_safe6 (fib_index, &lcl.ip6, &rmt.ip6,
clib_host_to_net_u16 (lcl_port),
clib_host_to_net_u16 (rmt_port), proto);
if (s)
{
*result = s;
session_pool_remove_peeker (s->thread_index);
return 1;
}
return 0;
@ -324,7 +323,7 @@ clear_session_command_fn (vlib_main_t * vm, unformat_input_t * input,
if (session_index != ~0)
{
session = stream_session_get_if_valid (session_index, thread_index);
session = session_get_if_valid (session_index, thread_index);
if (!session)
return clib_error_return (0, "no session %d on thread %d",
session_index, thread_index);

View File

@ -116,7 +116,7 @@ always_inline void
make_v4_ss_kv_from_tc (session_kv4_t * kv, transport_connection_t * t)
{
make_v4_ss_kv (kv, &t->lcl_ip.ip4, &t->rmt_ip.ip4, t->lcl_port, t->rmt_port,
session_type_from_proto_and_ip (t->transport_proto, 1));
session_type_from_proto_and_ip (t->proto, 1));
}
always_inline void
@ -159,7 +159,7 @@ always_inline void
make_v6_ss_kv_from_tc (session_kv6_t * kv, transport_connection_t * t)
{
make_v6_ss_kv (kv, &t->lcl_ip.ip6, &t->rmt_ip.ip6, t->lcl_port, t->rmt_port,
session_type_from_proto_and_ip (t->transport_proto, 0));
session_type_from_proto_and_ip (t->proto, 0));
}
@ -339,7 +339,7 @@ session_lookup_del_session (stream_session_t * s)
return session_lookup_del_connection (ts);
}
u32
u64
session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep)
{
session_table_t *st;
@ -349,14 +349,14 @@ session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep)
st = session_table_get (table_index);
if (!st)
return SESSION_INVALID_INDEX;
return SESSION_INVALID_HANDLE;
if (sep->is_ip4)
{
make_v4_listener_kv (&kv4, &sep->ip.ip4, sep->port,
sep->transport_proto);
rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
if (rv == 0)
return (u32) kv4.value;
return kv4.value;
}
else
{
@ -364,9 +364,43 @@ session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep)
sep->transport_proto);
rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
if (rv == 0)
return (u32) kv6.value;
return kv6.value;
}
return SESSION_INVALID_INDEX;
return SESSION_INVALID_HANDLE;
}
stream_session_t *
session_lookup_global_session_endpoint (session_endpoint_t * sep)
{
session_table_t *st;
session_kv4_t kv4;
session_kv6_t kv6;
u8 fib_proto;
u32 table_index;
int rv;
fib_proto = session_endpoint_fib_proto (sep);
table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
st = session_table_get (table_index);
if (!st)
return 0;
if (sep->is_ip4)
{
make_v4_listener_kv (&kv4, &sep->ip.ip4, sep->port,
sep->transport_proto);
rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
if (rv == 0)
return session_get_from_handle (kv4.value);
}
else
{
make_v6_listener_kv (&kv6, &sep->ip.ip6, sep->port,
sep->transport_proto);
rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
if (rv == 0)
return session_get_from_handle (kv6.value);
}
return 0;
}
u32
@ -562,7 +596,7 @@ session_lookup_half_open_handle (transport_connection_t * tc)
if (tc->is_ip4)
{
make_v4_ss_kv (&kv4, &tc->lcl_ip.ip4, &tc->rmt_ip.ip4, tc->lcl_port,
tc->rmt_port, tc->transport_proto);
tc->rmt_port, tc->proto);
rv = clib_bihash_search_inline_16_8 (&st->v4_half_open_hash, &kv4);
if (rv == 0)
return kv4.value;
@ -570,7 +604,7 @@ session_lookup_half_open_handle (transport_connection_t * tc)
else
{
make_v6_ss_kv (&kv6, &tc->lcl_ip.ip6, &tc->rmt_ip.ip6, tc->lcl_port,
tc->rmt_port, tc->transport_proto);
tc->rmt_port, tc->proto);
rv = clib_bihash_search_inline_48_8 (&st->v6_half_open_hash, &kv6);
if (rv == 0)
return kv6.value;
@ -713,12 +747,19 @@ session_lookup_connection4 (u32 fib_index, ip4_address_t * lcl,
/**
* Lookup session with ip4 and transport layer information
*
* Lookup logic is identical to that of @ref session_lookup_connection_wt4 but
* this returns a session as opposed to a transport connection;
* Important note: this may look into another thread's pool table and
* register as 'peeker'. Caller should call @ref session_pool_remove_peeker as
* if needed as soon as possible.
*
* Lookup logic is similar to that of @ref session_lookup_connection_wt4 but
* this returns a session as opposed to a transport connection and it does not
* try to lookup half-open sessions.
*
* Typically used by dgram connections
*/
stream_session_t *
session_lookup4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt,
u16 lcl_port, u16 rmt_port, u8 proto)
session_lookup_safe4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt,
u16 lcl_port, u16 rmt_port, u8 proto)
{
session_table_t *st;
session_kv4_t kv4;
@ -733,16 +774,11 @@ session_lookup4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt,
make_v4_ss_kv (&kv4, lcl, rmt, lcl_port, rmt_port, proto);
rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4);
if (rv == 0)
return session_get_from_handle (kv4.value);
return session_get_from_handle_safe (kv4.value);
/* If nothing is found, check if any listener is available */
if ((s = session_lookup_listener4_i (st, lcl, lcl_port, proto)))
return s;
/* Finally, try half-open connections */
rv = clib_bihash_search_inline_16_8 (&st->v4_half_open_hash, &kv4);
if (rv == 0)
return session_get_from_handle (kv4.value);
return 0;
}
@ -868,12 +904,19 @@ session_lookup_connection6 (u32 fib_index, ip6_address_t * lcl,
/**
* Lookup session with ip6 and transport layer information
*
* Lookup logic is identical to that of @ref session_lookup_connection_wt6 but
* this returns a session as opposed to a transport connection;
* Important note: this may look into another thread's pool table and
* register as 'peeker'. Caller should call @ref session_pool_remove_peeker as
* if needed as soon as possible.
*
* Lookup logic is similar to that of @ref session_lookup_connection_wt6 but
* this returns a session as opposed to a transport connection and it does not
* try to lookup half-open sessions.
*
* Typically used by dgram connections
*/
stream_session_t *
session_lookup6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt,
u16 lcl_port, u16 rmt_port, u8 proto)
session_lookup_safe6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt,
u16 lcl_port, u16 rmt_port, u8 proto)
{
session_table_t *st;
session_kv6_t kv6;
@ -887,16 +930,11 @@ session_lookup6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt,
make_v6_ss_kv (&kv6, lcl, rmt, lcl_port, rmt_port, proto);
rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6);
if (rv == 0)
return session_get_from_handle (kv6.value);
return session_get_from_handle_safe (kv6.value);
/* If nothing is found, check if any listener is available */
if ((s = session_lookup_listener6_i (st, lcl, lcl_port, proto)))
return s;
/* Finally, try half-open connections */
rv = clib_bihash_search_inline_48_8 (&st->v6_half_open_hash, &kv6);
if (rv == 0)
return session_get_from_handle (kv6.value);
return 0;
}

View File

@ -20,12 +20,12 @@
#include <vnet/session/stream_session.h>
#include <vnet/session/transport.h>
stream_session_t *session_lookup4 (u32 fib_index, ip4_address_t * lcl,
ip4_address_t * rmt, u16 lcl_port,
u16 rmt_port, u8 proto);
stream_session_t *session_lookup6 (u32 fib_index, ip6_address_t * lcl,
ip6_address_t * rmt, u16 lcl_port,
u16 rmt_port, u8 proto);
stream_session_t *session_lookup_safe4 (u32 fib_index, ip4_address_t * lcl,
ip4_address_t * rmt, u16 lcl_port,
u16 rmt_port, u8 proto);
stream_session_t *session_lookup_safe6 (u32 fib_index, ip6_address_t * lcl,
ip6_address_t * rmt, u16 lcl_port,
u16 rmt_port, u8 proto);
transport_connection_t *session_lookup_connection_wt4 (u32 fib_index,
ip4_address_t * lcl,
ip4_address_t * rmt,
@ -58,10 +58,12 @@ stream_session_t *session_lookup_listener (u32 table_index,
session_endpoint_t * sep);
int session_lookup_add_connection (transport_connection_t * tc, u64 value);
int session_lookup_del_connection (transport_connection_t * tc);
u32 session_lookup_session_endpoint (u32 table_index,
u64 session_lookup_session_endpoint (u32 table_index,
session_endpoint_t * sep);
u32 session_lookup_local_session_endpoint (u32 table_index,
session_endpoint_t * sep);
stream_session_t *session_lookup_global_session_endpoint (session_endpoint_t
*);
int session_lookup_add_session_endpoint (u32 table_index,
session_endpoint_t * sep, u64 value);
int session_lookup_del_session_endpoint (u32 table_index,

View File

@ -154,7 +154,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
next_index = next0 = session_type_to_next[s0->session_type];
transport_vft = session_get_transport_vft (s0->session_type);
transport_vft = transport_protocol_get_vft (s0->session_type);
tc0 = transport_vft->get_connection (s0->connection_index, thread_index);
/* Make sure we have space to send and there's something to dequeue */
@ -401,8 +401,7 @@ session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
always_inline stream_session_t *
session_event_get_session (session_fifo_event_t * e, u8 thread_index)
{
return stream_session_get_if_valid (e->fifo->master_session_index,
thread_index);
return session_get_if_valid (e->fifo->master_session_index, thread_index);
}
void
@ -540,7 +539,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
session_fifo_event_t *my_pending_event_vector, *e;
session_fifo_event_t *my_pending_event_vector, *pending_disconnects, *e;
session_fifo_event_t *my_fifo_events;
u32 n_to_dequeue, n_events;
unix_shared_memory_queue_t *q;
@ -570,8 +569,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
/* min number of events we can dequeue without blocking */
n_to_dequeue = q->cursize;
my_pending_event_vector = smm->pending_event_vector[my_thread_index];
pending_disconnects = smm->pending_disconnects[my_thread_index];
if (n_to_dequeue == 0 && vec_len (my_pending_event_vector) == 0)
if (!n_to_dequeue && !vec_len (my_pending_event_vector)
&& !vec_len (pending_disconnects))
return 0;
SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
@ -603,9 +604,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
pthread_mutex_unlock (&q->mutex);
vec_append (my_fifo_events, my_pending_event_vector);
vec_append (my_fifo_events, smm->pending_disconnects[my_thread_index]);
_vec_len (my_pending_event_vector) = 0;
smm->pending_event_vector[my_thread_index] = my_pending_event_vector;
_vec_len (smm->pending_disconnects[my_thread_index]) = 0;
skip_dequeue:
n_events = vec_len (my_fifo_events);
@ -644,6 +647,13 @@ skip_dequeue:
}
break;
case FIFO_EVENT_DISCONNECT:
/* Make sure disconnects run after the pending list is drained */
if (!e0->postponed)
{
e0->postponed = 1;
vec_add1 (smm->pending_disconnects[my_thread_index], *e0);
continue;
}
s0 = session_get_from_handle (e0->session_handle);
stream_session_disconnect (s0);
break;

View File

@ -37,6 +37,7 @@ typedef struct _session_lookup_table
#define SESSION_TABLE_INVALID_INDEX ((u32)~0)
#define SESSION_LOCAL_TABLE_PREFIX ((u32)~0)
#define SESSION_INVALID_INDEX ((u32)~0)
#define SESSION_INVALID_HANDLE ((u64)~0)
typedef int (*ip4_session_table_walk_fn_t) (clib_bihash_kv_16_8_t * kvp,
void *ctx);

View File

@ -260,8 +260,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
SESSION_TEST ((s->app_index == server_index), "app_index should be that of "
"the server");
server_local_st_index = application_local_session_table (server);
local_listener = session_lookup_session_endpoint (server_local_st_index,
&server_sep);
local_listener =
session_lookup_local_session_endpoint (server_local_st_index,
&server_sep);
SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
"listener should exist in local table");
@ -312,8 +313,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
s = session_lookup_listener (server_st_index, &server_sep);
SESSION_TEST ((s == 0), "listener should not exist in global table");
local_listener = session_lookup_session_endpoint (server_local_st_index,
&server_sep);
local_listener =
session_lookup_local_session_endpoint (server_local_st_index,
&server_sep);
SESSION_TEST ((s == 0), "listener should not exist in local table");
detach_args.app_index = server_index;
@ -337,8 +339,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
s = session_lookup_listener (server_st_index, &server_sep);
SESSION_TEST ((s == 0), "listener should not exist in global table");
server_local_st_index = application_local_session_table (server);
local_listener = session_lookup_session_endpoint (server_local_st_index,
&server_sep);
local_listener =
session_lookup_local_session_endpoint (server_local_st_index,
&server_sep);
SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
"listener should exist in local table");
@ -346,8 +349,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
error = vnet_unbind (&unbind_args);
SESSION_TEST ((error == 0), "unbind should work");
local_listener = session_lookup_session_endpoint (server_local_st_index,
&server_sep);
local_listener =
session_lookup_local_session_endpoint (server_local_st_index,
&server_sep);
SESSION_TEST ((local_listener == SESSION_INVALID_INDEX),
"listener should not exist in local table");
@ -417,8 +421,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input)
SESSION_TEST ((s->app_index == server_index), "app_index should be that of "
"the server");
server_local_st_index = application_local_session_table (server);
local_listener = session_lookup_session_endpoint (server_local_st_index,
&server_sep);
local_listener =
session_lookup_local_session_endpoint (server_local_st_index,
&server_sep);
SESSION_TEST ((local_listener != SESSION_INVALID_INDEX),
"zero listener should exist in local table");
detach_args.app_index = server_index;

View File

@ -43,6 +43,7 @@ typedef enum
SESSION_STATE_CONNECTING,
SESSION_STATE_ACCEPTING,
SESSION_STATE_READY,
SESSION_STATE_CONNECTING_READY,
SESSION_STATE_CLOSED,
SESSION_STATE_N_STATES,
} stream_session_state_t;

File diff suppressed because it is too large Load Diff

View File

@ -29,7 +29,7 @@ typedef struct _transport_connection
ip46_address_t lcl_ip; /**< Local IP */
u16 lcl_port; /**< Local port */
u16 rmt_port; /**< Remote port */
u8 transport_proto; /**< Protocol id */
u8 proto; /**< Protocol id */
u8 is_ip4; /**< Flag if IP4 connection */
u32 fib_index; /**< Network namespace */
@ -54,7 +54,7 @@ typedef struct _transport_connection
#define c_rmt_ip6 connection.rmt_ip.ip6
#define c_lcl_port connection.lcl_port
#define c_rmt_port connection.rmt_port
#define c_transport_proto connection.transport_proto
#define c_proto connection.proto
#define c_fib_index connection.fib_index
#define c_s_index connection.s_index
#define c_c_index connection.c_index
@ -69,7 +69,8 @@ typedef struct _transport_connection
typedef enum _transport_proto
{
TRANSPORT_PROTO_TCP,
TRANSPORT_PROTO_UDP
TRANSPORT_PROTO_UDP,
TRANSPORT_N_PROTO
} transport_proto_t;
#define foreach_transport_connection_fields \
@ -86,6 +87,8 @@ typedef struct _transport_endpoint
#undef _
} transport_endpoint_t;
typedef clib_bihash_24_8_t transport_endpoint_table_t;
#define ENDPOINT_INVALID_INDEX ((u32)~0)
always_inline u8
@ -94,6 +97,31 @@ transport_connection_fib_proto (transport_connection_t * tc)
return tc->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6;
}
always_inline u8
transport_endpoint_fib_proto (transport_endpoint_t * tep)
{
return tep->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6;
}
always_inline u8
transport_is_stream (u8 proto)
{
return (proto == TRANSPORT_PROTO_TCP);
}
always_inline u8
transport_is_dgram (u8 proto)
{
return (proto == TRANSPORT_PROTO_UDP);
}
int transport_alloc_local_port (u8 proto, ip46_address_t * ip);
int transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt,
ip46_address_t * lcl_addr,
u16 * lcl_port);
void transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port);
void transport_init (void);
#endif /* VNET_VNET_URI_TRANSPORT_H_ */
/*

View File

@ -1,109 +0,0 @@
/*
* Copyright (c) 2017 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vnet/session/transport_interface.h>
#include <vnet/session/session.h>
/**
* Per-type vector of transport protocol virtual function tables
*/
transport_proto_vft_t *tp_vfts;
u32
transport_endpoint_lookup (transport_endpoint_table_t * ht,
ip46_address_t * ip, u16 port)
{
clib_bihash_kv_24_8_t kv;
int rv;
kv.key[0] = ip->as_u64[0];
kv.key[1] = ip->as_u64[1];
kv.key[2] = port;
rv = clib_bihash_search_inline_24_8 (ht, &kv);
if (rv == 0)
return kv.value;
return TRANSPORT_ENDPOINT_INVALID_INDEX;
}
void
transport_endpoint_table_add (transport_endpoint_table_t * ht,
transport_endpoint_t * te, u32 value)
{
clib_bihash_kv_24_8_t kv;
kv.key[0] = te->ip.as_u64[0];
kv.key[1] = te->ip.as_u64[1];
kv.key[2] = te->port;
kv.value = value;
clib_bihash_add_del_24_8 (ht, &kv, 1);
}
void
transport_endpoint_table_del (transport_endpoint_table_t * ht,
transport_endpoint_t * te)
{
clib_bihash_kv_24_8_t kv;
kv.key[0] = te->ip.as_u64[0];
kv.key[1] = te->ip.as_u64[1];
kv.key[2] = te->port;
clib_bihash_add_del_24_8 (ht, &kv, 0);
}
/**
* Register transport virtual function table.
*
* @param type - session type (not protocol type)
* @param vft - virtual function table
*/
void
session_register_transport (transport_proto_t transport_proto, u8 is_ip4,
const transport_proto_vft_t * vft)
{
u8 session_type;
session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
vec_validate (tp_vfts, session_type);
tp_vfts[session_type] = *vft;
/* If an offset function is provided, then peek instead of dequeue */
session_manager_set_transport_rx_fn (session_type,
vft->tx_fifo_offset != 0);
}
/**
* Get transport virtual function table
*
* @param type - session type (not protocol type)
*/
transport_proto_vft_t *
session_get_transport_vft (u8 session_type)
{
if (session_type >= vec_len (tp_vfts))
return 0;
return &tp_vfts[session_type];
}
/*
* fd.io coding-style-patch-verification: ON
*
* Local Variables:
* eval: (c-set-style "gnu")
* End:
*/

View File

@ -56,20 +56,10 @@ typedef struct _transport_proto_vft
u8 *(*format_half_open) (u8 * s, va_list * args);
} transport_proto_vft_t;
typedef clib_bihash_24_8_t transport_endpoint_table_t;
#define TRANSPORT_ENDPOINT_INVALID_INDEX ((u32)~0)
u32 transport_endpoint_lookup (transport_endpoint_table_t * ht,
ip46_address_t * ip, u16 port);
void transport_endpoint_table_add (transport_endpoint_table_t * ht,
transport_endpoint_t * te, u32 value);
void transport_endpoint_table_del (transport_endpoint_table_t * ht,
transport_endpoint_t * te);
void session_register_transport (transport_proto_t transport_proto, u8 is_ip4,
const transport_proto_vft_t * vft);
transport_proto_vft_t *session_get_transport_vft (u8 session_type);
void transport_register_protocol (transport_proto_t transport_proto,
u8 is_ip4,
const transport_proto_vft_t * vft);
transport_proto_vft_t *transport_protocol_get_vft (u8 session_type);
#endif /* SRC_VNET_SESSION_TRANSPORT_INTERFACE_H_ */

View File

@ -50,7 +50,6 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
int test_buf_offset;
u32 bytes_this_chunk;
session_fifo_event_t evt;
static int serial_number = 0;
svm_fifo_t *txf;
int rv;
@ -98,7 +97,6 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
/* Fabricate TX event, send to vpp */
evt.fifo = txf;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = serial_number++;
if (unix_shared_memory_queue_add
(tm->vpp_event_queue[txf->master_thread_index], (u8 *) & evt,
@ -248,12 +246,12 @@ builtin_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
session_parse_handle (sp->vpp_session_handle,
&index, &thread_index);
s = stream_session_get_if_valid (index, thread_index);
s = session_get_if_valid (index, thread_index);
if (s)
{
vnet_disconnect_args_t _a, *a = &_a;
a->handle = stream_session_handle (s);
a->handle = session_handle (s);
a->app_index = tm->app_index;
vnet_disconnect_session (a);
@ -369,7 +367,7 @@ builtin_session_connected_callback (u32 app_index, u32 api_context,
session->server_rx_fifo->client_session_index = session_index;
session->server_tx_fifo = s->server_tx_fifo;
session->server_tx_fifo->client_session_index = session_index;
session->vpp_session_handle = stream_session_handle (s);
session->vpp_session_handle = session_handle (s);
vec_add1 (tm->connection_index_by_thread[thread_index], session_index);
__sync_fetch_and_add (&tm->ready_connections, 1);
@ -403,7 +401,7 @@ builtin_session_disconnect_callback (stream_session_t * s)
{
tclient_main_t *tm = &tclient_main;
vnet_disconnect_args_t _a, *a = &_a;
a->handle = stream_session_handle (s);
a->handle = session_handle (s);
a->app_index = tm->app_index;
vnet_disconnect_session (a);
return;

View File

@ -166,7 +166,6 @@ send_data (builtin_http_server_args * args, u8 * data)
/* Fabricate TX event, send to vpp */
evt.fifo = s->server_tx_fifo;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = 0;
unix_shared_memory_queue_add (hsm->vpp_queue[s->thread_index],
(u8 *) & evt,
@ -346,7 +345,7 @@ http_server_rx_callback (stream_session_t * s)
/* send the command to a new/recycled vlib process */
args = clib_mem_alloc (sizeof (*args));
args->data = vec_dup (hsm->rx_buf);
args->session_handle = stream_session_handle (s);
args->session_handle = session_handle (s);
/* Send an RPC request via the thread-0 input node */
if (vlib_get_thread_index () != 0)
@ -382,7 +381,7 @@ builtin_session_disconnect_callback (stream_session_t * s)
http_server_main_t *bsm = &http_server_main;
vnet_disconnect_args_t _a, *a = &_a;
a->handle = stream_session_handle (s);
a->handle = session_handle (s);
a->app_index = bsm->app_index;
vnet_disconnect_session (a);
}

View File

@ -32,7 +32,7 @@ delete_proxy_session (stream_session_t * s, int is_active_open)
uword *p;
u64 handle;
handle = stream_session_handle (s);
handle = session_handle (s);
clib_spinlock_lock_if_init (&bpm->sessions_lock);
if (is_active_open)
@ -88,19 +88,19 @@ delete_proxy_session (stream_session_t * s, int is_active_open)
if (active_open_session)
{
a->handle = stream_session_handle (active_open_session);
a->handle = session_handle (active_open_session);
a->app_index = bpm->active_open_app_index;
hash_unset (bpm->proxy_session_by_active_open_handle,
stream_session_handle (active_open_session));
session_handle (active_open_session));
vnet_disconnect_session (a);
}
if (server_session)
{
a->handle = stream_session_handle (server_session);
a->handle = session_handle (server_session);
a->app_index = bpm->server_app_index;
hash_unset (bpm->proxy_session_by_server_handle,
stream_session_handle (server_session));
session_handle (server_session));
vnet_disconnect_session (a);
}
}
@ -171,8 +171,7 @@ server_rx_callback (stream_session_t * s)
ASSERT (s->thread_index == thread_index);
clib_spinlock_lock_if_init (&bpm->sessions_lock);
p =
hash_get (bpm->proxy_session_by_server_handle, stream_session_handle (s));
p = hash_get (bpm->proxy_session_by_server_handle, session_handle (s));
if (PREDICT_TRUE (p != 0))
{
@ -218,7 +217,7 @@ server_rx_callback (stream_session_t * s)
memset (ps, 0, sizeof (*ps));
ps->server_rx_fifo = rx_fifo;
ps->server_tx_fifo = tx_fifo;
ps->vpp_server_handle = stream_session_handle (s);
ps->vpp_server_handle = session_handle (s);
proxy_index = ps - bpm->sessions;
@ -268,7 +267,7 @@ active_open_connected_callback (u32 app_index, u32 opaque,
clib_spinlock_lock_if_init (&bpm->sessions_lock);
ps = pool_elt_at_index (bpm->sessions, opaque);
ps->vpp_active_open_handle = stream_session_handle (s);
ps->vpp_active_open_handle = session_handle (s);
s->server_tx_fifo = ps->server_rx_fifo;
s->server_rx_fifo = ps->server_tx_fifo;

View File

@ -73,7 +73,7 @@ builtin_session_disconnect_callback (stream_session_t * s)
builtin_server_main_t *bsm = &builtin_server_main;
vnet_disconnect_args_t _a, *a = &_a;
a->handle = stream_session_handle (s);
a->handle = session_handle (s);
a->app_index = bsm->app_index;
vnet_disconnect_session (a);
}
@ -158,7 +158,6 @@ builtin_server_rx_callback (stream_session_t * s)
svm_fifo_t *tx_fifo, *rx_fifo;
builtin_server_main_t *bsm = &builtin_server_main;
session_fifo_event_t evt;
static int serial_number = 0;
u32 thread_index = vlib_get_thread_index ();
ASSERT (s->thread_index == thread_index);
@ -190,7 +189,6 @@ builtin_server_rx_callback (stream_session_t * s)
unix_shared_memory_queue_t *q;
evt.fifo = rx_fifo;
evt.event_type = FIFO_EVENT_BUILTIN_RX;
evt.event_id = 0;
q = bsm->vpp_queue[thread_index];
if (PREDICT_FALSE (q->cursize == q->maxsize))
@ -232,7 +230,6 @@ builtin_server_rx_callback (stream_session_t * s)
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = serial_number++;
if (unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
(u8 *) & evt,

Some files were not shown because too many files have changed in this diff Show More