http: large POST handling

Type: improvement
Change-Id: I28b8e8ccbff6f97e669b0048011b187decbfc892
Signed-off-by: Matus Fabian <matfabia@cisco.com>
This commit is contained in:
Matus Fabian
2024-09-04 18:04:54 +02:00
committed by Florin Coras
parent c4b5d10115
commit 9bb0762357
4 changed files with 246 additions and 77 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/onsi/gomega/ghttp"
"github.com/onsi/gomega/gmeasure"
"io"
"math/rand"
"net"
"net/http"
"net/http/httptrace"
@ -30,8 +31,8 @@ func init() {
HttpInvalidContentLengthTest, HttpInvalidTargetSyntaxTest, HttpStaticPathTraversalTest, HttpUriDecodeTest,
HttpHeadersTest, HttpStaticFileHandlerTest, HttpStaticFileHandlerDefaultMaxAgeTest, HttpClientTest, HttpClientErrRespTest, HttpClientPostFormTest,
HttpClientPostFileTest, HttpClientPostFilePtrTest, AuthorityFormTargetTest, HttpRequestLineTest)
RegisterNoTopoSoloTests(HttpStaticPromTest, HttpTpsTest, HttpTpsInterruptModeTest, PromConcurrentConnectionsTest,
PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest)
RegisterNoTopoSoloTests(HttpStaticPromTest, HttpGetTpsTest, HttpGetTpsInterruptModeTest, PromConcurrentConnectionsTest,
PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest, HttpPostTpsTest, HttpPostTpsInterruptModeTest)
}
const wwwRootPath = "/tmp/www_root"
@ -53,18 +54,51 @@ func httpDownloadBenchmark(s *HstSuite, experiment *gmeasure.Experiment, data in
experiment.RecordValue("Download Speed", (float64(resp.ContentLength)/1024/1024)/duration.Seconds(), gmeasure.Units("MB/s"), gmeasure.Precision(2))
}
func HttpTpsInterruptModeTest(s *NoTopoSuite) {
HttpTpsTest(s)
func HttpGetTpsInterruptModeTest(s *NoTopoSuite) {
HttpGetTpsTest(s)
}
func HttpTpsTest(s *NoTopoSuite) {
func HttpGetTpsTest(s *NoTopoSuite) {
vpp := s.GetContainerByName("vpp").VppInstance
serverAddress := s.VppAddr()
url := "http://" + serverAddress + ":8080/test_file_10M"
vpp.Vppctl("http tps uri tcp://0.0.0.0/8080")
s.RunBenchmark("HTTP tps 10M", 10, 0, httpDownloadBenchmark, url)
s.RunBenchmark("HTTP tps download 10M", 10, 0, httpDownloadBenchmark, url)
}
func httpUploadBenchmark(s *HstSuite, experiment *gmeasure.Experiment, data interface{}) {
url, isValid := data.(string)
s.AssertEqual(true, isValid)
body := make([]byte, 10485760)
_, err := rand.Read(body)
client := NewHttpClient()
req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
s.AssertNil(err, fmt.Sprint(err))
t := time.Now()
resp, err := client.Do(req)
s.AssertNil(err, fmt.Sprint(err))
defer resp.Body.Close()
s.AssertHttpStatus(resp, 200)
_, err = io.ReadAll(resp.Body)
s.AssertNil(err, fmt.Sprint(err))
duration := time.Since(t)
experiment.RecordValue("Upload Speed", (float64(req.ContentLength)/1024/1024)/duration.Seconds(), gmeasure.Units("MB/s"), gmeasure.Precision(2))
}
func HttpPostTpsInterruptModeTest(s *NoTopoSuite) {
HttpPostTpsTest(s)
}
func HttpPostTpsTest(s *NoTopoSuite) {
vpp := s.GetContainerByName("vpp").VppInstance
serverAddress := s.VppAddr()
url := "http://" + serverAddress + ":8080/test_file_10M"
vpp.Vppctl("http tps uri tcp://0.0.0.0/8080")
s.RunBenchmark("HTTP tps upload 10M", 10, 0, httpUploadBenchmark, url)
}
func HttpPersistentConnectionTest(s *NoTopoSuite) {

View File

@ -20,6 +20,8 @@
#include <http/http_header_names.h>
#include <http/http_content_types.h>
#define HTS_RX_BUF_SIZE (64 << 10)
typedef struct
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@ -28,6 +30,7 @@ typedef struct
u64 data_len;
u64 data_offset;
u32 vpp_session_index;
u32 to_recv;
union
{
/** threshold after which connection is closed */
@ -36,6 +39,7 @@ typedef struct
u32 close_rate;
};
u8 *uri;
u8 *rx_buf;
http_header_t *resp_headers;
} hts_session_t;
@ -105,6 +109,8 @@ hts_session_free (hts_session_t *hs)
if (htm->debug_level > 0)
clib_warning ("Freeing session %u", hs->session_index);
vec_free (hs->rx_buf);
if (CLIB_DEBUG)
clib_memset (hs, 0xfa, sizeof (*hs));
@ -227,6 +233,8 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
http_msg_t msg;
session_t *ts;
u8 *headers_buf = 0;
u32 n_segs = 1;
svm_fifo_seg_t seg[2];
int rv;
if (vec_len (hs->resp_headers))
@ -235,6 +243,9 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
vec_free (hs->resp_headers);
msg.data.headers_offset = 0;
msg.data.headers_len = vec_len (headers_buf);
seg[1].data = headers_buf;
seg[1].len = msg.data.headers_len;
n_segs = 2;
}
else
{
@ -248,17 +259,14 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
msg.data.body_len = hs->data_len;
msg.data.body_offset = msg.data.headers_len;
msg.data.len = msg.data.body_len + msg.data.headers_len;
seg[0].data = (u8 *) &msg;
seg[0].len = sizeof (msg);
ts = session_get (hs->vpp_session_index, hs->thread_index);
rv = svm_fifo_enqueue (ts->tx_fifo, sizeof (msg), (u8 *) &msg);
ASSERT (rv == sizeof (msg));
if (msg.data.headers_len)
{
rv = svm_fifo_enqueue (ts->tx_fifo, vec_len (headers_buf), headers_buf);
ASSERT (rv == msg.data.headers_len);
vec_free (headers_buf);
}
rv = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs,
0 /* allow partial */);
vec_free (headers_buf);
ASSERT (rv == (sizeof (msg) + msg.data.headers_len));
if (!msg.data.body_len)
{
@ -323,6 +331,40 @@ done:
return rc;
}
static inline void
hts_session_rx_body (hts_session_t *hs, session_t *ts)
{
hts_main_t *htm = &hts_main;
u32 n_deq;
int rv;
n_deq = svm_fifo_max_dequeue (ts->rx_fifo);
if (!htm->no_zc)
{
svm_fifo_dequeue_drop_all (ts->rx_fifo);
}
else
{
n_deq = clib_min (n_deq, HTS_RX_BUF_SIZE);
rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hs->rx_buf);
ASSERT (rv == n_deq);
}
hs->to_recv -= n_deq;
if (hs->close_threshold > 0)
{
if ((f64) (hs->data_len - hs->to_recv) / hs->data_len >
hs->close_threshold)
hts_disconnect_transport (hs);
}
if (hs->to_recv == 0)
{
hts_start_send_data (hs, HTTP_STATUS_OK);
vec_free (hs->rx_buf);
}
}
static int
hts_ts_rx_callback (session_t *ts)
{
@ -333,44 +375,77 @@ hts_ts_rx_callback (session_t *ts)
int rv;
hs = hts_session_get (ts->thread_index, ts->opaque);
hs->data_len = 0;
hs->resp_headers = 0;
/* Read the http message header */
rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
ASSERT (rv == sizeof (msg));
if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_GET)
if (hs->to_recv == 0)
{
http_add_header (&hs->resp_headers,
http_header_name_token (HTTP_HEADER_ALLOW),
http_token_lit ("GET"));
hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
goto done;
hs->data_len = 0;
hs->resp_headers = 0;
hs->rx_buf = 0;
/* Read the http message header */
rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
ASSERT (rv == sizeof (msg));
if (msg.type != HTTP_MSG_REQUEST)
{
hts_start_send_data (hs, HTTP_STATUS_INTERNAL_ERROR);
goto done;
}
if (msg.method_type != HTTP_REQ_GET && msg.method_type != HTTP_REQ_POST)
{
http_add_header (&hs->resp_headers,
http_header_name_token (HTTP_HEADER_ALLOW),
http_token_lit ("GET, POST"));
hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
goto done;
}
if (msg.data.target_path_len == 0 ||
msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
{
hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
goto done;
}
vec_validate (target, msg.data.target_path_len - 1);
rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset,
msg.data.target_path_len, target);
ASSERT (rv == msg.data.target_path_len);
if (htm->debug_level)
clib_warning ("%s request target: %v",
msg.method_type == HTTP_REQ_GET ? "GET" : "POST",
target);
if (msg.method_type == HTTP_REQ_GET)
{
if (try_test_file (hs, target))
hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND);
vec_free (target);
}
else
{
vec_free (target);
if (!msg.data.body_len)
{
hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
goto done;
}
/* drop everything up to body */
svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset);
hs->to_recv = msg.data.body_len;
if (htm->no_zc)
vec_validate (hs->rx_buf, HTS_RX_BUF_SIZE - 1);
hts_session_rx_body (hs, ts);
return 0;
}
done:
svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len);
}
else
hts_session_rx_body (hs, ts);
if (msg.data.target_path_len == 0 ||
msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
{
hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
goto done;
}
vec_validate (target, msg.data.target_path_len - 1);
rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset,
msg.data.target_path_len, target);
ASSERT (rv == msg.data.target_path_len);
if (htm->debug_level)
clib_warning ("Request target: %v", target);
if (try_test_file (hs, target))
hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND);
vec_free (target);
done:
svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len);
return 0;
}
@ -397,6 +472,7 @@ hts_ts_accept_callback (session_t *ts)
hs = hts_session_alloc (ts->thread_index);
hs->vpp_session_index = ts->session_index;
hs->to_recv = 0;
ts->opaque = hs->session_index;
ts->session_state = SESSION_STATE_READY;

