mirror of
https://github.com/debauchee/barrier.git
synced 2026-02-13 07:06:10 +08:00
Make ownership of SocketMultiplexerJob explicit
This commit is contained in:
@@ -33,6 +33,20 @@
|
||||
// SocketMultiplexer
|
||||
//
|
||||
|
||||
class CursorMultiplexerJob : public ISocketMultiplexerJob {
|
||||
public:
|
||||
MultiplexerJobStatus run(bool readable, bool writable, bool error) override
|
||||
{
|
||||
return {false, {}};
|
||||
}
|
||||
|
||||
ArchSocket getSocket() const override { return {}; }
|
||||
bool isReadable() const override { return false; }
|
||||
bool isWritable() const override { return false; }
|
||||
bool isCursor() const override { return true; }
|
||||
};
|
||||
|
||||
|
||||
SocketMultiplexer::SocketMultiplexer() :
|
||||
m_mutex(new Mutex),
|
||||
m_thread(NULL),
|
||||
@@ -43,12 +57,6 @@ SocketMultiplexer::SocketMultiplexer() :
|
||||
m_jobListLocker(NULL),
|
||||
m_jobListLockLocker(NULL)
|
||||
{
|
||||
// this pointer just has to be unique and not NULL. it will
|
||||
// never be dereferenced. it's used to identify cursor nodes
|
||||
// in the jobs list.
|
||||
// TODO: Remove this evilness
|
||||
m_cursorMark = reinterpret_cast<ISocketMultiplexerJob*>(this);
|
||||
|
||||
// start thread
|
||||
m_thread = new Thread(new TMethodJob<SocketMultiplexer>(
|
||||
this, &SocketMultiplexer::serviceThread));
|
||||
@@ -66,16 +74,9 @@ SocketMultiplexer::~SocketMultiplexer()
|
||||
delete m_jobListLocker;
|
||||
delete m_jobListLockLocker;
|
||||
delete m_mutex;
|
||||
|
||||
// clean up jobs
|
||||
for (SocketJobMap::iterator i = m_socketJobMap.begin();
|
||||
i != m_socketJobMap.end(); ++i) {
|
||||
delete *(i->second);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
|
||||
void SocketMultiplexer::addSocket(ISocket* socket, std::unique_ptr<ISocketMultiplexerJob>&& job)
|
||||
{
|
||||
assert(socket != NULL);
|
||||
assert(job != NULL);
|
||||
@@ -95,16 +96,12 @@ SocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
|
||||
// we *must* put the job at the end so the order of jobs in
|
||||
// the list continue to match the order of jobs in pfds in
|
||||
// serviceThread().
|
||||
JobCursor j = m_socketJobs.insert(m_socketJobs.end(), job);
|
||||
JobCursor j = m_socketJobs.insert(m_socketJobs.end(), std::move(job));
|
||||
m_update = true;
|
||||
m_socketJobMap.insert(std::make_pair(socket, j));
|
||||
}
|
||||
else {
|
||||
JobCursor j = i->second;
|
||||
if (*j != job) {
|
||||
delete *j;
|
||||
*j = job;
|
||||
}
|
||||
*(i->second) = std::move(job);
|
||||
m_update = true;
|
||||
}
|
||||
|
||||
@@ -131,10 +128,9 @@ SocketMultiplexer::removeSocket(ISocket* socket)
|
||||
// to match the order of jobs in pfds in serviceThread().
|
||||
SocketJobMap::iterator i = m_socketJobMap.find(socket);
|
||||
if (i != m_socketJobMap.end()) {
|
||||
if (*(i->second) != NULL) {
|
||||
delete *(i->second);
|
||||
*(i->second) = NULL;
|
||||
m_update = true;
|
||||
if (*(i->second)) {
|
||||
i->second->reset();
|
||||
m_update = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,14 +169,13 @@ SocketMultiplexer::serviceThread(void*)
|
||||
JobCursor cursor = newCursor();
|
||||
JobCursor jobCursor = nextCursor(cursor);
|
||||
while (jobCursor != m_socketJobs.end()) {
|
||||
ISocketMultiplexerJob* job = *jobCursor;
|
||||
if (job != NULL) {
|
||||
pfd.m_socket = job->getSocket();
|
||||
if (*jobCursor) {
|
||||
pfd.m_socket = (*jobCursor)->getSocket();
|
||||
pfd.m_events = 0;
|
||||
if (job->isReadable()) {
|
||||
if ((*jobCursor)->isReadable()) {
|
||||
pfd.m_events |= IArchNetwork::kPOLLIN;
|
||||
}
|
||||
if (job->isWritable()) {
|
||||
if ((*jobCursor)->isWritable()) {
|
||||
pfd.m_events |= IArchNetwork::kPOLLOUT;
|
||||
}
|
||||
pfds.push_back(pfd);
|
||||
@@ -221,15 +216,16 @@ SocketMultiplexer::serviceThread(void*)
|
||||
IArchNetwork::kPOLLNVAL)) != 0);
|
||||
|
||||
// run job
|
||||
ISocketMultiplexerJob* job = *jobCursor;
|
||||
ISocketMultiplexerJob* newJob = job->run(read, write, error);
|
||||
MultiplexerJobStatus status = (*jobCursor)->run(read, write, error);
|
||||
|
||||
// save job, if different
|
||||
if (newJob != job) {
|
||||
if (!status.continue_servicing) {
|
||||
Lock lock(m_mutex);
|
||||
delete job;
|
||||
*jobCursor = newJob;
|
||||
m_update = true;
|
||||
jobCursor->reset();
|
||||
m_update = true;
|
||||
} else if (status.new_job) {
|
||||
Lock lock(m_mutex);
|
||||
*jobCursor = std::move(status.new_job);
|
||||
m_update = true;
|
||||
}
|
||||
++i;
|
||||
}
|
||||
@@ -262,7 +258,7 @@ SocketMultiplexer::JobCursor
|
||||
SocketMultiplexer::newCursor()
|
||||
{
|
||||
Lock lock(m_mutex);
|
||||
return m_socketJobs.insert(m_socketJobs.begin(), m_cursorMark);
|
||||
return m_socketJobs.insert(m_socketJobs.begin(), std::make_unique<CursorMultiplexerJob>());
|
||||
}
|
||||
|
||||
SocketMultiplexer::JobCursor
|
||||
@@ -272,7 +268,7 @@ SocketMultiplexer::nextCursor(JobCursor cursor)
|
||||
JobCursor j = m_socketJobs.end();
|
||||
JobCursor i = cursor;
|
||||
while (++i != m_socketJobs.end()) {
|
||||
if (*i != m_cursorMark) {
|
||||
if (*i && !(*i)->isCursor()) {
|
||||
// found a real job (as opposed to a cursor)
|
||||
j = i;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user