feat(Core/Compression): Move packets compression from map to network thread (#18602)

* feat(Code/Compression): Move packets compression from map to network thread.

* Code style fix

* Remove unicode letter
This commit is contained in:
Anton Popovichenko
2024-03-28 12:57:29 +01:00
committed by GitHub
parent 3ff8de2086
commit 73340b94e3
14 changed files with 122 additions and 118 deletions

View File

@@ -305,7 +305,7 @@ void BattlegroundSA::StartShips()
UpdateData data;
WorldPacket pkt;
GetBGObject(i)->BuildValuesUpdateBlockForPlayer(&data, itr->second);
data.BuildPacket(&pkt);
data.BuildPacket(pkt);
itr->second->GetSession()->SendPacket(&pkt);
}
}
@@ -1106,7 +1106,7 @@ void BattlegroundSA::SendTransportInit(Player* player)
if (BgObjects[BG_SA_BOAT_TWO])
GetBGObject(BG_SA_BOAT_TWO)->BuildCreateUpdateBlockForPlayer(&transData, player);
WorldPacket packet;
transData.BuildPacket(&packet);
transData.BuildPacket(packet);
player->GetSession()->SendPacket(&packet);
}
}
@@ -1121,7 +1121,7 @@ void BattlegroundSA::SendTransportsRemove(Player* player)
if (BgObjects[BG_SA_BOAT_TWO])
GetBGObject(BG_SA_BOAT_TWO)->BuildOutOfRangeUpdateBlock(&transData);
WorldPacket packet;
transData.BuildPacket(&packet);
transData.BuildPacket(packet);
player->GetSession()->SendPacket(&packet);
}
}

View File

@@ -512,7 +512,7 @@ void GameObject::Update(uint32 diff)
UpdateData udata;
WorldPacket packet;
BuildValuesUpdateBlockForPlayer(&udata, caster->ToPlayer());
udata.BuildPacket(&packet);
udata.BuildPacket(packet);
caster->ToPlayer()->GetSession()->SendPacket(&packet);
SendCustomAnim(GetGoAnimProgress());

View File

@@ -250,7 +250,7 @@ void Object::SendUpdateToPlayer(Player* player)
WorldPacket packet;
BuildCreateUpdateBlockForPlayer(&upd, player);
upd.BuildPacket(&packet);
upd.BuildPacket(packet);
player->GetSession()->SendPacket(&packet);
}

View File

@@ -22,7 +22,6 @@
#include "Opcodes.h"
#include "World.h"
#include "WorldPacket.h"
#include "zlib.h"
UpdateData::UpdateData() : m_blockCount(0)
{
@@ -46,103 +45,25 @@ void UpdateData::AddUpdateBlock(const UpdateData& block)
m_blockCount += block.m_blockCount;
}
void UpdateData::Compress(void* dst, uint32* dst_size, void* src, int src_size)
bool UpdateData::BuildPacket(WorldPacket& packet)
{
z_stream c_stream;
ASSERT(packet.empty());
c_stream.zalloc = (alloc_func)0;
c_stream.zfree = (free_func)0;
c_stream.opaque = (voidpf)0;
packet.reserve(4 + (m_outOfRangeGUIDs.empty() ? 0 : 1 + 4 + 9 * m_outOfRangeGUIDs.size()) + m_data.wpos());
// default Z_BEST_SPEED (1)
int z_res = deflateInit(&c_stream, sWorld->getIntConfig(CONFIG_COMPRESSION));
if (z_res != Z_OK)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflateInit) Error code: {} ({})", z_res, zError(z_res));
*dst_size = 0;
return;
}
c_stream.next_out = (Bytef*)dst;
c_stream.avail_out = *dst_size;
c_stream.next_in = (Bytef*)src;
c_stream.avail_in = (uInt)src_size;
z_res = deflate(&c_stream, Z_NO_FLUSH);
if (z_res != Z_OK)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate) Error code: {} ({})", z_res, zError(z_res));
*dst_size = 0;
return;
}
if (c_stream.avail_in != 0)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate not greedy)");
*dst_size = 0;
return;
}
z_res = deflate(&c_stream, Z_FINISH);
if (z_res != Z_STREAM_END)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate should report Z_STREAM_END instead {} ({})", z_res, zError(z_res));
*dst_size = 0;
return;
}
z_res = deflateEnd(&c_stream);
if (z_res != Z_OK)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflateEnd) Error code: {} ({})", z_res, zError(z_res));
*dst_size = 0;
return;
}
*dst_size = c_stream.total_out;
}
bool UpdateData::BuildPacket(WorldPacket* packet)
{
ASSERT(packet->empty()); // shouldn't happen
ByteBuffer buf(4 + (m_outOfRangeGUIDs.empty() ? 0 : 1 + 4 + 9 * m_outOfRangeGUIDs.size()) + m_data.wpos());
buf << (uint32) (!m_outOfRangeGUIDs.empty() ? m_blockCount + 1 : m_blockCount);
packet << (uint32) (!m_outOfRangeGUIDs.empty() ? m_blockCount + 1 : m_blockCount);
if (!m_outOfRangeGUIDs.empty())
{
buf << (uint8) UPDATETYPE_OUT_OF_RANGE_OBJECTS;
buf << (uint32) m_outOfRangeGUIDs.size();
packet << (uint8) UPDATETYPE_OUT_OF_RANGE_OBJECTS;
packet << (uint32) m_outOfRangeGUIDs.size();
for (ObjectGuid const& guid : m_outOfRangeGUIDs)
{
buf << guid.WriteAsPacked();
}
packet << guid.WriteAsPacked();
}
buf.append(m_data);
size_t pSize = buf.wpos(); // use real used data size
if (pSize > 100) // compress large packets
{
uint32 destsize = compressBound(pSize);
packet->resize(destsize + sizeof(uint32));
packet->put<uint32>(0, pSize);
Compress(const_cast<uint8*>(packet->contents()) + sizeof(uint32), &destsize, (void*)buf.contents(), pSize);
if (destsize == 0)
return false;
packet->resize(destsize + sizeof(uint32));
packet->SetOpcode(SMSG_COMPRESSED_UPDATE_OBJECT);
}
else // send small packets without compression
{
packet->append(buf);
packet->SetOpcode(SMSG_UPDATE_OBJECT);
}
packet.append(m_data);
packet.SetOpcode(SMSG_UPDATE_OBJECT);
return true;
}

