[WS] Implement wslay unbuffered message parsing

Ensure we never read more than we can store during poll.

Raise default max packets to 4096 to maintain the same performance for
the first 2048 packets.
This commit is contained in:
Fabio Alessandrelli 2024-10-19 10:25:02 +02:00
parent 44fa552343
commit de37da2e9d
6 changed files with 72 additions and 17 deletions

View file

@ -60,7 +60,7 @@
<member name="inbound_buffer_size" type="int" setter="set_inbound_buffer_size" getter="get_inbound_buffer_size" default="65535"> <member name="inbound_buffer_size" type="int" setter="set_inbound_buffer_size" getter="get_inbound_buffer_size" default="65535">
The inbound buffer size for connected peers. See [member WebSocketPeer.inbound_buffer_size] for more details. The inbound buffer size for connected peers. See [member WebSocketPeer.inbound_buffer_size] for more details.
</member> </member>
<member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="2048"> <member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="4096">
The maximum number of queued packets for connected peers. See [member WebSocketPeer.max_queued_packets] for more details. The maximum number of queued packets for connected peers. See [member WebSocketPeer.max_queued_packets] for more details.
</member> </member>
<member name="outbound_buffer_size" type="int" setter="set_outbound_buffer_size" getter="get_outbound_buffer_size" default="65535"> <member name="outbound_buffer_size" type="int" setter="set_outbound_buffer_size" getter="get_outbound_buffer_size" default="65535">

View file

@ -158,7 +158,7 @@
<member name="inbound_buffer_size" type="int" setter="set_inbound_buffer_size" getter="get_inbound_buffer_size" default="65535"> <member name="inbound_buffer_size" type="int" setter="set_inbound_buffer_size" getter="get_inbound_buffer_size" default="65535">
The size of the input buffer in bytes (roughly the maximum amount of memory that will be allocated for the inbound packets). The size of the input buffer in bytes (roughly the maximum amount of memory that will be allocated for the inbound packets).
</member> </member>
<member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="2048"> <member name="max_queued_packets" type="int" setter="set_max_queued_packets" getter="get_max_queued_packets" default="4096">
The maximum amount of packets that will be allowed in the queues (both inbound and outbound). The maximum amount of packets that will be allowed in the queues (both inbound and outbound).
</member> </member>
<member name="outbound_buffer_size" type="int" setter="set_outbound_buffer_size" getter="get_outbound_buffer_size" default="65535"> <member name="outbound_buffer_size" type="int" setter="set_outbound_buffer_size" getter="get_outbound_buffer_size" default="65535">

View file

