Websocket peer outbound buffer fixes. Expose outbound buffered amount.
This commit is contained in:
parent
d22f487dfe
commit
023548c0a5
10 changed files with 61 additions and 4 deletions
|
@ -39,6 +39,13 @@
|
||||||
[b]Note:[/b] Not available in the HTML5 export.
|
[b]Note:[/b] Not available in the HTML5 export.
|
||||||
</description>
|
</description>
|
||||||
</method>
|
</method>
|
||||||
|
<method name="get_current_outbound_buffered_amount" qualifiers="const">
|
||||||
|
<return type="int">
|
||||||
|
</return>
|
||||||
|
<description>
|
||||||
|
Returns the current amount of data in the outbound websocket buffer. [b]Note:[/b] HTML5 exports use WebSocket.bufferedAmount, while other platforms use an internal buffer.
|
||||||
|
</description>
|
||||||
|
</method>
|
||||||
<method name="get_write_mode" qualifiers="const">
|
<method name="get_write_mode" qualifiers="const">
|
||||||
<return type="int" enum="WebSocketPeer.WriteMode">
|
<return type="int" enum="WebSocketPeer.WriteMode">
|
||||||
</return>
|
</return>
|
||||||
|
|
|
@ -95,7 +95,7 @@ Error EMWSClient::connect_to_host(String p_host, String p_path, uint16_t p_port,
|
||||||
return FAILED;
|
return FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
static_cast<Ref<EMWSPeer>>(_peer)->set_sock(_js_id, _in_buf_size, _in_pkt_size);
|
static_cast<Ref<EMWSPeer>>(_peer)->set_sock(_js_id, _in_buf_size, _in_pkt_size, _out_buf_size);
|
||||||
|
|
||||||
return OK;
|
return OK;
|
||||||
}
|
}
|
||||||
|
@ -136,12 +136,14 @@ int EMWSClient::get_max_packet_size() const {
|
||||||
Error EMWSClient::set_buffers(int p_in_buffer, int p_in_packets, int p_out_buffer, int p_out_packets) {
|
Error EMWSClient::set_buffers(int p_in_buffer, int p_in_packets, int p_out_buffer, int p_out_packets) {
|
||||||
_in_buf_size = nearest_shift(p_in_buffer - 1) + 10;
|
_in_buf_size = nearest_shift(p_in_buffer - 1) + 10;
|
||||||
_in_pkt_size = nearest_shift(p_in_packets - 1);
|
_in_pkt_size = nearest_shift(p_in_packets - 1);
|
||||||
|
_out_buf_size = nearest_shift(p_out_buffer - 1) + 10;
|
||||||
return OK;
|
return OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
EMWSClient::EMWSClient() {
|
EMWSClient::EMWSClient() {
|
||||||
_in_buf_size = nearest_shift((int)GLOBAL_GET(WSC_IN_BUF) - 1) + 10;
|
_in_buf_size = nearest_shift((int)GLOBAL_GET(WSC_IN_BUF) - 1) + 10;
|
||||||
_in_pkt_size = nearest_shift((int)GLOBAL_GET(WSC_IN_PKT) - 1);
|
_in_pkt_size = nearest_shift((int)GLOBAL_GET(WSC_IN_PKT) - 1);
|
||||||
|
_out_buf_size = nearest_shift((int)GLOBAL_GET(WSC_OUT_BUF) - 1) + 10;
|
||||||
_is_connecting = false;
|
_is_connecting = false;
|
||||||
_peer = Ref<EMWSPeer>(memnew(EMWSPeer));
|
_peer = Ref<EMWSPeer>(memnew(EMWSPeer));
|
||||||
_js_id = 0;
|
_js_id = 0;
|
||||||
|
|
|
@ -45,6 +45,7 @@ private:
|
||||||
bool _is_connecting;
|
bool _is_connecting;
|
||||||
int _in_buf_size;
|
int _in_buf_size;
|
||||||
int _in_pkt_size;
|
int _in_pkt_size;
|
||||||
|
int _out_buf_size;
|
||||||
|
|
||||||
static void _esws_on_connect(void *obj, char *proto);
|
static void _esws_on_connect(void *obj, char *proto);
|
||||||
static void _esws_on_message(void *obj, const uint8_t *p_data, int p_data_size, int p_is_string);
|
static void _esws_on_message(void *obj, const uint8_t *p_data, int p_data_size, int p_is_string);
|
||||||
|
|
|
@ -33,10 +33,11 @@
|
||||||
#include "emws_peer.h"
|
#include "emws_peer.h"
|
||||||
#include "core/io/ip.h"
|
#include "core/io/ip.h"
|
||||||
|
|
||||||
void EMWSPeer::set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size) {
|
void EMWSPeer::set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size, unsigned int p_out_buf_size) {
|
||||||
peer_sock = p_sock;
|
peer_sock = p_sock;
|
||||||
_in_buffer.resize(p_in_pkt_size, p_in_buf_size);
|
_in_buffer.resize(p_in_pkt_size, p_in_buf_size);
|
||||||
_packet_buffer.resize((1 << p_in_buf_size));
|
_packet_buffer.resize((1 << p_in_buf_size));
|
||||||
|
_out_buf_size = p_out_buf_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
void EMWSPeer::set_write_mode(WriteMode p_mode) {
|
void EMWSPeer::set_write_mode(WriteMode p_mode) {
|
||||||
|
@ -53,7 +54,10 @@ Error EMWSPeer::read_msg(const uint8_t *p_data, uint32_t p_size, bool p_is_strin
|
||||||
}
|
}
|
||||||
|
|
||||||
Error EMWSPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) {
|
Error EMWSPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) {
|
||||||
|
ERR_FAIL_COND_V(_out_buf_size && (godot_js_websocket_buffered_amount(peer_sock) >= (1ULL << _out_buf_size)), ERR_OUT_OF_MEMORY);
|
||||||
|
|
||||||
int is_bin = write_mode == WebSocketPeer::WRITE_MODE_BINARY ? 1 : 0;
|
int is_bin = write_mode == WebSocketPeer::WRITE_MODE_BINARY ? 1 : 0;
|
||||||
|
|
||||||
godot_js_websocket_send(peer_sock, p_buffer, p_buffer_size, is_bin);
|
godot_js_websocket_send(peer_sock, p_buffer, p_buffer_size, is_bin);
|
||||||
return OK;
|
return OK;
|
||||||
};
|
};
|
||||||
|
@ -77,6 +81,13 @@ int EMWSPeer::get_available_packet_count() const {
|
||||||
return _in_buffer.packets_left();
|
return _in_buffer.packets_left();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
int EMWSPeer::get_current_outbound_buffered_amount() const {
|
||||||
|
if (peer_sock != -1) {
|
||||||
|
return godot_js_websocket_buffered_amount(peer_sock);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
bool EMWSPeer::was_string_packet() const {
|
bool EMWSPeer::was_string_packet() const {
|
||||||
return _is_string;
|
return _is_string;
|
||||||
};
|
};
|
||||||
|
@ -107,6 +118,7 @@ void EMWSPeer::set_no_delay(bool p_enabled) {
|
||||||
}
|
}
|
||||||
|
|
||||||
EMWSPeer::EMWSPeer() {
|
EMWSPeer::EMWSPeer() {
|
||||||
|
_out_buf_size = 0;
|
||||||
peer_sock = -1;
|
peer_sock = -1;
|
||||||
write_mode = WRITE_MODE_BINARY;
|
write_mode = WRITE_MODE_BINARY;
|
||||||
close();
|
close();
|
||||||
|
|
|
@ -48,6 +48,7 @@ typedef void (*WSOnError)(void *p_ref);
|
||||||
|
|
||||||
extern int godot_js_websocket_create(void *p_ref, const char *p_url, const char *p_proto, WSOnOpen p_on_open, WSOnMessage p_on_message, WSOnError p_on_error, WSOnClose p_on_close);
|
extern int godot_js_websocket_create(void *p_ref, const char *p_url, const char *p_proto, WSOnOpen p_on_open, WSOnMessage p_on_message, WSOnError p_on_error, WSOnClose p_on_close);
|
||||||
extern int godot_js_websocket_send(int p_id, const uint8_t *p_buf, int p_buf_len, int p_raw);
|
extern int godot_js_websocket_send(int p_id, const uint8_t *p_buf, int p_buf_len, int p_raw);
|
||||||
|
extern int godot_js_websocket_buffered_amount(int p_id);
|
||||||
extern void godot_js_websocket_close(int p_id, int p_code, const char *p_reason);
|
extern void godot_js_websocket_close(int p_id, int p_code, const char *p_reason);
|
||||||
extern void godot_js_websocket_destroy(int p_id);
|
extern void godot_js_websocket_destroy(int p_id);
|
||||||
}
|
}
|
||||||
|
@ -62,14 +63,16 @@ private:
|
||||||
PoolVector<uint8_t> _packet_buffer;
|
PoolVector<uint8_t> _packet_buffer;
|
||||||
PacketBuffer<uint8_t> _in_buffer;
|
PacketBuffer<uint8_t> _in_buffer;
|
||||||
uint8_t _is_string;
|
uint8_t _is_string;
|
||||||
|
int _out_buf_size;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Error read_msg(const uint8_t *p_data, uint32_t p_size, bool p_is_string);
|
Error read_msg(const uint8_t *p_data, uint32_t p_size, bool p_is_string);
|
||||||
void set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size);
|
void set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size, unsigned int p_out_buf_size);
|
||||||
virtual int get_available_packet_count() const;
|
virtual int get_available_packet_count() const;
|
||||||
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size);
|
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size);
|
||||||
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size);
|
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size);
|
||||||
virtual int get_max_packet_size() const { return _packet_buffer.size(); };
|
virtual int get_max_packet_size() const { return _packet_buffer.size(); };
|
||||||
|
virtual int get_current_outbound_buffered_amount() const;
|
||||||
|
|
||||||
virtual void close(int p_code = 1000, String p_reason = "");
|
virtual void close(int p_code = 1000, String p_reason = "");
|
||||||
virtual bool is_connected_to_host() const;
|
virtual bool is_connected_to_host() const;
|
||||||
|
|
|
@ -101,6 +101,15 @@ const GodotWebSocket = {
|
||||||
return 0;
|
return 0;
|
||||||
},
|
},
|
||||||
|
|
||||||
|
// Get current bufferedAmount
|
||||||
|
bufferedAmount: function (p_id) {
|
||||||
|
const ref = IDHandler.get(p_id);
|
||||||
|
if (!ref) {
|
||||||
|
return 0; // Godot object is gone.
|
||||||
|
}
|
||||||
|
return ref.bufferedAmount;
|
||||||
|
},
|
||||||
|
|
||||||
create: function (socket, p_on_open, p_on_message, p_on_error, p_on_close) {
|
create: function (socket, p_on_open, p_on_message, p_on_error, p_on_close) {
|
||||||
const id = IDHandler.add(socket);
|
const id = IDHandler.add(socket);
|
||||||
socket.onopen = GodotWebSocket._onopen.bind(null, id, p_on_open);
|
socket.onopen = GodotWebSocket._onopen.bind(null, id, p_on_open);
|
||||||
|
@ -171,6 +180,11 @@ const GodotWebSocket = {
|
||||||
return GodotWebSocket.send(p_id, out);
|
return GodotWebSocket.send(p_id, out);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
godot_js_websocket_buffered_amount__sig: 'ii',
|
||||||
|
godot_js_websocket_buffered_amount: function (p_id) {
|
||||||
|
return GodotWebSocket.bufferedAmount(p_id);
|
||||||
|
},
|
||||||
|
|
||||||
godot_js_websocket_close__sig: 'viii',
|
godot_js_websocket_close__sig: 'viii',
|
||||||
godot_js_websocket_close: function (p_id, p_code, p_reason) {
|
godot_js_websocket_close: function (p_id, p_code, p_reason) {
|
||||||
const code = p_code;
|
const code = p_code;
|
||||||
|
|
|
@ -47,6 +47,7 @@ void WebSocketPeer::_bind_methods() {
|
||||||
ClassDB::bind_method(D_METHOD("get_connected_host"), &WebSocketPeer::get_connected_host);
|
ClassDB::bind_method(D_METHOD("get_connected_host"), &WebSocketPeer::get_connected_host);
|
||||||
ClassDB::bind_method(D_METHOD("get_connected_port"), &WebSocketPeer::get_connected_port);
|
ClassDB::bind_method(D_METHOD("get_connected_port"), &WebSocketPeer::get_connected_port);
|
||||||
ClassDB::bind_method(D_METHOD("set_no_delay", "enabled"), &WebSocketPeer::set_no_delay);
|
ClassDB::bind_method(D_METHOD("set_no_delay", "enabled"), &WebSocketPeer::set_no_delay);
|
||||||
|
ClassDB::bind_method(D_METHOD("get_current_outbound_buffered_amount"), &WebSocketPeer::get_current_outbound_buffered_amount);
|
||||||
|
|
||||||
BIND_ENUM_CONSTANT(WRITE_MODE_TEXT);
|
BIND_ENUM_CONSTANT(WRITE_MODE_TEXT);
|
||||||
BIND_ENUM_CONSTANT(WRITE_MODE_BINARY);
|
BIND_ENUM_CONSTANT(WRITE_MODE_BINARY);
|
||||||
|
|
|
@ -53,6 +53,7 @@ public:
|
||||||
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size) = 0;
|
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size) = 0;
|
||||||
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size) = 0;
|
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size) = 0;
|
||||||
virtual int get_max_packet_size() const = 0;
|
virtual int get_max_packet_size() const = 0;
|
||||||
|
virtual int get_current_outbound_buffered_amount() const = 0;
|
||||||
|
|
||||||
virtual WriteMode get_write_mode() const = 0;
|
virtual WriteMode get_write_mode() const = 0;
|
||||||
virtual void set_write_mode(WriteMode p_mode) = 0;
|
virtual void set_write_mode(WriteMode p_mode) = 0;
|
||||||
|
|
|
@ -205,7 +205,9 @@ void WSLPeer::make_context(PeerData *p_data, unsigned int p_in_buf_size, unsigne
|
||||||
ERR_FAIL_COND(p_data == nullptr);
|
ERR_FAIL_COND(p_data == nullptr);
|
||||||
|
|
||||||
_in_buffer.resize(p_in_pkt_size, p_in_buf_size);
|
_in_buffer.resize(p_in_pkt_size, p_in_buf_size);
|
||||||
_packet_buffer.resize((1 << MAX(p_in_buf_size, p_out_buf_size)));
|
_packet_buffer.resize(1 << p_in_buf_size);
|
||||||
|
_out_buf_size = p_out_buf_size;
|
||||||
|
_out_pkt_size = p_out_pkt_size;
|
||||||
|
|
||||||
_data = p_data;
|
_data = p_data;
|
||||||
_data->peer = this;
|
_data->peer = this;
|
||||||
|
@ -239,6 +241,8 @@ void WSLPeer::poll() {
|
||||||
|
|
||||||
Error WSLPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) {
|
Error WSLPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) {
|
||||||
ERR_FAIL_COND_V(!is_connected_to_host(), FAILED);
|
ERR_FAIL_COND_V(!is_connected_to_host(), FAILED);
|
||||||
|
ERR_FAIL_COND_V(_out_pkt_size && (wslay_event_get_queued_msg_count(_data->ctx) >= (1ULL << _out_pkt_size)), ERR_OUT_OF_MEMORY);
|
||||||
|
ERR_FAIL_COND_V(_out_buf_size && (wslay_event_get_queued_msg_length(_data->ctx) >= (1ULL << _out_buf_size)), ERR_OUT_OF_MEMORY);
|
||||||
|
|
||||||
struct wslay_event_msg msg; // Should I use fragmented?
|
struct wslay_event_msg msg; // Should I use fragmented?
|
||||||
msg.opcode = write_mode == WRITE_MODE_TEXT ? WSLAY_TEXT_FRAME : WSLAY_BINARY_FRAME;
|
msg.opcode = write_mode == WRITE_MODE_TEXT ? WSLAY_TEXT_FRAME : WSLAY_BINARY_FRAME;
|
||||||
|
@ -280,6 +284,12 @@ int WSLPeer::get_available_packet_count() const {
|
||||||
return _in_buffer.packets_left();
|
return _in_buffer.packets_left();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int WSLPeer::get_current_outbound_buffered_amount() const {
|
||||||
|
ERR_FAIL_COND_V(!_data, 0);
|
||||||
|
|
||||||
|
return wslay_event_get_queued_msg_length(_data->ctx);
|
||||||
|
}
|
||||||
|
|
||||||
bool WSLPeer::was_string_packet() const {
|
bool WSLPeer::was_string_packet() const {
|
||||||
return _is_string;
|
return _is_string;
|
||||||
}
|
}
|
||||||
|
@ -333,6 +343,8 @@ WSLPeer::WSLPeer() {
|
||||||
_is_string = 0;
|
_is_string = 0;
|
||||||
close_code = -1;
|
close_code = -1;
|
||||||
write_mode = WRITE_MODE_BINARY;
|
write_mode = WRITE_MODE_BINARY;
|
||||||
|
_out_buf_size = 0;
|
||||||
|
_out_pkt_size = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
WSLPeer::~WSLPeer() {
|
WSLPeer::~WSLPeer() {
|
||||||
|
|
|
@ -89,6 +89,9 @@ private:
|
||||||
|
|
||||||
WriteMode write_mode;
|
WriteMode write_mode;
|
||||||
|
|
||||||
|
int _out_buf_size;
|
||||||
|
int _out_pkt_size;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
int close_code;
|
int close_code;
|
||||||
String close_reason;
|
String close_reason;
|
||||||
|
@ -98,6 +101,7 @@ public:
|
||||||
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size);
|
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size);
|
||||||
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size);
|
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size);
|
||||||
virtual int get_max_packet_size() const { return _packet_buffer.size(); };
|
virtual int get_max_packet_size() const { return _packet_buffer.size(); };
|
||||||
|
virtual int get_current_outbound_buffered_amount() const;
|
||||||
|
|
||||||
virtual void close_now();
|
virtual void close_now();
|
||||||
virtual void close(int p_code = 1000, String p_reason = "");
|
virtual void close(int p_code = 1000, String p_reason = "");
|
||||||
|
|
Loading…
Reference in a new issue