sys_net refactor (#11740)

This commit is contained in:
RipleyTom 2022-04-09 14:51:22 +02:00 committed by GitHub
parent f4d0286109
commit 72b2876b6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 4018 additions and 3181 deletions

View file

@ -16,7 +16,7 @@ AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: true
AllowShortFunctionsOnASingleLine: Empty
AllowShortLoopsOnASingleLine: false
AllowShortLambdasOnASingleLine: Inline
AllowShortLambdasOnASingleLine: Empty
Cpp11BracedListStyle: true
IndentCaseLabels: false
SortIncludes: false

View file

@ -189,6 +189,14 @@ target_sources(rpcs3_emu PRIVATE
Cell/lv2/sys_mmapper.cpp
Cell/lv2/sys_mutex.cpp
Cell/lv2/sys_net.cpp
Cell/lv2/sys_net/lv2_socket.cpp
Cell/lv2/sys_net/lv2_socket_native.cpp
Cell/lv2/sys_net/lv2_socket_raw.cpp
Cell/lv2/sys_net/lv2_socket_p2p.cpp
Cell/lv2/sys_net/lv2_socket_p2ps.cpp
Cell/lv2/sys_net/network_context.cpp
Cell/lv2/sys_net/nt_p2p_port.cpp
Cell/lv2/sys_net/sys_net_helpers.cpp
Cell/lv2/sys_overlay.cpp
Cell/lv2/sys_ppu_thread.cpp
Cell/lv2/sys_process.cpp

File diff suppressed because it is too large Load diff

View file

@ -322,130 +322,6 @@ struct sys_net_linger
be_t<s32> l_linger;
};
// Custom structure for sockets
// We map host sockets to sequential IDs to return as descriptors because syscalls expect socket IDs to be under 1024.
struct lv2_socket final
{
#ifdef _WIN32
using socket_type = uptr;
#else
using socket_type = int;
#endif
static const u32 id_base = 24;
static const u32 id_step = 1;
static const u32 id_count = 1000;
// Poll events
enum class poll
{
read,
write,
error,
__bitset_enum_max
};
lv2_socket(socket_type s, s32 s_type, s32 family);
~lv2_socket();
shared_mutex mutex;
#ifdef _WIN32
// Tracks connect for WSAPoll workaround
bool is_connecting = false;
#endif
// Native socket (must be non-blocking)
socket_type socket;
// Events selected for polling
atomic_bs_t<poll> events{};
// Non-blocking IO option
s32 so_nbio = 0;
// Connection result
s32 so_error = 0;
// Unsupported option
s32 so_tcp_maxseg = 1500;
const lv2_socket_type type;
const lv2_socket_family family;
// SYS_NET_SOCK_DGRAM_P2P and SYS_NET_SOCK_STREAM_P2P socket specific information
struct p2p_i
{
// Port(actual bound port) and Virtual Port(indicated by u16 at the start of the packet)
u16 port = 0, vport = 0;
// Queue containing received packets from network_thread for SYS_NET_SOCK_DGRAM_P2P sockets
std::queue<std::pair<sys_net_sockaddr_in_p2p, std::vector<u8>>> data{};
} p2p;
struct p2ps_i
{
enum tcp_flags : u8
{
FIN = (1 << 0),
SYN = (1 << 1),
RST = (1 << 2),
PSH = (1 << 3),
ACK = (1 << 4),
URG = (1 << 5),
ECE = (1 << 6),
CWR = (1 << 7),
};
static constexpr be_t<u32> U2S_sig = (static_cast<u32>('U') << 24 | static_cast<u32>('2') << 16 | static_cast<u32>('S') << 8 | static_cast<u32>('0'));
static constexpr usz MAX_RECEIVED_BUFFER = (1024*1024*10);
// P2P stream socket specific
struct encapsulated_tcp
{
be_t<u32> signature = lv2_socket::p2ps_i::U2S_sig; // Signature to verify it's P2P Stream data
be_t<u32> length = 0; // Length of data
be_t<u64> seq = 0; // This should be u32 but changed to u64 for simplicity
be_t<u64> ack = 0;
be_t<u16> src_port = 0; // fake source tcp port
be_t<u16> dst_port = 0; // fake dest tcp port(should be == vport)
be_t<u16> checksum = 0;
u8 flags = 0;
};
enum stream_status
{
stream_closed, // Default when port is not listening nor connected
stream_listening, // Stream is listening, accepting SYN packets
stream_handshaking, // Currently handshaking
stream_connected, // This is an established connection(after tcp handshake)
};
stream_status status = stream_status::stream_closed;
usz max_backlog = 0; // set on listen
std::queue<s32> backlog;
u16 op_port = 0, op_vport = 0;
u32 op_addr = 0;
u64 data_beg_seq = 0; // Seq of first byte of received_data
u64 data_available = 0; // Amount of continuous data available(calculated on ACK send)
std::map<u64, std::vector<u8>> received_data; // holds seq/data of data received
u64 cur_seq = 0; // SEQ of next packet to be sent
} p2ps;
// Value keepers
#ifdef _WIN32
s32 so_reuseaddr = 0;
s32 so_reuseport = 0;
#endif
// Event processing workload (pair of thread id and the processing function)
std::vector<std::pair<u32, std::function<bool(bs_t<lv2_socket::poll>)>>> queue;
};
class ppu_thread;
// Syscalls

View file

@ -0,0 +1,128 @@
#include "stdafx.h"
#include "lv2_socket.h"
LOG_CHANNEL(sys_net);
lv2_socket::lv2_socket(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol)
{
this->family = family;
this->type = type;
this->protocol = protocol;
}
std::unique_lock<shared_mutex> lv2_socket::lock()
{
return std::unique_lock(mutex);
}
lv2_socket_family lv2_socket::get_family() const
{
return family;
}
lv2_socket_type lv2_socket::get_type() const
{
return type;
}
lv2_ip_protocol lv2_socket::get_protocol() const
{
return protocol;
}
std::size_t lv2_socket::get_queue_size() const
{
return queue.size();
}
socket_type lv2_socket::get_socket() const
{
return socket;
}
#ifdef _WIN32
bool lv2_socket::is_connecting() const
{
return connecting;
}
void lv2_socket::set_connecting(bool connecting)
{
this->connecting = connecting;
}
#endif
void lv2_socket::set_lv2_id(u32 id)
{
lv2_id = id;
}
bs_t<lv2_socket::poll_t> lv2_socket::get_events() const
{
return events.load();
}
void lv2_socket::set_poll_event(bs_t<lv2_socket::poll_t> event)
{
events += event;
}
void lv2_socket::poll_queue(u32 ppu_id, bs_t<lv2_socket::poll_t> event, std::function<bool(bs_t<lv2_socket::poll_t>)> poll_cb)
{
set_poll_event(event);
queue.emplace_back(ppu_id, poll_cb);
}
void lv2_socket::clear_queue(u32 ppu_id)
{
std::lock_guard lock(mutex);
for (auto it = queue.begin(); it != queue.end();)
{
if (it->first == ppu_id)
{
it = queue.erase(it);
continue;
}
it++;
}
if (queue.empty())
{
events.store({});
}
}
void lv2_socket::handle_events(const pollfd& native_pfd, [[maybe_unused]] bool unset_connecting)
{
bs_t<lv2_socket::poll_t> events_happening{};
if (native_pfd.revents & (POLLIN | POLLHUP) && events.test_and_reset(lv2_socket::poll_t::read))
events_happening += lv2_socket::poll_t::read;
if (native_pfd.revents & POLLOUT && events.test_and_reset(lv2_socket::poll_t::write))
events_happening += lv2_socket::poll_t::write;
if (native_pfd.revents & POLLERR && events.test_and_reset(lv2_socket::poll_t::error))
events_happening += lv2_socket::poll_t::error;
if (events_happening)
{
std::lock_guard lock(mutex);
#ifdef _WIN32
if (unset_connecting)
set_connecting(false);
#endif
for (auto it = queue.begin(); events_happening && it != queue.end();)
{
if (it->second(events_happening))
{
it = queue.erase(it);
continue;
}
it++;
}
if (queue.empty())
{
events.store({});
}
}
}

View file

@ -0,0 +1,135 @@
#pragma once
#include <functional>
#include <optional>
#include "Utilities/mutex.h"
#include "Emu/IdManager.h"
#include "Emu/Cell/lv2/sys_net.h"
#ifdef _WIN32
#include <winsock2.h>
#include <WS2tcpip.h>
#else
#ifdef __clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#endif
#include <poll.h>
#ifdef __clang__
#pragma GCC diagnostic pop
#endif
#endif
#ifdef _WIN32
using socket_type = uptr;
#else
using socket_type = int;
#endif
class lv2_socket
{
public:
// Poll events
enum class poll_t
{
read,
write,
error,
__bitset_enum_max
};
union sockopt_data
{
char ch[128];
be_t<s32> _int = 0;
sys_net_timeval timeo;
sys_net_linger linger;
};
public:
lv2_socket(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol);
virtual ~lv2_socket() = default;
std::unique_lock<shared_mutex> lock();
void set_lv2_id(u32 id);
bs_t<poll_t> get_events() const;
void set_poll_event(bs_t<poll_t> event);
void poll_queue(u32 ppu_id, bs_t<poll_t> event, std::function<bool(bs_t<poll_t>)> poll_cb);
void clear_queue(u32 ppu_id);
void handle_events(const pollfd& native_fd, bool unset_connecting = false);
lv2_socket_family get_family() const;
lv2_socket_type get_type() const;
lv2_ip_protocol get_protocol() const;
std::size_t get_queue_size() const;
socket_type get_socket() const;
#ifdef _WIN32
bool is_connecting() const;
void set_connecting(bool is_connecting);
#endif
public:
virtual std::tuple<bool, s32, sys_net_sockaddr> accept(bool is_lock = true) = 0;
virtual s32 bind(const sys_net_sockaddr &addr, s32 ps3_id) = 0;
virtual std::optional<s32> connect(const sys_net_sockaddr &addr) = 0;
virtual s32 connect_followup() = 0;
virtual std::pair<s32, sys_net_sockaddr> getpeername() = 0;
virtual std::pair<s32, sys_net_sockaddr> getsockname() = 0;
virtual std::tuple<s32, sockopt_data, u32> getsockopt(s32 level, s32 optname, u32 len) = 0;
virtual s32 setsockopt(s32 level, s32 optname, const std::vector<u8>& optval) = 0;
virtual s32 listen(s32 backlog) = 0;
virtual std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> recvfrom(s32 flags, u32 len, bool is_lock = true) = 0;
virtual std::optional<s32> sendto(s32 flags, const std::vector<u8>& buf, std::optional<sys_net_sockaddr> opt_sn_addr, bool is_lock = true) = 0;
virtual void close() = 0;
virtual s32 shutdown(s32 how) = 0;
virtual s32 poll(sys_net_pollfd& sn_pfd, pollfd& native_pfd) = 0;
virtual s32 select(bs_t<poll_t> selected, pollfd& native_pfd) = 0;
public:
// IDM data
static const u32 id_base = 24;
static const u32 id_step = 1;
static const u32 id_count = 1000;
protected:
shared_mutex mutex;
u32 lv2_id = 0;
socket_type socket = 0;
lv2_socket_family family{};
lv2_socket_type type{};
lv2_ip_protocol protocol{};
// Events selected for polling
atomic_bs_t<poll_t> events{};
// Event processing workload (pair of thread id and the processing function)
std::vector<std::pair<u32, std::function<bool(bs_t<poll_t>)>>> queue;
// Socket options value keepers
// Non-blocking IO option
s32 so_nbio = 0;
// Error, only used for connection result for non blocking stream sockets
s32 so_error = 0;
// Unsupported option
s32 so_tcp_maxseg = 1500;
#ifdef _WIN32
s32 so_reuseaddr = 0;
s32 so_reuseport = 0;
// Tracks connect for WSAPoll workaround
bool connecting = false;
#endif
};

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,68 @@
#pragma once
#ifdef _WIN32
#include <winsock2.h>
#include <WS2tcpip.h>
#else
#ifdef __clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#endif
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#ifdef __clang__
#pragma GCC diagnostic pop
#endif
#endif
#include "lv2_socket.h"
class lv2_socket_native final : public lv2_socket
{
public:
lv2_socket_native(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol);
~lv2_socket_native();
s32 create_socket();
std::tuple<bool, s32, sys_net_sockaddr> accept(bool is_lock = true) override;
s32 bind(const sys_net_sockaddr& addr, s32 ps3_id) override;
std::optional<s32> connect(const sys_net_sockaddr& addr) override;
s32 connect_followup() override;
std::pair<s32, sys_net_sockaddr> getpeername() override;
std::pair<s32, sys_net_sockaddr> getsockname() override;
std::tuple<s32, sockopt_data, u32> getsockopt(s32 level, s32 optname, u32 len) override;
s32 setsockopt(s32 level, s32 optname, const std::vector<u8>& optval) override;
std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> recvfrom(s32 flags, u32 len, bool is_lock = true) override;
std::optional<s32> sendto(s32 flags, const std::vector<u8>& buf, std::optional<sys_net_sockaddr> opt_sn_addr, bool is_lock = true) override;
s32 poll(sys_net_pollfd& sn_pfd, pollfd& native_pfd) override;
s32 select(bs_t<poll_t> selected, pollfd& native_pfd) override;
s32 listen(s32 backlog) override;
void close() override;
s32 shutdown(s32 how) override;
private:
void set_socket(socket_type socket, lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol);
void set_default_buffers();
void set_non_blocking();
private:
// Value keepers
#ifdef _WIN32
s32 so_reuseaddr = 0;
s32 so_reuseport = 0;
#endif
};

View file

@ -0,0 +1,295 @@
#include "stdafx.h"
#include "lv2_socket_p2p.h"
#include "Emu/NP/np_helpers.h"
#include "network_context.h"
#include "sys_net_helpers.h"
LOG_CHANNEL(sys_net);
lv2_socket_p2p::lv2_socket_p2p(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol)
: lv2_socket(family, type, protocol)
{
}
void lv2_socket_p2p::handle_new_data(sys_net_sockaddr_in_p2p p2p_addr, std::vector<u8> p2p_data)
{
std::lock_guard lock(mutex);
sys_net.trace("Received a P2P packet for vport %d and saved it", p2p_addr.sin_vport);
data.push(std::make_pair(std::move(p2p_addr), std::move(p2p_data)));
// Check if poll is happening
if (events.test_and_reset(lv2_socket::poll_t::read))
{
bs_t<lv2_socket::poll_t> read_event = lv2_socket::poll_t::read;
for (auto it = queue.begin(); it != queue.end();)
{
if (it->second(read_event))
{
it = queue.erase(it);
continue;
}
it++;
}
if (queue.empty())
{
events.store({});
}
}
}
std::tuple<bool, s32, sys_net_sockaddr> lv2_socket_p2p::accept([[maybe_unused]] bool is_lock)
{
sys_net.fatal("[P2P] accept() called on a P2P socket");
return {};
}
std::optional<s32> lv2_socket_p2p::connect([[maybe_unused]] const sys_net_sockaddr& addr)
{
sys_net.fatal("[P2P] connect() called on a P2P socket");
return {};
}
s32 lv2_socket_p2p::connect_followup()
{
sys_net.fatal("[P2P] connect_followup() called on a P2P socket");
return {};
}
std::pair<s32, sys_net_sockaddr> lv2_socket_p2p::getpeername()
{
sys_net.fatal("[P2P] getpeername() called on a P2P socket");
return {};
}
s32 lv2_socket_p2p::listen([[maybe_unused]] s32 backlog)
{
sys_net.fatal("[P2P] listen() called on a P2P socket");
return {};
}
s32 lv2_socket_p2p::bind(const sys_net_sockaddr& addr, s32 ps3_id)
{
const auto* psa_in_p2p = reinterpret_cast<const sys_net_sockaddr_in_p2p*>(&addr);
u16 p2p_port = psa_in_p2p->sin_port;
u16 p2p_vport = psa_in_p2p->sin_vport;
sys_net.notice("[P2P] Trying to bind %s:%d:%d", np::ip_to_string(std::bit_cast<u32>(psa_in_p2p->sin_addr)), p2p_port, p2p_vport);
ensure(p2p_vport != 0);
if (p2p_port != 3658)
{
sys_net.warning("[P2P] Attempting to bind a socket to a port != 3658");
}
socket_type real_socket{};
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (!nc.list_p2p_ports.contains(p2p_port))
{
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port));
}
auto& pport = nc.list_p2p_ports.at(p2p_port);
real_socket = pport.p2p_socket;
{
std::lock_guard lock(pport.bound_p2p_vports_mutex);
if (pport.bound_p2p_vports.count(p2p_vport) != 0)
{
return -SYS_NET_EADDRINUSE;
}
pport.bound_p2p_vports.insert(std::make_pair(p2p_vport, ps3_id));
}
}
{
std::lock_guard lock(mutex);
port = p2p_port;
vport = p2p_vport;
socket = real_socket;
}
return CELL_OK;
}
std::pair<s32, sys_net_sockaddr> lv2_socket_p2p::getsockname()
{
std::lock_guard lock(mutex);
// Unbound socket
if (!socket)
{
return {CELL_OK, {}};
}
sys_net_sockaddr sn_addr{};
sys_net_sockaddr_in_p2p* paddr = reinterpret_cast<sys_net_sockaddr_in_p2p*>(&sn_addr);
paddr->sin_len = sizeof(sys_net_sockaddr_in);
paddr->sin_family = SYS_NET_AF_INET;
paddr->sin_port = port;
paddr->sin_vport = vport;
paddr->sin_addr = bound_addr;
return {CELL_OK, sn_addr};
}
std::tuple<s32, lv2_socket::sockopt_data, u32> lv2_socket_p2p::getsockopt([[maybe_unused]] s32 level, [[maybe_unused]] s32 optname, [[maybe_unused]] u32 len)
{
// TODO
return {};
}
s32 lv2_socket_p2p::setsockopt(s32 level, s32 optname, const std::vector<u8>& optval)
{
// TODO
int native_int = *reinterpret_cast<const be_t<s32>*>(optval.data());
if (level == SYS_NET_SOL_SOCKET && optname == SYS_NET_SO_NBIO)
{
so_nbio = native_int;
}
return {};
}
std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> lv2_socket_p2p::recvfrom([[maybe_unused]] s32 flags, u32 len, bool is_lock)
{
std::unique_lock<shared_mutex> lock(mutex, std::defer_lock);
if (is_lock)
{
lock.lock();
}
sys_net.trace("[P2P] p2p_data for vport %d contains %d elements", vport, data.size());
if (data.empty())
{
return {{-SYS_NET_EWOULDBLOCK, {}, {}}};
}
std::vector<u8> res_buf(len);
const auto& p2p_data = data.front();
s32 native_result = std::min(len, static_cast<u32>(p2p_data.second.size()));
memcpy(res_buf.data(), p2p_data.second.data(), native_result);
sys_net_sockaddr sn_addr;
memcpy(&sn_addr, &p2p_data.first, sizeof(sn_addr));
data.pop();
return {{native_result, res_buf, sn_addr}};
}
std::optional<s32> lv2_socket_p2p::sendto(s32 flags, const std::vector<u8>& buf, std::optional<sys_net_sockaddr> opt_sn_addr, bool is_lock)
{
std::unique_lock<shared_mutex> lock(mutex, std::defer_lock);
if (is_lock)
{
lock.lock();
}
ensure(opt_sn_addr);
ensure(socket); // ensures it has been bound
ensure(buf.size() <= (65535 - sizeof(u16))); // catch games using full payload for future fragmentation implementation if necessary
const u16 p2p_port = reinterpret_cast<const sys_net_sockaddr_in*>(&*opt_sn_addr)->sin_port;
const u16 p2p_vport = reinterpret_cast<const sys_net_sockaddr_in_p2p*>(&*opt_sn_addr)->sin_vport;
auto native_addr = sys_net_addr_to_native_addr(*opt_sn_addr);
char ip_str[16];
inet_ntop(AF_INET, &native_addr.sin_addr, ip_str, sizeof(ip_str));
sys_net.trace("[P2P] Sending a packet to %s:%d:%d", ip_str, p2p_port, p2p_vport);
std::vector<u8> p2p_data(buf.size() + sizeof(u16));
reinterpret_cast<le_t<u16>&>(p2p_data[0]) = p2p_vport;
memcpy(p2p_data.data() + sizeof(u16), buf.data(), buf.size());
int native_flags = 0;
if (flags & SYS_NET_MSG_WAITALL)
{
native_flags |= MSG_WAITALL;
}
auto native_result = ::sendto(socket, reinterpret_cast<const char*>(p2p_data.data()), p2p_data.size(), native_flags, reinterpret_cast<struct sockaddr*>(&native_addr), sizeof(native_addr));
if (native_result >= 0)
{
return {native_result};
}
s32 result = get_last_error(!so_nbio && (flags & SYS_NET_MSG_DONTWAIT) == 0);
if (result)
{
return {-result};
}
// Note that this can only happen if the send buffer is full
return std::nullopt;
}
void lv2_socket_p2p::close()
{
if (!port || !vport)
{
return;
}
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard lock(nc.list_p2p_ports_mutex);
ensure(nc.list_p2p_ports.contains(port));
auto& p2p_port = nc.list_p2p_ports.at(port);
{
std::lock_guard lock(p2p_port.bound_p2p_vports_mutex);
p2p_port.bound_p2p_vports.erase(vport);
}
}
}
s32 lv2_socket_p2p::shutdown([[maybe_unused]] s32 how)
{
sys_net.todo("[P2P] shutdown");
return CELL_OK;
}
s32 lv2_socket_p2p::poll(sys_net_pollfd& sn_pfd, [[maybe_unused]] pollfd& native_pfd)
{
std::lock_guard lock(mutex);
ensure(vport);
sys_net.trace("[P2P] poll checking for 0x%X", sn_pfd.events);
// Check if it's a bound P2P socket
if ((sn_pfd.events & SYS_NET_POLLIN) && !data.empty())
{
sys_net.trace("[P2P] p2p_data for vport %d contains %d elements", vport, data.size());
sn_pfd.revents |= SYS_NET_POLLIN;
}
// Data can always be written on a dgram socket
if (sn_pfd.events & SYS_NET_POLLOUT)
{
sn_pfd.revents |= SYS_NET_POLLOUT;
}
return sn_pfd.revents ? 1 : 0;
}
s32 lv2_socket_p2p::select(bs_t<lv2_socket::poll_t> selected, [[maybe_unused]] pollfd& native_pfd)
{
std::lock_guard lock(mutex);
// Check if it's a bound P2P socket
if ((selected & lv2_socket::poll_t::read) && vport && !data.empty())
{
sys_net.trace("[P2P] p2p_data for vport %d contains %d elements", vport, data.size());
return 1;
}
return 0;
}

