Stabilize net parsing and reduce texture-cache churn

This commit is contained in:
Kelsi 2026-02-22 07:44:32 -08:00
parent ae88b226b5
commit 6d55c19987
7 changed files with 143 additions and 27 deletions

View file

@ -7,6 +7,8 @@
#include <sstream>
#include <cstdio>
#include <fstream>
#include <cstdlib>
#include <cstring>
namespace {
constexpr size_t kMaxReceiveBufferBytes = 8 * 1024 * 1024;
@ -40,6 +42,13 @@ inline bool isLoginPipelineCmsg(uint16_t opcode) {
return false;
}
}
inline bool envFlagEnabled(const char* key, bool defaultValue = false) {
const char* raw = std::getenv(key);
if (!raw || !*raw) return defaultValue;
return !(raw[0] == '0' || raw[0] == 'f' || raw[0] == 'F' ||
raw[0] == 'n' || raw[0] == 'N');
}
} // namespace
namespace wowee {
@ -58,6 +67,19 @@ static const uint8_t DECRYPT_KEY[] = {
WorldSocket::WorldSocket() {
net::ensureInit();
// Always reserve baseline receive capacity (safe, behavior-preserving).
receiveBuffer.reserve(64 * 1024);
useFastRecvAppend_ = envFlagEnabled("WOWEE_NET_FAST_RECV_APPEND", false);
useParseScratchQueue_ = envFlagEnabled("WOWEE_NET_PARSE_SCRATCH", false);
if (useParseScratchQueue_) {
LOG_WARNING("WOWEE_NET_PARSE_SCRATCH is temporarily disabled (known unstable); forcing off");
useParseScratchQueue_ = false;
}
if (useParseScratchQueue_) {
parsedPacketsScratch_.reserve(64);
}
LOG_INFO("WorldSocket net opts: fast_recv_append=", useFastRecvAppend_ ? "on" : "off",
" parse_scratch=", useParseScratchQueue_ ? "on" : "off");
}
WorldSocket::~WorldSocket() {
@ -118,6 +140,7 @@ void WorldSocket::disconnect() {
encryptionEnabled = false;
useVanillaCrypt = false;
receiveBuffer.clear();
parsedPacketsScratch_.clear();
headerBytesDecrypted = 0;
LOG_INFO("Disconnected from world server");
}
@ -270,8 +293,22 @@ void WorldSocket::update() {
if (received > 0) {
receivedAny = true;
++readOps;
bytesReadThisTick += static_cast<size_t>(received);
receiveBuffer.insert(receiveBuffer.end(), buffer, buffer + received);
size_t receivedSize = static_cast<size_t>(received);
bytesReadThisTick += receivedSize;
if (useFastRecvAppend_) {
size_t oldSize = receiveBuffer.size();
if (oldSize > kMaxReceiveBufferBytes || receivedSize > (kMaxReceiveBufferBytes - oldSize)) {
LOG_ERROR("World socket receive buffer would overflow (old=", oldSize,
" incoming=", receivedSize, " max=", kMaxReceiveBufferBytes,
"). Disconnecting to recover framing.");
disconnect();
return;
}
receiveBuffer.resize(oldSize + receivedSize);
std::memcpy(receiveBuffer.data() + oldSize, buffer, receivedSize);
} else {
receiveBuffer.insert(receiveBuffer.end(), buffer, buffer + received);
}
if (receiveBuffer.size() > kMaxReceiveBufferBytes) {
LOG_ERROR("World socket receive buffer overflow (", receiveBuffer.size(),
" bytes). Disconnecting to recover framing.");
@ -327,8 +364,21 @@ void WorldSocket::tryParsePackets() {
int parsedThisTick = 0;
size_t parseOffset = 0;
size_t localHeaderBytesDecrypted = headerBytesDecrypted;
std::vector<Packet> parsedPackets;
parsedPackets.reserve(32);
std::vector<Packet> parsedPacketsLocal;
std::vector<Packet>* parsedPackets = &parsedPacketsLocal;
if (useParseScratchQueue_) {
parsedPacketsScratch_.clear();
// Keep a warm queue to reduce steady-state allocations, but avoid
// retaining pathological capacity after burst/misaligned streams.
if (parsedPacketsScratch_.capacity() > 1024) {
std::vector<Packet>().swap(parsedPacketsScratch_);
} else if (parsedPacketsScratch_.capacity() < 64) {
parsedPacketsScratch_.reserve(64);
}
parsedPackets = &parsedPacketsScratch_;
} else {
parsedPacketsLocal.reserve(32);
}
while ((receiveBuffer.size() - parseOffset) >= 4 && parsedThisTick < kMaxParsedPacketsPerUpdate) {
uint8_t rawHeader[4] = {0, 0, 0, 0};
std::memcpy(rawHeader, receiveBuffer.data() + parseOffset, 4);
@ -408,12 +458,23 @@ void WorldSocket::tryParsePackets() {
break;
}
// Extract payload (skip header)
std::vector<uint8_t> packetData(receiveBuffer.begin() + parseOffset + 4,
receiveBuffer.begin() + parseOffset + totalSize);
// Queue packet; callbacks run after buffer state is finalized.
parsedPackets.emplace_back(opcode, std::move(packetData));
// Extract payload (skip header). Guard allocation failures so malformed
// streams cannot unwind into application-level OOM crashes.
try {
std::vector<uint8_t> packetData(payloadLen);
if (payloadLen > 0) {
std::memcpy(packetData.data(), receiveBuffer.data() + parseOffset + 4, payloadLen);
}
// Queue packet; callbacks run after buffer state is finalized.
parsedPackets->emplace_back(opcode, std::move(packetData));
} catch (const std::bad_alloc& e) {
LOG_ERROR("OOM while queuing world packet opcode=0x", std::hex, opcode, std::dec,
" payload=", payloadLen, " buffered=", receiveBuffer.size(),
" parseOffset=", parseOffset, " what=", e.what(),
". Disconnecting to recover.");
disconnect();
return;
}
parseOffset += totalSize;
localHeaderBytesDecrypted = 0;
++parsedThisTick;
@ -425,7 +486,7 @@ void WorldSocket::tryParsePackets() {
headerBytesDecrypted = localHeaderBytesDecrypted;
if (packetCallback) {
for (const auto& packet : parsedPackets) {
for (const auto& packet : *parsedPackets) {
if (!connected) break;
packetCallback(packet);
}