vcl: register workers when reattaching to vpp
Type: improvement Signed-off-by: Maros Ondrejicka <maros.ondrejicka@pantheon.tech> Change-Id: I82a286e2872338974c1930138c30db78103ae499
This commit is contained in:

committed by
Florin Coras

parent
d810a6e218
commit
0db15758ed
@ -10,6 +10,7 @@ import (
|
||||
"git.fd.io/govpp.git/api"
|
||||
"github.com/edwarnicke/exechelper"
|
||||
"github.com/edwarnicke/govpp/binapi/af_packet"
|
||||
"github.com/edwarnicke/govpp/binapi/ethernet_types"
|
||||
interfaces "github.com/edwarnicke/govpp/binapi/interface"
|
||||
"github.com/edwarnicke/govpp/binapi/interface_types"
|
||||
ip_types "github.com/edwarnicke/govpp/binapi/ip_types"
|
||||
@ -28,6 +29,8 @@ func RegisterActions() {
|
||||
reg("vpp-envoy", ConfigureEnvoyProxy)
|
||||
reg("http-tps", ConfigureHttpTps)
|
||||
reg("2veths", Configure2Veths)
|
||||
reg("vcl-test-server", RunVclEchoServer)
|
||||
reg("vcl-test-client", RunVclEchoClient)
|
||||
}
|
||||
|
||||
func configureProxyTcp(ifName0, ipAddr0, ifName1, ipAddr1 string) ConfFn {
|
||||
@ -145,11 +148,59 @@ func RunEchoClnInternal() *ActionResult {
|
||||
cmd := fmt.Sprintf("test echo client %s uri tcp://10.10.10.1/1234", getArgs())
|
||||
return ApiCliInband("/tmp/2veths", cmd)
|
||||
}
|
||||
func configure2vethsTopo(ifName, interfaceAddress, namespaceId string, secret uint64) ConfFn {
|
||||
|
||||
func RunVclEchoServer(args []string) *ActionResult {
|
||||
f, err := os.Create("vcl_1.conf")
|
||||
if err != nil {
|
||||
return NewActionResult(err, ActionResultWithStderr(("create vcl config: ")))
|
||||
}
|
||||
fmt.Fprintf(f, vclTemplate, "/tmp/echo-srv/var/run/app_ns_sockets/1", "1")
|
||||
f.Close()
|
||||
|
||||
os.Setenv("VCL_CONFIG", "/vcl_1.conf")
|
||||
cmd := fmt.Sprintf("vcl_test_server -p %s 12346", args[2])
|
||||
errCh := exechelper.Start(cmd)
|
||||
select {
|
||||
case err := <-errCh:
|
||||
writeSyncFile(NewActionResult(err, ActionResultWithDesc("vcl_test_server: ")))
|
||||
default:
|
||||
}
|
||||
writeSyncFile(OkResult())
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunVclEchoClient(args []string) *ActionResult {
|
||||
outBuff := bytes.NewBuffer([]byte{})
|
||||
errBuff := bytes.NewBuffer([]byte{})
|
||||
|
||||
f, err := os.Create("vcl_2.conf")
|
||||
if err != nil {
|
||||
return NewActionResult(err, ActionResultWithStderr(("create vcl config: ")))
|
||||
}
|
||||
fmt.Fprintf(f, vclTemplate, "/tmp/echo-cln/var/run/app_ns_sockets/2", "2")
|
||||
f.Close()
|
||||
|
||||
os.Setenv("VCL_CONFIG", "/vcl_2.conf")
|
||||
cmd := fmt.Sprintf("vcl_test_client -U -p %s 10.10.10.1 12346", args[2])
|
||||
err = exechelper.Run(cmd,
|
||||
exechelper.WithStdout(outBuff), exechelper.WithStderr(errBuff),
|
||||
exechelper.WithStdout(os.Stdout), exechelper.WithStderr(os.Stderr))
|
||||
|
||||
return NewActionResult(err, ActionResultWithStdout(string(outBuff.String())),
|
||||
ActionResultWithStderr(string(errBuff.String())))
|
||||
}
|
||||
|
||||
func configure2vethsTopo(ifName, interfaceAddress, namespaceId string, secret uint64, optionalHardwareAddress ...string) ConfFn {
|
||||
return func(ctx context.Context,
|
||||
vppConn api.Connection) error {
|
||||
|
||||
swIfIndex, err := configureAfPacket(ctx, vppConn, ifName, interfaceAddress)
|
||||
var swIfIndex interface_types.InterfaceIndex
|
||||
var err error
|
||||
if optionalHardwareAddress == nil {
|
||||
swIfIndex, err = configureAfPacket(ctx, vppConn, ifName, interfaceAddress)
|
||||
} else {
|
||||
swIfIndex, err = configureAfPacket(ctx, vppConn, ifName, interfaceAddress, optionalHardwareAddress[0])
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Printf("failed to create af packet: %v", err)
|
||||
}
|
||||
@ -191,6 +242,8 @@ func Configure2Veths(args []string) *ActionResult {
|
||||
var fn func(context.Context, api.Connection) error
|
||||
if args[2] == "srv" {
|
||||
fn = configure2vethsTopo("vppsrv", "10.10.10.1/24", "1", 1)
|
||||
} else if args[2] == "srv-with-preset-hw-addr" {
|
||||
fn = configure2vethsTopo("vppsrv", "10.10.10.1/24", "1", 1, "00:00:5e:00:53:01")
|
||||
} else {
|
||||
fn = configure2vethsTopo("vppcln", "10.10.10.2/24", "2", 2)
|
||||
}
|
||||
@ -204,14 +257,23 @@ func Configure2Veths(args []string) *ActionResult {
|
||||
}
|
||||
|
||||
func configureAfPacket(ctx context.Context, vppCon api.Connection,
|
||||
name, interfaceAddress string) (interface_types.InterfaceIndex, error) {
|
||||
name, interfaceAddress string, optionalHardwareAddress ...string) (interface_types.InterfaceIndex, error) {
|
||||
var err error
|
||||
ifaceClient := interfaces.NewServiceClient(vppCon)
|
||||
afPacketCreate := &af_packet.AfPacketCreateV2{
|
||||
afPacketCreate := af_packet.AfPacketCreateV2{
|
||||
UseRandomHwAddr: true,
|
||||
HostIfName: name,
|
||||
NumRxQueues: 1,
|
||||
}
|
||||
afPacketCreateRsp, err := af_packet.NewServiceClient(vppCon).AfPacketCreateV2(ctx, afPacketCreate)
|
||||
if len(optionalHardwareAddress) > 0 {
|
||||
afPacketCreate.HwAddr, err = ethernet_types.ParseMacAddress(optionalHardwareAddress[0])
|
||||
if err != nil {
|
||||
fmt.Printf("failed to parse mac address: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
afPacketCreate.UseRandomHwAddr = false
|
||||
}
|
||||
afPacketCreateRsp, err := af_packet.NewServiceClient(vppCon).AfPacketCreateV2(ctx, &afPacketCreate)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to create af packet: %v", err)
|
||||
return 0, err
|
||||
|
@ -77,7 +77,7 @@ func writeSyncFile(res *ActionResult) error {
|
||||
defer f.Close()
|
||||
f.Write([]byte(str))
|
||||
} else {
|
||||
return fmt.Errorf("sync file exists, delete the file frst")
|
||||
return fmt.Errorf("sync file exists, delete the file first")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -108,7 +108,7 @@ func ActionResultWithStderr(s string) ActionResultOptionFn {
|
||||
|
||||
func ActionResultWithStdout(s string) ActionResultOptionFn {
|
||||
return func(res *ActionResult) {
|
||||
res.ErrOutput = s
|
||||
res.StdOutput = s
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,8 +10,8 @@ type NetType string
|
||||
|
||||
const (
|
||||
NetNs NetType = "netns"
|
||||
Veth = "veth"
|
||||
Tap = "tap"
|
||||
Veth string = "veth"
|
||||
Tap string = "tap"
|
||||
)
|
||||
|
||||
type NetConfig struct {
|
||||
|
@ -50,6 +50,16 @@ plugins {
|
||||
|
||||
`
|
||||
|
||||
const vclTemplate = `vcl {
|
||||
app-socket-api %[1]s
|
||||
app-scope-global
|
||||
app-scope-local
|
||||
namespace-id %[2]s
|
||||
namespace-secret %[2]s
|
||||
use-mq-eventfd
|
||||
}
|
||||
`
|
||||
|
||||
const TopologyDir string = "topo/"
|
||||
|
||||
type Stanza struct {
|
||||
@ -142,7 +152,7 @@ func hstExec(args string, instance string) (string, error) {
|
||||
func waitForSyncFile(fname string) (*JsonResult, error) {
|
||||
var res JsonResult
|
||||
|
||||
for i := 0; i < 60; i++ {
|
||||
for i := 0; i < 360; i++ {
|
||||
f, err := os.Open(fname)
|
||||
if err == nil {
|
||||
defer f.Close()
|
||||
|
@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/edwarnicke/exechelper"
|
||||
)
|
||||
@ -84,3 +85,104 @@ func (s *Veths2Suite) testVclEcho(proto string) {
|
||||
}
|
||||
fmt.Println(o)
|
||||
}
|
||||
|
||||
func (s *Veths2Suite) TestVclRetryAttach() {
|
||||
s.testRetryAttach("tcp")
|
||||
}
|
||||
|
||||
func (s *Veths2Suite) testRetryAttach(proto string) {
|
||||
t := s.T()
|
||||
|
||||
exechelper.Run("docker volume create --name=echo-srv-vol")
|
||||
exechelper.Run("docker volume create --name=echo-cln-vol")
|
||||
|
||||
srvInstance := "vpp-vcl-test-srv"
|
||||
clnInstance := "vpp-vcl-test-cln"
|
||||
echoSrv := "echo-srv"
|
||||
echoCln := "echo-cln"
|
||||
|
||||
err := dockerRun(srvInstance, "-v echo-srv-vol:/tmp/2veths")
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
defer func() { exechelper.Run("docker stop " + srvInstance) }()
|
||||
|
||||
err = dockerRun(clnInstance, "-v echo-cln-vol:/tmp/2veths")
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
defer func() { exechelper.Run("docker stop " + clnInstance) }()
|
||||
|
||||
err = dockerRun(echoSrv, fmt.Sprintf("-v echo-srv-vol:/tmp/%s", echoSrv))
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
defer func() { exechelper.Run("docker stop " + echoSrv) }()
|
||||
|
||||
err = dockerRun(echoCln, fmt.Sprintf("-v echo-cln-vol:/tmp/%s", echoCln))
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
defer func() { exechelper.Run("docker stop " + echoCln) }()
|
||||
|
||||
_, err = hstExec("2veths srv-with-preset-hw-addr", srvInstance)
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = hstExec("2veths cln", clnInstance)
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = hstExec("vcl-test-server "+proto, echoSrv)
|
||||
if err != nil {
|
||||
t.Errorf("vcl test server: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("This whole test case can take around 3 minutes to run. Please be patient.")
|
||||
fmt.Println("... Running first echo client test, before disconnect.")
|
||||
_, err = hstExec("vcl-test-client "+proto, echoCln)
|
||||
if err != nil {
|
||||
t.Errorf("vcl test client: %v", err)
|
||||
return
|
||||
}
|
||||
fmt.Println("... First test ended. Stopping VPP server now.")
|
||||
|
||||
// Stop server-vpp-instance, start it again and then run vcl-test-client once more
|
||||
stopVppCommand := "/bin/bash -c 'ps -C vpp_main -o pid= | xargs kill -9'"
|
||||
_, err = dockerExec(stopVppCommand, srvInstance)
|
||||
if err != nil {
|
||||
t.Errorf("error while stopping vpp: %v", err)
|
||||
return
|
||||
}
|
||||
time.Sleep(5 * time.Second) // Give parent process time to reap the killed child process
|
||||
stopVppCommand = "/bin/bash -c 'ps -C hs-test -o pid= | xargs kill -9'"
|
||||
_, err = dockerExec(stopVppCommand, srvInstance)
|
||||
if err != nil {
|
||||
t.Errorf("error while stopping hs-test: %v", err)
|
||||
return
|
||||
}
|
||||
_, err = hstExec("2veths srv-with-preset-hw-addr", srvInstance)
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("... VPP server is starting again, so waiting for a bit.")
|
||||
time.Sleep(30 * time.Second) // Wait a moment for the re-attachment to happen
|
||||
|
||||
fmt.Println("... Running second echo client test, after disconnect and re-attachment.")
|
||||
_, err = hstExec("vcl-test-client "+proto, echoCln)
|
||||
if err != nil {
|
||||
t.Errorf("vcl test client: %v", err)
|
||||
}
|
||||
fmt.Println("Done.")
|
||||
}
|
||||
|
@ -341,6 +341,10 @@ typedef struct vppcom_main_t_
|
||||
/** Lock to protect worker registrations */
|
||||
clib_spinlock_t workers_lock;
|
||||
|
||||
/** Counter to determine order of execution of `vcl_api_retry_attach`
|
||||
* function by multiple workers */
|
||||
int reattach_count;
|
||||
|
||||
/** Lock to protect segment hash table */
|
||||
clib_rwlock_t segment_table_lock;
|
||||
|
||||
|
@ -59,6 +59,8 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
|
||||
}
|
||||
|
||||
wrk->api_client_handle = mp->api_client_handle;
|
||||
/* reattaching via `vcl_api_retry_attach` wants wrk->vpp_wrk_index to be 0 */
|
||||
wrk->vpp_wrk_index = 0;
|
||||
segment_handle = mp->segment_handle;
|
||||
if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
|
||||
{
|
||||
|
@ -1268,13 +1268,56 @@ vcl_api_attach (void)
|
||||
return vcl_bapi_attach ();
|
||||
}
|
||||
|
||||
int
|
||||
vcl_is_first_reattach_to_execute ()
|
||||
{
|
||||
if (vcm->reattach_count == 0)
|
||||
return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
vcl_set_reattach_counter ()
|
||||
{
|
||||
++vcm->reattach_count;
|
||||
|
||||
if (vcm->reattach_count == vec_len (vcm->workers))
|
||||
vcm->reattach_count = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reattach vcl to vpp after it has previously been disconnected.
|
||||
*
|
||||
* The logic should be:
|
||||
* - first worker to hit `vcl_api_retry_attach` should attach to vpp,
|
||||
* to reproduce the `vcl_api_attach` in `vppcom_app_create`.
|
||||
* - the rest of the workers should `reproduce vcl_worker_register_with_vpp`
|
||||
* from `vppcom_worker_register` since they were already allocated.
|
||||
*/
|
||||
|
||||
static void
|
||||
vcl_api_retry_attach (vcl_worker_t *wrk)
|
||||
{
|
||||
vcl_session_t *s;
|
||||
|
||||
if (vcl_api_attach ())
|
||||
return;
|
||||
clib_spinlock_lock (&vcm->workers_lock);
|
||||
if (vcl_is_first_reattach_to_execute ())
|
||||
{
|
||||
if (vcl_api_attach ())
|
||||
{
|
||||
clib_spinlock_unlock (&vcm->workers_lock);
|
||||
return;
|
||||
}
|
||||
vcl_set_reattach_counter ();
|
||||
clib_spinlock_unlock (&vcm->workers_lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
vcl_set_reattach_counter ();
|
||||
clib_spinlock_unlock (&vcm->workers_lock);
|
||||
vcl_worker_register_with_vpp ();
|
||||
}
|
||||
|
||||
/* Treat listeners as configuration that needs to be re-added to vpp */
|
||||
pool_foreach (s, wrk->sessions)
|
||||
|
Reference in New Issue
Block a user