View file

@ -0,0 +1,43 @@
#pragma once
#include "lv2_socket.h"
class lv2_socket_p2p : public lv2_socket
{
public:
lv2_socket_p2p(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol);
std::tuple<bool, s32, sys_net_sockaddr> accept(bool is_lock = true) override;
s32 bind(const sys_net_sockaddr &addr, s32 ps3_id) override;
std::optional<s32> connect(const sys_net_sockaddr &addr) override;
s32 connect_followup() override;
std::pair<s32, sys_net_sockaddr> getpeername() override;
std::pair<s32, sys_net_sockaddr> getsockname() override;
std::tuple<s32, sockopt_data, u32> getsockopt(s32 level, s32 optname, u32 len) override;
s32 setsockopt(s32 level, s32 optname, const std::vector<u8>& optval) override;
s32 listen(s32 backlog) override;
std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> recvfrom(s32 flags, u32 len, bool is_lock = true) override;
std::optional<s32> sendto(s32 flags, const std::vector<u8>& buf, std::optional<sys_net_sockaddr> opt_sn_addr, bool is_lock = true) override;
void close() override;
s32 shutdown(s32 how) override;
s32 poll(sys_net_pollfd& sn_pfd, pollfd& native_pfd) override;
s32 select(bs_t<poll_t> selected, pollfd& native_pfd) override;
void handle_new_data(sys_net_sockaddr_in_p2p p2p_addr, std::vector<u8> p2p_data);
protected:
// Port(actual bound port) and Virtual Port(indicated by u16 at the start of the packet)
u16 port = 3658, vport = 0;
u32 bound_addr = 0;
// Queue containing received packets from network_thread for SYS_NET_SOCK_DGRAM_P2P sockets
std::queue<std::pair<sys_net_sockaddr_in_p2p, std::vector<u8>>> data{};
// ID of the real socket for the actual bound udp port
socket_type socket{};
};

