flowprobe: fix flush callbacks when multiple workers
IPFIX buffers are stored on a per worker thread basis. Currently, the flush callbacks will flush only buffers stored for the main thread. And buffers for worker threads will not be sent until their size reach the path MTU configured for the exporter. So if traffic is constant, the problem will unlikely to be visible. Buffers will be sent once they reach the maximum size. However, if traffic stops at some point and flush is triggered in order to make the plugin send all currently buffered data, this will not happen. And collectors will not receive that data. The plugin will keep the remaining data until traffic starts again, the buffers reach the maximum size, and be sent. With this fix, flush buffers for worker threads and for the main thread when the flush callbacks are triggered. This will allow to remove @tag_fixme_vpp_workers from the unit tests that don't set timers. The tests that set timers will still be failing for other multi-worker related problems. Type: fix Change-Id: I9a7d9cef8ddbec7ee68c79309e48e7bc0953d488 Signed-off-by: Alexander Chernavin <achernavin@netgate.com>
This commit is contained in:

committed by
Matthew Smith

parent
93d38823f9
commit
4c7305f124
@ -105,6 +105,9 @@ vlib_node_registration_t flowprobe_input_l2_node;
|
||||
vlib_node_registration_t flowprobe_output_ip4_node;
|
||||
vlib_node_registration_t flowprobe_output_ip6_node;
|
||||
vlib_node_registration_t flowprobe_output_l2_node;
|
||||
vlib_node_registration_t flowprobe_flush_ip4_node;
|
||||
vlib_node_registration_t flowprobe_flush_ip6_node;
|
||||
vlib_node_registration_t flowprobe_flush_l2_node;
|
||||
|
||||
/* No counters at the moment */
|
||||
#define foreach_flowprobe_error \
|
||||
@ -945,18 +948,57 @@ flush_record (flowprobe_variant_t which)
|
||||
void
|
||||
flowprobe_flush_callback_ip4 (void)
|
||||
{
|
||||
vlib_main_t *worker_vm;
|
||||
u32 i;
|
||||
|
||||
/* Flush for each worker thread */
|
||||
for (i = 1; i < vlib_get_n_threads (); i++)
|
||||
{
|
||||
worker_vm = vlib_get_main_by_index (i);
|
||||
if (worker_vm)
|
||||
vlib_node_set_interrupt_pending (worker_vm,
|
||||
flowprobe_flush_ip4_node.index);
|
||||
}
|
||||
|
||||
/* Flush for the main thread */
|
||||
flush_record (FLOW_VARIANT_IP4);
|
||||
}
|
||||
|
||||
void
|
||||
flowprobe_flush_callback_ip6 (void)
|
||||
{
|
||||
vlib_main_t *worker_vm;
|
||||
u32 i;
|
||||
|
||||
/* Flush for each worker thread */
|
||||
for (i = 1; i < vlib_get_n_threads (); i++)
|
||||
{
|
||||
worker_vm = vlib_get_main_by_index (i);
|
||||
if (worker_vm)
|
||||
vlib_node_set_interrupt_pending (worker_vm,
|
||||
flowprobe_flush_ip6_node.index);
|
||||
}
|
||||
|
||||
/* Flush for the main thread */
|
||||
flush_record (FLOW_VARIANT_IP6);
|
||||
}
|
||||
|
||||
void
|
||||
flowprobe_flush_callback_l2 (void)
|
||||
{
|
||||
vlib_main_t *worker_vm;
|
||||
u32 i;
|
||||
|
||||
/* Flush for each worker thread */
|
||||
for (i = 1; i < vlib_get_n_threads (); i++)
|
||||
{
|
||||
worker_vm = vlib_get_main_by_index (i);
|
||||
if (worker_vm)
|
||||
vlib_node_set_interrupt_pending (worker_vm,
|
||||
flowprobe_flush_l2_node.index);
|
||||
}
|
||||
|
||||
/* Flush for the main thread */
|
||||
flush_record (FLOW_VARIANT_L2);
|
||||
flush_record (FLOW_VARIANT_L2_IP4);
|
||||
flush_record (FLOW_VARIANT_L2_IP6);
|
||||
@ -1062,6 +1104,32 @@ flowprobe_walker_process (vlib_main_t * vm,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static uword
|
||||
flowprobe_flush_ip4 (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
|
||||
{
|
||||
flush_record (FLOW_VARIANT_IP4);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static uword
|
||||
flowprobe_flush_ip6 (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
|
||||
{
|
||||
flush_record (FLOW_VARIANT_IP6);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static uword
|
||||
flowprobe_flush_l2 (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
|
||||
{
|
||||
flush_record (FLOW_VARIANT_L2);
|
||||
flush_record (FLOW_VARIANT_L2_IP4);
|
||||
flush_record (FLOW_VARIANT_L2_IP6);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
VLIB_REGISTER_NODE (flowprobe_input_ip4_node) = {
|
||||
.function = flowprobe_input_ip4_node_fn,
|
||||
@ -1135,6 +1203,24 @@ VLIB_REGISTER_NODE (flowprobe_walker_node) = {
|
||||
.type = VLIB_NODE_TYPE_INPUT,
|
||||
.state = VLIB_NODE_STATE_INTERRUPT,
|
||||
};
|
||||
VLIB_REGISTER_NODE (flowprobe_flush_ip4_node) = {
|
||||
.function = flowprobe_flush_ip4,
|
||||
.name = "flowprobe-flush-ip4",
|
||||
.type = VLIB_NODE_TYPE_INPUT,
|
||||
.state = VLIB_NODE_STATE_INTERRUPT,
|
||||
};
|
||||
VLIB_REGISTER_NODE (flowprobe_flush_ip6_node) = {
|
||||
.function = flowprobe_flush_ip6,
|
||||
.name = "flowprobe-flush-ip6",
|
||||
.type = VLIB_NODE_TYPE_INPUT,
|
||||
.state = VLIB_NODE_STATE_INTERRUPT,
|
||||
};
|
||||
VLIB_REGISTER_NODE (flowprobe_flush_l2_node) = {
|
||||
.function = flowprobe_flush_l2,
|
||||
.name = "flowprobe-flush-l2",
|
||||
.type = VLIB_NODE_TYPE_INPUT,
|
||||
.state = VLIB_NODE_STATE_INTERRUPT,
|
||||
};
|
||||
/* *INDENT-ON* */
|
||||
|
||||
/*
|
||||
|
@ -1228,7 +1228,6 @@ class DatapathTestsHolder(object):
|
||||
self.logger.info("FFP_TEST_FINISH_0002")
|
||||
|
||||
|
||||
@tag_fixme_vpp_workers
|
||||
class DatapathTx(MethodHolder, DatapathTestsHolder):
|
||||
"""Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
|
||||
|
||||
@ -1309,7 +1308,6 @@ class DatapathTx(MethodHolder, DatapathTestsHolder):
|
||||
ipfix.remove_vpp_config()
|
||||
|
||||
|
||||
@tag_fixme_vpp_workers
|
||||
class DatapathRx(MethodHolder, DatapathTestsHolder):
|
||||
"""Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
|
||||
|
||||
|
Reference in New Issue
Block a user