Merge pull request #53129 from Faless/net/4.x_enet_opts

[Net] ENet poll optimizations, fragmented unreliable transfer.
This commit is contained in:
Fabio Alessandrelli 2021-12-15 16:02:55 +01:00 committed by GitHub
commit c5879dbe07
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 195 additions and 170 deletions

View file

@ -117,6 +117,47 @@ Ref<ENetPacketPeer> ENetConnection::connect_to_host(const String &p_address, int
return out;
}
ENetConnection::EventType ENetConnection::_parse_event(const ENetEvent &p_event, Event &r_event) {
switch (p_event.type) {
case ENET_EVENT_TYPE_CONNECT: {
if (p_event.peer->data == nullptr) {
Ref<ENetPacketPeer> pp = memnew(ENetPacketPeer(p_event.peer));
peers.push_back(pp);
}
r_event.peer = Ref<ENetPacketPeer>((ENetPacketPeer *)p_event.peer->data);
r_event.data = p_event.data;
return EVENT_CONNECT;
} break;
case ENET_EVENT_TYPE_DISCONNECT: {
// A peer disconnected.
if (p_event.peer->data != nullptr) {
Ref<ENetPacketPeer> pp = Ref<ENetPacketPeer>((ENetPacketPeer *)p_event.peer->data);
pp->_on_disconnect();
peers.erase(pp);
r_event.peer = pp;
r_event.data = p_event.data;
return EVENT_DISCONNECT;
}
return EVENT_ERROR;
} break;
case ENET_EVENT_TYPE_RECEIVE: {
// Packet reveived.
if (p_event.peer->data != nullptr) {
Ref<ENetPacketPeer> pp = Ref<ENetPacketPeer>((ENetPacketPeer *)p_event.peer->data);
r_event.peer = Ref<ENetPacketPeer>((ENetPacketPeer *)p_event.peer->data);
r_event.channel_id = p_event.channelID;
r_event.packet = p_event.packet;
return EVENT_RECEIVE;
}
return EVENT_ERROR;
} break;
case ENET_EVENT_TYPE_NONE:
return EVENT_NONE;
default:
return EVENT_NONE;
}
}
ENetConnection::EventType ENetConnection::service(int p_timeout, Event &r_event) {
ERR_FAIL_COND_V_MSG(!host, EVENT_ERROR, "The ENetConnection instance isn't currently active.");
ERR_FAIL_COND_V(r_event.peer.is_valid(), EVENT_ERROR);
@ -140,44 +181,19 @@ ENetConnection::EventType ENetConnection::service(int p_timeout, Event &r_event)
} else if (ret == 0) {
return EVENT_NONE;
}
switch (event.type) {
case ENET_EVENT_TYPE_CONNECT: {
if (event.peer->data == nullptr) {
Ref<ENetPacketPeer> pp = memnew(ENetPacketPeer(event.peer));
peers.push_back(pp);
}
r_event.peer = Ref<ENetPacketPeer>((ENetPacketPeer *)event.peer->data);
r_event.data = event.data;
return EVENT_CONNECT;
} break;
case ENET_EVENT_TYPE_DISCONNECT: {
// A peer disconnected.
if (event.peer->data != nullptr) {
Ref<ENetPacketPeer> pp = Ref<ENetPacketPeer>((ENetPacketPeer *)event.peer->data);
pp->_on_disconnect();
peers.erase(pp);
r_event.peer = pp;
r_event.data = event.data;
return EVENT_DISCONNECT;
}
return EVENT_ERROR;
} break;
case ENET_EVENT_TYPE_RECEIVE: {
// Packet reveived.
if (event.peer->data != nullptr) {
Ref<ENetPacketPeer> pp = Ref<ENetPacketPeer>((ENetPacketPeer *)event.peer->data);
r_event.peer = Ref<ENetPacketPeer>((ENetPacketPeer *)event.peer->data);
r_event.channel_id = event.channelID;
r_event.packet = event.packet;
return EVENT_RECEIVE;
}
return EVENT_ERROR;
} break;
case ENET_EVENT_TYPE_NONE:
return EVENT_NONE;
default:
return EVENT_NONE;
return _parse_event(event, r_event);
}
int ENetConnection::check_events(EventType &r_type, Event &r_event) {
ERR_FAIL_COND_V_MSG(!host, -1, "The ENetConnection instance isn't currently active.");
ENetEvent event;
int ret = enet_host_check_events(host, &event);
if (ret < 0) {
r_type = EVENT_ERROR;
return ret;
}
r_type = _parse_event(event, r_event);
return ret;
}
void ENetConnection::flush() {

View file

@ -79,6 +79,7 @@ private:
ENetHost *host = nullptr;
List<Ref<ENetPacketPeer>> peers;
EventType _parse_event(const ENetEvent &p_event, Event &r_event);
Error _create(ENetAddress *p_address, int p_max_peers, int p_max_channels, int p_in_bandwidth, int p_out_bandwidth);
Array _service(int p_timeout = 0);
void _broadcast(int p_channel, PackedByteArray p_packet, int p_flags);
@ -110,6 +111,7 @@ public:
void destroy();
Ref<ENetPacketPeer> connect_to_host(const String &p_address, int p_port, int p_channels, int p_data = 0);
EventType service(int p_timeout, Event &r_event);
int check_events(EventType &r_type, Event &r_event);
void flush();
void bandwidth_limit(int p_in_bandwidth = 0, int p_out_bandwidth = 0);
void channel_limit(int p_max_channels);

View file

@ -114,32 +114,21 @@ Error ENetMultiplayerPeer::add_mesh_peer(int p_id, Ref<ENetConnection> p_host) {
return OK;
}
bool ENetMultiplayerPeer::_poll_server() {
for (const KeyValue<int, Ref<ENetPacketPeer>> &E : peers) {
if (!(E.value->is_active())) {
emit_signal(SNAME("peer_disconnected"), E.value->get_meta(SNAME("_net_id")));
peers.erase(E.key);
}
}
ENetConnection::Event event;
ENetConnection::EventType ret = hosts[0]->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
return true;
}
switch (ret) {
bool ENetMultiplayerPeer::_parse_server_event(ENetConnection::EventType p_type, ENetConnection::Event &p_event) {
switch (p_type) {
case ENetConnection::EVENT_CONNECT: {
if (is_refusing_new_connections()) {
event.peer->reset();
p_event.peer->reset();
return false;
}
// Client joined with invalid ID, probably trying to exploit us.
if (event.data < 2 || peers.has((int)event.data)) {
event.peer->reset();
if (p_event.data < 2 || peers.has((int)p_event.data)) {
p_event.peer->reset();
return false;
}
int id = event.data;
event.peer->set_meta(SNAME("_net_id"), id);
peers[id] = event.peer;
int id = p_event.data;
p_event.peer->set_meta(SNAME("_net_id"), id);
peers[id] = p_event.peer;
emit_signal(SNAME("peer_connected"), id);
if (server_relay) {
@ -148,7 +137,7 @@ bool ENetMultiplayerPeer::_poll_server() {
return false;
}
case ENetConnection::EVENT_DISCONNECT: {
int id = event.peer->get_meta(SNAME("_net_id"));
int id = p_event.peer->get_meta(SNAME("_net_id"));
if (!peers.has(id)) {
// Never fully connected.
return false;
@ -162,28 +151,28 @@ bool ENetMultiplayerPeer::_poll_server() {
return false;
}
case ENetConnection::EVENT_RECEIVE: {
if (event.channel_id == SYSCH_CONFIG) {
_destroy_unused(event.packet);
if (p_event.channel_id == SYSCH_CONFIG) {
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Only server can send config messages");
} else {
if (event.packet->dataLength < 8) {
_destroy_unused(event.packet);
if (p_event.packet->dataLength < 8) {
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Invalid packet size");
}
uint32_t source = decode_uint32(&event.packet->data[0]);
int target = decode_uint32(&event.packet->data[4]);
uint32_t source = decode_uint32(&p_event.packet->data[0]);
int target = decode_uint32(&p_event.packet->data[4]);
uint32_t id = event.peer->get_meta(SNAME("_net_id"));
uint32_t id = p_event.peer->get_meta(SNAME("_net_id"));
// Someone is cheating and trying to fake the source!
if (source != id) {
_destroy_unused(event.packet);
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Someone is cheating and trying to fake the source!");
}
Packet packet;
packet.packet = event.packet;
packet.channel = event.channel_id;
packet.packet = p_event.packet;
packet.channel = p_event.channel_id;
packet.from = id;
// Even if relaying is disabled, these targets are valid as incoming packets.
@ -194,9 +183,9 @@ bool ENetMultiplayerPeer::_poll_server() {
if (server_relay && target != 1) {
packet.packet->referenceCount++;
_relay(source, target, event.channel_id, event.packet);
_relay(source, target, p_event.channel_id, p_event.packet);
packet.packet->referenceCount--;
_destroy_unused(event.packet);
_destroy_unused(p_event.packet);
}
// Destroy packet later
}
@ -207,23 +196,8 @@ bool ENetMultiplayerPeer::_poll_server() {
}
}
bool ENetMultiplayerPeer::_poll_client() {
if (peers.has(1) && !peers[1]->is_active()) {
if (connection_status == CONNECTION_CONNECTED) {
// Client just disconnected from server.
emit_signal(SNAME("server_disconnected"));
} else {
emit_signal(SNAME("connection_failed"));
}
close_connection();
return true;
}
ENetConnection::Event event;
ENetConnection::EventType ret = hosts[0]->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
return true;
}
switch (ret) {
bool ENetMultiplayerPeer::_parse_client_event(ENetConnection::EventType p_type, ENetConnection::Event &p_event) {
switch (p_type) {
case ENetConnection::EVENT_CONNECT: {
connection_status = CONNECTION_CONNECTED;
emit_signal(SNAME("peer_connected"), 1);
@ -241,15 +215,15 @@ bool ENetMultiplayerPeer::_poll_client() {
return true;
}
case ENetConnection::EVENT_RECEIVE: {
if (event.channel_id == SYSCH_CONFIG) {
if (p_event.channel_id == SYSCH_CONFIG) {
// Config message
if (event.packet->dataLength != 8) {
_destroy_unused(event.packet);
if (p_event.packet->dataLength != 8) {
_destroy_unused(p_event.packet);
ERR_FAIL_V(false);
}
int msg = decode_uint32(&event.packet->data[0]);
int id = decode_uint32(&event.packet->data[4]);
int msg = decode_uint32(&p_event.packet->data[0]);
int id = decode_uint32(&p_event.packet->data[4]);
switch (msg) {
case SYSMSG_ADD_PEER: {
@ -262,18 +236,18 @@ bool ENetMultiplayerPeer::_poll_client() {
emit_signal(SNAME("peer_disconnected"), id);
} break;
}
_destroy_unused(event.packet);
_destroy_unused(p_event.packet);
} else {
if (event.packet->dataLength < 8) {
_destroy_unused(event.packet);
if (p_event.packet->dataLength < 8) {
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Invalid packet size");
}
uint32_t source = decode_uint32(&event.packet->data[0]);
uint32_t source = decode_uint32(&p_event.packet->data[0]);
Packet packet;
packet.packet = event.packet;
packet.packet = p_event.packet;
packet.from = source;
packet.channel = event.channel_id;
packet.channel = p_event.channel_id;
packet.packet->referenceCount++;
incoming_packets.push_back(packet);
@ -286,61 +260,37 @@ bool ENetMultiplayerPeer::_poll_client() {
}
}
bool ENetMultiplayerPeer::_poll_mesh() {
for (const KeyValue<int, Ref<ENetPacketPeer>> &E : peers) {
if (!(E.value->is_active())) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
if (hosts.has(E.key)) {
hosts.erase(E.key);
bool ENetMultiplayerPeer::_parse_mesh_event(ENetConnection::EventType p_type, ENetConnection::Event &p_event, int p_peer_id) {
switch (p_type) {
case ENetConnection::EVENT_CONNECT:
p_event.peer->reset();
return false;
case ENetConnection::EVENT_DISCONNECT:
if (peers.has(p_peer_id)) {
emit_signal(SNAME("peer_disconnected"), p_peer_id);
peers.erase(p_peer_id);
}
}
}
bool should_stop = true;
for (KeyValue<int, Ref<ENetConnection>> &E : hosts) {
ENetConnection::Event event;
ENetConnection::EventType ret = E.value->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
if (peers.has(E.key)) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
hosts.erase(p_peer_id);
return true;
case ENetConnection::EVENT_RECEIVE: {
if (p_event.packet->dataLength < 8) {
_destroy_unused(p_event.packet);
ERR_FAIL_V_MSG(false, "Invalid packet size");
}
hosts.erase(E.key);
continue;
}
switch (ret) {
case ENetConnection::EVENT_CONNECT:
should_stop = false;
event.peer->reset();
break;
case ENetConnection::EVENT_DISCONNECT:
should_stop = false;
if (peers.has(E.key)) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
}
hosts.erase(E.key);
break;
case ENetConnection::EVENT_RECEIVE: {
should_stop = false;
if (event.packet->dataLength < 8) {
_destroy_unused(event.packet);
ERR_CONTINUE_MSG(true, "Invalid packet size");
}
Packet packet;
packet.packet = event.packet;
packet.from = E.key;
packet.channel = event.channel_id;
Packet packet;
packet.packet = p_event.packet;
packet.from = p_peer_id;
packet.channel = p_event.channel_id;
packet.packet->referenceCount++;
incoming_packets.push_back(packet);
} break;
default:
break; // Nothing to do
}
packet.packet->referenceCount++;
incoming_packets.push_back(packet);
return false;
} break;
default:
// Nothing to do
return true;
}
return should_stop;
}
void ENetMultiplayerPeer::poll() {
@ -348,26 +298,77 @@ void ENetMultiplayerPeer::poll() {
_pop_current_packet();
while (true) {
switch (active_mode) {
case MODE_CLIENT:
if (_poll_client()) {
return;
switch (active_mode) {
case MODE_CLIENT: {
if (peers.has(1) && !peers[1]->is_active()) {
if (connection_status == CONNECTION_CONNECTED) {
// Client just disconnected from server.
emit_signal(SNAME("server_disconnected"));
} else {
emit_signal(SNAME("connection_failed"));
}
break;
case MODE_SERVER:
if (_poll_server()) {
return;
}
break;
case MODE_MESH:
if (_poll_mesh()) {
return;
}
break;
default:
close_connection();
return;
}
}
ENetConnection::Event event;
ENetConnection::EventType ret = hosts[0]->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
return;
}
do {
if (_parse_client_event(ret, event)) {
return;
}
} while (hosts[0]->check_events(ret, event) > 0);
} break;
case MODE_SERVER: {
for (const KeyValue<int, Ref<ENetPacketPeer>> &E : peers) {
if (!(E.value->is_active())) {
emit_signal(SNAME("peer_disconnected"), E.value->get_meta(SNAME("_net_id")));
peers.erase(E.key);
}
}
ENetConnection::Event event;
ENetConnection::EventType ret = hosts[0]->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
return;
}
do {
if (_parse_server_event(ret, event)) {
return;
}
} while (hosts[0]->check_events(ret, event) > 0);
} break;
case MODE_MESH: {
for (const KeyValue<int, Ref<ENetPacketPeer>> &E : peers) {
if (!(E.value->is_active())) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
if (hosts.has(E.key)) {
hosts.erase(E.key);
}
}
}
for (KeyValue<int, Ref<ENetConnection>> &E : hosts) {
ENetConnection::Event event;
ENetConnection::EventType ret = E.value->service(0, event);
if (ret == ENetConnection::EVENT_ERROR) {
if (peers.has(E.key)) {
emit_signal(SNAME("peer_disconnected"), E.key);
peers.erase(E.key);
}
hosts.erase(E.key);
continue;
}
do {
if (_parse_mesh_event(ret, event, E.key)) {
break; // Keep polling the others.
}
} while (E.value->check_events(ret, event) > 0);
}
} break;
default:
return;
}
}
@ -441,11 +442,11 @@ Error ENetMultiplayerPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size
} else {
switch (get_transfer_mode()) {
case Multiplayer::TRANSFER_MODE_UNRELIABLE: {
packet_flags = ENET_PACKET_FLAG_UNSEQUENCED;
packet_flags = ENET_PACKET_FLAG_UNSEQUENCED | ENET_PACKET_FLAG_UNRELIABLE_FRAGMENT;
channel = SYSCH_UNRELIABLE;
} break;
case Multiplayer::TRANSFER_MODE_UNRELIABLE_ORDERED: {
packet_flags = 0;
packet_flags = ENET_PACKET_FLAG_UNRELIABLE_FRAGMENT;
channel = SYSCH_UNRELIABLE;
} break;
case Multiplayer::TRANSFER_MODE_RELIABLE: {
@ -455,6 +456,12 @@ Error ENetMultiplayerPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size
}
}
#ifdef DEBUG_ENABLED
if ((packet_flags & ENET_PACKET_FLAG_UNRELIABLE_FRAGMENT) && p_buffer_size + 8 > ENET_HOST_DEFAULT_MTU) {
WARN_PRINT_ONCE(vformat("Sending %d bytes unrealiably which is above the MTU (%d), this will result in higher packet loss", p_buffer_size + 8, ENET_HOST_DEFAULT_MTU));
}
#endif
ENetPacket *packet = enet_packet_create(nullptr, p_buffer_size + 8, packet_flags);
encode_uint32(unique_id, &packet->data[0]); // Source ID
encode_uint32(target_peer, &packet->data[4]); // Dest ID

View file

@ -84,9 +84,9 @@ private:
Packet current_packet;
void _pop_current_packet();
bool _poll_server();
bool _poll_client();
bool _poll_mesh();
bool _parse_server_event(ENetConnection::EventType p_event_type, ENetConnection::Event &p_event);
bool _parse_client_event(ENetConnection::EventType p_event_type, ENetConnection::Event &p_event);
bool _parse_mesh_event(ENetConnection::EventType p_event_type, ENetConnection::Event &p_event, int p_peer_id);
void _relay(int p_from, int p_to, enet_uint8 p_channel, ENetPacket *p_packet);
void _notify_peers(int p_id, bool p_connected);
void _destroy_unused(ENetPacket *p_packet);