View file

@ -0,0 +1,798 @@
#include "stdafx.h"
#include <condition_variable>
#include "Utilities/Thread.h"
#include "util/asm.hpp"
#include "util/atomic.hpp"
#include "lv2_socket_p2ps.h"
#include "Emu/NP/np_helpers.h"
#include "nt_p2p_port.h"
#include "network_context.h"
#include "sys_net_helpers.h"
LOG_CHANNEL(sys_net);
// Object in charge of retransmiting packets for STREAM_P2P sockets
class tcp_timeout_monitor
{
public:
void add_message(s32 sock_id, const sockaddr_in* dst, std::vector<u8> data, u64 seq)
{
{
std::lock_guard lock(data_mutex);
const auto now = steady_clock::now();
message msg;
msg.dst_addr = *dst;
msg.sock_id = sock_id;
msg.data = std::move(data);
msg.seq = seq;
msg.initial_sendtime = now;
rtt_info rtt = rtts[sock_id];
const auto expected_time = now + rtt.rtt_time;
msgs.insert(std::make_pair(expected_time, std::move(msg)));
}
wakey.notify_one(); // TODO: Should be improved to only wake if new timeout < old timeout
}
void confirm_data_received(s32 sock_id, u64 ack)
{
std::lock_guard lock(data_mutex);
rtts[sock_id].num_retries = 0;
const auto now = steady_clock::now();
for (auto it = msgs.begin(); it != msgs.end();)
{
auto& msg = it->second;
if (msg.sock_id == sock_id && msg.seq < ack)
{
// Decreases RTT if msg is early
if (now < it->first)
{
const auto actual_rtt = std::chrono::duration_cast<std::chrono::milliseconds>(now - it->second.initial_sendtime);
const auto cur_rtt = rtts[sock_id].rtt_time;
if (cur_rtt > actual_rtt)
{
rtts[sock_id].rtt_time = (actual_rtt + cur_rtt) / 2;
}
}
it = msgs.erase(it);
continue;
}
it++;
}
}
void operator()()
{
while (thread_ctrl::state() != thread_state::aborting)
{
std::unique_lock<std::mutex> lock(data_mutex);
if (msgs.size())
wakey.wait_until(lock, msgs.begin()->first);
else
wakey.wait(lock);
if (thread_ctrl::state() == thread_state::aborting)
return;
const auto now = steady_clock::now();
// Check for messages that haven't been acked
std::set<s32> rtt_increased;
for (auto it = msgs.begin(); it != msgs.end();)
{
if (it->first > now)
break;
// reply is late, increases rtt
auto& msg = it->second;
const auto addr = msg.dst_addr.sin_addr.s_addr;
rtt_info rtt = rtts[msg.sock_id];
// Only increases rtt once per loop(in case a big number of packets are sent at once)
if (!rtt_increased.count(msg.sock_id))
{
rtt.num_retries += 1;
// Increases current rtt by 10%
rtt.rtt_time += (rtt.rtt_time / 10);
rtts[addr] = rtt;
rtt_increased.emplace(msg.sock_id);
}
if (rtt.num_retries >= 10)
{
// Too many retries, need to notify the socket that the connection is dead
idm::check<lv2_socket>(msg.sock_id, [&](lv2_socket& sock)
{
ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P);
auto& sock_p2ps = reinterpret_cast<lv2_socket_p2ps&>(sock);
sock_p2ps.set_status(p2ps_stream_status::stream_closed);
});
it = msgs.erase(it);
continue;
}
// resend the message
const auto res = idm::check<lv2_socket>(msg.sock_id, [&](lv2_socket& sock) -> bool
{
ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P);
auto& sock_p2ps = reinterpret_cast<lv2_socket_p2ps&>(sock);
if (sendto(sock_p2ps.get_socket(), reinterpret_cast<char*>(msg.data.data()), msg.data.size(), 0, reinterpret_cast<const sockaddr*>(&msg.dst_addr), sizeof(msg.dst_addr)) == -1)
{
sock_p2ps.set_status(p2ps_stream_status::stream_closed);
return false;
}
return true;
});
if (!res || !res.ret)
{
it = msgs.erase(it);
continue;
}
// Update key timeout
msgs.insert(std::make_pair(now + rtt.rtt_time, std::move(msg)));
it = msgs.erase(it);
}
}
}
tcp_timeout_monitor& operator=(thread_state)
{
wakey.notify_one();
return *this;
}
public:
static constexpr auto thread_name = "Tcp Over Udp Timeout Manager Thread"sv;
private:
std::condition_variable wakey;
std::mutex data_mutex;
// List of outgoing messages
struct message
{
s32 sock_id = 0;
::sockaddr_in dst_addr{};
std::vector<u8> data;
u64 seq = 0;
steady_clock::time_point initial_sendtime{};
};
std::map<steady_clock::time_point, message> msgs; // (wakeup time, msg)
// List of rtts
struct rtt_info
{
unsigned long num_retries = 0;
std::chrono::milliseconds rtt_time = 50ms;
};
std::unordered_map<s32, rtt_info> rtts; // (sock_id, rtt)
};
void initialize_tcp_timeout_monitor()
{
g_fxo->need<named_thread<tcp_timeout_monitor>>();
}
u16 u2s_tcp_checksum(const u16* buffer, usz size)
{
u32 cksum = 0;
while (size > 1)
{
cksum += *buffer++;
size -= sizeof(u16);
}
if (size)
cksum += *reinterpret_cast<const u8*>(buffer);
cksum = (cksum >> 16) + (cksum & 0xffff);
cksum += (cksum >> 16);
return static_cast<u16>(~cksum);
}
std::vector<u8> generate_u2s_packet(const p2ps_encapsulated_tcp& header, const u8* data, const u32 datasize)
{
const u32 packet_size = (sizeof(u16) + sizeof(p2ps_encapsulated_tcp) + datasize);
ensure(packet_size < 65535); // packet size shouldn't be bigger than possible UDP payload
std::vector<u8> packet(packet_size);
u8* packet_data = packet.data();
*reinterpret_cast<le_t<u16>*>(packet_data) = header.dst_port;
memcpy(packet_data + sizeof(u16), &header, sizeof(p2ps_encapsulated_tcp));
if (datasize)
memcpy(packet_data + sizeof(u16) + sizeof(p2ps_encapsulated_tcp), data, datasize);
auto* hdr_ptr = reinterpret_cast<p2ps_encapsulated_tcp*>(packet_data + sizeof(u16));
hdr_ptr->checksum = 0;
hdr_ptr->checksum = u2s_tcp_checksum(utils::bless<u16>(hdr_ptr), sizeof(p2ps_encapsulated_tcp) + datasize);
return packet;
}
lv2_socket_p2ps::lv2_socket_p2ps(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol)
: lv2_socket_p2p(family, type, protocol)
{
}
lv2_socket_p2ps::lv2_socket_p2ps(socket_type socket, u16 port, u16 vport, u32 op_addr, u16 op_port, u16 op_vport, u64 cur_seq, u64 data_beg_seq)
: lv2_socket_p2p(SYS_NET_AF_INET, SYS_NET_SOCK_STREAM_P2P, SYS_NET_IPPROTO_IP)
{
this->socket = socket;
this->port = port;
this->vport = vport;
this->op_addr = op_addr;
this->op_port = op_port;
this->op_vport = op_vport;
this->cur_seq = cur_seq;
this->data_beg_seq = data_beg_seq;
status = p2ps_stream_status::stream_connected;
}
bool lv2_socket_p2ps::handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr)
{
std::lock_guard lock(mutex);
if (status != p2ps_stream_status::stream_connected && status != p2ps_stream_status::stream_handshaking)
return false;
nt_p2p_port::dump_packet(tcp_header);
if (tcp_header->flags == p2ps_tcp_flags::ACK)
{
auto& tcpm = g_fxo->get<named_thread<tcp_timeout_monitor>>();
tcpm.confirm_data_received(lv2_id, tcp_header->ack);
}
auto send_ack = [&]()
{
auto final_ack = data_beg_seq;
while (received_data.contains(final_ack))
{
final_ack += received_data.at(final_ack).size();
}
data_available = final_ack - data_beg_seq;
p2ps_encapsulated_tcp send_hdr;
send_hdr.src_port = tcp_header->dst_port;
send_hdr.dst_port = tcp_header->src_port;
send_hdr.flags = p2ps_tcp_flags::ACK;
send_hdr.ack = final_ack;
auto packet = generate_u2s_packet(send_hdr, nullptr, 0);
sys_net.trace("Sent ack %d", final_ack);
send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), 0, false);
// check if polling is happening
if (data_available && events.test_and_reset(lv2_socket::poll_t::read))
{
bs_t<lv2_socket::poll_t> read_event = lv2_socket::poll_t::read;
for (auto it = queue.begin(); it != queue.end();)
{
if (it->second(read_event))
{
it = queue.erase(it);
continue;
}
it++;
}
if (queue.empty())
{
events.store({});
}
}
};
if (status == p2ps_stream_status::stream_handshaking)
{
// Only expect SYN|ACK
if (tcp_header->flags == (p2ps_tcp_flags::SYN | p2ps_tcp_flags::ACK))
{
sys_net.trace("Received SYN|ACK, status is now connected");
data_beg_seq = tcp_header->seq + 1;
status = p2ps_stream_status::stream_connected;
send_ack();
}
return true;
}
else if (status == p2ps_stream_status::stream_connected)
{
if (tcp_header->seq < data_beg_seq)
{
// Data has already been processed
sys_net.trace("Data has already been processed");
if (tcp_header->flags != p2ps_tcp_flags::ACK && tcp_header->flags != p2ps_tcp_flags::RST)
send_ack();
return true;
}
switch (tcp_header->flags)
{
case p2ps_tcp_flags::PSH:
case 0:
{
if (!received_data.count(tcp_header->seq))
{
// New data
received_data.emplace(tcp_header->seq, std::vector<u8>(data, data + tcp_header->length));
}
else
{
sys_net.trace("Data was not new!");
}
send_ack();
return true;
}
case p2ps_tcp_flags::RST:
case p2ps_tcp_flags::FIN:
{
status = p2ps_stream_status::stream_closed;
return false;
}
default:
{
sys_net.error("Unknown U2S TCP flag received");
return true;
}
}
}
return true;
}
bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[maybe_unused]] u8* data, ::sockaddr_storage* op_addr)
{
std::lock_guard lock(mutex);
if (status != p2ps_stream_status::stream_listening)
return false;
// Only valid packet
if (tcp_header->flags == p2ps_tcp_flags::SYN && backlog.size() < max_backlog)
{
if (backlog.size() >= max_backlog)
{
// Send a RST packet on backlog full
sys_net.trace("Backlog was full, sent a RST packet");
p2ps_encapsulated_tcp send_hdr;
send_hdr.src_port = tcp_header->dst_port;
send_hdr.dst_port = tcp_header->src_port;
send_hdr.flags = p2ps_tcp_flags::RST;
auto packet = generate_u2s_packet(send_hdr, nullptr, 0);
send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), 0, false);
}
// Yes, new connection and a backlog is available, create a new lv2_socket for it and send SYN|ACK
// Prepare reply packet
sys_net.notice("Received connection on listening STREAM-P2P socket!");
p2ps_encapsulated_tcp send_hdr;
send_hdr.src_port = tcp_header->dst_port;
send_hdr.dst_port = tcp_header->src_port;
send_hdr.flags = p2ps_tcp_flags::SYN | p2ps_tcp_flags::ACK;
send_hdr.ack = tcp_header->seq + 1;
// Generates random starting SEQ
send_hdr.seq = rand();
// Create new socket
const u32 new_op_addr = reinterpret_cast<struct sockaddr_in*>(op_addr)->sin_addr.s_addr;
const u16 new_op_port = std::bit_cast<u16, be_t<u16>>((reinterpret_cast<struct sockaddr_in*>(op_addr)->sin_port));
const u16 new_op_vport = tcp_header->src_port;
const u64 new_cur_seq = send_hdr.seq + 1;
const u64 new_data_beg_seq = send_hdr.ack;
auto sock_lv2 = std::make_shared<lv2_socket_p2ps>(socket, port, vport, new_op_addr, new_op_port, new_op_vport, new_cur_seq, new_data_beg_seq);
const s32 new_sock_id = idm::import_existing<lv2_socket>(sock_lv2);
sock_lv2->set_lv2_id(new_sock_id);
const u64 key_connected = (reinterpret_cast<struct sockaddr_in*>(op_addr)->sin_addr.s_addr) | (static_cast<u64>(tcp_header->src_port) << 48) | (static_cast<u64>(tcp_header->dst_port) << 32);
{
auto& nc = g_fxo->get<network_context>();
auto& pport = nc.list_p2p_ports.at(port);
pport.bound_p2p_streams.emplace(key_connected, new_sock_id);
}
auto packet = generate_u2s_packet(send_hdr, nullptr, 0);
{
std::lock_guard lock(sock_lv2->mutex);
send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), send_hdr.seq, true);
}
backlog.push(new_sock_id);
if (events.test_and_reset(lv2_socket::poll_t::read))
{
bs_t<lv2_socket::poll_t> read_event = lv2_socket::poll_t::read;
for (auto it = queue.begin(); it != queue.end();)
{
if (it->second(read_event))
{
it = queue.erase(it);
continue;
}
it++;
}
if (queue.empty())
{
events.store({});
}
}
}
else if (tcp_header->flags == p2ps_tcp_flags::SYN)
{
// Send a RST packet on backlog full
sys_net.trace("Backlog was full, sent a RST packet");
p2ps_encapsulated_tcp send_hdr;
send_hdr.src_port = tcp_header->dst_port;
send_hdr.dst_port = tcp_header->src_port;
send_hdr.flags = p2ps_tcp_flags::RST;
auto packet = generate_u2s_packet(send_hdr, nullptr, 0);
send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), 0, false);
}
// Ignore other packets?
return true;
}
void lv2_socket_p2ps::send_u2s_packet(std::vector<u8> data, const ::sockaddr_in* dst, u32 seq, bool require_ack)
{
char ip_str[16];
inet_ntop(AF_INET, &dst->sin_addr, ip_str, sizeof(ip_str));
sys_net.trace("Sending U2S packet on socket %d(id:%d): data(%d, seq %d, require_ack %d) to %s:%d", socket, lv2_id, data.size(), seq, require_ack, ip_str, std::bit_cast<u16, be_t<u16>>(dst->sin_port));
if (::sendto(socket, reinterpret_cast<char*>(data.data()), data.size(), 0, reinterpret_cast<const sockaddr*>(dst), sizeof(sockaddr_in)) == -1)
{
sys_net.error("Attempting to send a u2s packet failed(%s), closing socket!", get_last_error(false));
status = p2ps_stream_status::stream_closed;
return;
}
// Adds to tcp timeout monitor to resend the message until an ack is received
if (require_ack)
{
auto& tcpm = g_fxo->get<named_thread<tcp_timeout_monitor>>();
tcpm.add_message(lv2_id, dst, std::move(data), seq);
}
}
p2ps_stream_status lv2_socket_p2ps::get_status() const
{
return status;
}
void lv2_socket_p2ps::set_status(p2ps_stream_status new_status)
{
status = new_status;
}
std::tuple<bool, s32, sys_net_sockaddr> lv2_socket_p2ps::accept(bool is_lock)
{
std::unique_lock<shared_mutex> lock(mutex, std::defer_lock);
if (is_lock)
{
lock.lock();
}
if (backlog.size() == 0)
{
if (so_nbio)
{
return {true, -SYS_NET_EWOULDBLOCK, {}};
}
return {false, {}, {}};
}
auto p2ps_client = backlog.front();
backlog.pop();
sys_net_sockaddr ps3_addr{};
auto* paddr = reinterpret_cast<sys_net_sockaddr_in_p2p*>(&ps3_addr);
lv2_socket_p2ps* sock_client = reinterpret_cast<lv2_socket_p2ps*>(idm::check_unlocked<lv2_socket>(p2ps_client));
{
std::lock_guard lock(sock_client->mutex);
paddr->sin_family = SYS_NET_AF_INET;
paddr->sin_addr = std::bit_cast<be_t<u32>, u32>(sock_client->op_addr);
paddr->sin_port = sock_client->op_vport;
paddr->sin_vport = sock_client->op_port;
paddr->sin_len = sizeof(sys_net_sockaddr_in_p2p);
}
return {true, p2ps_client, ps3_addr};
}
s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr, s32 ps3_id)
{
const auto* psa_in_p2p = reinterpret_cast<const sys_net_sockaddr_in_p2p*>(&addr);
// For SYS_NET_SOCK_STREAM_P2P sockets, the port is the "fake" tcp port and the vport is the udp port it's bound to
u16 p2p_port = psa_in_p2p->sin_vport;
u16 p2p_vport = psa_in_p2p->sin_port;
sys_net.notice("[P2PS] Trying to bind %s:%d:%d", np::ip_to_string(std::bit_cast<u32>(psa_in_p2p->sin_addr)), p2p_port, p2p_vport);
ensure(p2p_vport != 0);
if (p2p_port != 3658)
{
sys_net.warning("[P2PS] Attempting to bind a socket to a port != 3658");
}
socket_type real_socket{};
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (!nc.list_p2p_ports.contains(p2p_port))
{
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port));
}
auto& pport = nc.list_p2p_ports.at(p2p_port);
real_socket = pport.p2p_socket;
{
// Ensures the socket & the bound list are updated at the same time to avoid races
std::lock_guard vport_lock(pport.bound_p2p_vports_mutex);
std::lock_guard sock_lock(mutex);
const u64 key = (static_cast<u64>(p2p_vport) << 32);
pport.bound_p2p_streams.emplace(key, ps3_id);
port = p2p_port;
vport = p2p_vport;
socket = real_socket;
}
}
return CELL_OK;
}
std::optional<s32> lv2_socket_p2ps::connect(const sys_net_sockaddr& addr)
{
std::lock_guard lock(mutex);
p2ps_encapsulated_tcp send_hdr;
const auto psa_in_p2p = reinterpret_cast<const sys_net_sockaddr_in_p2p*>(&addr);
auto name = sys_net_addr_to_native_addr(addr);
// This is purposefully inverted, not a bug
const u16 dst_vport = psa_in_p2p->sin_port;
const u16 dst_port = psa_in_p2p->sin_vport;
socket_type real_socket{};
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (!nc.list_p2p_ports.contains(port))
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(port), std::forward_as_tuple(port));
auto& pport = nc.list_p2p_ports.at(port);
real_socket = pport.p2p_socket;
{
std::lock_guard lock(pport.bound_p2p_vports_mutex);
if (vport == 0)
{
// Unassigned vport, assigns one
sys_net.warning("[P2PS] vport was unassigned before connect!");
u16 found_vport = 30000;
while (true)
{
found_vport++;
if (pport.bound_p2p_vports.count(found_vport))
continue;
if (pport.bound_p2p_streams.count(static_cast<u64>(found_vport) << 32))
continue;
break;
}
vport = found_vport;
}
const u64 key = name.sin_addr.s_addr | (static_cast<u64>(vport) << 32) | (static_cast<u64>(dst_vport) << 48);
pport.bound_p2p_streams.emplace(key, lv2_id);
}
}
socket = real_socket;
send_hdr.src_port = vport;
send_hdr.dst_port = dst_vport;
send_hdr.flags = p2ps_tcp_flags::SYN;
send_hdr.seq = rand();
// sock.socket = p2p_socket;
op_addr = name.sin_addr.s_addr;
op_port = dst_port;
op_vport = dst_vport;
cur_seq = send_hdr.seq + 1;
data_beg_seq = 0;
data_available = 0u;
received_data.clear();
status = p2ps_stream_status::stream_handshaking;
std::vector<u8> packet = generate_u2s_packet(send_hdr, nullptr, 0);
name.sin_port = std::bit_cast<u16>(psa_in_p2p->sin_vport); // not a bug
send_u2s_packet(std::move(packet), reinterpret_cast<::sockaddr_in*>(&name), send_hdr.seq, true);
return true;
}
s32 lv2_socket_p2ps::listen(s32 backlog)
{
std::lock_guard lock(mutex);
status = p2ps_stream_status::stream_listening;
max_backlog = backlog;
return CELL_OK;
}
std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> lv2_socket_p2ps::recvfrom([[maybe_unused]] s32 flags, u32 len, bool is_lock)
{
std::unique_lock<shared_mutex> lock(mutex, std::defer_lock);
if (is_lock)
{
lock.lock();
}
if (!data_available)
{
if (so_nbio)
{
return {{-SYS_NET_EWOULDBLOCK, {}, {}}};
}
return std::nullopt;
}
const u32 to_give = std::min<u32>(data_available, len);
sys_net_sockaddr addr{};
std::vector<u8> dest_buf(to_give);
sys_net.trace("STREAM-P2P socket had %u available, given %u", data_available, to_give);
u32 left_to_give = to_give;
while (left_to_give)
{
auto& cur_data = received_data.begin()->second;
auto to_give_for_this_packet = std::min(static_cast<u32>(cur_data.size()), left_to_give);
memcpy(dest_buf.data() + (to_give - left_to_give), cur_data.data(), to_give_for_this_packet);
if (cur_data.size() != to_give_for_this_packet)
{
auto amount_left = cur_data.size() - to_give_for_this_packet;
std::vector<u8> new_vec(amount_left);
memcpy(new_vec.data(), cur_data.data() + to_give_for_this_packet, amount_left);
auto new_key = (received_data.begin()->first) + to_give_for_this_packet;
received_data.emplace(new_key, std::move(new_vec));
}
received_data.erase(received_data.begin());
left_to_give -= to_give_for_this_packet;
}
data_available -= to_give;
data_beg_seq += to_give;
sys_net_sockaddr_in_p2p* addr_p2p = reinterpret_cast<sys_net_sockaddr_in_p2p*>(&addr);
addr_p2p->sin_family = AF_INET;
addr_p2p->sin_addr = std::bit_cast<be_t<u32>, u32>(op_addr);
addr_p2p->sin_port = op_vport;
addr_p2p->sin_vport = op_port;
addr_p2p->sin_len = sizeof(sys_net_sockaddr_in_p2p);
return {{to_give, dest_buf, addr}};
}
std::optional<s32> lv2_socket_p2ps::sendto([[maybe_unused]] s32 flags, const std::vector<u8>& buf, std::optional<sys_net_sockaddr> opt_sn_addr, bool is_lock)
{
std::unique_lock<shared_mutex> lock(mutex, std::defer_lock);
if (is_lock)
{
lock.lock();
}
constexpr u32 max_data_len = (65535 - (sizeof(u16) + sizeof(p2ps_encapsulated_tcp)));
::sockaddr_in name{};
if (opt_sn_addr)
{
name = sys_net_addr_to_native_addr(*opt_sn_addr);
}
// Prepare address
name.sin_family = AF_INET;
name.sin_port = std::bit_cast<u16, be_t<u16>>(op_port);
name.sin_addr.s_addr = op_addr;
// Prepares encapsulated tcp
p2ps_encapsulated_tcp tcp_header;
tcp_header.src_port = vport;
tcp_header.dst_port = op_vport;
// chop it up
std::vector<std::vector<u8>> stream_packets;
u32 cur_total_len = buf.size();
while (cur_total_len > 0)
{
u32 cur_data_len = std::min(cur_total_len, max_data_len);
tcp_header.length = cur_data_len;
tcp_header.seq = cur_seq;
auto packet = generate_u2s_packet(tcp_header, &buf[buf.size() - cur_total_len], cur_data_len);
send_u2s_packet(std::move(packet), &name, tcp_header.seq, true);
cur_total_len -= cur_data_len;
cur_seq += cur_data_len;
}
return {buf.size()};
}
void lv2_socket_p2ps::close()
{
if (!port || !vport)
{
return;
}
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard lock(nc.list_p2p_ports_mutex);
ensure(nc.list_p2p_ports.contains(port));
auto& p2p_port = nc.list_p2p_ports.at(port);
{
std::lock_guard lock(p2p_port.bound_p2p_vports_mutex);
for (auto it = p2p_port.bound_p2p_streams.begin(); it != p2p_port.bound_p2p_streams.end();)
{
if (static_cast<u32>(it->second) == lv2_id)
{
it = p2p_port.bound_p2p_streams.erase(it);
continue;
}
it++;
}
}
}
}
s32 lv2_socket_p2ps::shutdown([[maybe_unused]] s32 how)
{
sys_net.todo("[P2PS] shutdown");
return CELL_OK;
}
s32 lv2_socket_p2ps::poll(sys_net_pollfd& sn_pfd, [[maybe_unused]] pollfd& native_pfd)
{
std::lock_guard lock(mutex);
sys_net.trace("[P2PS] poll checking for 0x%X", sn_pfd.events);
if (status == p2ps_stream_status::stream_connected)
{
if ((sn_pfd.events & SYS_NET_POLLIN) && data_available)
{
sys_net.trace("[P2PS] p2ps has %u bytes available", data_available);
sn_pfd.revents |= SYS_NET_POLLIN;
}
// Data can only be written if the socket is connected
if (sn_pfd.events & SYS_NET_POLLOUT && status == p2ps_stream_status::stream_connected)
{
sn_pfd.revents |= SYS_NET_POLLOUT;
}
if (sn_pfd.revents)
{
return 1;
}
}
return 0;
}