View File

@ -527,13 +527,18 @@ v_find_index (u8 *vec, u32 offset, u32 num, char *str)
static void
http_identify_optional_query (http_conn_t *hc)
{
u32 pos = vec_search (hc->rx_buf, '?');
if (~0 != pos)
int i;
for (i = hc->target_path_offset;
i < (hc->target_path_offset + hc->target_path_len); i++)
{
hc->target_query_offset = pos + 1;
hc->target_query_len =
hc->target_path_offset + hc->target_path_len - hc->target_query_offset;
hc->target_path_len = hc->target_path_len - hc->target_query_len - 1;
if (hc->rx_buf[i] == '?')
{
hc->target_query_offset = i + 1;
hc->target_query_len = hc->target_path_offset + hc->target_path_len -
hc->target_query_offset;
hc->target_path_len = hc->target_path_len - hc->target_query_len - 1;
break;
}
}
}
@ -674,7 +679,9 @@ http_parse_request_line (http_conn_t *hc, http_status_code_t *ec)
}
/* parse request-target */
HTTP_DBG (0, "http at %d", i);
target_len = i - hc->target_path_offset;
HTTP_DBG (0, "target_len %d", target_len);
if (target_len < 1)
{
clib_warning ("request-target not present");
@ -911,7 +918,7 @@ http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp)
http_msg_t msg = {};
app_worker_t *app_wrk;
session_t *as;
u32 len, max_enq;
u32 len, max_enq, body_sent;
http_status_code_t ec;
http_main_t *hm = &http_main;
@ -972,16 +979,16 @@ http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp)
http_read_message_drop (hc, len);
if (hc->body_len == 0)
body_sent = len - hc->control_data_len;
hc->to_recv = hc->body_len - body_sent;
if (hc->to_recv == 0)
{
/* no response body, we are done */
hc->to_recv = 0;
/* all sent, we are done */
http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD);
}
else
{
/* stream response body */
hc->to_recv = hc->body_len;
/* stream rest of the response body */
http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA);
}
@ -1006,7 +1013,7 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
http_msg_t msg;
session_t *as;
int rv;
u32 len, max_enq;
u32 len, max_enq, max_deq, body_sent;
rv = http_read_message (hc);
@ -1034,16 +1041,20 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
if (rv)
goto error;
/* send "control data" and request body */
/* send at least "control data" which is necessary minimum,
* if there is some space send also portion of body */
as = session_get_from_handle (hc->h_pa_session_handle);
len = hc->control_data_len + hc->body_len;
max_enq = svm_fifo_max_enqueue (as->rx_fifo);
if (max_enq < len)
if (max_enq < hc->control_data_len)
{
/* TODO stream body of large POST */
clib_warning ("not enough room for data in app's rx fifo");
clib_warning ("not enough room for control data in app's rx fifo");
ec = HTTP_STATUS_INTERNAL_ERROR;
goto error;
}
/* do not dequeue more than one HTTP request, we do not support pipelining */
max_deq =
clib_min (hc->control_data_len + hc->body_len, vec_len (hc->rx_buf));
len = clib_min (max_enq, max_deq);
msg.type = HTTP_MSG_REQUEST;
msg.method_type = hc->method;
@ -1065,9 +1076,21 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2, 0 /* allow partial */);
ASSERT (rv == (sizeof (msg) + len));
/* drop everything, we do not support pipelining */
http_read_message_drop_all (hc);
http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY);
body_sent = len - hc->control_data_len;
hc->to_recv = hc->body_len - body_sent;
if (hc->to_recv == 0)
{
/* drop everything, we do not support pipelining */
http_read_message_drop_all (hc);
/* all sent, we are done */
http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY);
}
else
{
http_read_message_drop (hc, len);
/* stream rest of the response body */
http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA);
}
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
if (app_wrk)
@ -1408,8 +1431,12 @@ http_state_client_io_more_data (http_conn_t *hc, transport_send_params_t *sp)
hc->to_recv -= rv;
HTTP_DBG (1, "drained %d from ts; remains %d", rv, hc->to_recv);
/* Finished transaction:
* server back to HTTP_STATE_WAIT_APP_REPLY
* client to HTTP_STATE_WAIT_APP_METHOD */
if (hc->to_recv == 0)
http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD);
http_state_change (hc, hc->is_server ? HTTP_STATE_WAIT_APP_REPLY :
HTTP_STATE_WAIT_APP_METHOD);
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
if (app_wrk)