@ -104,6 +104,14 @@ public:
return _queued; return _queued;
} }
int payload_space_left() const {
return _payload.space_left();
}
int packets_space_left() const {
return _packets.size() - _queued;
}
void clear() { void clear() {
_payload.resize(0); _payload.resize(0);
_packets.resize(0); _packets.resize(0);

View file

@ -71,7 +71,7 @@ protected:
int outbound_buffer_size = DEFAULT_BUFFER_SIZE; int outbound_buffer_size = DEFAULT_BUFFER_SIZE;
int inbound_buffer_size = DEFAULT_BUFFER_SIZE; int inbound_buffer_size = DEFAULT_BUFFER_SIZE;
int max_queued_packets = 2048; int max_queued_packets = 4096;
public: public:
static WebSocketPeer *create(bool p_notify_postinitialize = true) { static WebSocketPeer *create(bool p_notify_postinitialize = true) {

View file

@ -295,6 +295,7 @@ Error WSLPeer::_do_server_handshake() {
resolver.stop(); resolver.stop();
// Response sent, initialize wslay context. // Response sent, initialize wslay context.
wslay_event_context_server_init(&wsl_ctx, &_wsl_callbacks, this); wslay_event_context_server_init(&wsl_ctx, &_wsl_callbacks, this);
wslay_event_config_set_no_buffering(wsl_ctx, 1);
wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size); wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size);
in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets); in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets);
packet_buffer.resize(inbound_buffer_size); packet_buffer.resize(inbound_buffer_size);
@ -403,6 +404,7 @@ void WSLPeer::_do_client_handshake() {
ERR_FAIL_MSG("Invalid response headers."); ERR_FAIL_MSG("Invalid response headers.");
} }
wslay_event_context_client_init(&wsl_ctx, &_wsl_callbacks, this); wslay_event_context_client_init(&wsl_ctx, &_wsl_callbacks, this);
wslay_event_config_set_no_buffering(wsl_ctx, 1);
wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size); wslay_event_config_set_max_recv_msg_length(wsl_ctx, inbound_buffer_size);
in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets); in_buffer.resize(nearest_shift(inbound_buffer_size), max_queued_packets);
packet_buffer.resize(inbound_buffer_size); packet_buffer.resize(inbound_buffer_size);
@ -568,8 +570,15 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data,
wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
return -1; return -1;
} }
// Make sure we don't read more than what our buffer can hold.
size_t buffer_limit = MIN(peer->in_buffer.payload_space_left(), peer->in_buffer.packets_space_left() * 2); // The minimum size of a websocket message is 2 bytes.
size_t to_read = MIN(len, buffer_limit);
if (to_read == 0) {
wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
return -1;
}
int read = 0; int read = 0;
Error err = conn->get_partial_data(data, len, read); Error err = conn->get_partial_data(data, to_read, read);
if (err != OK) { if (err != OK) {
print_verbose("Websocket get data error: " + itos(err) + ", read (should be 0!): " + itos(read)); print_verbose("Websocket get data error: " + itos(err) + ", read (should be 0!): " + itos(read));
wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
@ -582,6 +591,37 @@ ssize_t WSLPeer::_wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data,
return read; return read;
} }
void WSLPeer::_wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data) {
WSLPeer *peer = (WSLPeer *)user_data;
uint8_t op = arg->opcode;
if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) {
// Get ready to process a data package.
PendingMessage &pm = peer->pending_message;
pm.opcode = op;
pm.payload_size = arg->payload_length;
}
}
void WSLPeer::_wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data) {
WSLPeer *peer = (WSLPeer *)user_data;
PendingMessage &pm = peer->pending_message;
if (pm.opcode != 0) {
// Only write the payload.
peer->in_buffer.write_packet(arg->data, arg->data_length, nullptr);
}
}
void WSLPeer::_wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data) {
WSLPeer *peer = (WSLPeer *)user_data;
PendingMessage &pm = peer->pending_message;
if (pm.opcode != 0) {
// Only write the packet (since it's now completed).
uint8_t is_string = pm.opcode == WSLAY_TEXT_FRAME ? 1 : 0;
peer->in_buffer.write_packet(nullptr, pm.payload_size, &is_string);
pm.clear();
}
}
ssize_t WSLPeer::_wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data) { ssize_t WSLPeer::_wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data) {
WSLPeer *peer = (WSLPeer *)user_data; WSLPeer *peer = (WSLPeer *)user_data;
Ref<StreamPeer> conn = peer->connection; Ref<StreamPeer> conn = peer->connection;
@ -627,25 +667,16 @@ void WSLPeer::_wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct w
return; return;
} }
if (peer->ready_state == STATE_CLOSING) { // Ping, pong, or message (already parsed in chunks).
return;
}
if (op == WSLAY_TEXT_FRAME || op == WSLAY_BINARY_FRAME) {
// Message.
uint8_t is_string = arg->opcode == WSLAY_TEXT_FRAME ? 1 : 0;
peer->in_buffer.write_packet(arg->msg, arg->msg_length, &is_string);
}
// Ping or pong.
} }
wslay_event_callbacks WSLPeer::_wsl_callbacks = { wslay_event_callbacks WSLPeer::_wsl_callbacks = {
_wsl_recv_callback, _wsl_recv_callback,
_wsl_send_callback, _wsl_send_callback,
_wsl_genmask_callback, _wsl_genmask_callback,
nullptr, /* on_frame_recv_start_callback */ _wsl_recv_start_callback,
nullptr, /* on_frame_recv_callback */ _wsl_frame_recv_chunk_callback,
nullptr, /* on_frame_recv_end_callback */ _wsl_frame_recv_end_callback,
_wsl_msg_recv_callback _wsl_msg_recv_callback
}; };
@ -783,6 +814,7 @@ void WSLPeer::close(int p_code, String p_reason) {
in_buffer.clear(); in_buffer.clear();
packet_buffer.resize(0); packet_buffer.resize(0);
pending_message.clear();
} }
IPAddress WSLPeer::get_connected_host() const { IPAddress WSLPeer::get_connected_host() const {

View file

@ -53,6 +53,10 @@ private:
// Callbacks. // Callbacks.
static ssize_t _wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, size_t len, int flags, void *user_data); static ssize_t _wsl_recv_callback(wslay_event_context_ptr ctx, uint8_t *data, size_t len, int flags, void *user_data);
static void _wsl_recv_start_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg *arg, void *user_data);
static void _wsl_frame_recv_chunk_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg *arg, void *user_data);
static void _wsl_frame_recv_end_callback(wslay_event_context_ptr ctx, void *user_data);
static ssize_t _wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data); static ssize_t _wsl_send_callback(wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data);
static int _wsl_genmask_callback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, void *user_data); static int _wsl_genmask_callback(wslay_event_context_ptr ctx, uint8_t *buf, size_t len, void *user_data);
static void _wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data); static void _wsl_msg_recv_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data);
@ -80,6 +84,16 @@ private:
Resolver() {} Resolver() {}
}; };
struct PendingMessage {
size_t payload_size = 0;
uint8_t opcode = 0;
void clear() {
payload_size = 0;
opcode = 0;
}
};
Resolver resolver; Resolver resolver;
// WebSocket connection state. // WebSocket connection state.
@ -99,6 +113,7 @@ private:
int close_code = -1; int close_code = -1;
String close_reason; String close_reason;
uint8_t was_string = 0; uint8_t was_string = 0;
PendingMessage pending_message;
// WebSocket configuration. // WebSocket configuration.
bool use_tls = true; bool use_tls = true;