View file

@ -0,0 +1,102 @@
#pragma once
#ifdef _WIN32
#include <winsock2.h>
#include <WS2tcpip.h>
#else
#ifdef __clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#endif
#include <netinet/in.h>
#ifdef __clang__
#pragma GCC diagnostic pop
#endif
#endif
#include "lv2_socket_p2p.h"
constexpr be_t<u32> P2PS_U2S_SIG = (static_cast<u32>('U') << 24 | static_cast<u32>('2') << 16 | static_cast<u32>('S') << 8 | static_cast<u32>('0'));
struct p2ps_encapsulated_tcp
{
be_t<u32> signature = P2PS_U2S_SIG; // Signature to verify it's P2P Stream data
be_t<u32> length = 0; // Length of data
be_t<u64> seq = 0; // This should be u32 but changed to u64 for simplicity
be_t<u64> ack = 0;
be_t<u16> src_port = 0; // fake source tcp port
be_t<u16> dst_port = 0; // fake dest tcp port(should be == vport)
be_t<u16> checksum = 0;
u8 flags = 0;
};
enum p2ps_stream_status
{
stream_closed, // Default when port is not listening nor connected
stream_listening, // Stream is listening, accepting SYN packets
stream_handshaking, // Currently handshaking
stream_connected, // This is an established connection(after tcp handshake)
};
enum p2ps_tcp_flags : u8
{
FIN = (1 << 0),
SYN = (1 << 1),
RST = (1 << 2),
PSH = (1 << 3),
ACK = (1 << 4),
URG = (1 << 5),
ECE = (1 << 6),
CWR = (1 << 7),
};
void initialize_tcp_timeout_monitor();
u16 u2s_tcp_checksum(const u16* buffer, usz size);
std::vector<u8> generate_u2s_packet(const p2ps_encapsulated_tcp& header, const u8* data, const u32 datasize);
class lv2_socket_p2ps final : public lv2_socket_p2p
{
public:
lv2_socket_p2ps(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol);
lv2_socket_p2ps(socket_type socket, u16 port, u16 vport, u32 op_addr, u16 op_port, u16 op_vport, u64 cur_seq, u64 data_beg_seq);
p2ps_stream_status get_status() const;
void set_status(p2ps_stream_status new_status);
bool handle_connected(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr);
bool handle_listening(p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr);
void send_u2s_packet(std::vector<u8> data, const ::sockaddr_in* dst, u32 seq, bool require_ack);
std::tuple<bool, s32, sys_net_sockaddr> accept(bool is_lock = true) override;
s32 bind(const sys_net_sockaddr& addr, s32 ps3_id) override;
std::optional<s32> connect(const sys_net_sockaddr& addr) override;
//std::pair<s32, sys_net_sockaddr> getsockname() override;
s32 listen(s32 backlog) override;
std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> recvfrom(s32 flags, u32 len, bool is_lock = true) override;
std::optional<s32> sendto(s32 flags, const std::vector<u8>& buf, std::optional<sys_net_sockaddr> opt_sn_addr, bool is_lock = true) override;
void close() override;
s32 shutdown(s32 how) override;
s32 poll(sys_net_pollfd& sn_pfd, pollfd& native_pfd) override;
protected:
static constexpr usz MAX_RECEIVED_BUFFER = (1024 * 1024 * 10);
p2ps_stream_status status = p2ps_stream_status::stream_closed;
usz max_backlog = 0; // set on listen
std::queue<s32> backlog;
u16 op_port = 0, op_vport = 0;
u32 op_addr = 0;
u64 data_beg_seq = 0; // Seq of first byte of received_data
u64 data_available = 0; // Amount of continuous data available(calculated on ACK send)
std::map<u64, std::vector<u8>> received_data; // holds seq/data of data received
u64 cur_seq = 0; // SEQ of next packet to be sent
};

