[Net] ENet poll now only service the connection once.

It used to call `enet_host_service` until all events were consumed, but
that also meant constantly polling the connection leading to potentially
unbounded processing time.

It now only service the connection once, and instead consumes all the
retrieved events via `enet_host_check_events`.
This commit is contained in:
Jordan Schidlowsky 2021-09-27 11:07:00 +02:00 committed by Fabio Alessandrelli
parent 397d895fb7
commit 373d5ea103
4 changed files with 187 additions and 168 deletions

View file

@ -117,6 +117,47 @@ Ref<ENetPacketPeer> ENetConnection::connect_to_host(const String &p_address, int
return out;
}
ENetConnection::EventType ENetConnection::_parse_event(const ENetEvent &p_event, Event &r_event) {
switch (p_event.type) {
case ENET_EVENT_TYPE_CONNECT: {
if (p_event.peer->data == nullptr) {
Ref<ENetPacketPeer> pp = memnew(ENetPacketPeer(p_event.peer));
peers.push_back(pp);
}
r_event.peer = Ref<ENetPacketPeer>((ENetPacketPeer *)p_event.peer->data);
r_event.data = p_event.data;
return EVENT_CONNECT;
} break;
case ENET_EVENT_TYPE_DISCONNECT: {
// A peer disconnected.
if (p_event.peer->data != nullptr) {
Ref<ENetPacketPeer> pp = Ref<ENetPacketPeer>((ENetPacketPeer *)p_event.peer->data);
pp->_on_disconnect();
peers.erase(pp);
r_event.peer = pp;
r_event.data = p_event.data;
return EVENT_DISCONNECT;
}
return EVENT_ERROR;
} break;
case ENET_EVENT_TYPE_RECEIVE: {
// Packet reveived.
if (p_event.peer->data != nullptr) {
Ref<ENetPacketPeer> pp = Ref<ENetPacketPeer>((ENetPacketPeer *)p_event.peer->data);
r_event.peer = Ref<ENetPacketPeer>((ENetPacketPeer *)p_event.peer->data);
r_event.channel_id = p_event.channelID;
r_event.packet = p_event.packet;
return EVENT_RECEIVE;
}
return EVENT_ERROR;
} break;
case ENET_EVENT_TYPE_NONE:
return EVENT_NONE;
default:
return EVENT_NONE;
}
}
ENetConnection::EventType ENetConnection::service(int p_timeout, Event &r_event) {
ERR_FAIL_COND_V_MSG(!host, EVENT_ERROR, "The ENetConnection instance isn't currently active.");
ERR_FAIL_COND_V(r_event.peer.is_valid(), EVENT_ERROR);
@ -140,44 +181,19 @@ ENetConnection::EventType ENetConnection::service(int p_timeout, Event &r_event)
} else if (ret == 0) {
return EVENT_NONE;
}
switch (event.type) {
case ENET_EVENT_TYPE_CONNECT: {
if (event.peer->data == nullptr) {
Ref<ENetPacketPeer> pp = memnew(ENetPacketPeer(event.peer));
peers.push_back(pp);
}
r_event.peer = Ref<ENetPacketPeer>((ENetPacketPeer *)event.peer->data);
r_event.data = event.data;
return EVENT_CONNECT;
} break;
case ENET_EVENT_TYPE_DISCONNECT: {
// A peer disconnected.
if (event.peer->data != nullptr) {
Ref<ENetPacketPeer> pp = Ref<ENetPacketPeer>((ENetPacketPeer *)event.peer->data);
pp->_on_disconnect();
peers.erase(pp);
r_event.peer = pp;
r_event.data = event.data;
return EVENT_DISCONNECT;
}
return EVENT_ERROR;
} break;
case ENET_EVENT_TYPE_RECEIVE: {
// Packet reveived.
if (event.peer->data != nullptr) {
Ref<ENetPacketPeer> pp = Ref<ENetPacketPeer>((ENetPacketPeer *)event.peer->data);
r_event.peer = Ref<ENetPacketPeer>((ENetPacketPeer *)event.peer->data);
r_event.channel_id = event.channelID;
r_event.packet = event.packet;
return EVENT_RECEIVE;
}
return EVENT_ERROR;
} break;
case ENET_EVENT_TYPE_NONE:
return EVENT_NONE;
default:
return EVENT_NONE;
return _parse_event(event, r_event);
}
int ENetConnection::check_events(EventType &r_type, Event &r_event) {
ERR_FAIL_COND_V_MSG(!host, -1, "The ENetConnection instance isn't currently active.");
ENetEvent event;
int ret = enet_host_check_events(host, &event);
if (ret < 0) {
r_type = EVENT_ERROR;
return ret;
}
r_type = _parse_event(event, r_event);
return ret;
}
void ENetConnection::flush() {

View file

@ -79,6 +79,7 @@ private:
ENetHost *host = nullptr;
List<Ref<ENetPacketPeer>> peers;
EventType _parse_event(const ENetEvent &p_event, Event &r_event);
Error _create(ENetAddress *p_address, int p_max_peers, int p_max_channels, int p_in_bandwidth, int p_out_bandwidth);
Array _service(int p_timeout = 0);
void _broadcast(int p_channel, PackedByteArray p_packet, int p_flags);
@ -110,6 +111,7 @@ public:
void destroy();
Ref<ENetPacketPeer> connect_to_host(const String &p_address, int p_port, int p_channels, int p_data = 0);
EventType service(int p_timeout, Event &r_event);
int check_events(EventType &r_type, Event &r_event);
void flush();
void bandwidth_limit(int p_in_bandwidth = 0, int p_out_bandwidth = 0);
void channel_limit(int p_max_channels);

View file

@ -114,32 +114,21 @@ Error ENetMultiplayerPeer::add_mesh_peer(int p_id, Ref<ENetConnection> p_host) {
return OK;
}
bool ENetMultiplayerPeer::_poll_server() {
for (const KeyValue<int, Ref<ENetPacketPeer>> &E : peers) {
if (!(E.value->is_active())) {
emit_signal(SNAME("peer_disconnected"), E.value->get_meta(SNAME("_net_id")));
peers.erase(E.key);
}
}
ENetConnection::Event event;
ENetConnection::EventType ret = hosts[0]->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
return true;
}
switch (ret) {
bool ENetMultiplayerPeer::_parse_server_event(ENetConnection::EventType p_type, ENetConnection::Event &p_event) {
switch (p_type) {
case ENetConnection::EVENT_CONNECT: {
if (is_refusing_new_connections()) {
event.peer->reset();
p_event.peer->reset();
return false;
}
// Client joined with invalid ID, probably trying to exploit us.
if (event.data < 2 || peers.has((int)event.data)) {
event.peer->reset();
if (p_event.data < 2 || peers.has((int)p_event.data)) {
p_event.peer->reset();
return false;
}
int id = event.data;
event.peer->set_meta(SNAME("_net_id"), id);
peers[id] = event.peer;
int id = p_event.data;
p_event.peer->set_meta(SNAME("_net_id"), id);
peers[id] = p_event.peer;
emit_signal(SNAME("peer_connected"), id);
if (server_relay) {
@ -148,7 +137,7 @@ bool ENetMultiplayerPeer::_poll_server() {
return false;
}
case ENetConnection::EVENT_DISCONNECT: {
int id = event.peer->get_meta(SNAME("_net_id"));
int id = p_event.peer->get_meta(SNAME("_net_id"));
if (!peers.has(id)) {
// Never fully connected.
return false;
@ -162,28 +151,28 @@ bool ENetMultiplayerPeer::_poll_server() {
return false;
}
case ENetConnection::EVENT_RECEIVE: {
if (event.channel_id == SYSCH_CONFIG) {
_destroy_unused(event.packet);
if (p_event.channel_id == SYSCH_CONFIG) {
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Only server can send config messages");
} else {
if (event.packet->dataLength < 8) {
_destroy_unused(event.packet);
if (p_event.packet->dataLength < 8) {
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Invalid packet size");
}
uint32_t source = decode_uint32(&event.packet->data[0]);
int target = decode_uint32(&event.packet->data[4]);
uint32_t source = decode_uint32(&p_event.packet->data[0]);
int target = decode_uint32(&p_event.packet->data[4]);
uint32_t id = event.peer->get_meta(SNAME("_net_id"));
uint32_t id = p_event.peer->get_meta(SNAME("_net_id"));
// Someone is cheating and trying to fake the source!
if (source != id) {
_destroy_unused(event.packet);
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Someone is cheating and trying to fake the source!");
}
Packet packet;
packet.packet = event.packet;
packet.channel = event.channel_id;
packet.packet = p_event.packet;
packet.channel = p_event.channel_id;
packet.from = id;
// Even if relaying is disabled, these targets are valid as incoming packets.
@ -194,9 +183,9 @@ bool ENetMultiplayerPeer::_poll_server() {
if (server_relay && target != 1) {
packet.packet->referenceCount++;
_relay(source, target, event.channel_id, event.packet);
_relay(source, target, p_event.channel_id, p_event.packet);
packet.packet->referenceCount--;
_destroy_unused(event.packet);
_destroy_unused(p_event.packet);
}
// Destroy packet later
}
@ -207,23 +196,8 @@ bool ENetMultiplayerPeer::_poll_server() {
}
}
bool ENetMultiplayerPeer::_poll_client() {
if (peers.has(1) && !peers[1]->is_active()) {
if (connection_status == CONNECTION_CONNECTED) {
// Client just disconnected from server.
emit_signal(SNAME("server_disconnected"));
} else {
emit_signal(SNAME("connection_failed"));
}
close_connection();
return true;
}
ENetConnection::Event event;
ENetConnection::EventType ret = hosts[0]->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
return true;
}
switch (ret) {
bool ENetMultiplayerPeer::_parse_client_event(ENetConnection::EventType p_type, ENetConnection::Event &p_event) {
switch (p_type) {
case ENetConnection::EVENT_CONNECT: {
connection_status = CONNECTION_CONNECTED;
emit_signal(SNAME("peer_connected"), 1);
@ -241,15 +215,15 @@ bool ENetMultiplayerPeer::_poll_client() {
return true;
}
case ENetConnection::EVENT_RECEIVE: {
if (event.channel_id == SYSCH_CONFIG) {
if (p_event.channel_id == SYSCH_CONFIG) {
// Config message
if (event.packet->dataLength != 8) {
_destroy_unused(event.packet);
if (p_event.packet->dataLength != 8) {
_destroy_unused(p_event.packet);
ERR_FAIL_V(false);
}
int msg = decode_uint32(&event.packet->data[0]);
int id = decode_uint32(&event.packet->data[4]);
int msg = decode_uint32(&p_event.packet->data[0]);
int id = decode_uint32(&p_event.packet->data[4]);
switch (msg) {
case SYSMSG_ADD_PEER: {
@ -262,18 +236,18 @@ bool ENetMultiplayerPeer::_poll_client() {
emit_signal(SNAME("peer_disconnected"), id);
} break;
}
_destroy_unused(event.packet);
_destroy_unused(p_event.packet);
} else {
if (event.packet->dataLength < 8) {
_destroy_unused(event.packet);
if (p_event.packet->dataLength < 8) {
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Invalid packet size");
}
uint32_t source = decode_uint32(&event.packet->data[0]);
uint32_t source = decode_uint32(&p_event.packet->data[0]);
Packet packet;
packet.packet = event.packet;
packet.packet = p_event.packet;
packet.from = source;
packet.channel = event.channel_id;
packet.channel = p_event.channel_id;
packet.packet->referenceCount++;
incoming_packets.push_back(packet);
@ -286,61 +260,37 @@ bool ENetMultiplayerPeer::_poll_client() {
}
}
bool ENetMultiplayerPeer::_poll_mesh() {
for (const KeyValue<int, Ref<ENetPacketPeer>> &E : peers) {
if (!(E.value->is_active())) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
if (hosts.has(E.key)) {
hosts.erase(E.key);
bool ENetMultiplayerPeer::_parse_mesh_event(ENetConnection::EventType p_type, ENetConnection::Event &p_event, int p_peer_id) {
switch (p_type) {
case ENetConnection::EVENT_CONNECT:
p_event.peer->reset();
return false;
case ENetConnection::EVENT_DISCONNECT:
if (peers.has(p_peer_id)) {
emit_signal(SNAME("peer_disconnected"), p_peer_id);
peers.erase(p_peer_id);
}
}
}
bool should_stop = true;
for (KeyValue<int, Ref<ENetConnection>> &E : hosts) {
ENetConnection::Event event;
ENetConnection::EventType ret = E.value->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
if (peers.has(E.key)) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
hosts.erase(p_peer_id);
return true;
case ENetConnection::EVENT_RECEIVE: {
if (p_event.packet->dataLength < 8) {
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Invalid packet size");
}
hosts.erase(E.key);
continue;
}
switch (ret) {
case ENetConnection::EVENT_CONNECT:
should_stop = false;
event.peer->reset();
break;
case ENetConnection::EVENT_DISCONNECT:
should_stop = false;
if (peers.has(E.key)) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
}
hosts.erase(E.key);
break;
case ENetConnection::EVENT_RECEIVE: {
should_stop = false;
if (event.packet->dataLength < 8) {
_destroy_unused(event.packet);
ERR_CONTINUE_MSG(true, "Invalid packet size");
}
Packet packet;
packet.packet = event.packet;
packet.from = E.key;
packet.channel = event.channel_id;
Packet packet;
packet.packet = p_event.packet;
packet.from = p_peer_id;
packet.channel = p_event.channel_id;
packet.packet->referenceCount++;
incoming_packets.push_back(packet);
} break;
default:
break; // Nothing to do
}
packet.packet->referenceCount++;
incoming_packets.push_back(packet);
return false;
} break;
default:
// Nothing to do
return true;
}
return should_stop;
}
void ENetMultiplayerPeer::poll() {
@ -348,26 +298,77 @@ void ENetMultiplayerPeer::poll() {
_pop_current_packet();
while (true) {
switch (active_mode) {
case MODE_CLIENT:
if (_poll_client()) {
return;
switch (active_mode) {
case MODE_CLIENT: {
if (peers.has(1) && !peers[1]->is_active()) {
if (connection_status == CONNECTION_CONNECTED) {
// Client just disconnected from server.
emit_signal(SNAME("server_disconnected"));
} else {
emit_signal(SNAME("connection_failed"));
}
break;
case MODE_SERVER:
if (_poll_server()) {
return;
}
break;
case MODE_MESH:
if (_poll_mesh()) {
return;
}
break;
default:
close_connection();
return;
}
}
ENetConnection::Event event;
ENetConnection::EventType ret = hosts[0]->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
return;
}
do {
if (_parse_client_event(ret, event)) {
return;
}
} while (hosts[0]->check_events(ret, event) > 0);
} break;
case MODE_SERVER: {
for (const KeyValue<int, Ref<ENetPacketPeer>> &E : peers) {
if (!(E.value->is_active())) {
emit_signal(SNAME("peer_disconnected"), E.value->get_meta(SNAME("_net_id")));
peers.erase(E.key);
}
}
ENetConnection::Event event;
ENetConnection::EventType ret = hosts[0]->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
return;
}
do {
if (_parse_server_event(ret, event)) {
return;
}
} while (hosts[0]->check_events(ret, event) > 0);
} break;
case MODE_MESH: {
for (const KeyValue<int, Ref<ENetPacketPeer>> &E : peers) {
if (!(E.value->is_active())) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
if (hosts.has(E.key)) {
hosts.erase(E.key);
}
}
}
for (KeyValue<int, Ref<ENetConnection>> &E : hosts) {
ENetConnection::Event event;
ENetConnection::EventType ret = E.value->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
if (peers.has(E.key)) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
}
hosts.erase(E.key);
continue;
}
do {
if (_parse_mesh_event(ret, event, E.key)) {
break; // Keep polling the others.
}
} while (E.value->check_events(ret, event) > 0);
}
} break;
default:
return;
}
}

View file

@ -84,9 +84,9 @@ private:
Packet current_packet;
void _pop_current_packet();
bool _poll_server();
bool _poll_client();
bool _poll_mesh();
bool _parse_server_event(ENetConnection::EventType p_event_type, ENetConnection::Event &p_event);
bool _parse_client_event(ENetConnection::EventType p_event_type, ENetConnection::Event &p_event);
bool _parse_mesh_event(ENetConnection::EventType p_event_type, ENetConnection::Event &p_event, int p_peer_id);
void _relay(int p_from, int p_to, enet_uint8 p_channel, ENetPacket *p_packet);
void _notify_peers(int p_id, bool p_connected);
void _destroy_unused(ENetPacket *p_packet);