View File

@@ -56,7 +56,7 @@ public:
void AddOutOfRangeGUID(ObjectGuid guid);
void AddUpdateBlock(const ByteBuffer& block);
void AddUpdateBlock(const UpdateData& block);
bool BuildPacket(WorldPacket* packet);
bool BuildPacket(WorldPacket& packet);
[[nodiscard]] bool HasData() const { return m_blockCount > 0 || !m_outOfRangeGUIDs.empty(); }
void Clear();
@@ -64,7 +64,5 @@ protected:
uint32 m_blockCount;
GuidVector m_outOfRangeGUIDs;
ByteBuffer m_data;
void Compress(void* dst, uint32* dst_size, void* src, int src_size);
};
#endif

View File

@@ -1739,7 +1739,7 @@ void Player::UpdateTriggerVisibility()
if (!udata.HasData())
return;
udata.BuildPacket(&packet);
udata.BuildPacket(packet);
GetSession()->SendPacket(&packet);
}
@@ -1791,7 +1791,7 @@ void Player::UpdateForQuestWorldObjects()
}
}
udata.BuildPacket(&packet);
udata.BuildPacket(packet);
GetSession()->SendPacket(&packet);
}

View File

@@ -10595,7 +10595,7 @@ void Unit::SetOwnerGUID(ObjectGuid owner)
UpdateData udata;
WorldPacket packet;
BuildValuesUpdateBlockForPlayer(&udata, player);
udata.BuildPacket(&packet);
udata.BuildPacket(packet);
player->SendDirectMessage(&packet);
RemoveFieldNotifyFlag(UF_FLAG_OWNER);

View File

@@ -99,7 +99,7 @@ void VisibleNotifier::SendToSelf()
return;
WorldPacket packet;
i_data.BuildPacket(&packet);
i_data.BuildPacket(packet);
i_player.GetSession()->SendPacket(&packet);
for (std::vector<Unit*>::const_iterator it = i_visibleNow.begin(); it != i_visibleNow.end(); ++it)

View File

@@ -520,7 +520,7 @@ bool Group::AddMember(Player* player)
player->BuildValuesUpdateBlockForPlayer(&newData, itrMember);
if (newData.HasData())
{
newData.BuildPacket(&newDataPacket);
newData.BuildPacket(newDataPacket);
itrMember->SendDirectMessage(&newDataPacket);
}
}
@@ -529,7 +529,7 @@ bool Group::AddMember(Player* player)
if (groupData.HasData())
{
groupData.BuildPacket(&groupDataPacket);
groupData.BuildPacket(groupDataPacket);
player->SendDirectMessage(&groupDataPacket);
}