View file

@ -0,0 +1,98 @@
#include "stdafx.h"
#include "lv2_socket_raw.h"
LOG_CHANNEL(sys_net);
lv2_socket_raw::lv2_socket_raw(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol)
: lv2_socket(family, type, protocol)
{
}
std::tuple<bool, s32, sys_net_sockaddr> lv2_socket_raw::accept([[maybe_unused]] bool is_lock)
{
sys_net.todo("lv2_socket_raw::accept");
return {};
}
s32 lv2_socket_raw::bind([[maybe_unused]] const sys_net_sockaddr& addr, [[maybe_unused]] s32 ps3_id)
{
sys_net.todo("lv2_socket_raw::bind");
return {};
}
std::optional<s32> lv2_socket_raw::connect([[maybe_unused]] const sys_net_sockaddr& addr)
{
sys_net.todo("lv2_socket_raw::connect");
return {};
}
s32 lv2_socket_raw::connect_followup()
{
sys_net.todo("lv2_socket_raw::connect_followup");
return {};
}
std::pair<s32, sys_net_sockaddr> lv2_socket_raw::getpeername()
{
sys_net.todo("lv2_socket_raw::getpeername");
return {{}, {}};
}
std::pair<s32, sys_net_sockaddr> lv2_socket_raw::getsockname()
{
sys_net.todo("lv2_socket_raw::getsockname");
return {};
}
std::tuple<s32, lv2_socket::sockopt_data, u32> lv2_socket_raw::getsockopt([[maybe_unused]] s32 level, [[maybe_unused]] s32 optname, [[maybe_unused]] u32 len)
{
sys_net.todo("lv2_socket_raw::getsockopt");
return {};
}
s32 lv2_socket_raw::setsockopt([[maybe_unused]] s32 level, [[maybe_unused]] s32 optname, [[maybe_unused]] const std::vector<u8>& optval)
{
sys_net.todo("lv2_socket_raw::setsockopt");
return {};
}
s32 lv2_socket_raw::listen([[maybe_unused]] s32 backlog)
{
sys_net.todo("lv2_socket_raw::listen");
return {};
}
std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> lv2_socket_raw::recvfrom([[maybe_unused]] s32 flags, [[maybe_unused]] u32 len, [[maybe_unused]] bool is_lock)
{
sys_net.todo("lv2_socket_raw::recvfrom");
return {};
}
std::optional<s32> lv2_socket_raw::sendto([[maybe_unused]] s32 flags, [[maybe_unused]] const std::vector<u8>& buf, [[maybe_unused]] std::optional<sys_net_sockaddr> opt_sn_addr, [[maybe_unused]] bool is_lock)
{
sys_net.todo("lv2_socket_raw::sendto");
return {};
}
void lv2_socket_raw::close()
{
sys_net.todo("lv2_socket_raw::close");
}
s32 lv2_socket_raw::shutdown([[maybe_unused]] s32 how)
{
sys_net.todo("lv2_socket_raw::shutdown");
return {};
}
s32 lv2_socket_raw::poll([[maybe_unused]] sys_net_pollfd& sn_pfd, [[maybe_unused]] pollfd& native_pfd)
{
sys_net.todo("lv2_socket_raw::poll");
return {};
}
s32 lv2_socket_raw::select([[maybe_unused]] bs_t<lv2_socket::poll_t> selected, [[maybe_unused]] pollfd& native_pfd)
{
sys_net.todo("lv2_socket_raw::select");
return {};
}

