fix: stabilize turtle world entry session handling

This commit is contained in:
Kelsi 2026-03-15 01:21:23 -07:00
parent 4dba20b757
commit b0fafe5efa
20 changed files with 2283 additions and 1380 deletions

View file

@ -1,6 +1,7 @@
#include "network/world_socket.hpp"
#include "network/packet.hpp"
#include "network/net_platform.hpp"
#include "game/opcode_table.hpp"
#include "auth/crypto.hpp"
#include "core/logger.hpp"
#include <iomanip>
@ -9,10 +10,49 @@
#include <fstream>
#include <cstdlib>
#include <cstring>
#include <chrono>
#include <thread>
namespace {
constexpr size_t kMaxReceiveBufferBytes = 8 * 1024 * 1024;
constexpr int kMaxParsedPacketsPerUpdate = 220;
constexpr int kDefaultMaxParsedPacketsPerUpdate = 16;
constexpr int kAbsoluteMaxParsedPacketsPerUpdate = 220;
constexpr int kMinParsedPacketsPerUpdate = 8;
constexpr int kDefaultMaxPacketCallbacksPerUpdate = 6;
constexpr int kAbsoluteMaxPacketCallbacksPerUpdate = 64;
constexpr int kMinPacketCallbacksPerUpdate = 1;
constexpr int kMaxRecvCallsPerUpdate = 64;
constexpr size_t kMaxRecvBytesPerUpdate = 512 * 1024;
constexpr size_t kMaxQueuedPacketCallbacks = 4096;
constexpr int kAsyncPumpSleepMs = 2;
inline int parsedPacketsBudgetPerUpdate() {
static int budget = []() {
const char* raw = std::getenv("WOWEE_NET_MAX_PARSED_PACKETS");
if (!raw || !*raw) return kDefaultMaxParsedPacketsPerUpdate;
char* end = nullptr;
long parsed = std::strtol(raw, &end, 10);
if (end == raw) return kDefaultMaxParsedPacketsPerUpdate;
if (parsed < kMinParsedPacketsPerUpdate) return kMinParsedPacketsPerUpdate;
if (parsed > kAbsoluteMaxParsedPacketsPerUpdate) return kAbsoluteMaxParsedPacketsPerUpdate;
return static_cast<int>(parsed);
}();
return budget;
}
inline int packetCallbacksBudgetPerUpdate() {
static int budget = []() {
const char* raw = std::getenv("WOWEE_NET_MAX_PACKET_CALLBACKS");
if (!raw || !*raw) return kDefaultMaxPacketCallbacksPerUpdate;
char* end = nullptr;
long parsed = std::strtol(raw, &end, 10);
if (end == raw) return kDefaultMaxPacketCallbacksPerUpdate;
if (parsed < kMinPacketCallbacksPerUpdate) return kMinPacketCallbacksPerUpdate;
if (parsed > kAbsoluteMaxPacketCallbacksPerUpdate) return kAbsoluteMaxPacketCallbacksPerUpdate;
return static_cast<int>(parsed);
}();
return budget;
}
inline bool isLoginPipelineSmsg(uint16_t opcode) {
switch (opcode) {
@ -49,6 +89,14 @@ inline bool envFlagEnabled(const char* key, bool defaultValue = false) {
return !(raw[0] == '0' || raw[0] == 'f' || raw[0] == 'F' ||
raw[0] == 'n' || raw[0] == 'N');
}
const char* opcodeNameForTrace(uint16_t wireOpcode) {
const auto* table = wowee::game::getActiveOpcodeTable();
if (!table) return "UNKNOWN";
auto logical = table->fromWire(wireOpcode);
if (!logical) return "UNKNOWN";
return wowee::game::OpcodeTable::logicalToName(*logical);
}
} // namespace
namespace wowee {
@ -71,6 +119,7 @@ WorldSocket::WorldSocket() {
receiveBuffer.reserve(64 * 1024);
useFastRecvAppend_ = envFlagEnabled("WOWEE_NET_FAST_RECV_APPEND", true);
useParseScratchQueue_ = envFlagEnabled("WOWEE_NET_PARSE_SCRATCH", false);
useAsyncPump_ = envFlagEnabled("WOWEE_NET_ASYNC_PUMP", true);
if (useParseScratchQueue_) {
LOG_WARNING("WOWEE_NET_PARSE_SCRATCH is temporarily disabled (known unstable); forcing off");
useParseScratchQueue_ = false;
@ -79,7 +128,10 @@ WorldSocket::WorldSocket() {
parsedPacketsScratch_.reserve(64);
}
LOG_INFO("WorldSocket net opts: fast_recv_append=", useFastRecvAppend_ ? "on" : "off",
" parse_scratch=", useParseScratchQueue_ ? "on" : "off");
" async_pump=", useAsyncPump_ ? "on" : "off",
" parse_scratch=", useParseScratchQueue_ ? "on" : "off",
" max_parsed_packets=", parsedPacketsBudgetPerUpdate(),
" max_packet_callbacks=", packetCallbacksBudgetPerUpdate());
}
WorldSocket::~WorldSocket() {
@ -89,6 +141,8 @@ WorldSocket::~WorldSocket() {
bool WorldSocket::connect(const std::string& host, uint16_t port) {
LOG_INFO("Connecting to world server: ", host, ":", port);
stopAsyncPump();
// Create socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == INVALID_SOCK) {
@ -165,32 +219,59 @@ bool WorldSocket::connect(const std::string& host, uint16_t port) {
connected = true;
LOG_INFO("Connected to world server: ", host, ":", port);
startAsyncPump();
return true;
}
void WorldSocket::disconnect() {
stopAsyncPump();
{
std::lock_guard<std::mutex> lock(ioMutex_);
closeSocketNoJoin();
encryptionEnabled = false;
useVanillaCrypt = false;
receiveBuffer.clear();
receiveReadOffset_ = 0;
parsedPacketsScratch_.clear();
headerBytesDecrypted = 0;
packetTraceStart_ = {};
packetTraceUntil_ = {};
packetTraceReason_.clear();
}
{
std::lock_guard<std::mutex> lock(callbackMutex_);
pendingPacketCallbacks_.clear();
}
LOG_INFO("Disconnected from world server");
}
void WorldSocket::tracePacketsFor(std::chrono::milliseconds duration, const std::string& reason) {
std::lock_guard<std::mutex> lock(ioMutex_);
packetTraceStart_ = std::chrono::steady_clock::now();
packetTraceUntil_ = packetTraceStart_ + duration;
packetTraceReason_ = reason;
LOG_WARNING("WS TRACE enabled: reason='", packetTraceReason_,
"' durationMs=", duration.count());
}
bool WorldSocket::isConnected() const {
std::lock_guard<std::mutex> lock(ioMutex_);
return connected;
}
void WorldSocket::closeSocketNoJoin() {
if (sockfd != INVALID_SOCK) {
net::closeSocket(sockfd);
sockfd = INVALID_SOCK;
}
connected = false;
encryptionEnabled = false;
useVanillaCrypt = false;
receiveBuffer.clear();
receiveReadOffset_ = 0;
parsedPacketsScratch_.clear();
headerBytesDecrypted = 0;
LOG_INFO("Disconnected from world server");
}
bool WorldSocket::isConnected() const {
return connected;
}
void WorldSocket::send(const Packet& packet) {
if (!connected) return;
static const bool kLogCharCreatePayload = envFlagEnabled("WOWEE_NET_LOG_CHAR_CREATE", false);
static const bool kLogSwapItemPackets = envFlagEnabled("WOWEE_NET_LOG_SWAP_ITEM", false);
std::lock_guard<std::mutex> lock(ioMutex_);
if (!connected || sockfd == INVALID_SOCK) return;
const auto& data = packet.getData();
uint16_t opcode = packet.getOpcode();
@ -254,6 +335,17 @@ void WorldSocket::send(const Packet& packet) {
LOG_INFO("WS TX opcode=0x", std::hex, opcode, std::dec, " payloadLen=", payloadLen, " data=[", hex, "]");
}
const auto traceNow = std::chrono::steady_clock::now();
if (packetTraceUntil_ > traceNow) {
const auto elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(
traceNow - packetTraceStart_).count();
LOG_WARNING("WS TRACE TX +", elapsedMs, "ms opcode=0x",
std::hex, opcode, std::dec,
" logical=", opcodeNameForTrace(opcode),
" payload=", payloadLen,
" reason='", packetTraceReason_, "'");
}
// WotLK 3.3.5 CMSG header (6 bytes total):
// - size (2 bytes, big-endian) = payloadLen + 4 (opcode is 4 bytes for CMSG)
// - opcode (4 bytes, little-endian)
@ -317,7 +409,46 @@ void WorldSocket::send(const Packet& packet) {
}
void WorldSocket::update() {
if (!connected) return;
if (!useAsyncPump_) {
pumpNetworkIO();
}
dispatchQueuedPackets();
}
void WorldSocket::startAsyncPump() {
if (!useAsyncPump_ || asyncPumpRunning_.load(std::memory_order_acquire)) {
return;
}
asyncPumpStop_.store(false, std::memory_order_release);
asyncPumpThread_ = std::thread(&WorldSocket::asyncPumpLoop, this);
}
void WorldSocket::stopAsyncPump() {
asyncPumpStop_.store(true, std::memory_order_release);
if (asyncPumpThread_.joinable()) {
asyncPumpThread_.join();
}
asyncPumpRunning_.store(false, std::memory_order_release);
}
void WorldSocket::asyncPumpLoop() {
asyncPumpRunning_.store(true, std::memory_order_release);
while (!asyncPumpStop_.load(std::memory_order_acquire)) {
pumpNetworkIO();
{
std::lock_guard<std::mutex> lock(ioMutex_);
if (!connected) {
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(kAsyncPumpSleepMs));
}
asyncPumpRunning_.store(false, std::memory_order_release);
}
void WorldSocket::pumpNetworkIO() {
std::lock_guard<std::mutex> lock(ioMutex_);
if (!connected || sockfd == INVALID_SOCK) return;
auto bufferedBytes = [&]() -> size_t {
return (receiveBuffer.size() >= receiveReadOffset_)
? (receiveBuffer.size() - receiveReadOffset_)
@ -343,7 +474,8 @@ void WorldSocket::update() {
bool receivedAny = false;
size_t bytesReadThisTick = 0;
int readOps = 0;
while (connected) {
while (connected && readOps < kMaxRecvCallsPerUpdate &&
bytesReadThisTick < kMaxRecvBytesPerUpdate) {
uint8_t buffer[4096];
ssize_t received = net::portableRecv(sockfd, buffer, sizeof(buffer));
@ -362,7 +494,7 @@ void WorldSocket::update() {
LOG_ERROR("World socket receive buffer would overflow (buffered=", liveBytes,
" incoming=", receivedSize, " max=", kMaxReceiveBufferBytes,
"). Disconnecting to recover framing.");
disconnect();
closeSocketNoJoin();
return;
}
const size_t oldSize = receiveBuffer.size();
@ -375,7 +507,7 @@ void WorldSocket::update() {
if (newCap < needed) {
LOG_ERROR("World socket receive buffer capacity growth failed (needed=", needed,
" max=", kMaxReceiveBufferBytes, "). Disconnecting to recover framing.");
disconnect();
closeSocketNoJoin();
return;
}
receiveBuffer.reserve(newCap);
@ -387,7 +519,7 @@ void WorldSocket::update() {
if (bufferedBytes() > kMaxReceiveBufferBytes) {
LOG_ERROR("World socket receive buffer overflow (", bufferedBytes(),
" bytes). Disconnecting to recover framing.");
disconnect();
closeSocketNoJoin();
return;
}
continue;
@ -409,7 +541,7 @@ void WorldSocket::update() {
}
LOG_ERROR("Receive failed: ", net::errorString(err));
disconnect();
closeSocketNoJoin();
return;
}
@ -434,10 +566,15 @@ void WorldSocket::update() {
}
}
if (connected && (readOps >= kMaxRecvCallsPerUpdate || bytesReadThisTick >= kMaxRecvBytesPerUpdate)) {
LOG_DEBUG("World socket recv budget reached (calls=", readOps,
", bytes=", bytesReadThisTick, "), deferring remaining socket drain");
}
if (sawClose) {
LOG_INFO("World server connection closed (receivedAny=", receivedAny,
" buffered=", bufferedBytes(), ")");
disconnect();
closeSocketNoJoin();
return;
}
}
@ -462,7 +599,8 @@ void WorldSocket::tryParsePackets() {
} else {
parsedPacketsLocal.reserve(32);
}
while ((receiveBuffer.size() - parseOffset) >= 4 && parsedThisTick < kMaxParsedPacketsPerUpdate) {
const int maxParsedThisTick = parsedPacketsBudgetPerUpdate();
while ((receiveBuffer.size() - parseOffset) >= 4 && parsedThisTick < maxParsedThisTick) {
uint8_t rawHeader[4] = {0, 0, 0, 0};
std::memcpy(rawHeader, receiveBuffer.data() + parseOffset, 4);
@ -491,7 +629,7 @@ void WorldSocket::tryParsePackets() {
static_cast<int>(rawHeader[2]), " ",
static_cast<int>(rawHeader[3]), std::dec,
" enc=", encryptionEnabled, ". Disconnecting to recover stream.");
disconnect();
closeSocketNoJoin();
return;
}
constexpr uint16_t kMaxWorldPacketSize = 0x4000;
@ -503,7 +641,7 @@ void WorldSocket::tryParsePackets() {
static_cast<int>(rawHeader[2]), " ",
static_cast<int>(rawHeader[3]), std::dec,
" enc=", encryptionEnabled, ". Disconnecting to recover stream.");
disconnect();
closeSocketNoJoin();
return;
}
@ -535,6 +673,16 @@ void WorldSocket::tryParsePackets() {
" buffered=", (receiveBuffer.size() - parseOffset),
" enc=", encryptionEnabled ? "yes" : "no");
}
const auto traceNow = std::chrono::steady_clock::now();
if (packetTraceUntil_ > traceNow) {
const auto elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(
traceNow - packetTraceStart_).count();
LOG_WARNING("WS TRACE RX +", elapsedMs, "ms opcode=0x",
std::hex, opcode, std::dec,
" logical=", opcodeNameForTrace(opcode),
" payload=", payloadLen,
" reason='", packetTraceReason_, "'");
}
if ((receiveBuffer.size() - parseOffset) < totalSize) {
// Not enough data yet - header stays decrypted in buffer
@ -555,7 +703,7 @@ void WorldSocket::tryParsePackets() {
" payload=", payloadLen, " buffered=", receiveBuffer.size(),
" parseOffset=", parseOffset, " what=", e.what(),
". Disconnecting to recover.");
disconnect();
closeSocketNoJoin();
return;
}
parseOffset += totalSize;
@ -578,23 +726,57 @@ void WorldSocket::tryParsePackets() {
}
headerBytesDecrypted = localHeaderBytesDecrypted;
if (packetCallback) {
for (const auto& packet : *parsedPackets) {
if (!connected) break;
packetCallback(packet);
// Queue parsed packets for main-thread dispatch.
if (!parsedPackets->empty()) {
std::lock_guard<std::mutex> callbackLock(callbackMutex_);
for (auto& packet : *parsedPackets) {
pendingPacketCallbacks_.push_back(std::move(packet));
}
if (pendingPacketCallbacks_.size() > kMaxQueuedPacketCallbacks) {
LOG_ERROR("World socket callback queue overflow (", pendingPacketCallbacks_.size(),
" packets). Disconnecting to recover.");
pendingPacketCallbacks_.clear();
closeSocketNoJoin();
return;
}
}
const size_t buffered = (receiveBuffer.size() >= receiveReadOffset_)
? (receiveBuffer.size() - receiveReadOffset_)
: 0;
if (parsedThisTick >= kMaxParsedPacketsPerUpdate && buffered >= 4) {
if (parsedThisTick >= maxParsedThisTick && buffered >= 4) {
LOG_DEBUG("World socket parse budget reached (", parsedThisTick,
" packets); deferring remaining buffered data=", buffered, " bytes");
}
}
void WorldSocket::dispatchQueuedPackets() {
std::deque<Packet> localPackets;
{
std::lock_guard<std::mutex> lock(callbackMutex_);
if (!packetCallback || pendingPacketCallbacks_.empty()) {
return;
}
const int maxCallbacksThisTick = packetCallbacksBudgetPerUpdate();
for (int i = 0; i < maxCallbacksThisTick && !pendingPacketCallbacks_.empty(); ++i) {
localPackets.push_back(std::move(pendingPacketCallbacks_.front()));
pendingPacketCallbacks_.pop_front();
}
if (!pendingPacketCallbacks_.empty()) {
LOG_DEBUG("World socket callback budget reached (", localPackets.size(),
" callbacks); deferring ", pendingPacketCallbacks_.size(),
" queued packet callbacks");
}
}
while (!localPackets.empty()) {
packetCallback(localPackets.front());
localPackets.pop_front();
}
}
void WorldSocket::initEncryption(const std::vector<uint8_t>& sessionKey, uint32_t build) {
std::lock_guard<std::mutex> lock(ioMutex_);
if (sessionKey.size() != 40) {
LOG_ERROR("Invalid session key size: ", sessionKey.size(), " (expected 40)");
return;