Checkpoint. Code does not run. Still converting over to new

event loop model.  Streams, stream filters, and sockets are
converted.  Client proxies are almost converted.  CServer is
in progress.  Removed all HTTP code.  Haven't converted the
necessary win32 arch stuff.
This commit is contained in:
crs
2004-02-01 21:09:22 +00:00
parent 618aa7fedd
commit 848aee7a3a
64 changed files with 2581 additions and 2942 deletions

View File

@@ -16,14 +16,10 @@
#include "CNetworkAddress.h"
#include "CSocketMultiplexer.h"
#include "TSocketMultiplexerMethodJob.h"
#include "CBufferedInputStream.h"
#include "CBufferedOutputStream.h"
#include "XSocket.h"
#include "XIO.h"
#include "CLock.h"
#include "CMutex.h"
#include "CEventQueue.h"
#include "TMethodJob.h"
#include "IEventJob.h"
#include "CArch.h"
#include "XArch.h"
@@ -31,7 +27,10 @@
// CTCPSocket
//
CTCPSocket::CTCPSocket()
CTCPSocket::CTCPSocket() :
m_mutex(),
m_flushed(&m_mutex, true),
m_eventFilter(NULL)
{
try {
m_socket = ARCH->newSocket(IArchNetwork::kINET, IArchNetwork::kSTREAM);
@@ -39,34 +38,32 @@ CTCPSocket::CTCPSocket()
catch (XArchNetwork& e) {
throw XSocketCreate(e.what());
}
init();
}
CTCPSocket::CTCPSocket(CArchSocket socket) :
m_socket(socket)
m_mutex(),
m_socket(socket),
m_flushed(&m_mutex, true),
m_eventFilter(NULL)
{
assert(m_socket != NULL);
// socket starts in connected state
init();
setState(kReadWrite, true);
onConnected();
setJob(newJob());
}
CTCPSocket::~CTCPSocket()
{
try {
if (m_socket != NULL) {
close();
}
close();
}
catch (...) {
// ignore
}
// clean up
delete m_input;
delete m_output;
delete m_mutex;
}
void
@@ -86,92 +83,202 @@ CTCPSocket::bind(const CNetworkAddress& addr)
void
CTCPSocket::close()
{
// flush buffers
m_output->flush();
CLock lock(&m_mutex);
// now closed
setState(kClosed, true);
// clear buffers and enter disconnected state
if (m_connected) {
sendSocketEvent(getDisconnectedEvent());
}
onDisconnected();
// close buffers
try {
m_input->close();
}
catch (...) {
// ignore
}
try {
m_output->close();
}
catch (...) {
// ignore
}
// remove ourself from the multiplexer
setJob(NULL);
// close socket
CLock lock(m_mutex);
// close the socket
if (m_socket != NULL) {
CArchSocket socket = m_socket;
m_socket = NULL;
try {
ARCH->closeSocket(m_socket);
m_socket = NULL;
ARCH->closeSocket(socket);
}
catch (XArchNetwork& e) {
throw XSocketIOClose(e.what());
// FIXME -- just discard this for now
//throw XSocketIOClose(e.what());
}
}
}
void
CTCPSocket::setEventTarget(void* target)
void*
CTCPSocket::getEventTarget() const
{
CLock lock(m_mutex);
m_target = target;
return const_cast<void*>(reinterpret_cast<const void*>(this));
}
UInt32
CTCPSocket::read(void* buffer, UInt32 n)
{
// copy data directly from our input buffer
CLock lock(&m_mutex);
UInt32 size = m_inputBuffer.getSize();
if (n > size) {
n = size;
}
if (buffer != NULL) {
memcpy(buffer, m_inputBuffer.peek(n), n);
}
m_inputBuffer.pop(n);
// if no more data and we cannot read or write then send disconnected
if (n > 0 && !m_readable && !m_writable) {
sendSocketEvent(getDisconnectedEvent());
}
return n;
}
void
CTCPSocket::write(const void* buffer, UInt32 n)
{
CLock lock(&m_mutex);
// must not have shutdown output
if (!m_writable) {
sendStreamEvent(getOutputErrorEvent());
return;
}
// ignore empty writes
if (n == 0) {
return;
}
// copy data to the output buffer
bool wasEmpty = (m_outputBuffer.getSize() == 0);
m_outputBuffer.write(buffer, n);
// there's data to write
m_flushed = false;
// make sure we're waiting to write
if (wasEmpty) {
setJob(newJob());
}
}
void
CTCPSocket::flush()
{
CLock lock(&m_mutex);
while (m_flushed == false) {
m_flushed.wait();
}
}
void
CTCPSocket::shutdownInput()
{
CLock lock(&m_mutex);
// shutdown socket for reading
try {
ARCH->closeSocketForRead(m_socket);
}
catch (XArchNetwork&) {
// ignore
}
// shutdown buffer for reading
if (m_readable) {
sendStreamEvent(getInputShutdownEvent());
onInputShutdown();
setJob(newJob());
}
}
void
CTCPSocket::shutdownOutput()
{
CLock lock(&m_mutex);
// shutdown socket for writing
try {
ARCH->closeSocketForWrite(m_socket);
}
catch (XArchNetwork&) {
// ignore
}
// shutdown buffer for writing
if (m_writable) {
sendStreamEvent(getOutputShutdownEvent());
onOutputShutdown();
setJob(newJob());
}
}
void
CTCPSocket::setEventFilter(IEventJob* filter)
{
CLock lock(&m_mutex);
m_eventFilter = filter;
}
bool
CTCPSocket::isReady() const
{
CLock lock(&m_mutex);
return (m_inputBuffer.getSize() > 0);
}
UInt32
CTCPSocket::getSize() const
{
CLock lock(&m_mutex);
return m_inputBuffer.getSize();
}
IEventJob*
CTCPSocket::getEventFilter() const
{
CLock lock(&m_mutex);
return m_eventFilter;
}
void
CTCPSocket::connect(const CNetworkAddress& addr)
{
CLock lock(&m_mutex);
// fail on attempts to reconnect
if (m_socket == NULL || m_connected) {
sendSocketEvent(getConnectionFailedEvent());
return;
}
try {
// FIXME -- don't throw if in progress, just return that info
ARCH->connectSocket(m_socket, addr.getAddress());
setState(kReadWrite, true);
sendSocketEvent(getConnectedEvent());
onConnected();
setJob(newJob());
}
catch (XArchNetworkConnecting&) {
// connection is in progress
setState(kConnecting, true);
m_writable = true;
setJob(newJob());
}
catch (XArchNetwork& e) {
throw XSocketConnect(e.what());
}
}
IInputStream*
CTCPSocket::getInputStream()
{
return m_input;
}
IOutputStream*
CTCPSocket::getOutputStream()
{
return m_output;
}
void
CTCPSocket::init()
{
m_mutex = new CMutex;
m_input = new CBufferedInputStream(m_mutex,
new TMethodJob<CTCPSocket>(
this, &CTCPSocket::emptyInput),
new TMethodJob<CTCPSocket>(
this, &CTCPSocket::closeInput));
m_output = new CBufferedOutputStream(m_mutex,
new TMethodJob<CTCPSocket>(
this, &CTCPSocket::fillOutput),
new TMethodJob<CTCPSocket>(
this, &CTCPSocket::closeOutput));
m_state = kUnconnected;
m_target = NULL;
m_job = NULL;
// default state
m_connected = false;
m_readable = false;
m_writable = false;
// make socket non-blocking
// FIXME -- check for error
@@ -196,172 +303,97 @@ CTCPSocket::init()
}
}
ISocketMultiplexerJob*
CTCPSocket::newMultiplexerJob(JobFunc func, bool readable, bool writable)
{
return new TSocketMultiplexerMethodJob<CTCPSocket>(
this, func, m_socket, readable, writable);
}
ISocketMultiplexerJob*
CTCPSocket::setState(State state, bool setJob)
{
if (m_state == state || m_state == kClosed) {
return m_job;
}
State oldState = m_state;
m_state = state;
bool read = (m_input->getSize() > 0);
bool write = (m_output->getSize() > 0);
CEvent::Type eventType = 0;
m_job = NULL;
switch (m_state) {
case kUnconnected:
assert(0 && "cannot re-enter unconnected state");
break;
case kConnecting:
m_job = newMultiplexerJob(&CTCPSocket::serviceConnecting, false, true);
break;
case kReadWrite:
if (oldState == kConnecting) {
eventType = IDataSocket::getConnectedEvent();
}
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, true, write);
break;
case kReadOnly:
if (!write) {
eventType = IDataSocket::getShutdownOutputEvent();
}
if (oldState == kWriteOnly) {
goto shutdown;
}
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, true, write);
break;
case kWriteOnly:
if (!read) {
m_input->hangup();
eventType = IDataSocket::getShutdownInputEvent();
}
if (oldState == kReadOnly) {
goto shutdown;
}
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, false, write);
break;
case kShutdown:
shutdown:
if (!read && !write) {
eventType = ISocket::getDisconnectedEvent();
m_state = kClosed;
}
else {
m_state = kShutdown;
}
break;
case kClosed:
m_input->hangup();
if (oldState == kConnecting) {
eventType = IDataSocket::getConnectionFailedEvent();
}
else {
eventType = ISocket::getDisconnectedEvent();
}
break;
}
// notify
if (eventType != 0) {
sendEvent(eventType);
}
// cut over to new job. multiplexer will delete the old job.
if (setJob) {
if (m_job == NULL) {
CSocketMultiplexer::getInstance()->removeSocket(this);
}
else {
CSocketMultiplexer::getInstance()->addSocket(this, m_job);
}
}
return m_job;
}
void
CTCPSocket::closeInput(void*)
CTCPSocket::setJob(ISocketMultiplexerJob* job)
{
// note -- m_mutex should already be locked
try {
ARCH->closeSocketForRead(m_socket);
setState(kWriteOnly, true);
}
catch (XArchNetwork&) {
// ignore
}
}
void
CTCPSocket::closeOutput(void*)
{
// note -- m_mutex should already be locked
try {
// ARCH->closeSocketForWrite(m_socket);
setState(kReadOnly, true);
}
catch (XArchNetwork&) {
// ignore
}
}
void
CTCPSocket::emptyInput(void*)
{
// note -- m_mutex should already be locked
bool write = (m_output->getSize() > 0);
if (m_state == kWriteOnly && !write) {
m_state = kShutdown;
}
if (m_state == kWriteOnly) {
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, false, write);
CSocketMultiplexer::getInstance()->addSocket(this, m_job);
m_input->hangup();
sendEvent(IDataSocket::getShutdownInputEvent());
}
else if (m_state == kShutdown) {
m_job = NULL;
// multiplexer will delete the old job
if (job == NULL) {
CSocketMultiplexer::getInstance()->removeSocket(this);
if (!write) {
sendEvent(ISocket::getDisconnectedEvent());
m_state = kClosed;
}
}
else {
CSocketMultiplexer::getInstance()->addSocket(this, job);
}
}
ISocketMultiplexerJob*
CTCPSocket::newJob()
{
// note -- must have m_mutex locked on entry
if (m_socket == NULL || !(m_readable || m_writable)) {
return NULL;
}
else if (!m_connected) {
assert(!m_readable);
return new TSocketMultiplexerMethodJob<CTCPSocket>(
this, &CTCPSocket::serviceConnecting,
m_socket, m_readable, m_writable);
}
else {
return new TSocketMultiplexerMethodJob<CTCPSocket>(
this, &CTCPSocket::serviceConnected,
m_socket, m_readable,
m_writable && (m_outputBuffer.getSize() > 0));
}
}
void
CTCPSocket::fillOutput(void*)
CTCPSocket::sendSocketEvent(CEvent::Type type)
{
// note -- m_mutex should already be locked
if (m_state == kReadWrite) {
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, true, true);
CSocketMultiplexer::getInstance()->addSocket(this, m_job);
EVENTQUEUE->addEvent(CEvent(type, this, NULL));
}
void
CTCPSocket::sendStreamEvent(CEvent::Type type)
{
if (m_eventFilter != NULL) {
m_eventFilter->run(CEvent(type, this, NULL));
}
else if (m_state == kWriteOnly) {
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected, false, true);
CSocketMultiplexer::getInstance()->addSocket(this, m_job);
else {
EVENTQUEUE->addEvent(CEvent(type, this, NULL));
}
}
void
CTCPSocket::onConnected()
{
m_connected = true;
m_readable = true;
m_writable = true;
}
void
CTCPSocket::onInputShutdown()
{
m_inputBuffer.pop(m_inputBuffer.getSize());
m_readable = false;
}
void
CTCPSocket::onOutputShutdown()
{
m_outputBuffer.pop(m_outputBuffer.getSize());
m_writable = false;
// we're now flushed
m_flushed = true;
m_flushed.broadcast();
}
void
CTCPSocket::onDisconnected()
{
// disconnected
onInputShutdown();
onOutputShutdown();
m_connected = false;
}
ISocketMultiplexerJob*
CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
bool, bool write, bool error)
{
CLock lock(m_mutex);
CLock lock(&m_mutex);
if (write && !error) {
try {
@@ -374,11 +406,15 @@ CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
}
if (error) {
return setState(kClosed, false);
sendSocketEvent(getConnectionFailedEvent());
onDisconnected();
return newJob();
}
if (write) {
return setState(kReadWrite, false);
sendSocketEvent(getConnectedEvent());
onConnected();
return newJob();
}
return job;
@@ -388,79 +424,99 @@ ISocketMultiplexerJob*
CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
bool read, bool write, bool error)
{
CLock lock(m_mutex);
CLock lock(&m_mutex);
if (error) {
return setState(kClosed, false);
sendSocketEvent(getDisconnectedEvent());
onDisconnected();
return newJob();
}
if (write) {
// get amount of data to write
UInt32 n = m_output->getSize();
bool needNewJob = false;
// write data
if (write) {
try {
const void* buffer = m_output->peek(n);
size_t n2 = ARCH->writeSocket(m_socket, buffer, n);
// write data
UInt32 n = m_outputBuffer.getSize();
const void* buffer = m_outputBuffer.peek(n);
n = (UInt32)ARCH->writeSocket(m_socket, buffer, n);
// discard written data
if (n2 > 0) {
m_output->pop(n2);
if (n > 0) {
m_outputBuffer.pop(n);
if (m_outputBuffer.getSize() == 0) {
sendStreamEvent(getOutputFlushedEvent());
m_flushed = true;
m_flushed.broadcast();
needNewJob = true;
}
}
}
catch (XArchNetworkShutdown&) {
// remote read end of stream hungup. our output side
// has therefore shutdown.
onOutputShutdown();
sendStreamEvent(getOutputShutdownEvent());
if (!m_readable && m_inputBuffer.getSize() == 0) {
sendSocketEvent(getDisconnectedEvent());
}
needNewJob = true;
}
catch (XArchNetworkDisconnected&) {
// stream hungup
onDisconnected();
sendSocketEvent(getDisconnectedEvent());
needNewJob = true;
}
catch (XArchNetwork&) {
// other write error
onDisconnected();
sendStreamEvent(getOutputErrorEvent());
sendSocketEvent(getDisconnectedEvent());
needNewJob = true;
}
}
if (read && m_readable) {
try {
UInt8 buffer[4096];
size_t n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
if (n > 0) {
bool wasEmpty = (m_inputBuffer.getSize() == 0);
// slurp up as much as possible
do {
m_inputBuffer.write(buffer, n);
n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
} while (n > 0);
// send input ready if input buffer was empty
if (wasEmpty) {
sendStreamEvent(getInputReadyEvent());
}
}
else {
// remote write end of stream hungup. our input side
// has therefore shutdown but don't flush our buffer
// since there's still data to be read.
sendStreamEvent(getInputShutdownEvent());
if (!m_writable && m_inputBuffer.getSize() == 0) {
sendSocketEvent(getDisconnectedEvent());
}
m_readable = false;
needNewJob = true;
}
}
catch (XArchNetworkDisconnected&) {
// stream hungup
return setState(kReadOnly, false);
sendSocketEvent(getDisconnectedEvent());
onDisconnected();
needNewJob = true;
}
catch (XArchNetwork&) {
// ignore other read error
}
}
if (read) {
UInt8 buffer[4096];
size_t n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
if (n > 0) {
// slurp up as much as possible
do {
m_input->write(buffer, n);
try {
n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
}
catch (XArchNetworkWouldBlock&) {
break;
}
} while (n > 0);
// notify
sendEvent(IDataSocket::getInputEvent());
}
else {
// stream hungup
return setState(kWriteOnly, false);
}
}
if (write && m_output->getSize() == 0) {
if (m_state == kReadOnly) {
ARCH->closeSocketForWrite(m_socket);
sendEvent(IDataSocket::getShutdownOutputEvent());
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected,
true, false);
job = m_job;
}
else if (m_state == kReadWrite || m_state == kReadOnly) {
m_job = newMultiplexerJob(&CTCPSocket::serviceConnected,
true, false);
job = m_job;
}
else if (m_state == kWriteOnly) {
m_job = NULL;
job = m_job;
}
}
return job;
}
void
CTCPSocket::sendEvent(CEvent::Type type)
{
CEventQueue::getInstance()->addEvent(CEvent(type, m_target, NULL));
return needNewJob ? newJob() : job;
}