View file

@ -0,0 +1,32 @@
#pragma once
#include "lv2_socket.h"
class lv2_socket_raw final : public lv2_socket
{
public:
lv2_socket_raw(lv2_socket_family family, lv2_socket_type type, lv2_ip_protocol protocol);
std::tuple<bool, s32, sys_net_sockaddr> accept(bool is_lock = true) override;
s32 bind(const sys_net_sockaddr& addr, s32 ps3_id) override;
std::optional<s32> connect(const sys_net_sockaddr& addr) override;
s32 connect_followup() override;
std::pair<s32, sys_net_sockaddr> getpeername() override;
std::pair<s32, sys_net_sockaddr> getsockname() override;
std::tuple<s32, sockopt_data, u32> getsockopt(s32 level, s32 optname, u32 len) override;
s32 setsockopt(s32 level, s32 optname, const std::vector<u8>& optval) override;
s32 listen(s32 backlog) override;
std::optional<std::tuple<s32, std::vector<u8>, sys_net_sockaddr>> recvfrom(s32 flags, u32 len, bool is_lock = true) override;
std::optional<s32> sendto(s32 flags, const std::vector<u8>& buf, std::optional<sys_net_sockaddr> opt_sn_addr, bool is_lock = true) override;
void close() override;
s32 shutdown(s32 how) override;
s32 poll(sys_net_pollfd& sn_pfd, pollfd& native_pfd) override;
s32 select(bs_t<poll_t> selected, pollfd& native_pfd) override;
};

View file

@ -0,0 +1,212 @@
#include "stdafx.h"
#include "Emu/Cell/lv2/sys_sync.h"
#include "network_context.h"
#include "Emu/system_config.h"
#include "sys_net_helpers.h"
LOG_CHANNEL(sys_net);
// Used by RPCN to send signaling packets to RPCN server(for UDP hole punching)
s32 send_packet_from_p2p_port(const std::vector<u8>& data, const sockaddr_in& addr)
{
s32 res{};
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (nc.list_p2p_ports.contains(3658))
{
auto& def_port = nc.list_p2p_ports.at(3658);
res = ::sendto(def_port.p2p_socket, reinterpret_cast<const char*>(data.data()), data.size(), 0, reinterpret_cast<const sockaddr*>(&addr), sizeof(sockaddr_in));
}
else
{
sys_net.error("send_packet_from_p2p_port: port %d not present", 3658);
}
}
return res;
}
std::vector<std::vector<u8>> get_rpcn_msgs()
{
std::vector<std::vector<u8>> msgs;
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (nc.list_p2p_ports.contains(3658))
{
auto& def_port = nc.list_p2p_ports.at(3658);
{
std::lock_guard lock(def_port.s_rpcn_mutex);
msgs = std::move(def_port.rpcn_msgs);
def_port.rpcn_msgs.clear();
}
}
else
{
sys_net.error("get_rpcn_msgs: port %d not present", 3658);
}
}
return msgs;
}
std::vector<std::pair<std::pair<u32, u16>, std::vector<u8>>> get_sign_msgs()
{
std::vector<std::pair<std::pair<u32, u16>, std::vector<u8>>> msgs;
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (nc.list_p2p_ports.contains(3658))
{
auto& def_port = nc.list_p2p_ports.at(3658);
{
std::lock_guard lock(def_port.s_sign_mutex);
msgs = std::move(def_port.sign_msgs);
def_port.sign_msgs.clear();
}
}
else
{
sys_net.error("get_sign_msgs: port %d not present", 3658);
}
}
return msgs;
}
void need_network()
{
g_fxo->need<network_context>();
initialize_tcp_timeout_monitor();
}
network_thread::network_thread() noexcept
{
if (g_cfg.net.psn_status == np_psn_status::psn_rpcn)
list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(3658), std::forward_as_tuple(3658));
}
network_thread::~network_thread()
{
}
void network_thread::operator()()
{
std::vector<std::shared_ptr<lv2_socket>> socklist;
socklist.reserve(lv2_socket::id_count);
s_to_awake.clear();
::pollfd fds[lv2_socket::id_count]{};
#ifdef _WIN32
bool connecting[lv2_socket::id_count]{};
bool was_connecting[lv2_socket::id_count]{};
#endif
::pollfd p2p_fd[lv2_socket::id_count]{};
while (thread_ctrl::state() != thread_state::aborting)
{
// Wait with 1ms timeout
#ifdef _WIN32
windows_poll(fds, ::size32(socklist), 1, connecting);
#else
::poll(fds, socklist.size(), 1);
#endif
// Check P2P sockets for incoming packets(timeout could probably be set at 0)
{
std::lock_guard lock(list_p2p_ports_mutex);
std::memset(p2p_fd, 0, sizeof(p2p_fd));
auto num_p2p_sockets = 0;
for (const auto& p2p_port : list_p2p_ports)
{
p2p_fd[num_p2p_sockets].events = POLLIN;
p2p_fd[num_p2p_sockets].revents = 0;
p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket;
num_p2p_sockets++;
}
if (num_p2p_sockets)
{
#ifdef _WIN32
const auto ret_p2p = WSAPoll(p2p_fd, num_p2p_sockets, 1);
#else
const auto ret_p2p = ::poll(p2p_fd, num_p2p_sockets, 1);
#endif
if (ret_p2p > 0)
{
auto fd_index = 0;
for (auto& p2p_port : list_p2p_ports)
{
if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM)
{
while (p2p_port.second.recv_data())
;
}
fd_index++;
}
}
else if (ret_p2p < 0)
{
sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false));
}
}
}
std::lock_guard lock(s_nw_mutex);
for (usz i = 0; i < socklist.size(); i++)
{
#ifdef _WIN32
socklist[i]->handle_events(fds[i], was_connecting[i] && !connecting[i]);
#else
socklist[i]->handle_events(fds[i]);
#endif
}
s_to_awake.erase(std::unique(s_to_awake.begin(), s_to_awake.end()), s_to_awake.end());
for (ppu_thread* ppu : s_to_awake)
{
network_clear_queue(*ppu);
lv2_obj::append(ppu);
}
if (!s_to_awake.empty())
{
lv2_obj::awake_all();
}
s_to_awake.clear();
socklist.clear();
// Obtain all non P2P active sockets
idm::select<lv2_socket>([&](u32 id, lv2_socket& s)
{
if (s.get_type() != SYS_NET_SOCK_DGRAM_P2P && s.get_type() != SYS_NET_SOCK_STREAM_P2P)
{
socklist.emplace_back(idm::get_unlocked<lv2_socket>(id));
}
});
for (usz i = 0; i < socklist.size(); i++)
{
auto events = socklist[i]->get_events();
fds[i].fd = events ? socklist[i]->get_socket() : -1;
fds[i].events =
(events & lv2_socket::poll_t::read ? POLLIN : 0) |
(events & lv2_socket::poll_t::write ? POLLOUT : 0) |
0;
fds[i].revents = 0;
#ifdef _WIN32
const auto cur_connecting = socklist[i]->is_connecting();
was_connecting[i] = connecting;
connecting[i] = connecting;
#endif
}
}
}

View file

@ -0,0 +1,27 @@
#pragma once
#include <vector>
#include <map>
#include "Utilities/mutex.h"
#include "Emu/Cell/PPUThread.h"
#include "nt_p2p_port.h"
struct network_thread
{
std::vector<ppu_thread*> s_to_awake;
shared_mutex s_nw_mutex;
shared_mutex list_p2p_ports_mutex;
std::map<u16, nt_p2p_port> list_p2p_ports{};
static constexpr auto thread_name = "Network Thread";
network_thread() noexcept;
~network_thread();
void operator()();
};
using network_context = named_thread<network_thread>;

View file

