feat(Core/Threading): replace ace threading (#4821)

This commit is contained in:
Kargatum
2021-04-17 00:45:29 +07:00
committed by GitHub
parent b9e84d8278
commit b2861be1cd
50 changed files with 300 additions and 342 deletions

View File

@@ -1035,7 +1035,7 @@ private:
AddonsList m_addonsList;
uint32 recruiterId;
bool isRecruiter;
ACE_Based::LockedQueue<WorldPacket*, ACE_Thread_Mutex> _recvQueue;
LockedQueue<WorldPacket*> _recvQueue;
uint32 m_currentVendorEntry;
uint64 m_currentBankerGUID;
time_t timeWhoCommandAllowed;

View File

@@ -126,7 +126,7 @@ void WorldSocket::CloseSocket(std::string const& reason)
sLog->outDebug(LOG_FILTER_CLOSE_SOCKET, "Socket closed because of: %s", reason.c_str());
{
ACE_GUARD (LockType, Guard, m_OutBufferLock);
std::lock_guard<std::mutex> guard(m_OutBufferLock);
if (closing_)
return;
@@ -136,7 +136,7 @@ void WorldSocket::CloseSocket(std::string const& reason)
}
{
ACE_GUARD (LockType, Guard, m_SessionLock);
std::lock_guard<std::mutex> guard(m_SessionLock);
m_Session = nullptr;
}
@@ -149,7 +149,7 @@ const std::string& WorldSocket::GetRemoteAddress(void) const
int WorldSocket::SendPacket(WorldPacket const& pct)
{
ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1);
std::lock_guard<std::mutex> guard(m_OutBufferLock);
if (closing_)
return -1;
@@ -306,20 +306,20 @@ int WorldSocket::handle_input(ACE_HANDLE)
return Update(); // another interesting line ;)
}
ACE_NOTREACHED(return -1);
return -1;
}
int WorldSocket::handle_output(ACE_HANDLE)
{
ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1);
if (closing_)
return -1;
std::lock_guard<std::mutex> guard(m_OutBufferLock);
size_t send_len = m_OutBuffer->length();
if (send_len == 0)
return handle_output_queue(Guard);
return handle_output_queue();
#ifdef MSG_NOSIGNAL
ssize_t n = peer().send (m_OutBuffer->rd_ptr(), send_len, MSG_NOSIGNAL);
@@ -332,7 +332,7 @@ int WorldSocket::handle_output(ACE_HANDLE)
else if (n == -1)
{
if (errno == EWOULDBLOCK || errno == EAGAIN)
return schedule_wakeup_output (Guard);
return schedule_wakeup_output();
return -1;
}
@@ -343,22 +343,22 @@ int WorldSocket::handle_output(ACE_HANDLE)
// move the data to the base of the buffer
m_OutBuffer->crunch();
return schedule_wakeup_output (Guard);
return schedule_wakeup_output();
}
else //now n == send_len
{
m_OutBuffer->reset();
return handle_output_queue (Guard);
return handle_output_queue();
}
ACE_NOTREACHED (return 0);
}
int WorldSocket::handle_output_queue(GuardType& g)
int WorldSocket::handle_output_queue()
{
if (msg_queue()->is_empty())
return cancel_wakeup_output(g);
return cancel_wakeup_output();
ACE_Message_Block* mblk;
@@ -387,7 +387,7 @@ int WorldSocket::handle_output_queue(GuardType& g)
if (errno == EWOULDBLOCK || errno == EAGAIN)
{
msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero);
return schedule_wakeup_output (g);
return schedule_wakeup_output();
}
mblk->release();
@@ -404,23 +404,23 @@ int WorldSocket::handle_output_queue(GuardType& g)
return -1;
}
return schedule_wakeup_output (g);
return schedule_wakeup_output();
}
else //now n == send_len
{
mblk->release();
return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK;
return msg_queue()->is_empty() ? cancel_wakeup_output() : ACE_Event_Handler::WRITE_MASK;
}
ACE_NOTREACHED(return -1);
return -1;
}
int WorldSocket::handle_close(ACE_HANDLE h, ACE_Reactor_Mask)
{
// Critical section
{
ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1);
std::lock_guard<std::mutex> guard(m_OutBufferLock);
closing_ = true;
@@ -430,7 +430,7 @@ int WorldSocket::handle_close(ACE_HANDLE h, ACE_Reactor_Mask)
// Critical section
{
ACE_GUARD_RETURN (LockType, Guard, m_SessionLock, -1);
std::lock_guard<decltype(m_SessionLock)> guard(m_SessionLock);
m_Session = nullptr;
}
@@ -448,7 +448,8 @@ int WorldSocket::Update(void)
return 0;
{
ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, 0);
std::lock_guard<std::mutex> guard(m_OutBufferLock);
if (m_OutBuffer->length() == 0 && msg_queue()->is_empty())
return 0;
}
@@ -612,15 +613,13 @@ int WorldSocket::handle_input_missing_data(void)
return size_t(n) == recv_size ? 1 : 2;
}
int WorldSocket::cancel_wakeup_output(GuardType& g)
int WorldSocket::cancel_wakeup_output()
{
if (!m_OutActive)
return 0;
m_OutActive = false;
g.release();
if (reactor()->cancel_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
{
@@ -632,15 +631,13 @@ int WorldSocket::cancel_wakeup_output(GuardType& g)
return 0;
}
int WorldSocket::schedule_wakeup_output(GuardType& g)
int WorldSocket::schedule_wakeup_output()
{
if (m_OutActive)
return 0;
m_OutActive = true;
g.release();
if (reactor()->schedule_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
{
@@ -694,7 +691,7 @@ int WorldSocket::ProcessIncoming(WorldPacket* new_pct)
return 0;
default:
{
ACE_GUARD_RETURN (LockType, Guard, m_SessionLock, -1);
std::lock_guard<std::mutex> guard(m_SessionLock);
if (m_Session != nullptr)
{
@@ -1050,7 +1047,7 @@ int WorldSocket::HandlePing(WorldPacket& recvPacket)
if (max_count && m_OverSpeedPings > max_count)
{
ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1);
std::lock_guard<std::mutex> guard(m_SessionLock);
if (m_Session && AccountMgr::IsPlayerAccount(m_Session->GetSecurity()))
{
@@ -1071,7 +1068,7 @@ int WorldSocket::HandlePing(WorldPacket& recvPacket)
// critical section
{
ACE_GUARD_RETURN(LockType, Guard, m_SessionLock, -1);
std::lock_guard<std::mutex> guard(m_SessionLock);
if (m_Session)
{

View File

@@ -20,8 +20,8 @@
#include <ace/SOCK_Stream.h>
#include <ace/Svc_Handler.h>
#include <ace/Synch_Traits.h>
#include <ace/Thread_Mutex.h>
#include <ace/Unbounded_Queue.h>
#include <mutex>
#if !defined (ACE_LACKS_PRAGMA_ONCE)
#pragma once
@@ -78,10 +78,6 @@ public:
friend class WorldSocketMgr;
/// Mutex type used for various synchronizations.
typedef ACE_Thread_Mutex LockType;
typedef ACE_Guard<LockType> GuardType;
/// Check if socket is closed.
bool IsClosed (void) const;
@@ -131,11 +127,11 @@ private:
/// Help functions to mark/unmark the socket for output.
/// @param g the guard is for m_OutBufferLock, the function will release it
int cancel_wakeup_output (GuardType& g);
int schedule_wakeup_output (GuardType& g);
int cancel_wakeup_output();
int schedule_wakeup_output();
/// Drain the queue if its not empty.
int handle_output_queue (GuardType& g);
int handle_output_queue();
/// process one incoming packet.
/// @param new_pct received packet, note that you need to delete it.
@@ -161,7 +157,7 @@ private:
AuthCrypt m_Crypt;
/// Mutex lock to protect m_Session
LockType m_SessionLock;
std::mutex m_SessionLock;
/// Session to which received packets are routed
WorldSession* m_Session;
@@ -178,7 +174,7 @@ private:
ACE_Message_Block m_Header;
/// Mutex for protecting output related data.
LockType m_OutBufferLock;
std::mutex m_OutBufferLock;
/// Buffer used for writing output.
ACE_Message_Block* m_OutBuffer;

View File

@@ -92,7 +92,7 @@ public:
int AddSocket (WorldSocket* sock)
{
ACORE_GUARD(ACE_Thread_Mutex, m_NewSockets_Lock);
std::lock_guard<std::mutex> guard(m_NewSockets_Lock);
++m_Connections;
sock->AddReference();
@@ -112,7 +112,7 @@ public:
protected:
void AddNewSockets()
{
ACORE_GUARD(ACE_Thread_Mutex, m_NewSockets_Lock);
std::lock_guard<std::mutex> guard(m_NewSockets_Lock);
if (m_NewSockets.empty())
return;
@@ -194,7 +194,7 @@ private:
SocketSet m_Sockets;
SocketSet m_NewSockets;
ACE_Thread_Mutex m_NewSockets_Lock;
std::mutex m_NewSockets_Lock;
};
WorldSocketMgr::WorldSocketMgr() :

View File

@@ -14,7 +14,6 @@
#define __WORLDSOCKETMGR_H
#include "Common.h"
#include <ace/Thread_Mutex.h>
class WorldSocket;
class ReactorRunnable;