View File

@ -144,16 +144,48 @@ Following example shows how to parse headers:
vec_free (headers);
}
Finally application reads body:
Finally application reads body (if any), which might be received in multiple pieces (depends on size), so we might need some state machine in ``builtin_app_rx_callback``.
We will add following members to our session context structure:
.. code-block:: C
u8 *body = 0;
if (msg.data.body_len)
typedef struct
{
/* ... */
u32 to_recv;
u8 *resp_body;
} session_ctx_t;
First we prepare vector for response body, do it only once when you are reading metadata:
.. code-block:: C
/* drop everything up to body */
svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset);
ctx->to_recv = msg.data.body_len;
/* prepare vector for response body */
vec_validate (ctx->resp_body, msg.data.body_len - 1);
vec_reset_length (ctx->resp_body);
Now we can start reading body content, following block of code could be executed multiple times:
.. code-block:: C
/* dequeue */
u32 n_deq = svm_fifo_max_dequeue (ts->rx_fifo);
/* current offset */
u32 curr = vec_len (ctx->resp_body);
rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, ctx->resp_body + curr);
ASSERT (rv == n_deq);
/* update length of the vector */
vec_set_len (ctx->resp_body, curr + n_deq);
/* update number of remaining bytes to receive */
ctx->to_recv -= rv;
/* check if all data received */
if (ctx->to_recv == 0)
{
vec_validate (body, msg.data.body_len - 1);
rv = svm_fifo_peek (ts->rx_fifo, msg.data.body_offset, msg.data.body_len, body);
ASSERT (rv == msg.data.body_len);
/* we are done */
/* send 200 OK response */
}
Sending data