@ -0,0 +1,241 @@
#include "stdafx.h"
#include <fcntl.h>
#include "nt_p2p_port.h"
#include "lv2_socket_native.h"
#include "lv2_socket_p2ps.h"
#include "util/asm.hpp"
#include "sys_net_helpers.h"
#include "Emu/NP/signaling_handler.h"
LOG_CHANNEL(sys_net);
nt_p2p_port::nt_p2p_port(u16 port)
: port(port)
{
// Creates and bind P2P Socket
p2p_socket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_IP);
if (p2p_socket == -1)
sys_net.fatal("Failed to create DGRAM socket for P2P socket!");
#ifdef _WIN32
u_long _true = 1;
::ioctlsocket(p2p_socket, FIONBIO, &_true);
#else
::fcntl(p2p_socket, F_SETFL, ::fcntl(p2p_socket, F_GETFL, 0) | O_NONBLOCK);
#endif
u32 optval = 131072; // value obtained from DECR for a SOCK_DGRAM_P2P socket(should maybe be bigger for actual socket?)
if (setsockopt(p2p_socket, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<const char*>(&optval), sizeof(optval)) != 0)
sys_net.fatal("Error setsockopt SO_RCVBUF on P2P socket");
::sockaddr_in p2p_saddr{};
p2p_saddr.sin_family = AF_INET;
p2p_saddr.sin_port = std::bit_cast<u16, be_t<u16>>(port); // htons(port);
p2p_saddr.sin_addr.s_addr = 0; // binds to 0.0.0.0
const auto ret_bind = ::bind(p2p_socket, reinterpret_cast<sockaddr*>(&p2p_saddr), sizeof(p2p_saddr));
if (ret_bind == -1)
sys_net.fatal("Failed to bind DGRAM socket to %d for P2P!", port);
sys_net.notice("P2P port %d was bound!", port);
}
nt_p2p_port::~nt_p2p_port()
{
if (p2p_socket)
{
#ifdef _WIN32
::closesocket(p2p_socket);
#else
::close(p2p_socket);
#endif
}
}
void nt_p2p_port::dump_packet(p2ps_encapsulated_tcp* tcph)
{
sys_net.trace("PACKET DUMP:\nsrc_port: %d\ndst_port: %d\nflags: %d\nseq: %d\nack: %d\nlen: %d", tcph->src_port, tcph->dst_port, tcph->flags, tcph->seq, tcph->ack, tcph->length);
}
bool nt_p2p_port::handle_connected(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr)
{
const auto sock = idm::check<lv2_socket>(sock_id, [&](lv2_socket& sock) -> bool
{
ensure(sock.get_type() == SYS_NET_SOCK_STREAM_P2P);
auto& sock_p2ps = reinterpret_cast<lv2_socket_p2ps&>(sock);
return sock_p2ps.handle_connected(tcp_header, data, op_addr);
});
if (!sock || !sock.ret)
return false;
return true;
}
bool nt_p2p_port::handle_listening(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr)
{
auto sock = idm::get<lv2_socket>(sock_id);
if (!sock)
return false;
auto& sock_p2ps = reinterpret_cast<lv2_socket_p2ps&>(*sock.get());
return sock_p2ps.handle_listening(tcp_header, data, op_addr);
}
bool nt_p2p_port::recv_data()
{
::sockaddr_storage native_addr;
::socklen_t native_addrlen = sizeof(native_addr);
const auto recv_res = ::recvfrom(p2p_socket, reinterpret_cast<char*>(p2p_recv_data.data()), p2p_recv_data.size(), 0, reinterpret_cast<struct sockaddr*>(&native_addr), &native_addrlen);
if (recv_res == -1)
{
auto lerr = get_last_error(false);
if (lerr != SYS_NET_EINPROGRESS && lerr != SYS_NET_EWOULDBLOCK)
sys_net.error("Error recvfrom on P2P socket: %d", lerr);
return false;
}
if (recv_res < static_cast<s32>(sizeof(u16)))
{
sys_net.error("Received badly formed packet on P2P port(no vport)!");
return true;
}
u16 dst_vport = reinterpret_cast<le_t<u16>&>(p2p_recv_data[0]);
if (dst_vport == 0) // Reserved for messages from RPCN server
{
std::vector<u8> rpcn_msg(recv_res - sizeof(u16));
memcpy(rpcn_msg.data(), p2p_recv_data.data() + sizeof(u16), recv_res - sizeof(u16));
std::lock_guard lock(s_rpcn_mutex);
rpcn_msgs.push_back(std::move(rpcn_msg));
return true;
}
if (dst_vport == 65535) // Reserved for signaling
{
std::vector<u8> sign_msg(recv_res - sizeof(u16));
memcpy(sign_msg.data(), p2p_recv_data.data() + sizeof(u16), recv_res - sizeof(u16));
std::pair<std::pair<u32, u16>, std::vector<u8>> msg;
msg.first.first = reinterpret_cast<struct sockaddr_in*>(&native_addr)->sin_addr.s_addr;
msg.first.second = std::bit_cast<u16, be_t<u16>>(reinterpret_cast<struct sockaddr_in*>(&native_addr)->sin_port);
msg.second = std::move(sign_msg);
{
std::lock_guard lock(s_sign_mutex);
sign_msgs.push_back(std::move(msg));
}
auto& sigh = g_fxo->get<named_thread<signaling_handler>>();
sigh.wake_up();
return true;
}
{
std::lock_guard lock(bound_p2p_vports_mutex);
if (bound_p2p_vports.contains(dst_vport))
{
sys_net_sockaddr_in_p2p p2p_addr{};
p2p_addr.sin_len = sizeof(sys_net_sockaddr_in);
p2p_addr.sin_family = SYS_NET_AF_INET;
p2p_addr.sin_addr = std::bit_cast<be_t<u32>, u32>(reinterpret_cast<struct sockaddr_in*>(&native_addr)->sin_addr.s_addr);
p2p_addr.sin_vport = dst_vport;
p2p_addr.sin_port = std::bit_cast<be_t<u16>, u16>(reinterpret_cast<struct sockaddr_in*>(&native_addr)->sin_port);
std::vector<u8> p2p_data(recv_res - sizeof(u16));
memcpy(p2p_data.data(), p2p_recv_data.data() + sizeof(u16), recv_res - sizeof(u16));
const auto sock = idm::check<lv2_socket>(bound_p2p_vports.at(dst_vport), [&](lv2_socket& sock)
{
ensure(sock.get_type() == SYS_NET_SOCK_DGRAM_P2P);
auto& sock_p2p = reinterpret_cast<lv2_socket_p2p&>(sock);
sock_p2p.handle_new_data(std::move(p2p_addr), std::move(p2p_data));
});
// Should not happen in theory
if (!sock)
bound_p2p_vports.erase(dst_vport);
return true;
}
}
// Not directed at a bound DGRAM_P2P vport so check if the packet is a STREAM-P2P packet
const auto sp_size = recv_res - sizeof(u16);
u8* sp_data = p2p_recv_data.data() + sizeof(u16);
if (sp_size < sizeof(p2ps_encapsulated_tcp))
{
sys_net.notice("Received P2P packet targeted at unbound vport(likely) or invalid(vport=%d)", dst_vport);
return true;
}
auto* tcp_header = reinterpret_cast<p2ps_encapsulated_tcp*>(sp_data);
// Validate signature & length
if (tcp_header->signature != P2PS_U2S_SIG)
{
sys_net.notice("Received P2P packet targeted at unbound vport(vport=%d)", dst_vport);
return true;
}
if (tcp_header->length != (sp_size - sizeof(p2ps_encapsulated_tcp)))
{
sys_net.error("Received STREAM-P2P packet tcp length didn't match packet length");
return true;
}
// Sanity check
if (tcp_header->dst_port != dst_vport)
{
sys_net.error("Received STREAM-P2P packet with dst_port != vport");
return true;
}
// Validate checksum
u16 given_checksum = tcp_header->checksum;
tcp_header->checksum = 0;
if (given_checksum != u2s_tcp_checksum(reinterpret_cast<const u16*>(sp_data), sp_size))
{
sys_net.error("Checksum is invalid, dropping packet!");
return true;
}
// The packet is valid, check if it's bound
const u64 key_connected = (reinterpret_cast<struct sockaddr_in*>(&native_addr)->sin_addr.s_addr) | (static_cast<u64>(tcp_header->src_port) << 48) | (static_cast<u64>(tcp_header->dst_port) << 32);
const u64 key_listening = (static_cast<u64>(tcp_header->dst_port) << 32);
{
std::lock_guard lock(bound_p2p_vports_mutex);
if (bound_p2p_streams.contains(key_connected))
{
const auto sock_id = bound_p2p_streams.at(key_connected);
sys_net.trace("Received packet for connected STREAM-P2P socket(s=%d)", sock_id);
handle_connected(sock_id, tcp_header, sp_data + sizeof(p2ps_encapsulated_tcp), &native_addr);
return true;
}
if (bound_p2p_streams.contains(key_listening))
{
const auto sock_id = bound_p2p_streams.at(key_listening);
sys_net.trace("Received packet for listening STREAM-P2P socket(s=%d)", sock_id);
handle_listening(sock_id, tcp_header, sp_data + sizeof(p2ps_encapsulated_tcp), &native_addr);
return true;
}
}
sys_net.notice("Received a STREAM-P2P packet with no bound target");
return true;
}

View file

@ -0,0 +1,51 @@
#pragma once
#include "lv2_socket_p2ps.h"
#ifdef _WIN32
#include <winsock2.h>
#include <WS2tcpip.h>
#else
#ifdef __clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#endif
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#ifdef __clang__
#pragma GCC diagnostic pop
#endif
#endif
struct nt_p2p_port
{
// Real socket where P2P packets are received/sent
socket_type p2p_socket = 0;
u16 port = 0;
shared_mutex bound_p2p_vports_mutex;
// For DGRAM_P2P sockets(vport, sock_id)
std::map<u16, s32> bound_p2p_vports{};
// For STREAM_P2P sockets(key, sock_id)
// key is ( (src_vport) << 48 | (dst_vport) << 32 | addr ) with src_vport and addr being 0 for listening sockets
std::map<u64, s32> bound_p2p_streams{};
// Queued messages from RPCN
shared_mutex s_rpcn_mutex;
std::vector<std::vector<u8>> rpcn_msgs{};
// Queued signaling messages
shared_mutex s_sign_mutex;
std::vector<std::pair<std::pair<u32, u16>, std::vector<u8>>> sign_msgs{};
std::array<u8, 65535> p2p_recv_data{};
nt_p2p_port(u16 port);
~nt_p2p_port();
static void dump_packet(p2ps_encapsulated_tcp* tcph);
bool handle_connected(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr);
bool handle_listening(s32 sock_id, p2ps_encapsulated_tcp* tcp_header, u8* data, ::sockaddr_storage* op_addr);
bool recv_data();
};

View file