View File

@@ -644,7 +644,7 @@ bool Map::AddToMap(MotionTransport* obj, bool /*checkTransport*/)
UpdateData data;
obj->BuildCreateUpdateBlockForPlayer(&data, itr->GetSource());
WorldPacket packet;
data.BuildPacket(&packet);
data.BuildPacket(packet);
itr->GetSource()->SendDirectMessage(&packet);
}
}
@@ -971,7 +971,7 @@ void Map::RemoveFromMap(MotionTransport* obj, bool remove)
UpdateData data;
obj->BuildOutOfRangeUpdateBlock(&data);
WorldPacket packet;
data.BuildPacket(&packet);
data.BuildPacket(packet);
for (Map::PlayerList::const_iterator itr = players.begin(); itr != players.end(); ++itr)
if (itr->GetSource()->GetTransport() != obj)
itr->GetSource()->SendDirectMessage(&packet);
@@ -2536,7 +2536,7 @@ void Map::SendInitSelf(Player* player)
player->BuildCreateUpdateBlockForPlayer(&data, player);
// build and send self update packet before sending to player his own auras
data.BuildPacket(&packet);
data.BuildPacket(packet);
player->SendDirectMessage(&packet);
// send to player his own auras (this is needed here for timely initialization of some fields on client)
@@ -2552,7 +2552,7 @@ void Map::SendInitSelf(Player* player)
if (player != (*itr) && player->HaveAtClient(*itr))
(*itr)->BuildCreateUpdateBlockForPlayer(&data, player);
data.BuildPacket(&packet);
data.BuildPacket(packet);
player->SendDirectMessage(&packet);
}
@@ -2565,7 +2565,7 @@ void Map::SendInitTransports(Player* player)
(*itr)->BuildCreateUpdateBlockForPlayer(&transData, player);
WorldPacket packet;
transData.BuildPacket(&packet);
transData.BuildPacket(packet);
player->GetSession()->SendPacket(&packet);
}
@@ -2590,7 +2590,7 @@ void Map::SendRemoveTransports(Player* player)
}
WorldPacket packet;
transData.BuildPacket(&packet);
transData.BuildPacket(packet);
player->GetSession()->SendPacket(&packet);
}
@@ -2621,7 +2621,7 @@ void Map::SendObjectUpdates()
WorldPacket packet; // here we allocate a std::vector with a size of 0x10000
for (UpdateDataMapType::iterator iter = update_players.begin(); iter != update_players.end(); ++iter)
{
iter->second.BuildPacket(&packet);
iter->second.BuildPacket(packet);
iter->first->GetSession()->SendPacket(&packet);
packet.clear(); // clean the string
}

View File

