diff --git a/src/gui/src/IpcReader.cpp b/src/gui/src/IpcReader.cpp index 1e7f2551..9754af11 100644 --- a/src/gui/src/IpcReader.cpp +++ b/src/gui/src/IpcReader.cpp @@ -84,7 +84,7 @@ void IpcReader::readStream(char* buffer, int length) // qt is such a fucker with mutexes (can't lock/unlock between // threads?! wtf?!). i'd just rather not go there (patches welcome). while (!m_ReadyRead) { - QThread::usleep(100); + QThread::usleep(50); } m_ReadyRead = false; } diff --git a/src/lib/ipc/CIpcClientProxy.cpp b/src/lib/ipc/CIpcClientProxy.cpp index 618ae48c..8c0fec73 100644 --- a/src/lib/ipc/CIpcClientProxy.cpp +++ b/src/lib/ipc/CIpcClientProxy.cpp @@ -65,8 +65,12 @@ CIpcClientProxy::~CIpcClientProxy() m_stream.getInputShutdownEvent(), m_stream.getEventTarget()); EVENTQUEUE->removeHandler( m_stream.getOutputShutdownEvent(), m_stream.getEventTarget()); - + + // don't delete the stream while it's being used. + ARCH->lockMutex(m_mutex); delete &m_stream; + ARCH->unlockMutex(m_mutex); + ARCH->closeMutex(m_mutex); } void @@ -86,6 +90,9 @@ CIpcClientProxy::handleWriteError(const CEvent&, void*) void CIpcClientProxy::handleData(const CEvent&, void*) { + // don't allow the dtor to destroy the stream while we're using it. + CArchMutexLock lock(m_mutex); + UInt8 code[1]; UInt32 n = m_stream.read(code, 1); while (n != 0) { @@ -124,6 +131,7 @@ CIpcClientProxy::send(const CIpcMessage& message) { // don't allow other threads to write until we've finished the entire // message. stream write is locked, but only for that single write. + // also, don't allow the dtor to destroy the stream while we're using it. CArchMutexLock lock(m_mutex); LOG((CLOG_DEBUG "ipc client proxy write: %d", message.m_type)); diff --git a/src/lib/ipc/CIpcLogOutputter.cpp b/src/lib/ipc/CIpcLogOutputter.cpp index 408bb292..93e6fbf6 100644 --- a/src/lib/ipc/CIpcLogOutputter.cpp +++ b/src/lib/ipc/CIpcLogOutputter.cpp @@ -26,6 +26,7 @@ #include "CArch.h" #include "CThread.h" #include "TMethodJob.h" +#include "XArch.h" CIpcLogOutputter::CIpcLogOutputter(CIpcServer& ipcServer) : m_ipcServer(ipcServer), @@ -39,6 +40,9 @@ m_running(true) CIpcLogOutputter::~CIpcLogOutputter() { + m_running = false; + m_bufferThread->wait(5); + ARCH->closeMutex(m_bufferMutex); delete m_bufferThread; } @@ -51,8 +55,6 @@ CIpcLogOutputter::open(const char* title) void CIpcLogOutputter::close() { - m_running = false; - m_bufferThread->wait(5); } void @@ -88,19 +90,26 @@ CIpcLogOutputter::write(ELevel, const char* text, bool force) void CIpcLogOutputter::bufferThread(void*) { - while (m_running) { - while (m_running && m_buffer.size() == 0) { - ARCH->sleep(.1); - } + try { + while (m_running) { + while (m_running && m_buffer.size() == 0) { + ARCH->sleep(.1); + } - if (!m_running) { - break; - } + if (!m_running) { + break; + } - if (m_ipcServer.hasClients(kIpcClientGui)) { - sendBuffer(); + if (m_ipcServer.hasClients(kIpcClientGui)) { + sendBuffer(); + } } } + catch (XArch& e) { + LOG((CLOG_ERR "ipc log buffer thread error, %s", e.what().c_str())); + } + + LOG((CLOG_DEBUG "ipc log buffer thread finished")); } CString* diff --git a/src/lib/ipc/CIpcServer.cpp b/src/lib/ipc/CIpcServer.cpp index 4616a629..873f0cb9 100644 --- a/src/lib/ipc/CIpcServer.cpp +++ b/src/lib/ipc/CIpcServer.cpp @@ -31,6 +31,7 @@ CEvent::Type CIpcServer::s_clientConnectedEvent = CEvent::kUnknown; CIpcServer::CIpcServer() : m_address(CNetworkAddress(IPC_HOST, IPC_PORT)) { + m_clientsMutex = ARCH->newMutex(); m_address.resolve(); EVENTQUEUE->adoptHandler( @@ -41,11 +42,14 @@ m_address(CNetworkAddress(IPC_HOST, IPC_PORT)) CIpcServer::~CIpcServer() { - CClientSet::iterator it; + ARCH->lockMutex(m_clientsMutex); + CClientList::iterator it; for (it = m_clients.begin(); it != m_clients.end(); it++) { delete *it; } m_clients.empty(); + ARCH->unlockMutex(m_clientsMutex); + ARCH->closeMutex(m_clientsMutex); EVENTQUEUE->removeHandler(m_socket.getConnectingEvent(), &m_socket); } @@ -65,8 +69,11 @@ CIpcServer::handleClientConnecting(const CEvent&, void*) } LOG((CLOG_DEBUG "accepted ipc client connection")); + + ARCH->lockMutex(m_clientsMutex); CIpcClientProxy* proxy = new CIpcClientProxy(*stream); - m_clients.insert(proxy); + m_clients.push_back(proxy); + ARCH->unlockMutex(m_clientsMutex); EVENTQUEUE->adoptHandler( CIpcClientProxy::getDisconnectedEvent(), proxy, @@ -85,20 +92,22 @@ CIpcServer::handleClientDisconnected(const CEvent& e, void*) EVENTQUEUE->removeHandler( CIpcClientProxy::getDisconnectedEvent(), proxy); - CClientSet::iterator& it = m_clients.find(proxy); + CArchMutexLock lock(m_clientsMutex); + m_clients.remove(proxy); delete proxy; - m_clients.erase(it); LOG((CLOG_DEBUG "ipc client proxy removed, connected=%d", m_clients.size())); } bool CIpcServer::hasClients(EIpcClientType clientType) const { - if (m_clients.size() == 0) { + CArchMutexLock lock(m_clientsMutex); + + if (m_clients.empty()) { return false; } - CClientSet::iterator it; + CClientList::const_iterator it; for (it = m_clients.begin(); it != m_clients.end(); it++) { // at least one client is alive and type matches, there are clients. CIpcClientProxy* p = *it; @@ -121,7 +130,9 @@ CIpcServer::getClientConnectedEvent() void CIpcServer::send(const CIpcMessage& message, EIpcClientType filterType) { - CClientSet::iterator it; + CArchMutexLock lock(m_clientsMutex); + + CClientList::iterator it; for (it = m_clients.begin(); it != m_clients.end(); it++) { CIpcClientProxy* proxy = *it; if (proxy->m_clientType == filterType) { diff --git a/src/lib/ipc/CIpcServer.h b/src/lib/ipc/CIpcServer.h index ff58e0ac..089fdad7 100644 --- a/src/lib/ipc/CIpcServer.h +++ b/src/lib/ipc/CIpcServer.h @@ -20,7 +20,8 @@ #include "CTCPListenSocket.h" #include "CNetworkAddress.h" #include "Ipc.h" -#include +#include +#include "CArch.h" class CEvent; class CIpcClientProxy; @@ -65,11 +66,12 @@ private: void handleClientMessage(const CEvent&, void*); private: - typedef std::set CClientSet; + typedef std::list CClientList; CTCPListenSocket m_socket; CNetworkAddress m_address; - CClientSet m_clients; + CClientList m_clients; + CArchMutex m_clientsMutex; static CEvent::Type s_clientConnectedEvent; };