Support kube-proxy data plane

This plugin provides kube-proxy data plane on user space,
which is used to replace linux kernal's kube-proxy based on iptables.
The idea is largely inspired from VPP LB plugin.

Currently, kube-proxy plugin supports three service types:
1) Cluster IP plus Port: support any protocols, including TCP, UDP.
2) Node IP plus Node Port: currently only support UDP.
3) External Load Balancer.

Please refer to kp_plugin_doc.md for details.

Change-Id: I36690e417dd26ad5ec1bd77c7ea4b8100416cac6
Signed-off-by: Hongjun Ni <hongjun.ni@intel.com>
This commit is contained in:
Hongjun Ni
2017-08-29 01:00:42 +08:00
committed by Dave Wallace
parent 42998828c9
commit c91f50242f
13 changed files with 3798 additions and 0 deletions

View File

@ -215,6 +215,7 @@ PLUGIN_ENABLED(gtpu)
PLUGIN_ENABLED(ila)
PLUGIN_ENABLED(ioam)
PLUGIN_ENABLED(ixge)
PLUGIN_ENABLED(kubeproxy)
PLUGIN_ENABLED(lb)
PLUGIN_ENABLED(memif)
PLUGIN_ENABLED(pppoe)

View File

@ -59,6 +59,10 @@ if ENABLE_IXGE_PLUGIN
include ixge.am
endif
if ENABLE_KUBEPROXY_PLUGIN
include kubeproxy.am
endif
if ENABLE_LB_PLUGIN
include lb.am
endif

38
src/plugins/kubeproxy.am Normal file
View File

@ -0,0 +1,38 @@
# Copyright (c) 2017 Intel Corporation, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
vppapitestplugins_LTLIBRARIES += kubeproxy_test_plugin.la
vppplugins_LTLIBRARIES += kubeproxy_plugin.la
kubeproxy_plugin_la_SOURCES = \
kubeproxy/kp.c \
kubeproxy/kp_node.c \
kubeproxy/kp_cli.c \
kubeproxy/kp_api.c
BUILT_SOURCES += \
kubeproxy/kp.api.h \
kubeproxy/kp.api.json
API_FILES += kubeproxy/kp.api
noinst_HEADERS += \
kubeproxy/kp.h \
kubeproxy/kphash.h \
kubeproxy/kp.api.h
kubeproxy_test_plugin_la_SOURCES = \
kubeproxy/kp_test.c \
kubeproxy/kp_plugin.api.h
# vi:syntax=automake

View File

@ -0,0 +1,81 @@
/*
* Copyright (c) 2017 Intel and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
vl_api_version 1.0.0
/** \brief Configure Kube-proxy global parameters
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param sticky_buckets_per_core - Number of buckets *per worker thread* in the
established flow table (must be power of 2).
@param flow_timeout - Time in seconds after which, if no packet is received
for a given flow, the flow is removed from the established flow table.
*/
autoreply define kp_conf
{
u32 client_index;
u32 context;
u32 sticky_buckets_per_core;
u32 flow_timeout;
};
/** \brief Add a virtual address (or prefix)
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param ip_prefix - IP address (IPv4 in lower order 32 bits).
@param prefix_length - IP prefix length (96 + 'IPv4 prefix length' for IPv4).
@param is_ipv6 - Is IPv6 addresss.
@param port - service port;
@param target_port - Pod's port corresponding to specific service.
@param node_port - Node's port.
@param is_nat4 - DNAT is NAT44 (NAT64 otherwise).
@param new_flows_table_length - Size of the new connections flow table used
for this VIP (must be power of 2).
@param is_del - The VIP should be removed.
*/
autoreply define kp_add_del_vip {
u32 client_index;
u32 context;
u8 ip_prefix[16];
u8 prefix_length;
u8 is_ipv6;
u16 port;
u16 target_port;
u16 node_port;
u8 is_nat4;
u32 new_flows_table_length;
u8 is_del;
};
/** \brief Add a pod for a given VIP
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@param vip_ip_prefix - VIP IP address (IPv4 in lower order 32 bits).
@param vip_ip_prefix - VIP IP prefix length (96 + 'IPv4 prefix length' for IPv4).
@param vip_is_ipv6 - VIP is IPv6 addresss.
@param pod_address - The pod's IP address (IPv4 in lower order 32 bits).
@param pod_is_ipv6 - Pod is IPv6 addresss.
@param is_del - The Pod should be removed.
*/
autoreply define kp_add_del_pod {
u32 client_index;
u32 context;
u8 vip_ip_prefix[16];
u8 vip_prefix_length;
u8 vip_is_ipv6;
u8 pod_address[16];
u8 pod_is_ipv6;
u8 is_del;
};

