From de37da2e9ded9f06c2f6804c2f064bc69609d035 Mon Sep 17 00:00:00 2001 From: Fabio Alessandrelli Date: Sat, 19 Oct 2024 10:25:02 +0200 Subject: [PATCH] [WS] Implement wslay unbuffered message parsing Ensure we never read more than we can store during poll. Raise default max packets to 4096 to maintain the same performance for the first 2048 packets. --- .../doc_classes/WebSocketMultiplayerPeer.xml | 2 +- .../websocket/doc_classes/WebSocketPeer.xml | 2 +- modules/websocket/packet_buffer.h | 8 +++ modules/websocket/websocket_peer.h | 2 +- modules/websocket/wsl_peer.cpp | 60 ++++++++++++++----- modules/websocket/wsl_peer.h | 15 +++++ 6 files changed, 72 insertions(+), 17 deletions(-) diff --git a/modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml b/modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml index 0978e1fcee4..b2e1cb345b9 100644 --- a/modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml +++ b/modules/websocket/doc_classes/WebSocketMultiplayerPeer.xml @@ -60,7 +60,7 @@ The inbound buffer size for connected peers. See [member WebSocketPeer.inbound_buffer_size] for more details. - + The maximum number of queued packets for connected peers. See [member WebSocketPeer.max_queued_packets] for more details. diff --git a/modules/websocket/doc_classes/WebSocketPeer.xml b/modules/websocket/doc_classes/WebSocketPeer.xml index 238dd305368..3776a6915cb 100644 --- a/modules/websocket/doc_classes/WebSocketPeer.xml +++ b/modules/websocket/doc_classes/WebSocketPeer.xml @@ -158,7 +158,7 @@ The size of the input buffer in bytes (roughly the maximum amount of memory that will be allocated for the inbound packets). - + The maximum amount of packets that will be allowed in the queues (both inbound and outbound). diff --git a/modules/websocket/packet_buffer.h b/modules/websocket/packet_buffer.h index f98ee12ef9a..4ab05799128 100644 --- a/modules/websocket/packet_buffer.h +++ b/modules/websocket/packet_buffer.h @@ -104,6 +104,14 @@ public: return _queued; } + int payload_space_left() const { + return _payload.space_left(); + } + + int packets_space_left() const { + return _packets.size() - _queued; + } + void clear() { _payload.resize(0); _packets.resize(0); diff --git a/modules/websocket/websocket_peer.h b/modules/websocket/websocket_peer.h index ef0197cf6c9..b564b55b87c 100644 --- a/modules/websocket/websocket_peer.h +++ b/modules/websocket/websocket_peer.h @@ -71,7 +71,7 @@ protected: int outbound_buffer_size = DEFAULT_BUFFER_SIZE; int inbound_buffer_size = DEFAULT_BUFFER_SIZE; - int max_queued_packets = 2048; + int max_queued_packets = 4096; public: static WebSocketPeer *create(bool p_notify_postinitialize = true) { diff --git a/modules/websocket/wsl_peer.cpp b/modules/websocket/wsl_peer.cpp index 0c0a046805c..95713669cc8 100644 --- a/modules/websocket/wsl_peer.cpp +++ b/modules/websocket/wsl_peer.cpp @@ -295,6 +295,7 @@ Error WSLPeer::_do_server_handshake() { resolver.stop(); // Response sent, initialize wslay context. wslay_event_context_server_init(&wsl_ctx, &_wsl_callbacks, this); + wslay_event_config_set_no_buffering(wsl_ctx, 1); wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size); in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets); packet_buffer.resize(inbound_buffer_size); @@ -403,6 +404,7 @@ void WSLPeer::_do_client_handshake() { ERR_FAIL_MSG("Invalid response headers."); } wslay_event_context_client_init(&wsl_ctx, &_wsl_callbacks, this); + wslay_event_config_set_no_buffering(wsl_ctx, 1); wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size); in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets); packet_buffer.resize(inbound_buffer_size); @@ -568,8 +570,15 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); return -1; } + // Make sure we don't read more than what our buffer can hold. + size_t buffer_limit = MIN(peer->in_buffer.payload_space_left(), peer->in_buffer.packets_space_left() * 2); // The minimum size of a websocket message is 2 bytes. + size_t to_read = MIN(len, buffer_limit); + if (to_read == 0) { + wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + return -1; + } int read = 0; - Error err = conn->get_partial_data(data, len, read); + Error err = conn->get_partial_data(data, to_read, read); if (err != OK) { print_verbose("Websocket get data error: " + itos(err) + ", read (should be 0!): " + itos(read)); wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); @@ -582,6 +591,37 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, return read; } +void WSLPeer::_wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data) { + WSLPeer *peer = (WSLPeer *)user_data; + uint8_t op = arg->opcode; + if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) { + // Get ready to process a data package. + PendingMessage &pm = peer->pending_message; + pm.opcode = op; + pm.payload_size = arg->payload_length; + } +} + +void WSLPeer::_wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data) { + WSLPeer *peer = (WSLPeer *)user_data; + PendingMessage &pm = peer->pending_message; + if (pm.opcode != 0) { + // Only write the payload. + peer->in_buffer.write_packet(arg->data, arg->data_length, nullptr); + } +} + +void WSLPeer::_wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data) { + WSLPeer *peer = (WSLPeer *)user_data; + PendingMessage &pm = peer->pending_message; + if (pm.opcode != 0) { + // Only write the packet (since it's now completed). + uint8_t is_string = pm.opcode == WSLAY_TEXT_FRAME ? 1 : 0; + peer->in_buffer.write_packet(nullptr, pm.payload_size, &is_string); + pm.clear(); + } +} + ssize_t WSLPeer::_wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data) { WSLPeer *peer = (WSLPeer *)user_data; Ref conn = peer->connection; @@ -627,25 +667,16 @@ void WSLPeer::_wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct w return; } - if (peer->ready_state == STATE_CLOSING) { - return; - } - - if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) { - // Message. - uint8_t is_string = arg->opcode == WSLAY_TEXT_FRAME ? 1 : 0; - peer->in_buffer.write_packet(arg->msg, arg->msg_length, &is_string); - } - // Ping or pong. + // Ping, pong, or message (already parsed in chunks). } wslay_event_callbacks WSLPeer::_wsl_callbacks = { _wsl_recv_callback, _wsl_send_callback, _wsl_genmask_callback, - nullptr, /* on_frame_recv_start_callback */ - nullptr, /* on_frame_recv_callback */ - nullptr, /* on_frame_recv_end_callback */ + _wsl_recv_start_callback, + _wsl_frame_recv_chunk_callback, + _wsl_frame_recv_end_callback, _wsl_msg_recv_callback }; @@ -783,6 +814,7 @@ void WSLPeer::close(int p_code, String p_reason) { in_buffer.clear(); packet_buffer.resize(0); + pending_message.clear(); } IPAddress WSLPeer::get_connected_host() const { diff --git a/modules/websocket/wsl_peer.h b/modules/websocket/wsl_peer.h index fb01da7ce29..ace0be68476 100644 --- a/modules/websocket/wsl_peer.h +++ b/modules/websocket/wsl_peer.h @@ -53,6 +53,10 @@ private: // Callbacks. static ssize_t _wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, size_t len, int flags, void *user_data); + static void _wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data); + static void _wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data); + static void _wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data); + static ssize_t _wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data); static int _wsl_genmask_callback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, void *user_data); static void _wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data); @@ -80,6 +84,16 @@ private: Resolver() {} }; + struct PendingMessage { + size_t payload_size = 0; + uint8_t opcode = 0; + + void clear() { + payload_size = 0; + opcode = 0; + } + }; + Resolver resolver; // WebSocket connection state. @@ -99,6 +113,7 @@ private: int close_code = -1; String close_reason; uint8_t was_string = 0; + PendingMessage pending_message; // WebSocket configuration. bool use_tls = true;