mirror of
https://github.com/debauchee/barrier.git
synced 2026-05-08 23:14:20 +08:00
Checkpointing centralized event queue stuff. Currently have:
an event queue and events, TCP sockets converted to use events, unix multithreading and network stuff converted, and an X Windows event queue subclass.
This commit is contained in:
@@ -13,14 +13,16 @@
|
||||
*/
|
||||
|
||||
#include "CTCPSocket.h"
|
||||
#include "CNetworkAddress.h"
|
||||
#include "CSocketMultiplexer.h"
|
||||
#include "TSocketMultiplexerMethodJob.h"
|
||||
#include "CBufferedInputStream.h"
|
||||
#include "CBufferedOutputStream.h"
|
||||
#include "CNetworkAddress.h"
|
||||
#include "XIO.h"
|
||||
#include "XSocket.h"
|
||||
#include "XIO.h"
|
||||
#include "CLock.h"
|
||||
#include "CMutex.h"
|
||||
#include "CThread.h"
|
||||
#include "CEventQueue.h"
|
||||
#include "TMethodJob.h"
|
||||
#include "CArch.h"
|
||||
#include "XArch.h"
|
||||
@@ -45,20 +47,17 @@ CTCPSocket::CTCPSocket(CArchSocket socket) :
|
||||
{
|
||||
assert(m_socket != NULL);
|
||||
|
||||
init();
|
||||
|
||||
// socket starts in connected state
|
||||
m_connected = kReadWrite;
|
||||
|
||||
// start handling socket
|
||||
m_thread = new CThread(new TMethodJob<CTCPSocket>(
|
||||
this, &CTCPSocket::ioThread));
|
||||
init();
|
||||
setState(kReadWrite, true);
|
||||
}
|
||||
|
||||
CTCPSocket::~CTCPSocket()
|
||||
{
|
||||
try {
|
||||
close();
|
||||
if (m_socket != NULL) {
|
||||
close();
|
||||
}
|
||||
}
|
||||
catch (...) {
|
||||
// ignore
|
||||
@@ -87,44 +86,28 @@ CTCPSocket::bind(const CNetworkAddress& addr)
|
||||
void
|
||||
CTCPSocket::close()
|
||||
{
|
||||
// see if buffers should be flushed
|
||||
bool doFlush = false;
|
||||
{
|
||||
CLock lock(m_mutex);
|
||||
doFlush = (m_thread != NULL && (m_connected & kWrite) != 0);
|
||||
}
|
||||
|
||||
// flush buffers
|
||||
if (doFlush) {
|
||||
m_output->flush();
|
||||
}
|
||||
m_output->flush();
|
||||
|
||||
// cause ioThread to exit
|
||||
if (m_socket != NULL) {
|
||||
CLock lock(m_mutex);
|
||||
try {
|
||||
ARCH->closeSocketForRead(m_socket);
|
||||
}
|
||||
catch (XArchNetwork&) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
ARCH->closeSocketForWrite(m_socket);
|
||||
}
|
||||
catch (XArchNetwork&) {
|
||||
// ignore
|
||||
}
|
||||
m_connected = kClosed;
|
||||
}
|
||||
// now closed
|
||||
setState(kClosed, true);
|
||||
|
||||
// wait for thread
|
||||
if (m_thread != NULL) {
|
||||
m_thread->wait();
|
||||
delete m_thread;
|
||||
m_thread = NULL;
|
||||
// close buffers
|
||||
try {
|
||||
m_input->close();
|
||||
}
|
||||
catch (...) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
m_output->close();
|
||||
}
|
||||
catch (...) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
// close socket
|
||||
CLock lock(m_mutex);
|
||||
if (m_socket != NULL) {
|
||||
try {
|
||||
ARCH->closeSocket(m_socket);
|
||||
@@ -136,65 +119,28 @@ CTCPSocket::close()
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
CTCPSocket::setEventTarget(void* target)
|
||||
{
|
||||
CLock lock(m_mutex);
|
||||
m_target = target;
|
||||
}
|
||||
|
||||
void
|
||||
CTCPSocket::connect(const CNetworkAddress& addr)
|
||||
{
|
||||
do {
|
||||
// connect asynchronously so we can check for cancellation.
|
||||
// we can't wrap setting and resetting the blocking flag in
|
||||
// the c'tor/d'tor of a class (to make resetting automatic)
|
||||
// because setBlockingOnSocket() can throw and it might be
|
||||
// called while unwinding the stack due to a throw.
|
||||
try {
|
||||
ARCH->setBlockingOnSocket(m_socket, false);
|
||||
ARCH->connectSocket(m_socket, addr.getAddress());
|
||||
ARCH->setBlockingOnSocket(m_socket, true);
|
||||
|
||||
// connected
|
||||
break;
|
||||
}
|
||||
catch (XArchNetworkConnecting&) {
|
||||
// connection is in progress
|
||||
ARCH->setBlockingOnSocket(m_socket, true);
|
||||
}
|
||||
catch (XArchNetwork& e) {
|
||||
ARCH->setBlockingOnSocket(m_socket, true);
|
||||
throw XSocketConnect(e.what());
|
||||
}
|
||||
|
||||
// wait for connection or failure
|
||||
IArchNetwork::CPollEntry pfds[1];
|
||||
pfds[0].m_socket = m_socket;
|
||||
pfds[0].m_events = IArchNetwork::kPOLLOUT;
|
||||
for (;;) {
|
||||
ARCH->testCancelThread();
|
||||
try {
|
||||
const int status = ARCH->pollSocket(pfds, 1, 0.01);
|
||||
if (status > 0) {
|
||||
if ((pfds[0].m_revents & (IArchNetwork::kPOLLERR |
|
||||
IArchNetwork::kPOLLNVAL)) != 0) {
|
||||
// connection failed
|
||||
ARCH->throwErrorOnSocket(m_socket);
|
||||
}
|
||||
if ((pfds[0].m_revents & IArchNetwork::kPOLLOUT) != 0) {
|
||||
// connection may have failed or succeeded
|
||||
ARCH->throwErrorOnSocket(m_socket);
|
||||
|
||||
// connected!
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (XArchNetwork& e) {
|
||||
throw XSocketConnect(e.what());
|
||||
}
|
||||
}
|
||||
} while (false);
|
||||
|
||||
// start servicing the socket
|
||||
m_connected = kReadWrite;
|
||||
m_thread = new CThread(new TMethodJob<CTCPSocket>(
|
||||
this, &CTCPSocket::ioThread));
|
||||
try {
|
||||
// FIXME -- don't throw if in progress, just return that info
|
||||
ARCH->connectSocket(m_socket, addr.getAddress());
|
||||
setState(kReadWrite, true);
|
||||
}
|
||||
catch (XArchNetworkConnecting&) {
|
||||
// connection is in progress
|
||||
setState(kConnecting, true);
|
||||
}
|
||||
catch (XArchNetwork& e) {
|
||||
throw XSocketConnect(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
IInputStream*
|
||||
@@ -212,15 +158,24 @@ CTCPSocket::getOutputStream()
|
||||
void
|
||||
CTCPSocket::init()
|
||||
{
|
||||
m_mutex = new CMutex;
|
||||
m_thread = NULL;
|
||||
m_connected = kClosed;
|
||||
m_input = new CBufferedInputStream(m_mutex,
|
||||
m_mutex = new CMutex;
|
||||
m_input = new CBufferedInputStream(m_mutex,
|
||||
new TMethodJob<CTCPSocket>(
|
||||
this, &CTCPSocket::emptyInput),
|
||||
new TMethodJob<CTCPSocket>(
|
||||
this, &CTCPSocket::closeInput));
|
||||
m_output = new CBufferedOutputStream(m_mutex,
|
||||
m_output = new CBufferedOutputStream(m_mutex,
|
||||
new TMethodJob<CTCPSocket>(
|
||||
this, &CTCPSocket::fillOutput),
|
||||
new TMethodJob<CTCPSocket>(
|
||||
this, &CTCPSocket::closeOutput));
|
||||
m_state = kUnconnected;
|
||||
m_target = NULL;
|
||||
m_job = NULL;
|
||||
|
||||
// make socket non-blocking
|
||||
// FIXME -- check for error
|
||||
ARCH->setBlockingOnSocket(m_socket, false);
|
||||
|
||||
// turn off Nagle algorithm. we send lots of very short messages
|
||||
// that should be sent without (much) delay. for example, the
|
||||
@@ -241,115 +196,101 @@ CTCPSocket::init()
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
CTCPSocket::ioThread(void*)
|
||||
ISocketMultiplexerJob*
|
||||
CTCPSocket::newMultiplexerJob(JobFunc func, bool readable, bool writable)
|
||||
{
|
||||
try {
|
||||
ioService();
|
||||
ioCleanup();
|
||||
}
|
||||
catch (...) {
|
||||
ioCleanup();
|
||||
throw;
|
||||
}
|
||||
return new TSocketMultiplexerMethodJob<CTCPSocket>(
|
||||
this, func, m_socket, readable, writable);
|
||||
}
|
||||
|
||||
void
|
||||
CTCPSocket::ioCleanup()
|
||||
ISocketMultiplexerJob*
|
||||
CTCPSocket::setState(State state, bool setJob)
|
||||
{
|
||||
try {
|
||||
m_input->close();
|
||||
if (m_state == state || m_state == kClosed) {
|
||||
return m_job;
|
||||
}
|
||||
catch (...) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
m_output->close();
|
||||
}
|
||||
catch (...) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
CTCPSocket::ioService()
|
||||
{
|
||||
assert(m_socket != NULL);
|
||||
State oldState = m_state;
|
||||
m_state = state;
|
||||
|
||||
// now service the connection
|
||||
IArchNetwork::CPollEntry pfds[1];
|
||||
pfds[0].m_socket = m_socket;
|
||||
for (;;) {
|
||||
{
|
||||
// choose events to poll for
|
||||
CLock lock(m_mutex);
|
||||
pfds[0].m_events = 0;
|
||||
if (m_connected == 0) {
|
||||
return;
|
||||
}
|
||||
if ((m_connected & kRead) != 0) {
|
||||
// still open for reading
|
||||
pfds[0].m_events |= IArchNetwork::kPOLLIN;
|
||||
}
|
||||
if ((m_connected & kWrite) != 0 && m_output->getSize() > 0) {
|
||||
// data queued for writing
|
||||
pfds[0].m_events |= IArchNetwork::kPOLLOUT;
|
||||
}
|
||||
bool read = (m_input->getSize() > 0);
|
||||
bool write = (m_output->getSize() > 0);
|
||||
CEvent::Type eventType = 0;
|
||||
m_job = NULL;
|
||||
switch (m_state) {
|
||||
case kUnconnected:
|
||||
assert(0 && "cannot re-enter unconnected state");
|
||||
break;
|
||||
|
||||
case kConnecting:
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnecting, false, true);
|
||||
break;
|
||||
|
||||
case kReadWrite:
|
||||
if (oldState == kConnecting) {
|
||||
eventType = IDataSocket::getConnectedEvent();
|
||||
}
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, true, write);
|
||||
break;
|
||||
|
||||
try {
|
||||
// check for status
|
||||
const int status = ARCH->pollSocket(pfds, 1, 0.01);
|
||||
|
||||
// transfer data and handle errors
|
||||
if (status == 1) {
|
||||
if ((pfds[0].m_revents & (IArchNetwork::kPOLLERR |
|
||||
IArchNetwork::kPOLLNVAL)) != 0) {
|
||||
// stream is no good anymore so bail
|
||||
CLock lock(m_mutex);
|
||||
m_input->hangup();
|
||||
return;
|
||||
}
|
||||
|
||||
// read some data
|
||||
if (pfds[0].m_revents & IArchNetwork::kPOLLIN) {
|
||||
UInt8 buffer[4096];
|
||||
size_t n = ARCH->readSocket(m_socket,
|
||||
buffer, sizeof(buffer));
|
||||
CLock lock(m_mutex);
|
||||
if (n > 0) {
|
||||
m_input->write(buffer, n);
|
||||
}
|
||||
else {
|
||||
// stream hungup
|
||||
m_input->hangup();
|
||||
m_connected &= ~kRead;
|
||||
}
|
||||
}
|
||||
|
||||
// write some data
|
||||
if (pfds[0].m_revents & IArchNetwork::kPOLLOUT) {
|
||||
CLock lock(m_mutex);
|
||||
|
||||
// get amount of data to write
|
||||
UInt32 n = m_output->getSize();
|
||||
|
||||
// write data
|
||||
const void* buffer = m_output->peek(n);
|
||||
size_t n2 = ARCH->writeSocket(m_socket, buffer, n);
|
||||
|
||||
// discard written data
|
||||
if (n2 > 0) {
|
||||
m_output->pop(n2);
|
||||
}
|
||||
}
|
||||
}
|
||||
case kReadOnly:
|
||||
if (!write) {
|
||||
eventType = IDataSocket::getShutdownOutputEvent();
|
||||
}
|
||||
catch (XArchNetwork&) {
|
||||
// socket has failed
|
||||
return;
|
||||
if (oldState == kWriteOnly) {
|
||||
goto shutdown;
|
||||
}
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, true, write);
|
||||
break;
|
||||
|
||||
case kWriteOnly:
|
||||
if (!read) {
|
||||
m_input->hangup();
|
||||
eventType = IDataSocket::getShutdownInputEvent();
|
||||
}
|
||||
if (oldState == kReadOnly) {
|
||||
goto shutdown;
|
||||
}
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, false, write);
|
||||
break;
|
||||
|
||||
case kShutdown:
|
||||
shutdown:
|
||||
if (!read && !write) {
|
||||
eventType = ISocket::getDisconnectedEvent();
|
||||
m_state = kClosed;
|
||||
}
|
||||
else {
|
||||
m_state = kShutdown;
|
||||
}
|
||||
break;
|
||||
|
||||
case kClosed:
|
||||
m_input->hangup();
|
||||
if (oldState == kConnecting) {
|
||||
eventType = IDataSocket::getConnectionFailedEvent();
|
||||
}
|
||||
else {
|
||||
eventType = ISocket::getDisconnectedEvent();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// notify
|
||||
if (eventType != 0) {
|
||||
sendEvent(eventType);
|
||||
}
|
||||
|
||||
// cut over to new job. multiplexer will delete the old job.
|
||||
if (setJob) {
|
||||
if (m_job == NULL) {
|
||||
CSocketMultiplexer::getInstance()->removeSocket(this);
|
||||
}
|
||||
else {
|
||||
CSocketMultiplexer::getInstance()->addSocket(this, m_job);
|
||||
}
|
||||
}
|
||||
return m_job;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -358,7 +299,7 @@ CTCPSocket::closeInput(void*)
|
||||
// note -- m_mutex should already be locked
|
||||
try {
|
||||
ARCH->closeSocketForRead(m_socket);
|
||||
m_connected &= ~kRead;
|
||||
setState(kWriteOnly, true);
|
||||
}
|
||||
catch (XArchNetwork&) {
|
||||
// ignore
|
||||
@@ -370,10 +311,156 @@ CTCPSocket::closeOutput(void*)
|
||||
{
|
||||
// note -- m_mutex should already be locked
|
||||
try {
|
||||
ARCH->closeSocketForWrite(m_socket);
|
||||
m_connected &= ~kWrite;
|
||||
// ARCH->closeSocketForWrite(m_socket);
|
||||
setState(kReadOnly, true);
|
||||
}
|
||||
catch (XArchNetwork&) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
CTCPSocket::emptyInput(void*)
|
||||
{
|
||||
// note -- m_mutex should already be locked
|
||||
bool write = (m_output->getSize() > 0);
|
||||
if (m_state == kWriteOnly && !write) {
|
||||
m_state = kShutdown;
|
||||
}
|
||||
if (m_state == kWriteOnly) {
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, false, write);
|
||||
CSocketMultiplexer::getInstance()->addSocket(this, m_job);
|
||||
m_input->hangup();
|
||||
sendEvent(IDataSocket::getShutdownInputEvent());
|
||||
}
|
||||
else if (m_state == kShutdown) {
|
||||
m_job = NULL;
|
||||
CSocketMultiplexer::getInstance()->removeSocket(this);
|
||||
if (!write) {
|
||||
sendEvent(ISocket::getDisconnectedEvent());
|
||||
m_state = kClosed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
CTCPSocket::fillOutput(void*)
|
||||
{
|
||||
// note -- m_mutex should already be locked
|
||||
if (m_state == kReadWrite) {
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, true, true);
|
||||
CSocketMultiplexer::getInstance()->addSocket(this, m_job);
|
||||
}
|
||||
else if (m_state == kWriteOnly) {
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, false, true);
|
||||
CSocketMultiplexer::getInstance()->addSocket(this, m_job);
|
||||
}
|
||||
}
|
||||
|
||||
ISocketMultiplexerJob*
|
||||
CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
|
||||
bool, bool write, bool error)
|
||||
{
|
||||
CLock lock(m_mutex);
|
||||
|
||||
if (write && !error) {
|
||||
try {
|
||||
// connection may have failed or succeeded
|
||||
ARCH->throwErrorOnSocket(m_socket);
|
||||
}
|
||||
catch (XArchNetwork&) {
|
||||
error = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (error) {
|
||||
return setState(kClosed, false);
|
||||
}
|
||||
|
||||
if (write) {
|
||||
return setState(kReadWrite, false);
|
||||
}
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
ISocketMultiplexerJob*
|
||||
CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
|
||||
bool read, bool write, bool error)
|
||||
{
|
||||
CLock lock(m_mutex);
|
||||
if (error) {
|
||||
return setState(kClosed, false);
|
||||
}
|
||||
|
||||
if (write) {
|
||||
// get amount of data to write
|
||||
UInt32 n = m_output->getSize();
|
||||
|
||||
// write data
|
||||
try {
|
||||
const void* buffer = m_output->peek(n);
|
||||
size_t n2 = ARCH->writeSocket(m_socket, buffer, n);
|
||||
|
||||
// discard written data
|
||||
if (n2 > 0) {
|
||||
m_output->pop(n2);
|
||||
}
|
||||
}
|
||||
catch (XArchNetworkDisconnected&) {
|
||||
// stream hungup
|
||||
return setState(kReadOnly, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (read) {
|
||||
UInt8 buffer[4096];
|
||||
size_t n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
|
||||
if (n > 0) {
|
||||
// slurp up as much as possible
|
||||
do {
|
||||
m_input->write(buffer, n);
|
||||
try {
|
||||
n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
|
||||
}
|
||||
catch (XArchNetworkWouldBlock&) {
|
||||
break;
|
||||
}
|
||||
} while (n > 0);
|
||||
|
||||
// notify
|
||||
sendEvent(IDataSocket::getInputEvent());
|
||||
}
|
||||
else {
|
||||
// stream hungup
|
||||
return setState(kWriteOnly, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (write && m_output->getSize() == 0) {
|
||||
if (m_state == kReadOnly) {
|
||||
ARCH->closeSocketForWrite(m_socket);
|
||||
sendEvent(IDataSocket::getShutdownOutputEvent());
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected,
|
||||
true, false);
|
||||
job = m_job;
|
||||
}
|
||||
else if (m_state == kReadWrite || m_state == kReadOnly) {
|
||||
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected,
|
||||
true, false);
|
||||
job = m_job;
|
||||
}
|
||||
else if (m_state == kWriteOnly) {
|
||||
m_job = NULL;
|
||||
job = m_job;
|
||||
}
|
||||
}
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
void
|
||||
CTCPSocket::sendEvent(CEvent::Type type)
|
||||
{
|
||||
CEventQueue::getInstance()->addEvent(CEvent(type, m_target, NULL));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user