@ -0,0 +1,212 @@
#include "stdafx.h"
#include "Emu/IdManager.h"
#include "Emu/Cell/PPUThread.h"
#include "lv2_socket.h"
#include "sys_net_helpers.h"
LOG_CHANNEL(sys_net);
int get_native_error()
{
int native_error;
#ifdef _WIN32
native_error = WSAGetLastError();
#else
native_error = errno;
#endif
return native_error;
}
sys_net_error get_last_error(bool is_blocking, int native_error)
{
// Convert the error code for socket functions to a one for sys_net
sys_net_error result{};
const char* name{};
if (!native_error)
{
native_error = get_native_error();
}
#ifdef _WIN32
#define ERROR_CASE(error) \
case WSA##error: \
result = SYS_NET_##error; \
name = #error; \
break;
#else
#define ERROR_CASE(error) \
case error: \
result = SYS_NET_##error; \
name = #error; \
break;
#endif
switch (native_error)
{
#ifndef _WIN32
ERROR_CASE(ENOENT);
ERROR_CASE(ENOMEM);
ERROR_CASE(EBUSY);
ERROR_CASE(ENOSPC);
ERROR_CASE(EPIPE);
#endif
// TODO: We don't currently support EFAULT or EINTR
// ERROR_CASE(EFAULT);
// ERROR_CASE(EINTR);
ERROR_CASE(EBADF);
ERROR_CASE(EACCES);
ERROR_CASE(EINVAL);
ERROR_CASE(EMFILE);
ERROR_CASE(EWOULDBLOCK);
ERROR_CASE(EINPROGRESS);
ERROR_CASE(EALREADY);
ERROR_CASE(EDESTADDRREQ);
ERROR_CASE(EMSGSIZE);
ERROR_CASE(EPROTOTYPE);
ERROR_CASE(ENOPROTOOPT);
ERROR_CASE(EPROTONOSUPPORT);
ERROR_CASE(EOPNOTSUPP);
ERROR_CASE(EPFNOSUPPORT);
ERROR_CASE(EAFNOSUPPORT);
ERROR_CASE(EADDRINUSE);
ERROR_CASE(EADDRNOTAVAIL);
ERROR_CASE(ENETDOWN);
ERROR_CASE(ENETUNREACH);
ERROR_CASE(ECONNABORTED);
ERROR_CASE(ECONNRESET);
ERROR_CASE(ENOBUFS);
ERROR_CASE(EISCONN);
ERROR_CASE(ENOTCONN);
ERROR_CASE(ESHUTDOWN);
ERROR_CASE(ETOOMANYREFS);
ERROR_CASE(ETIMEDOUT);
ERROR_CASE(ECONNREFUSED);
ERROR_CASE(EHOSTDOWN);
ERROR_CASE(EHOSTUNREACH);
default:
fmt::throw_exception("sys_net get_last_error(is_blocking=%d, native_error=%d): Unknown/illegal socket error", is_blocking, native_error);
}
if (name && result != SYS_NET_EWOULDBLOCK && result != SYS_NET_EINPROGRESS)
{
sys_net.error("Socket error %s", name);
}
if (is_blocking && result == SYS_NET_EWOULDBLOCK)
{
return {};
}
if (is_blocking && result == SYS_NET_EINPROGRESS)
{
return {};
}
return result;
#undef ERROR_CASE
}
sys_net_sockaddr native_addr_to_sys_net_addr(const ::sockaddr_storage& native_addr)
{
ensure(native_addr.ss_family == AF_INET || native_addr.ss_family == AF_UNSPEC);
sys_net_sockaddr sn_addr;
sys_net_sockaddr_in* paddr = reinterpret_cast<sys_net_sockaddr_in*>(&sn_addr);
paddr->sin_len = sizeof(sys_net_sockaddr_in);
paddr->sin_family = SYS_NET_AF_INET;
paddr->sin_port = std::bit_cast<be_t<u16>, u16>(reinterpret_cast<const sockaddr_in*>(&native_addr)->sin_port);
paddr->sin_addr = std::bit_cast<be_t<u32>, u32>(reinterpret_cast<const sockaddr_in*>(&native_addr)->sin_addr.s_addr);
paddr->sin_zero = 0;
return sn_addr;
}
::sockaddr_in sys_net_addr_to_native_addr(const sys_net_sockaddr& sn_addr)
{
ensure(sn_addr.sa_family == SYS_NET_AF_INET);
const sys_net_sockaddr_in* psa_in = reinterpret_cast<const sys_net_sockaddr_in*>(&sn_addr);
::sockaddr_in native_addr{};
native_addr.sin_family = AF_INET;
native_addr.sin_port = std::bit_cast<u16>(psa_in->sin_port);
native_addr.sin_addr.s_addr = std::bit_cast<u32>(psa_in->sin_addr);
#ifdef _WIN32
// Windows doesn't support sending packets to 0.0.0.0 but it works on unixes, send to 127.0.0.1 instead
if (native_addr.sin_addr.s_addr == 0x00000000)
{
sys_net.warning("[Native] Redirected 0.0.0.0 to 127.0.0.1");
native_addr.sin_addr.s_addr = std::bit_cast<u32, be_t<u32>>(0x7F000001);
}
#endif
return native_addr;
}
void network_clear_queue(ppu_thread& ppu)
{
idm::select<lv2_socket>([&](u32, lv2_socket& sock)
{
sock.clear_queue(ppu.id);
});
}
#ifdef _WIN32
// Workaround function for WSAPoll not reporting failed connections
void windows_poll(pollfd* fds, unsigned long nfds, int timeout, bool* connecting)
{
ensure(connecting);
// Don't call WSAPoll with zero nfds (errors 10022 or 10038)
if (std::none_of(fds, fds + nfds, [](pollfd& pfd)
{
return pfd.fd != INVALID_SOCKET;
}))
{
if (timeout > 0)
{
Sleep(timeout);
}
return;
}
int r = ::WSAPoll(fds, nfds, timeout);
if (r == SOCKET_ERROR)
{
sys_net.error("WSAPoll failed: %u", WSAGetLastError());
return;
}
for (unsigned long i = 0; i < nfds; i++)
{
if (connecting[i])
{
if (!fds[i].revents)
{
int error = 0;
socklen_t intlen = sizeof(error);
if (getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &intlen) == -1 || error != 0)
{
// Connection silently failed
connecting[i] = false;
fds[i].revents = POLLERR | POLLHUP | (fds[i].events & (POLLIN | POLLOUT));
}
}
else
{
connecting[i] = false;
}
}
}
}
#endif

View file

@ -0,0 +1,29 @@
#pragma once
#ifdef _WIN32
#include <winsock2.h>
#include <WS2tcpip.h>
#else
#ifdef __clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#endif
#include <sys/socket.h>
#include <netinet/in.h>
#ifdef __clang__
#pragma GCC diagnostic pop
#endif
#endif
#include "Emu/Cell/lv2/sys_net.h"
int get_native_error();
sys_net_error get_last_error(bool is_blocking, int native_error = 0);
sys_net_sockaddr native_addr_to_sys_net_addr(const ::sockaddr_storage& native_addr);
::sockaddr_in sys_net_addr_to_native_addr(const sys_net_sockaddr& sn_addr);
void network_clear_queue(ppu_thread& ppu);
#ifdef _WIN32
void windows_poll(pollfd* fds, unsigned long nfds, int timeout, bool* connecting);
#endif

View file

@ -809,10 +809,6 @@ bool gdb_thread::cmd_remove_breakpoint(gdb_cmd& cmd)
gdb_thread::gdb_thread() noexcept
{
#ifdef _WIN32
WSADATA wsa_data;
WSAStartup(MAKEWORD(2, 2), &wsa_data);
#endif
}
gdb_thread::~gdb_thread()
@ -826,10 +822,6 @@ gdb_thread::~gdb_thread()
{
closesocket(client_socket);
}
#ifdef _WIN32
WSACleanup();
#endif
}
void gdb_thread::operator()()

View file

@ -60,10 +60,6 @@ namespace rpcn
thread_rpcn(std::thread(&rpcn_client::rpcn_thread, this)), thread_rpcn_reader(std::thread(&rpcn_client::rpcn_reader_thread, this)),
thread_rpcn_writer(std::thread(&rpcn_client::rpcn_writer_thread, this))
{
#ifdef _WIN32
WSADATA wsa_data;
WSAStartup(MAKEWORD(2, 2), &wsa_data);
#endif
g_cfg_rpcn.load();
sem_rpcn.release();

View file

@ -161,6 +161,14 @@
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_gpio.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket_native.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket_raw.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket_p2p.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket_p2ps.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net\network_context.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net\nt_p2p_port.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_net\sys_net_helpers.cpp" />
<ClCompile Include="Emu\Cell\Modules\StaticHLE.cpp" />
<ClCompile Include="Emu\Cell\lv2\sys_overlay.cpp" />
<ClCompile Include="Emu\Cell\PPUAnalyser.cpp" />

View file

@ -762,6 +762,30 @@
<ClCompile Include="Emu\Cell\lv2\sys_net.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket_native.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket_raw.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket_p2p.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_net\lv2_socket_p2ps.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_net\network_context.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_net\nt_p2p_port.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Cell\lv2\sys_net\sys_net_helpers.cpp">
<Filter>Emu\Cell\lv2</Filter>
</ClCompile>
<ClCompile Include="Emu\Io\PadHandler.cpp">
<Filter>Emu\Io</Filter>
</ClCompile>

View file

@ -446,6 +446,9 @@ int main(int argc, char** argv)
{
report_fatal_error("Not enough memory for RPCS3 process.");
}
WSADATA wsa_data;
WSAStartup(MAKEWORD(2, 2), &wsa_data);
#endif
ensure(thread_ctrl::is_main(), "Not main thread");

View file

@ -26,6 +26,7 @@
#include "Emu/Cell/lv2/sys_rsx.h"
#include "Emu/Cell/lv2/sys_vm.h"
#include "Emu/Cell/lv2/sys_net.h"
#include "Emu/Cell/lv2/sys_net/lv2_socket.h"
#include "Emu/Cell/lv2/sys_fs.h"
#include "Emu/Cell/lv2/sys_interrupt.h"
#include "Emu/Cell/Modules/cellSpurs.h"
@ -547,7 +548,7 @@ void kernel_explorer::update()
idm::select<lv2_socket>([&](u32 id, lv2_socket& sock)
{
add_leaf(find_node(root, additional_nodes::sockets), qstr(fmt::format("Socket %u: Type: %s, Family: %s, Wq: %zu", id, sock.type, sock.family, sock.queue.size())));
add_leaf(find_node(root, additional_nodes::sockets), qstr(fmt::format("Socket %u: Type: %s, Family: %s, Wq: %zu", id, sock.get_type(), sock.get_family(), sock.get_queue_size())));
});
idm::select<lv2_memory_container>([&](u32 id, lv2_memory_container& container)