api: add new stream message convention
Instead of having to wrap dump/detail calls in control ping, send details messages in between a normal reply / request pair. As expressed in the below service statement. Example: service { rpc map_domains_gets returns map_domains_get_reply stream map_domain_details; }; define map_domains_get { u32 client_index; u32 context; u32 cursor; }; define map_domains_get_reply { u32 context; i32 retval; u32 cursor; }; To avoid blocking the main thread for too long, the replies are now sent in client message queue size chunks. The reply message returns VNET_API_ERROR_EAGAIN when there is more to read. The API handler must also include a "cursor" that is used to the next call to the get function. API handler example: REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains, ({ send_domain_details (cursor, rp, mp->context); })); The macro starts from cursor and iterates through the pool until vl_api_process_may_suspend() returns true or the iteration reaches the end of the list. Client Example: cursor = 0 d = [] while True: rv, details = map_domains_get(cursor=cursor) d += details if rv.retval == 0 or rv.retval != -165: break cursor = rv.cursor or the convenience iterator: for x in vpp.details_iter(vpp.api.map_domains_get): pass or list(details_iter(map_domains_get)) Change-Id: Iad9f6b41b0ef886adb584c97708dd91cf552749e Type: feature Signed-off-by: Ole Troan <ot@cisco.com>
This commit is contained in:
@ -13,7 +13,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
option version = "4.1.1";
|
||||
option version = "4.2.1";
|
||||
|
||||
import "vnet/ip/ip_types.api";
|
||||
import "vnet/interface_types.api";
|
||||
@ -91,8 +91,28 @@ autoreply define map_add_del_rule
|
||||
/** \brief Get list of map domains
|
||||
@param client_index - opaque cookie to identify the sender
|
||||
*/
|
||||
service {
|
||||
rpc map_domains_get returns map_domains_get_reply
|
||||
stream map_domain_details;
|
||||
};
|
||||
|
||||
define map_domains_get
|
||||
{
|
||||
u32 client_index;
|
||||
u32 context;
|
||||
u32 cursor;
|
||||
};
|
||||
|
||||
define map_domains_get_reply
|
||||
{
|
||||
u32 context;
|
||||
i32 retval;
|
||||
u32 cursor;
|
||||
};
|
||||
|
||||
define map_domain_dump
|
||||
{
|
||||
option deprecated="v20.12";
|
||||
u32 client_index;
|
||||
u32 context;
|
||||
};
|
||||
|
@ -85,15 +85,49 @@ vl_api_map_add_del_rule_t_handler (vl_api_map_add_del_rule_t * mp)
|
||||
REPLY_MACRO (VL_API_MAP_ADD_DEL_RULE_REPLY);
|
||||
}
|
||||
|
||||
static void
|
||||
send_domain_details (u32 map_domain_index, vl_api_registration_t * rp,
|
||||
u32 context)
|
||||
{
|
||||
map_main_t *mm = &map_main;
|
||||
vl_api_map_domain_details_t *rmp;
|
||||
map_domain_t *d = pool_elt_at_index (mm->domains, map_domain_index);
|
||||
|
||||
/* Make sure every field is initiated (or don't skip the clib_memset()) */
|
||||
map_domain_extra_t *de =
|
||||
vec_elt_at_index (mm->domain_extras, map_domain_index);
|
||||
int tag_len = clib_min (ARRAY_LEN (rmp->tag), vec_len (de->tag) + 1);
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
REPLY_MACRO_DETAILS4(VL_API_MAP_DOMAIN_DETAILS, rp, context,
|
||||
({
|
||||
rmp->domain_index = htonl (map_domain_index);
|
||||
clib_memcpy (&rmp->ip6_prefix.address, &d->ip6_prefix,
|
||||
sizeof (rmp->ip6_prefix.address));
|
||||
clib_memcpy (&rmp->ip4_prefix.address, &d->ip4_prefix,
|
||||
sizeof (rmp->ip4_prefix.address));
|
||||
clib_memcpy (&rmp->ip6_src.address, &d->ip6_src,
|
||||
sizeof (rmp->ip6_src.address));
|
||||
rmp->ip6_prefix.len = d->ip6_prefix_len;
|
||||
rmp->ip4_prefix.len = d->ip4_prefix_len;
|
||||
rmp->ip6_src.len = d->ip6_src_len;
|
||||
rmp->ea_bits_len = d->ea_bits_len;
|
||||
rmp->psid_offset = d->psid_offset;
|
||||
rmp->psid_length = d->psid_length;
|
||||
rmp->flags = d->flags;
|
||||
rmp->mtu = htons (d->mtu);
|
||||
memcpy (rmp->tag, de->tag, tag_len - 1);
|
||||
rmp->tag[tag_len - 1] = '\0';
|
||||
}));
|
||||
/* *INDENT-ON* */
|
||||
}
|
||||
|
||||
static void
|
||||
vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp)
|
||||
{
|
||||
vl_api_map_domain_details_t *rmp;
|
||||
map_main_t *mm = &map_main;
|
||||
map_domain_t *d;
|
||||
map_domain_extra_t *de;
|
||||
int i;
|
||||
vl_api_registration_t *reg;
|
||||
u32 map_domain_index;
|
||||
|
||||
if (pool_elts (mm->domains) == 0)
|
||||
return;
|
||||
@ -103,33 +137,28 @@ vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp)
|
||||
return;
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
pool_foreach(d, mm->domains,
|
||||
pool_foreach_index(i, mm->domains,
|
||||
({
|
||||
map_domain_index = d - mm->domains;
|
||||
de = vec_elt_at_index(mm->domain_extras, map_domain_index);
|
||||
int tag_len = clib_min(ARRAY_LEN(rmp->tag), vec_len(de->tag) + 1);
|
||||
send_domain_details(i, reg, mp->context);
|
||||
}));
|
||||
/* *INDENT-ON* */
|
||||
}
|
||||
|
||||
/* Make sure every field is initiated (or don't skip the clib_memset()) */
|
||||
rmp = vl_msg_api_alloc (sizeof (*rmp) + tag_len);
|
||||
static void
|
||||
vl_api_map_domains_get_t_handler (vl_api_map_domains_get_t * mp)
|
||||
{
|
||||
map_main_t *mm = &map_main;
|
||||
vl_api_map_domains_get_reply_t *rmp;
|
||||
|
||||
rmp->_vl_msg_id = htons(VL_API_MAP_DOMAIN_DETAILS + mm->msg_id_base);
|
||||
rmp->context = mp->context;
|
||||
rmp->domain_index = htonl(map_domain_index);
|
||||
clib_memcpy(&rmp->ip6_prefix.address, &d->ip6_prefix, sizeof(rmp->ip6_prefix.address));
|
||||
clib_memcpy(&rmp->ip4_prefix.address, &d->ip4_prefix, sizeof(rmp->ip4_prefix.address));
|
||||
clib_memcpy(&rmp->ip6_src.address, &d->ip6_src, sizeof(rmp->ip6_src.address));
|
||||
rmp->ip6_prefix.len = d->ip6_prefix_len;
|
||||
rmp->ip4_prefix.len = d->ip4_prefix_len;
|
||||
rmp->ip6_src.len = d->ip6_src_len;
|
||||
rmp->ea_bits_len = d->ea_bits_len;
|
||||
rmp->psid_offset = d->psid_offset;
|
||||
rmp->psid_length = d->psid_length;
|
||||
rmp->flags = d->flags;
|
||||
rmp->mtu = htons(d->mtu);
|
||||
memcpy(rmp->tag, de->tag, tag_len-1);
|
||||
rmp->tag[tag_len-1] = '\0';
|
||||
i32 rv = 0;
|
||||
|
||||
vl_api_send_msg (reg, (u8 *) rmp);
|
||||
if (pool_elts (mm->domains) == 0)
|
||||
return;
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains,
|
||||
({
|
||||
send_domain_details (cursor, rp, mp->context);
|
||||
}));
|
||||
/* *INDENT-ON* */
|
||||
}
|
||||
|
@ -100,6 +100,48 @@ class TestMAP(VppTestCase):
|
||||
self.assertEqual(rv[0].tag, tag,
|
||||
"output produced incorrect tag value.")
|
||||
|
||||
def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
|
||||
ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
|
||||
ip6_dst = ipaddress.ip_network(ip6_pfx_str)
|
||||
mod = ip4_pfx.num_addresses / 1024
|
||||
indicies = []
|
||||
for i in range(ip4_pfx.num_addresses):
|
||||
rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str,
|
||||
ip4_prefix=str(ip4_pfx[i]) + "/32",
|
||||
ip6_src=ip6_src_str)
|
||||
indicies.append(rv.index)
|
||||
return indicies
|
||||
|
||||
def test_api_map_domains_get(self):
|
||||
# Create a bunch of domains
|
||||
domains = self.create_domains('130.67.0.0/24', '2001::/32',
|
||||
'2001::1/128')
|
||||
self.assertEqual(len(domains), 256)
|
||||
|
||||
d = []
|
||||
cursor = 0
|
||||
|
||||
# Invalid cursor
|
||||
rv, details = self.vapi.map_domains_get(cursor=1234)
|
||||
self.assertEqual(rv.retval, -7)
|
||||
|
||||
# Delete a domain in the middle of walk
|
||||
rv, details = self.vapi.map_domains_get(cursor=0)
|
||||
self.assertEqual(rv.retval, -165)
|
||||
self.vapi.map_del_domain(index=rv.cursor)
|
||||
domains.remove(rv.cursor)
|
||||
|
||||
# Continue at point of deleted cursor
|
||||
rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
|
||||
self.assertEqual(rv.retval, -165)
|
||||
|
||||
d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
|
||||
self.assertEqual(len(d), 255)
|
||||
|
||||
# Clean up
|
||||
for i in domains:
|
||||
self.vapi.map_del_domain(index=i)
|
||||
|
||||
def test_map_e_udp(self):
|
||||
""" MAP-E UDP"""
|
||||
|
||||
@ -916,5 +958,6 @@ class TestMAP(VppTestCase):
|
||||
ip6_nh_address="4001::1",
|
||||
is_add=0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(testRunner=VppTestRunner)
|
||||
|
@ -176,10 +176,11 @@ def vla_is_last_check(name, block):
|
||||
|
||||
|
||||
class Service():
|
||||
def __init__(self, caller, reply, events=None, stream=False):
|
||||
def __init__(self, caller, reply, events=None, stream_message=None, stream=False):
|
||||
self.caller = caller
|
||||
self.reply = reply
|
||||
self.stream = stream
|
||||
self.stream_message = stream_message
|
||||
self.events = [] if events is None else events
|
||||
|
||||
|
||||
@ -511,6 +512,10 @@ class VPPAPIParser(object):
|
||||
else:
|
||||
p[0] = Service(p[2], p[4])
|
||||
|
||||
def p_service_statement2(self, p):
|
||||
'''service_statement : RPC ID RETURNS ID STREAM ID ';' '''
|
||||
p[0] = Service(p[2], p[4], stream_message=p[6], stream=True)
|
||||
|
||||
def p_event_list(self, p):
|
||||
'''event_list : events
|
||||
| event_list events '''
|
||||
|
@ -26,6 +26,8 @@ def walk_services(s):
|
||||
d = {'reply': e.reply}
|
||||
if e.stream:
|
||||
d['stream'] = True
|
||||
if e.stream_message:
|
||||
d['stream_msg'] = e.stream_message
|
||||
if e.events:
|
||||
d['events'] = e.events
|
||||
r[e.caller] = d
|
||||
|
@ -90,6 +90,15 @@ do { \
|
||||
vl_api_send_msg (rp, (u8 *)rmp); \
|
||||
} while(0);
|
||||
|
||||
#define REPLY_MACRO_DETAILS4(t, rp, context, body) \
|
||||
do { \
|
||||
rmp = vl_msg_api_alloc (sizeof (*rmp)); \
|
||||
rmp->_vl_msg_id = htons((t)+(REPLY_MSG_ID_BASE)); \
|
||||
rmp->context = context; \
|
||||
do {body;} while (0); \
|
||||
vl_api_send_msg (rp, (u8 *)rmp); \
|
||||
} while(0);
|
||||
|
||||
#define REPLY_MACRO3(t, n, body) \
|
||||
do { \
|
||||
vl_api_registration_t *rp; \
|
||||
@ -153,6 +162,34 @@ do { \
|
||||
vl_api_send_msg (rp, (u8 *)rmp); \
|
||||
} while(0);
|
||||
|
||||
#define REPLY_AND_DETAILS_MACRO(t, p, body) \
|
||||
do { \
|
||||
vl_api_registration_t *rp; \
|
||||
rp = vl_api_client_index_to_registration (mp->client_index); \
|
||||
if (rp == 0) \
|
||||
return; \
|
||||
u32 cursor = clib_net_to_host_u32 (mp->cursor); \
|
||||
vlib_main_t *vm = vlib_get_main (); \
|
||||
f64 start = vlib_time_now (vm); \
|
||||
if (pool_is_free_index (p, cursor)) { \
|
||||
cursor = pool_next_index (p, cursor); \
|
||||
if (cursor == ~0) \
|
||||
rv = VNET_API_ERROR_INVALID_VALUE; \
|
||||
} \
|
||||
while (cursor != ~0) { \
|
||||
do {body;} while (0); \
|
||||
cursor = pool_next_index (p, cursor); \
|
||||
if (vl_api_process_may_suspend (vm, rp, start)) { \
|
||||
if (cursor != ~0) \
|
||||
rv = VNET_API_ERROR_EAGAIN; \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
REPLY_MACRO2 (t, ({ \
|
||||
rmp->cursor = clib_host_to_net_u32 (cursor); \
|
||||
})); \
|
||||
} while(0);
|
||||
|
||||
/* "trust, but verify" */
|
||||
|
||||
static inline uword
|
||||
|
@ -53,6 +53,28 @@ vl_api_can_send_msg (vl_api_registration_t * rp)
|
||||
return vl_mem_api_can_send (rp->vl_input_queue);
|
||||
}
|
||||
|
||||
/*
|
||||
* Suggests to an API handler to relinguish control. Currently limits
|
||||
* an handler to a maximum of 1ms or it earlier if the client queue is
|
||||
* full.
|
||||
*
|
||||
* May be enhanced in the future based on other performance
|
||||
* characteristics of the main thread.
|
||||
*/
|
||||
#define VL_API_MAX_TIME_IN_HANDLER 0.001 /* 1 ms */
|
||||
always_inline int
|
||||
vl_api_process_may_suspend (vlib_main_t * vm, vl_api_registration_t * rp,
|
||||
f64 start)
|
||||
{
|
||||
/* Is client queue full (leave space for reply message) */
|
||||
if (rp->registration_type <= REGISTRATION_TYPE_SHMEM &&
|
||||
rp->vl_input_queue->cursize + 1 >= rp->vl_input_queue->maxsize)
|
||||
return true;
|
||||
if (vlib_time_now (vm) > start + VL_API_MAX_TIME_IN_HANDLER)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
always_inline vl_api_registration_t *
|
||||
vl_api_client_index_to_registration (u32 index)
|
||||
{
|
||||
|
@ -156,6 +156,7 @@ _(MISSING_CERT_KEY, -161, "Missing certifcate or key") \
|
||||
_(LIMIT_EXCEEDED, -162, "limit exceeded") \
|
||||
_(IKE_NO_PORT, -163, "port not managed by IKE") \
|
||||
_(UDP_PORT_TAKEN, -164, "UDP port already taken") \
|
||||
_(EAGAIN, -165, "Retry stream call with cursor") \
|
||||
|
||||
typedef enum
|
||||
{
|
||||
|
@ -472,12 +472,7 @@ class VPPApiClient(object):
|
||||
|
||||
# Create function for client side messages.
|
||||
if name in self.services:
|
||||
if 'stream' in self.services[name] and \
|
||||
self.services[name]['stream']:
|
||||
multipart = True
|
||||
else:
|
||||
multipart = False
|
||||
f = self.make_function(msg, i, multipart, do_async)
|
||||
f = self.make_function(msg, i, self.services[name], do_async)
|
||||
setattr(self._api, name, FuncWrapper(f))
|
||||
else:
|
||||
self.logger.debug(
|
||||
@ -644,7 +639,7 @@ class VPPApiClient(object):
|
||||
n[1]['avg'], n[1]['max'])
|
||||
return s
|
||||
|
||||
def _call_vpp(self, i, msgdef, multipart, **kwargs):
|
||||
def _call_vpp(self, i, msgdef, service, **kwargs):
|
||||
"""Given a message, send the message and await a reply.
|
||||
|
||||
msgdef - the message packing definition
|
||||
@ -686,10 +681,21 @@ class VPPApiClient(object):
|
||||
|
||||
self.transport.write(b)
|
||||
|
||||
if multipart:
|
||||
# Send a ping after the request - we use its response
|
||||
# to detect that we have seen all results.
|
||||
self._control_ping(context)
|
||||
msgreply = service['reply']
|
||||
stream = True if 'stream' in service else False
|
||||
if stream:
|
||||
if 'stream_msg' in service:
|
||||
# New service['reply'] = _reply and service['stream_message'] = _details
|
||||
stream_message = service['stream_msg']
|
||||
modern =True
|
||||
else:
|
||||
# Old service['reply'] = _details
|
||||
stream_message = msgreply
|
||||
msgreply = 'control_ping_reply'
|
||||
modern = False
|
||||
# Send a ping after the request - we use its response
|
||||
# to detect that we have seen all results.
|
||||
self._control_ping(context)
|
||||
|
||||
# Block until we get a reply.
|
||||
rl = []
|
||||
@ -702,11 +708,14 @@ class VPPApiClient(object):
|
||||
# Message being queued
|
||||
self.message_queue.put_nowait(r)
|
||||
continue
|
||||
|
||||
if not multipart:
|
||||
if msgname != msgreply and (stream and (msgname != stream_message)):
|
||||
print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
|
||||
if not stream:
|
||||
rl = r
|
||||
break
|
||||
if msgname == 'control_ping_reply':
|
||||
if msgname == msgreply:
|
||||
if modern: # Return both reply and list
|
||||
rl = r, rl
|
||||
break
|
||||
|
||||
rl.append(r)
|
||||
@ -847,6 +856,19 @@ class VPPApiClient(object):
|
||||
self.logger, self.read_timeout, self.use_socket,
|
||||
self.server_address)
|
||||
|
||||
def details_iter(self, f, **kwargs):
|
||||
cursor = 0
|
||||
while True:
|
||||
kwargs['cursor'] = cursor
|
||||
rv, details = f(**kwargs)
|
||||
#
|
||||
# Convert to yield from details when we only support python 3
|
||||
#
|
||||
for d in details:
|
||||
yield d
|
||||
if rv.retval == 0 or rv.retval != -165:
|
||||
break
|
||||
cursor = rv.cursor
|
||||
|
||||
# Provide the old name for backward compatibility.
|
||||
VPP = VPPApiClient
|
||||
|
Reference in New Issue
Block a user