974
src/plugins/kubeproxy/kp.c Normal file

File diff suppressed because it is too large Load Diff

473
src/plugins/kubeproxy/kp.h Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,249 @@
/*
* Copyright (c) 2016 Intel and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "POD IS" BPODIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <kubeproxy/kp.h>
#include <vppinfra/byte_order.h>
#include <vlibapi/api.h>
#include <vlibmemory/api.h>
#define vl_msg_id(n,h) n,
typedef enum {
#include <kubeproxy/kp.api.h>
/* We'll want to know how many messages IDs we need... */
VL_MSG_FIRST_AVAILABLE,
} vl_msg_id_t;
#undef vl_msg_id
/* define message structures */
#define vl_typedefs
#include <kubeproxy/kp.api.h>
#undef vl_typedefs
/* define generated endian-swappers */
#define vl_endianfun
#include <kubeproxy/kp.api.h>
#undef vl_endianfun
#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__)
/* Get the API version number */
#define vl_api_version(n,v) static u32 api_version=(v);
#include <kubeproxy/kp.api.h>
#undef vl_api_version
#define vl_msg_name_crc_list
#include <kubeproxy/kp.api.h>
#undef vl_msg_name_crc_list
#define REPLY_MSG_ID_BASE kpm->msg_id_base
#include <vlibapi/api_helper_macros.h>
static void
setup_message_id_table (kp_main_t * kpm, api_main_t * am)
{
#define _(id,n,crc) \
vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id + kpm->msg_id_base);
foreach_vl_msg_name_crc_kp;
#undef _
}
/* Macro to finish up custom dump fns */
#define FINISH \
vec_add1 (s, 0); \
vl_print (handle, (char *)s); \
vec_free (s); \
return handle;
static void
vl_api_kp_conf_t_handler
(vl_api_kp_conf_t * mp)
{
kp_main_t *kpm = &kp_main;
vl_api_kp_conf_reply_t * rmp;
int rv = 0;
rv = kp_conf(mp->sticky_buckets_per_core,
mp->flow_timeout);
REPLY_MACRO (VL_API_KP_CONF_REPLY);
}
static void *vl_api_kp_conf_t_print
(vl_api_kp_conf_t *mp, void * handle)
{
u8 * s;
s = format (0, "SCRIPT: kp_conf ");
s = format (s, "%u ", mp->sticky_buckets_per_core);
s = format (s, "%u ", mp->flow_timeout);
FINISH;
}
static void
vl_api_kp_add_del_vip_t_handler
(vl_api_kp_add_del_vip_t * mp)
{
kp_main_t *kpm = &kp_main;
vl_api_kp_conf_reply_t * rmp;
int rv = 0;
ip46_address_t prefix;
u8 prefix_length = mp->prefix_length;
if (mp->is_ipv6 == 0)
{
prefix_length += 96;
memcpy(&prefix.ip4, mp->ip_prefix, sizeof(prefix.ip4));
prefix.pad[0] = prefix.pad[1] = prefix.pad[2] = 0;
}
else
{
memcpy(&prefix.ip6, mp->ip_prefix, sizeof(prefix.ip6));
}
if (mp->is_del) {
u32 vip_index;
if (!(rv = kp_vip_find_index(&prefix, prefix_length, &vip_index)))
rv = kp_vip_del(vip_index);
} else {
u32 vip_index;
kp_vip_type_t type;
if (mp->is_ipv6 == 0) {
type = mp->is_nat4?KP_VIP_TYPE_IP4_NAT44:KP_VIP_TYPE_IP4_NAT46;
} else {
type = mp->is_nat4?KP_VIP_TYPE_IP6_NAT64:KP_VIP_TYPE_IP6_NAT66;
}
rv = kp_vip_add(&prefix, prefix_length, type,
ntohl(mp->new_flows_table_length), &vip_index,
ntohs(mp->port), ntohs(mp->target_port),
ntohs(mp->node_port));
}
REPLY_MACRO (VL_API_KP_CONF_REPLY);
}
static void *vl_api_kp_add_del_vip_t_print
(vl_api_kp_add_del_vip_t *mp, void * handle)
{
u8 * s;
s = format (0, "SCRIPT: kp_add_del_vip ");
s = format (s, "%U ", format_ip46_prefix,
(ip46_address_t *)mp->ip_prefix, mp->prefix_length, IP46_TYPE_ANY);
s = format (s, "port %u ", mp->port);
s = format (s, "target_port %u ", mp->target_port);
s = format (s, "node_port %u ", mp->node_port);
s = format (s, "%s ", mp->is_nat4?"nat4":"nat6");
s = format (s, "%u ", mp->new_flows_table_length);
s = format (s, "%s ", mp->is_del?"del":"add");
FINISH;
}
static void
vl_api_kp_add_del_pod_t_handler
(vl_api_kp_add_del_pod_t * mp)
{
kp_main_t *kpm = &kp_main;
vl_api_kp_conf_reply_t * rmp;
int rv = 0;
u32 vip_index;
ip46_address_t vip_ip_prefix;
u8 vip_prefix_length = mp->vip_prefix_length;
if (mp->vip_is_ipv6 == 0)
{
vip_prefix_length += 96;
memcpy(&vip_ip_prefix.ip4, mp->vip_ip_prefix,
sizeof(vip_ip_prefix.ip4));
vip_ip_prefix.pad[0] = vip_ip_prefix.pad[1] = vip_ip_prefix.pad[2] = 0;
}
else
{
memcpy(&vip_ip_prefix.ip6, mp->vip_ip_prefix,
sizeof(vip_ip_prefix.ip6));
}
ip46_address_t pod_address;
if (mp->pod_is_ipv6 == 0)
{
memcpy(&pod_address.ip4, mp->pod_address,
sizeof(pod_address.ip4));
pod_address.pad[0] = pod_address.pad[1] = pod_address.pad[2] = 0;
}
else
{
memcpy(&pod_address.ip6, mp->pod_address,
sizeof(pod_address.ip6));
}
if ((rv = kp_vip_find_index(&vip_ip_prefix, vip_prefix_length, &vip_index)))
goto done;
if (mp->is_del)
rv = kp_vip_del_pods(vip_index, &pod_address, 1);
else
rv = kp_vip_add_pods(vip_index, &pod_address, 1);
done:
REPLY_MACRO (VL_API_KP_CONF_REPLY);
}
static void *vl_api_kp_add_del_pod_t_print
(vl_api_kp_add_del_pod_t *mp, void * handle)
{
u8 * s;
s = format (0, "SCRIPT: kp_add_del_pod ");
s = format (s, "%U ", format_ip46_prefix,
(ip46_address_t *)mp->vip_ip_prefix, mp->vip_prefix_length, IP46_TYPE_ANY);
s = format (s, "%U ", format_ip46_address,
(ip46_address_t *)mp->pod_address, IP46_TYPE_ANY);
s = format (s, "%s ", mp->is_del?"del":"add");
FINISH;
}
/* List of message types that this plugin understands */
#define foreach_kp_plugin_api_msg \
_(KP_CONF, kp_conf) \
_(KP_ADD_DEL_VIP, kp_add_del_vip) \
_(KP_ADD_DEL_POD, kp_add_del_pod)
static clib_error_t * kp_api_init (vlib_main_t * vm)
{
kp_main_t *kpm = &kp_main;
u8 *name = format (0, "kp_%08x%c", api_version, 0);
kpm->msg_id_base = vl_msg_api_get_msg_ids
((char *) name, VL_MSG_FIRST_AVAILABLE);
#define _(N,n) \
vl_msg_api_set_handlers((VL_API_##N + kpm->msg_id_base), \
#n, \
vl_api_##n##_t_handler, \
vl_noop_handler, \
vl_api_##n##_t_endian, \
vl_api_##n##_t_print, \
sizeof(vl_api_##n##_t), 1);
foreach_kp_plugin_api_msg;
#undef _
/* Add our API messages to the global name_crc hash table */
setup_message_id_table (kpm, &api_main);
return 0;
}
VLIB_INIT_FUNCTION (kp_api_init);

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,105 @@
# Kube-proxy plugin for VPP
## Overview
This plugin provides kube-proxy data plane on user space,
which is used to replace linux kernal's kube-proxy based on iptables.
The idea is largely inspired from VPP LB plugin.
Currently, kube-proxy plugin supports three service types:
1) Cluster IP plus Port: support any protocols, including TCP, UDP.
2) Node IP plus Node Port: currently only support UDP.
3) External Load Balancer.
For Cluster IP plus Port case:
kube-proxy is configured with a set of Virtual IPs (VIP, which can be
prefixes), and for each VIP, with a set of POD addresses (PODs).
For a specific session received for a given VIP (or VIP prefix),
first packet selects a Pod according to internal load balancing algorithm,
then does DNAT operation and sent to chosen Pod.
At the same time, will create a session entry to store Pod chosen result.
Following packets for that session will look up session table first,
which ensures that a given session will always be routed to the same Pod.
For returned packet from Pod, it will do SNAT operation and sent out.
Please refer to below for details:
https://schd.ws/hosted_files/ossna2017/1e/VPP_K8S_GTPU_OSSNA.pdf
## Configuration
### Global KP parameters
The kube-proxy needs to be configured with some parameters:
ku conf [buckets <n>] [timeout <s>]
buckets: the *per-thread* established-connections-table number of buckets.
timeout: the number of seconds a connection will remain in the
established-connections-table while no packet for this flow
is received.
### Configure VIPs and Ports
ku vip <prefix> port <n> target_port <n> node_port <n> \
[nat4|nat6)] [new_len <n>] [del]
new_len is the size of the new-connection-table. It should be 1 or 2 orders of
magnitude bigger than the number of PODs for the VIP in order to ensure a good
load balancing.
Examples:
ku vip 90.0.0.0/8 nat44 new_len 2048
ku vip 2003::/16 nat66 new_len 2048
### Configure PODs (for each VIP)
ku pod <vip-prefix> [<address> [<address> [...]]] [del]
You can add (or delete) as many PODs at a time (for a single VIP).
Examples:
ku pod 90.0.0.0/8 10.0.0.1
ku pod 2002::/16 2001::2 2001::3 2001::4
### Configure SNAT
ku set interface nat4 in <intfc> [del]
Set SNAT feature in a specific interface.
## Monitoring
The plugin provides quite a bunch of counters and information.
show ku
show ku vip verbose
show node counters
## Design notes
### Multi-Threading
This implementation implement parallelism by using
one established-connections table per thread. This is equivalent to assuming
that RSS will make a job similar to ECMP, and is pretty useful as threads don't
need to get a lock in order to write in the table.
### Hash Table
A kube-proxy requires an efficient read and write Hash table. The Hash table
used by ip6-forward is very read-efficient, but not so much for writing. In
addition, it is not a big deal if writing into the Hash table fails.
The plugin therefore uses a very specific Hash table.
- Fixed (and power of 2) number of buckets (configured at runtime)
- Fixed (and power of 2) elements per buckets (configured at compilation time)