@@ -31,9 +31,88 @@
#include "World.h"
#include "WorldSession.h"
#include <memory>
#include "zlib.h"
using boost::asio::ip::tcp;
void compressBuff(void* dst, uint32* dst_size, void* src, int src_size)
{
z_stream c_stream;
c_stream.zalloc = (alloc_func)0;
c_stream.zfree = (free_func)0;
c_stream.opaque = (voidpf)0;
// default Z_BEST_SPEED (1)
int z_res = deflateInit(&c_stream, sWorld->getIntConfig(CONFIG_COMPRESSION));
if (z_res != Z_OK)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflateInit) Error code: {} ({})", z_res, zError(z_res));
*dst_size = 0;
return;
}
c_stream.next_out = (Bytef*)dst;
c_stream.avail_out = *dst_size;
c_stream.next_in = (Bytef*)src;
c_stream.avail_in = (uInt)src_size;
z_res = deflate(&c_stream, Z_NO_FLUSH);
if (z_res != Z_OK)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate) Error code: {} ({})", z_res, zError(z_res));
*dst_size = 0;
return;
}
if (c_stream.avail_in != 0)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate not greedy)");
*dst_size = 0;
return;
}
z_res = deflate(&c_stream, Z_FINISH);
if (z_res != Z_STREAM_END)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflate should report Z_STREAM_END instead {} ({})", z_res, zError(z_res));
*dst_size = 0;
return;
}
z_res = deflateEnd(&c_stream);
if (z_res != Z_OK)
{
LOG_ERROR("entities.object", "Can't compress update packet (zlib: deflateEnd) Error code: {} ({})", z_res, zError(z_res));
*dst_size = 0;
return;
}
*dst_size = c_stream.total_out;
}
void EncryptableAndCompressiblePacket::CompressIfNeeded()
{
if (!NeedsCompression())
return;
uint32 pSize = size();
uint32 destsize = compressBound(pSize);
ByteBuffer buf(destsize + sizeof(uint32));
buf.resize(destsize + sizeof(uint32));
buf.put<uint32>(0, pSize);
compressBuff(const_cast<uint8*>(buf.contents()) + sizeof(uint32), &destsize, (void*)contents(), pSize);
if (destsize == 0)
return;
buf.resize(destsize + sizeof(uint32));
ByteBuffer::operator=(std::move(buf));
SetOpcode(SMSG_COMPRESSED_UPDATE_OBJECT);
}
WorldSocket::WorldSocket(tcp::socket&& socket)
: Socket(std::move(socket)), _OverSpeedPings(0), _worldSession(nullptr), _authed(false), _sendBufferSize(4096)
{
@@ -81,10 +160,12 @@ void WorldSocket::CheckIpCallback(PreparedQueryResult result)
bool WorldSocket::Update()
{
EncryptablePacket* queued;
EncryptableAndCompressiblePacket* queued;
MessageBuffer buffer(_sendBufferSize);
while (_bufferQueue.Dequeue(queued))
{
queued->CompressIfNeeded();
ServerPktHeader header(queued->size() + 2, queued->GetOpcode());
if (queued->NeedsEncryption())
_authCrypt.EncryptSend(header.header, header.getHeaderLength());
@@ -427,7 +508,7 @@ void WorldSocket::SendPacket(WorldPacket const& packet)
if (sPacketLog->CanLogPacket())
sPacketLog->LogPacket(packet, SERVER_TO_CLIENT, GetRemoteIpAddress(), GetRemotePort());
_bufferQueue.Enqueue(new EncryptablePacket(packet, _authCrypt.IsInitialized()));
_bufferQueue.Enqueue(new EncryptableAndCompressiblePacket(packet, _authCrypt.IsInitialized()));
}
void WorldSocket::HandleAuthSession(WorldPacket & recvPacket)

View File

@@ -30,17 +30,21 @@
using boost::asio::ip::tcp;
class EncryptablePacket : public WorldPacket
class EncryptableAndCompressiblePacket : public WorldPacket
{
public:
EncryptablePacket(WorldPacket const& packet, bool encrypt) : WorldPacket(packet), _encrypt(encrypt)
EncryptableAndCompressiblePacket(WorldPacket const& packet, bool encrypt) : WorldPacket(packet), _encrypt(encrypt)
{
SocketQueueLink.store(nullptr, std::memory_order_relaxed);
}
bool NeedsEncryption() const { return _encrypt; }
std::atomic<EncryptablePacket*> SocketQueueLink;
bool NeedsCompression() const { return GetOpcode() == SMSG_UPDATE_OBJECT && size() > 100; }
void CompressIfNeeded();
std::atomic<EncryptableAndCompressiblePacket*> SocketQueueLink;
private:
bool _encrypt;
@@ -125,7 +129,7 @@ private:
MessageBuffer _headerBuffer;
MessageBuffer _packetBuffer;
MPSCQueue<EncryptablePacket, &EncryptablePacket::SocketQueueLink> _bufferQueue;
MPSCQueue<EncryptableAndCompressiblePacket, &EncryptableAndCompressiblePacket::SocketQueueLink> _bufferQueue;
std::size_t _sendBufferSize;
QueryCallbackProcessor _queryProcessor;

View File

@@ -430,7 +430,7 @@ public:
UpdateData data;
WorldPacket pkt;
go->BuildValuesUpdateBlockForPlayer(&data, i->GetSource());
data.BuildPacket(&pkt);
data.BuildPacket(pkt);
i->GetSource()->GetSession()->SendPacket(&pkt);
}
}
@@ -473,7 +473,7 @@ public:
UpdateData data;
WorldPacket pkt;
go->BuildValuesUpdateBlockForPlayer(&data, i->GetSource());
data.BuildPacket(&pkt);
data.BuildPacket(pkt);
i->GetSource()->GetSession()->SendPacket(&pkt);
}
}

View File

@@ -438,7 +438,7 @@ class spell_pri_lightwell_renew : public AuraScript
UpdateData data;
WorldPacket packet;
caster->BuildValuesUpdateBlockForPlayer(&data, player);
data.BuildPacket(&packet);
data.BuildPacket(packet);
player->SendDirectMessage(&packet);
}
}