View File

@ -0,0 +1,268 @@
/*
* Copyright (c) 2016 Intel and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "POD IS" BPODIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vat/vat.h>
#include <vlibapi/api.h>
#include <vlibmemory/api.h>
#include <vppinfra/error.h>
#include <kubeproxy/kp.h>
#define __plugin_msg_base kp_test_main.msg_id_base
#include <vlibapi/vat_helper_macros.h>
//TODO: Move that to vat/plugin_api.c
//////////////////////////
uword unformat_ip46_address (unformat_input_t * input, va_list * args)
{
ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
ip46_type_t type = va_arg (*args, ip46_type_t);
if ((type != IP46_TYPE_IP6) &&
unformat(input, "%U", unformat_ip4_address, &ip46->ip4)) {
ip46_address_mask_ip4(ip46);
return 1;
} else if ((type != IP46_TYPE_IP4) &&
unformat(input, "%U", unformat_ip6_address, &ip46->ip6)) {
return 1;
}
return 0;
}
uword unformat_ip46_prefix (unformat_input_t * input, va_list * args)
{
ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
u8 *len = va_arg (*args, u8 *);
ip46_type_t type = va_arg (*args, ip46_type_t);
u32 l;
if ((type != IP46_TYPE_IP6) && unformat(input, "%U/%u", unformat_ip4_address, &ip46->ip4, &l)) {
if (l > 32)
return 0;
*len = l + 96;
ip46->pad[0] = ip46->pad[1] = ip46->pad[2] = 0;
} else if ((type != IP46_TYPE_IP4) && unformat(input, "%U/%u", unformat_ip6_address, &ip46->ip6, &l)) {
if (l > 128)
return 0;
*len = l;
} else {
return 0;
}
return 1;
}
/////////////////////////
#define vl_msg_id(n,h) n,
typedef enum {
#include <kubeproxy/kp.api.h>
/* We'll want to know how many messages IDs we need... */
VL_MSG_FIRST_AVAILABLE,
} vl_msg_id_t;
#undef vl_msg_id
/* define message structures */
#define vl_typedefs
#include <kubeproxy/kp.api.h>
#undef vl_typedefs
/* declare message handlers for each api */
#define vl_endianfun /* define message structures */
#include <kubeproxy/kp.api.h>
#undef vl_endianfun
/* instantiate all the print functions we know about */
#define vl_print(handle, ...)
#define vl_printfun
#include <kubeproxy/kp.api.h>
#undef vl_printfun
/* Get the API version number. */
#define vl_api_version(n,v) static u32 api_version=(v);
#include <kubeproxy/kp.api.h>
#undef vl_api_version
typedef struct {
/* API message ID base */
u16 msg_id_base;
vat_main_t *vat_main;
} kp_test_main_t;
kp_test_main_t kp_test_main;
#define foreach_standard_reply_retval_handler \
_(kp_conf_reply) \
_(kp_add_del_vip_reply) \
_(kp_add_del_pod_reply)
#define _(n) \
static void vl_api_##n##_t_handler \
(vl_api_##n##_t * mp) \
{ \
vat_main_t * vam = kp_test_main.vat_main; \
i32 retval = ntohl(mp->retval); \
if (vam->async_mode) { \
vam->async_errors += (retval < 0); \
} else { \
vam->retval = retval; \
vam->result_ready = 1; \
} \
}
foreach_standard_reply_retval_handler;
#undef _
/*
* Table of message reply handlers, must include boilerplate handlers
* we just generated
*/
#define foreach_vpe_api_reply_msg \
_(KP_CONF_REPLY, kp_conf_reply) \
_(KP_ADD_DEL_VIP_REPLY, kp_add_del_vip_reply) \
_(KP_ADD_DEL_POD_REPLY, kp_add_del_pod_reply)
static int api_kp_conf (vat_main_t * vam)
{
unformat_input_t *i = vam->input;
vl_api_kp_conf_t mps, *mp;
int ret;
if (!unformat(i, "%u %u",
&mps.sticky_buckets_per_core,
&mps.flow_timeout)) {
errmsg ("invalid arguments\n");
return -99;
}
M(KP_CONF, mp);
S(mp);
W (ret);
return ret;
}
static int api_kp_add_del_vip (vat_main_t * vam)
{
unformat_input_t * i = vam->input;
vl_api_kp_add_del_vip_t mps, *mp;
int ret;
mps.is_del = 0;
mps.is_nat4 = 0;
if (!unformat(i, "%U",
unformat_ip46_prefix, mps.ip_prefix, &mps.prefix_length, IP46_TYPE_ANY)) {
errmsg ("invalid prefix\n");
return -99;
}
if (unformat(i, "nat4")) {
mps.is_nat4 = 1;
} else if (unformat(i, "nat6")) {
mps.is_nat4 = 0;
} else {
errmsg ("no nat\n");
return -99;
}
if (!unformat(i, "%d", &mps.new_flows_table_length)) {
errmsg ("no table lentgh\n");
return -99;
}
if (unformat(i, "del")) {
mps.is_del = 1;
}
M(KP_ADD_DEL_VIP, mp);
S(mp);
W (ret);
return ret;
}
static int api_kp_add_del_pod (vat_main_t * vam)
{
unformat_input_t * i = vam->input;
vl_api_kp_add_del_pod_t mps, *mp;
int ret;
mps.is_del = 0;
if (!unformat(i, "%U %U",
unformat_ip46_prefix, mps.vip_ip_prefix, &mps.vip_prefix_length, IP46_TYPE_ANY,
unformat_ip46_address, mps.pod_address)) {
errmsg ("invalid prefix or address\n");
return -99;
}
if (unformat(i, "del")) {
mps.is_del = 1;
}
M(KP_ADD_DEL_POD, mp);
S(mp);
W (ret);
return ret;
}
/*
* List of messages that the api test plugin sends,
* and that the data plane plugin processes
*/
#define foreach_vpe_api_msg \
_(kp_conf, "<sticky_buckets_per_core> <flow_timeout>") \
_(kp_add_del_vip, "<ip-prefix> <port> <target_port> <node_port> " \
"[nat4|nat6] <new_table_len> [del]") \
_(kp_add_del_pod, "<vip-ip-prefix> <address> [del]")
static void
kp_vat_api_hookup (vat_main_t *vam)
{
kp_test_main_t * kptm = &kp_test_main;
/* Hook up handlers for replies from the data plane plug-in */
#define _(N,n) \
vl_msg_api_set_handlers((VL_API_##N + kptm->msg_id_base), \
#n, \
vl_api_##n##_t_handler, \
vl_noop_handler, \
vl_api_##n##_t_endian, \
vl_api_##n##_t_print, \
sizeof(vl_api_##n##_t), 1);
foreach_vpe_api_reply_msg;
#undef _
/* API messages we can send */
#define _(n,h) hash_set_mem (vam->function_by_name, #n, api_##n);
foreach_vpe_api_msg;
#undef _
/* Help strings */
#define _(n,h) hash_set_mem (vam->help_by_name, #n, h);
foreach_vpe_api_msg;
#undef _
}
clib_error_t * vat_plugin_register (vat_main_t *vam)
{
kp_test_main_t * kptm = &kp_test_main;
u8 * name;
kptm->vat_main = vam;
/* Ask the vpp engine for the first assigned message-id */
name = format (0, "kp_%08x%c", api_version, 0);
kptm->msg_id_base = vl_client_get_first_plugin_msg_id ((char *) name);
if (kptm->msg_id_base != (u16) ~0)
kp_vat_api_hookup (vam);
vec_free(name);
return 0;
}

View File

@ -0,0 +1,216 @@
/*
* Copyright (c) 2017 Intel and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* vppinfra already includes tons of different hash tables.
* MagLev flow table is a bit different. It has to be very efficient
* for both writing and reading operations. But it does not need to
* be 100% reliable (write can fail). It also needs to recycle
* old entries in a lazy way.
*
* This hash table is the most dummy hash table you can do.
* Fixed total size, fixed bucket size.
* Advantage is that it could be very efficient (maybe).
*
*/
#ifndef KP_PLUGIN_KP_KPHASH_H_
#define KP_PLUGIN_KP_KPHASH_H_
#include <vnet/vnet.h>
#include <vppinfra/xxhash.h>
#include <vppinfra/crc32.h>
/*
* @brief Number of entries per bucket.
*/
#define KPHASH_ENTRY_PER_BUCKET 4
#define KP_HASH_DO_NOT_USE_SSE_BUCKETS 0
/**
* 32 bits integer comparison for running values.
* 1 > 0 is true. But 1 > 0xffffffff also is.
*/
#define clib_u32_loop_gt(a, b) (((u32)(a)) - ((u32)(b)) < 0x7fffffff)
/*
* @brief One bucket contains 4 entries.
* Each bucket takes one 64B cache line in memory.
*/
typedef struct {
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
u32 hash[KPHASH_ENTRY_PER_BUCKET];
u32 timeout[KPHASH_ENTRY_PER_BUCKET];
u32 vip[KPHASH_ENTRY_PER_BUCKET];
u32 value[KPHASH_ENTRY_PER_BUCKET];
} kp_hash_bucket_t;
typedef struct {
u32 buckets_mask;
u32 timeout;
kp_hash_bucket_t buckets[];
} kp_hash_t;
#define kp_hash_nbuckets(h) (((h)->buckets_mask) + 1)
#define kp_hash_size(h) ((h)->buckets_mask + KPHASH_ENTRY_PER_BUCKET)
#define kp_hash_foreach_bucket(h, bucket) \
for (bucket = (h)->buckets; \
bucket < (h)->buckets + kp_hash_nbuckets(h); \
bucket++)
#define kp_hash_foreach_entry(h, bucket, i) \
kp_hash_foreach_bucket(h, bucket) \
for (i = 0; i < KPHASH_ENTRY_PER_BUCKET; i++)
#define kp_hash_foreach_valid_entry(h, bucket, i, now) \
kp_hash_foreach_entry(h, bucket, i) \
if (!clib_u32_loop_gt((now), bucket->timeout[i]))
static_always_inline
kp_hash_t *kp_hash_alloc(u32 buckets, u32 timeout)
{
if (!is_pow2(buckets))
return NULL;
// Allocate 1 more bucket for prefetch
u32 size = ((u64)&((kp_hash_t *)(0))->buckets[0]) +
sizeof(kp_hash_bucket_t) * (buckets + 1);
u8 *mem = 0;
kp_hash_t *h;
vec_alloc_aligned(mem, size, CLIB_CACHE_LINE_BYTES);
h = (kp_hash_t *)mem;
h->buckets_mask = (buckets - 1);
h->timeout = timeout;
return h;
}
static_always_inline
void kp_hash_free(kp_hash_t *h)
{
u8 *mem = (u8 *)h;
vec_free(mem);
}
static_always_inline
u32 kp_hash_hash(u64 k0, u64 k1, u64 k2, u64 k3, u64 k4)
{
#ifdef clib_crc32c_uses_intrinsics
u64 key[5];
key[0] = k0;
key[1] = k1;
key[2] = k2;
key[3] = k3;
key[4] = k4;
return clib_crc32c ((u8 *) key, 40);
#else
u64 tmp = k0 ^ k1 ^ k2 ^ k3 ^ k4;
return (u32)clib_xxhash (tmp);
#endif
}
static_always_inline
void kp_hash_prefetch_bucket(kp_hash_t *ht, u32 hash)
{
kp_hash_bucket_t *bucket = &ht->buckets[hash & ht->buckets_mask];
CLIB_PREFETCH(bucket, sizeof(*bucket), READ);
}
static_always_inline
void kp_hash_get(kp_hash_t *ht, u32 hash, u32 vip, u32 time_now,
u32 *available_index, u32 *found_value)
{
kp_hash_bucket_t *bucket = &ht->buckets[hash & ht->buckets_mask];
*found_value = ~0;
*available_index = ~0;
#if __SSE4_2__ && KP_HASH_DO_NOT_USE_SSE_BUCKETS == 0
u32 bitmask, found_index;
__m128i mask;
// mask[*] = timeout[*] > now
mask = _mm_cmpgt_epi32(_mm_loadu_si128 ((__m128i *) bucket->timeout),
_mm_set1_epi32 (time_now));
// bitmask[*] = now <= timeout[*/4]
bitmask = (~_mm_movemask_epi8(mask)) & 0xffff;
// Get first index with now <= timeout[*], if any.
*available_index = (bitmask)?__builtin_ctz(bitmask)/4:*available_index;
// mask[*] = (timeout[*] > now) && (hash[*] == hash)
mask = _mm_and_si128(mask,
_mm_cmpeq_epi32(
_mm_loadu_si128 ((__m128i *) bucket->hash),
_mm_set1_epi32 (hash)));
// Load the array of vip values
// mask[*] = (timeout[*] > now) && (hash[*] == hash) && (vip[*] == vip)
mask = _mm_and_si128(mask,
_mm_cmpeq_epi32(
_mm_loadu_si128 ((__m128i *) bucket->vip),
_mm_set1_epi32 (vip)));
// mask[*] = (timeout[*x4] > now) && (hash[*x4] == hash) && (vip[*x4] == vip)
bitmask = _mm_movemask_epi8(mask);
// Get first index, if any
found_index = (bitmask)?__builtin_ctzll(bitmask)/4:0;
ASSERT(found_index < 4);
*found_value = (bitmask)?bucket->value[found_index]:*found_value;
bucket->timeout[found_index] =
(bitmask)?time_now + ht->timeout:bucket->timeout[found_index];
#else
u32 i;
for (i = 0; i < KPHASH_ENTRY_PER_BUCKET; i++) {
u8 cmp = (bucket->hash[i] == hash && bucket->vip[i] == vip);
u8 timeouted = clib_u32_loop_gt(time_now, bucket->timeout[i]);
*found_value = (cmp || timeouted)?*found_value:bucket->value[i];
bucket->timeout[i] = (cmp || timeouted)?time_now + ht->timeout:bucket->timeout[i];
*available_index = (timeouted && (*available_index == ~0))?i:*available_index;
if (!cmp)
return;
}
#endif
}
static_always_inline
u32 kp_hash_available_value(kp_hash_t *h, u32 hash, u32 available_index)
{
return h->buckets[hash & h->buckets_mask].value[available_index];
}
static_always_inline
void kp_hash_put(kp_hash_t *h, u32 hash, u32 value, u32 vip,
u32 available_index, u32 time_now)
{
kp_hash_bucket_t *bucket = &h->buckets[hash & h->buckets_mask];
bucket->hash[available_index] = hash;
bucket->value[available_index] = value;
bucket->timeout[available_index] = time_now + h->timeout;
bucket->vip[available_index] = vip;
}
static_always_inline
u32 kp_hash_elts(kp_hash_t *h, u32 time_now)
{
u32 tot = 0;
kp_hash_bucket_t *bucket;
u32 i;
kp_hash_foreach_valid_entry(h, bucket, i, time_now) {
tot++;
}
return tot;
}
#endif /* KP_PLUGIN_KP_KPHASH_H_ */

203
test/test_kubeproxy.py Normal file
View File

@ -0,0 +1,203 @@
import socket
import unittest
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from framework import VppTestCase, running_extended_tests
from util import ppp
""" TestKP is a subclass of VPPTestCase classes.
TestKP class defines Four NAT test case for:
- IP4 to IP4 NAT
- IP4 to IP6 NAT
- IP6 to IP4 NAT
- IP6 to IP6 NAT
"""
class TestKP(VppTestCase):
""" Kube-proxy Test Case """
@classmethod
def setUpClass(cls):
super(TestKP, cls).setUpClass()
cls.pods = range(5)
cls.packets = range(5)
try:
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
for i in cls.interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.disable_ipv6_ra()
i.resolve_arp()
i.resolve_ndp()
dst4 = socket.inet_pton(socket.AF_INET, "10.0.0.0")
dst6 = socket.inet_pton(socket.AF_INET6, "2002::")
cls.vapi.ip_add_del_route(dst4, 24, cls.pg1.remote_ip4n)
cls.vapi.ip_add_del_route(dst6, 16, cls.pg1.remote_ip6n, is_ipv6=1)
except Exception:
super(TestKP, cls).tearDownClass()
raise
def tearDown(self):
super(TestKP, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show ku vip verbose"))
def getIPv4Flow(self, id):
return (IP(dst="90.0.%u.%u" % (id / 255, id % 255),
src="40.0.%u.%u" % (id / 255, id % 255)) /
UDP(sport=10000 + id, dport=3306))
def getIPv6Flow(self, id):
return (IPv6(dst="2001::%u" % (id), src="fd00:f00d:ffff::%u" % (id)) /
UDP(sport=10000 + id, dport=3306))
def generatePackets(self, src_if, isv4):
self.reset_packet_infos()
pkts = []
for pktid in self.packets:
info = self.create_packet_info(src_if, self.pg1)
payload = self.info_to_payload(info)
ip = self.getIPv4Flow(pktid) if isv4 else self.getIPv6Flow(pktid)
packet = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
ip /
Raw(payload))
self.extend_packet(packet, 128)
info.data = packet.copy()
pkts.append(packet)
return pkts
def checkInner(self, udp):
self.assertEqual(udp.dport, 3307)
def checkCapture(self, nat4, isv4):
self.pg0.assert_nothing_captured()
out = self.pg1.get_capture(len(self.packets))
load = [0] * len(self.pods)
self.info = None
for p in out:
try:
podid = 0
udp = None
if nat4:
ip = p[IP]
podid = int(ip.dst.split(".")[3])
self.assertEqual(ip.version, 4)
self.assertEqual(ip.flags, 0)
self.assertEqual(ip.dst, "10.0.0.%u" % podid)
self.assertEqual(ip.proto, 17)
self.assertEqual(len(ip.options), 0)
self.assertGreaterEqual(ip.ttl, 63)
udp = p[UDP]
else:
ip = p[IPv6]
podid = ip.dst.split(":")
podid = podid[len(podid) - 1]
podid = 0 if podid == "" else int(podid)
self.assertEqual(ip.version, 6)
self.assertEqual(ip.tc, 0)
self.assertEqual(ip.fl, 0)
self.assertEqual(
socket.inet_pton(socket.AF_INET6, ip.dst),
socket.inet_pton(socket.AF_INET6, "2002::%u" % podid)
)
self.assertEqual(ip.nh, 17)
self.assertGreaterEqual(ip.hlim, 63)
udp = UDP(str(p[IPv6].payload))
# self.assertEqual(len(ip.options), 0)
self.checkInner(udp)
load[podid] += 1
except:
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
# This is just to roughly check that the balancing algorithm
# is not completly biased.
for podid in self.pods:
if load[podid] < len(self.packets) / (len(self.pods) * 2):
self.log(
"Pod isn't balanced: load[%d] = %d" % (podid, load[podid]))
raise Exception("Kube-proxy algorithm is biased")
def test_kp_ip4_nat4(self):
""" Kube-proxy NAT44 """
try:
self.vapi.cli("ku vip 90.0.0.0/8 port 3306 target_port 3307 nat4")
for podid in self.pods:
self.vapi.cli("ku pod 90.0.0.0/8 10.0.0.%u" % (podid))
self.pg0.add_stream(self.generatePackets(self.pg0, isv4=True))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.checkCapture(nat4=True, isv4=True)
finally:
for podid in self.pods:
self.vapi.cli("ku pod 90.0.0.0/8 10.0.0.%u del" % (podid))
self.vapi.cli("ku vip 90.0.0.0/8 nat4 del")
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
def test_kp_ip6_nat4(self):
""" Kube-proxy NAT64 """
try:
self.vapi.cli("ku vip 90.0.0.0/8 port 3306 target_port 3307 nat4")
for podid in self.pods:
self.vapi.cli("ku pod 2001::/16 10.0.0.%u" % (podid))
self.pg0.add_stream(self.generatePackets(self.pg0, isv4=False))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.checkCapture(nat4=True, isv4=False)
finally:
for podid in self.pods:
self.vapi.cli("ku pod 2001::/16 10.0.0.%u del" % (podid))
self.vapi.cli("ku vip 2001::/16 nat4 del")
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
def test_kp_ip4_nat6(self):
""" Kube-proxy NAT46 """
try:
self.vapi.cli("ku vip 90.0.0.0/8 port 3306 target_port 3307 nat6")
for podid in self.pods:
self.vapi.cli("ku pod 90.0.0.0/8 2002::%u" % (podid))
self.pg0.add_stream(self.generatePackets(self.pg0, isv4=True))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.checkCapture(nat4=False, isv4=True)
finally:
for podid in self.pods:
self.vapi.cli("ku pod 90.0.0.0/8 2002::%u" % (podid))
self.vapi.cli("ku vip 90.0.0.0/8 nat6 del")
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
def test_kp_ip6_nat6(self):
""" Kube-proxy NAT66 """
try:
self.vapi.cli("ku vip 90.0.0.0/8 port 3306 target_port 3307 nat6")
for podid in self.pods:
self.vapi.cli("ku pod 2001::/16 2002::%u" % (podid))
self.pg0.add_stream(self.generatePackets(self.pg0, isv4=False))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.checkCapture(nat4=False, isv4=False)
finally:
for podid in self.pods:
self.vapi.cli("ku pod 2001::/16 2002::%u del" % (podid))
self.vapi.cli("ku vip 2001::/16 nat6 del")