1
0
mirror of https://github.com/rwengine/openrw.git synced 2024-09-15 06:52:34 +02:00

Overhaul worker interface

This commit is contained in:
Daniel Evans 2014-06-04 11:53:11 +01:00
parent 51e4fb9989
commit f24ceeb42f
10 changed files with 272 additions and 320 deletions

View File

@ -0,0 +1,85 @@
#pragma once
#ifndef _LOADCONTEXT_HPP_
#define _LOADCONTEXT_HPP_
#include <queue>
#include <thread>
#include <mutex>
#include <functional>
#include <fstream>
class WorkContext;
class LoadWorker
{
WorkContext* _context;
public:
bool _running;
std::thread _thread;
void start();
LoadWorker( WorkContext* context )
: _context( context ), _running(true),
_thread( std::bind(&LoadWorker::start, this) ) { }
~LoadWorker( )
{
_running = false;
_thread.join();
}
};
class WorkJob
{
WorkContext* _context;
public:
WorkJob(WorkContext* context)
: _context(context) {}
virtual ~WorkJob() {}
/**
* @brief getContext
* @return The loading context for this Loader
*/
WorkContext* getContext() const { return _context; }
virtual void work() = 0;
virtual void complete() {}
};
class WorkContext
{
std::queue<WorkJob*> _workQueue;
std::queue<WorkJob*> _completeQueue;
LoadWorker _worker;
std::mutex _inMutex;
std::mutex _outMutex;
public:
WorkContext()
: _worker(this) { }
void queueJob( WorkJob* job )
{
std::lock_guard<std::mutex> guard(_inMutex);
_workQueue.push( job );
}
// Called by the worker thread - don't touch;
void workNext();
const std::queue<WorkJob*> getWorkQueue() const { return _workQueue; }
const std::queue<WorkJob*> getCompleteQueue() const { return _completeQueue; }
void update();
};
#endif

View File

@ -4,15 +4,15 @@
#include <iostream>
class LoadContext;
class WorkContext;
class DataLoader
{
LoadContext* _context;
WorkContext* _context;
public:
DataLoader(LoadContext* context)
DataLoader(WorkContext* context)
: _context(context) {}
virtual ~DataLoader() {}
@ -21,7 +21,7 @@ public:
* @brief getContext
* @return The loading context for this Loader
*/
LoadContext* getContext() const { return _context; }
WorkContext* getContext() const { return _context; }
/**
* @brief load the data contained in a set of bytes

View File

@ -1,86 +0,0 @@
#pragma once
#ifndef _LOADCONTEXT_HPP_
#define _LOADCONTEXT_HPP_
#include <queue>
#include <thread>
#include <functional>
#include <fstream>
class DataLoader;
class LoadContext;
struct LoadRequest
{
DataLoader* loader;
const char* data;
size_t size;
std::string filename;
};
class LoadWorker
{
LoadContext* _context;
public:
bool _started;
bool _running;
std::thread _thread;
void start();
LoadWorker( LoadContext* context )
: _context( context ), _started(false), _running(true),
_thread( std::bind(&LoadWorker::start, this) ) { }
~LoadWorker( )
{
_started = true;
_running = false;
_thread.join();
}
};
class LoadContext
{
std::queue<LoadRequest> _loadQueue;
std::queue<LoadRequest> _createQueue;
unsigned int _loaded;
unsigned int _loading;
LoadWorker _worker;
public:
LoadContext()
: _loaded(0), _loading(0), _worker(this) { }
void add(DataLoader* loader, const char* data, size_t size)
{
_loadQueue.push({loader, data, size, ""});
}
void add(DataLoader* loader, const std::string& filename)
{
_loadQueue.push({loader, nullptr, 0, filename});
}
void loadNext();
const std::queue<LoadRequest> getLoadQueue() const { return _loadQueue; }
unsigned int getLoaded() const { return _createQueue.size(); }
unsigned int getIncomplete() const { return _loadQueue.size() + _loading + _createQueue.size(); }
unsigned int getComplete() const { return _loaded; }
unsigned int getTotal() const { return getIncomplete() + getComplete(); }
void flushCreation();
void wait();
};
#endif

View File

@ -9,7 +9,7 @@
#include <vector>
#include <string>
#include <memory>
#include <WorkContext.hpp>
class Model;
@ -25,4 +25,28 @@ public:
Model* loadFromMemory(char *data, GameData* gameData);
};
#include <functional>
class LoaderIMG;
class LoadModelJob : public WorkJob
{
public:
typedef std::function<void ( Model* )> ModelCallback;
private:
GameData* _gameData;
LoaderIMG* _archive;
std::string _file;
ModelCallback _callback;
char* _data;
public:
LoadModelJob(WorkContext* context, GameData* gd, LoaderIMG& archive, const std::string& file, ModelCallback cb);
void work();
void complete();
};
#endif

View File

@ -0,0 +1,38 @@
#include <WorkContext.hpp>
#include <loaders/DataLoader.hpp>
void LoadWorker::start()
{
while( _running ) {
_context->workNext();
std::this_thread::yield();
}
}
void WorkContext::workNext()
{
WorkJob* j = nullptr;
{
std::lock_guard<std::mutex> guard( _inMutex );
if( _workQueue.empty() ) return;
j = _workQueue.front(); _workQueue.pop();
}
j->work();
{
std::lock_guard<std::mutex> guard( _outMutex );
_completeQueue.push( j );
}
}
void WorkContext::update()
{
std::lock_guard<std::mutex> guard( _outMutex );
while( ! _completeQueue.empty() ) {
WorkJob* j = _completeQueue.front(); _completeQueue.pop();
j->complete();
delete j;
}
}

View File

@ -1,69 +0,0 @@
#include <loaders/LoadContext.hpp>
#include <loaders/DataLoader.hpp>
void LoadWorker::start()
{
while( !_started ) {
std::this_thread::yield();
}
while( _running ) {
_context->loadNext();
std::this_thread::yield();
}
}
void LoadContext::loadNext()
{
// MUTEX
if( _loadQueue.size() == 0 ) {
// UMUTEX
return;
}
_loading++;
auto tl = _loadQueue.front();
_loadQueue.pop();
// UMUTEX
if( tl.data == nullptr ) {
// TODO: instigate opening of the data bytes.
}
bool result = tl.loader->load(tl.data, tl.size);
delete tl.data;
// MUTEX
if( result ) {
_createQueue.push(tl);
}
else {
// Create is rip
_loaded++;
}
_loading--;
// UMUTEX
}
void LoadContext::flushCreation()
{
// MUTEX
while( ! _createQueue.empty() ) {
auto tl = _createQueue.front();
_createQueue.pop();
tl.loader->create();
_loaded++;
}
// UMUTEX
}
void LoadContext::wait()
{
while( _loadQueue.size() > 0 || _loading > 0 ) std::this_thread::yield();
}

View File

@ -311,3 +311,28 @@ RW::BSSectionHeader LoaderDFF::readHeader(char *data, size_t &dataI)
{
return readStructure<RW::BSSectionHeader>(data, dataI);
}
#include <loaders/LoaderIMG.hpp>
LoadModelJob::LoadModelJob(WorkContext *context, GameData* gd, LoaderIMG &archive, const std::string &file, ModelCallback cb)
: WorkJob(context), _gameData(gd), _archive(&archive), _file(file), _callback(cb), _data(nullptr)
{
}
void LoadModelJob::work()
{
_data = _archive->loadToMemory(_file);
}
void LoadModelJob::complete()
{
// TODO allow some of the loading to process in a seperate thread.
LoaderDFF loader;
Model* m = loader.loadFromMemory(_data, _gameData);
_callback(m);
delete[] _data;
}

View File

@ -1,147 +0,0 @@
#include <boost/test/unit_test.hpp>
#include <loaders/LoadContext.hpp>
#include <loaders/DataLoader.hpp>
class TestLoader : public DataLoader
{
public:
bool _loaded, _loadResult, _created;
TestLoader(LoadContext* context, bool result )
: DataLoader(context), _loaded(false), _loadResult(result),
_created(false) { }
bool load( const char* bytes, size_t size )
{ std::this_thread::sleep_for(std::chrono::milliseconds(5)); _loaded = true; return _loadResult; }
void create() { _created = true; }
};
BOOST_AUTO_TEST_SUITE(LoaderTests)
BOOST_AUTO_TEST_CASE(test_interface)
{
{
LoadContext context;
TestLoader loader( &context, true );
BOOST_CHECK_EQUAL( loader.getContext(), &context );
// Test that the TestLoader works as expected.
const char* databytes = new const char[1]{ 0x00 };
BOOST_CHECK( ! loader._loaded );
BOOST_CHECK( ! loader._created );
BOOST_CHECK( loader.load(databytes, 1) );
BOOST_CHECK( loader._loaded );
BOOST_CHECK( ! loader._created );
loader.create();
BOOST_CHECK( loader._created );
}
}
BOOST_AUTO_TEST_CASE(test_context)
{
{
LoadContext context;
TestLoader test(&context, true);
const char* databytes = new const char[1]{ 0x00 };
context.add(&test, databytes, 1);
BOOST_CHECK_EQUAL( context.getIncomplete(), 1 );
BOOST_CHECK_EQUAL( context.getTotal(), 1 );
BOOST_CHECK_EQUAL( context.getComplete(), 0 );
auto& queue = context.getLoadQueue();
BOOST_REQUIRE( queue.size() == 1 );
BOOST_CHECK_EQUAL( queue.front().loader, &test );
BOOST_CHECK_EQUAL( queue.front().data, databytes );
BOOST_CHECK_EQUAL( queue.front().size, 1 );
// Worker thread tier.
context.loadNext();
BOOST_CHECK_EQUAL( context.getIncomplete(), 1 );
BOOST_CHECK_EQUAL( context.getTotal(), 1 );
BOOST_CHECK_EQUAL( context.getComplete(), 0 );
// Back on main thread
BOOST_CHECK_EQUAL( context.getLoaded(), 1 );
context.flushCreation();
BOOST_CHECK_EQUAL( context.getIncomplete(), 0 );
BOOST_CHECK_EQUAL( context.getTotal(), 1 );
BOOST_CHECK_EQUAL( context.getComplete(), 1 );
}
}
BOOST_AUTO_TEST_CASE(test_thread)
{
{
LoadContext context;
LoadWorker worker(&context);
TestLoader test(&context, true);
const char* databytes = new const char[1]{ 0x00 };
context.add(&test, databytes, 1);
BOOST_CHECK_EQUAL( context.getIncomplete(), 1 );
BOOST_CHECK_EQUAL( context.getLoaded(), 0 );
worker._started = true;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
BOOST_CHECK_EQUAL( context.getLoaded(), 1 );
BOOST_CHECK_EQUAL( context.getIncomplete(), 1 );
context.flushCreation();
BOOST_CHECK_EQUAL( context.getLoaded(), 0 );
BOOST_CHECK_EQUAL( context.getIncomplete(), 0 );
BOOST_CHECK_EQUAL( context.getComplete(), 1 );
}
{
LoadContext context;
LoadWorker worker(&context);
TestLoader test(&context, true);
const char* databytes = new const char[1]{ 0x00 };
context.add(&test, databytes, 1);
BOOST_CHECK_EQUAL( context.getIncomplete(), 1 );
BOOST_CHECK_EQUAL( context.getLoaded(), 0 );
worker._started = true;
context.wait();
BOOST_CHECK_EQUAL( context.getLoaded(), 1 );
BOOST_CHECK_EQUAL( context.getIncomplete(), 1 );
context.flushCreation();
BOOST_CHECK_EQUAL( context.getLoaded(), 0 );
BOOST_CHECK_EQUAL( context.getIncomplete(), 0 );
BOOST_CHECK_EQUAL( context.getComplete(), 1 );
}
}
BOOST_AUTO_TEST_SUITE_END()

View File

@ -1,26 +1,60 @@
#include <boost/test/unit_test.hpp>
#include "test_globals.hpp"
#include <render/Model.hpp>
#include <WorkContext.hpp>
BOOST_AUTO_TEST_SUITE(LoaderDFFTests)
BOOST_AUTO_TEST_CASE(test_open_dff)
{
LoaderIMG loader;
LoaderIMG archive;
BOOST_REQUIRE( loader.load(Global::getGamePath() + "/models/gta3") );
BOOST_REQUIRE( archive.load(Global::getGamePath() + "/models/gta3") );
{
auto d = archive.loadToMemory("landstal.dff");
LoaderDFF loader;
Model* m = loader.loadFromMemory(d, &Global::get().e->gameData);
BOOST_REQUIRE( m != nullptr );
BOOST_CHECK( m->frames.size() > 0 );
delete m;
}
}
BOOST_AUTO_TEST_CASE(test_modeljob)
{
LoaderIMG archive;
BOOST_REQUIRE( archive.load(Global::getGamePath() + "/models/gta3") );
{
WorkContext ctx;
Model* m = nullptr;
bool done = false;
LoadModelJob* lmj = new LoadModelJob(&ctx, &Global::get().e->gameData, archive, "landstal.dff",
[&](Model* model) { m = model; done = true; });
ctx.queueJob(lmj);
while( ! done ) {
ctx.update();
std::this_thread::yield();
}
BOOST_REQUIRE( m != nullptr );
BOOST_CHECK( m->frames.size() > 0 );
delete m;
}
auto d = loader.loadToMemory("landstal.dff");
LoaderDFF dffloader;
Model* m = dffloader.loadFromMemory(d, &Global::get().e->gameData);
BOOST_REQUIRE( m != nullptr );
BOOST_CHECK( m->frames.size() > 0 );
delete d;
delete m;
}
BOOST_AUTO_TEST_SUITE_END()

48
tests/test_worker.cpp Normal file
View File

@ -0,0 +1,48 @@
#include <boost/test/unit_test.hpp>
#include <WorkContext.hpp>
class TestJob : public WorkJob
{
public:
bool *_worked, *_completed;
TestJob( WorkContext* context, bool *w, bool *c )
: WorkJob(context), _worked(w), _completed(c)
{}
void work() { *_worked = true; }
void complete() { *_completed = true; }
};
BOOST_AUTO_TEST_SUITE(WorkTests)
BOOST_AUTO_TEST_CASE(test_interface)
{
{
WorkContext context;
bool worked = false, completed = false;
TestJob* job = new TestJob(&context, &worked, &completed);
BOOST_CHECK( ! worked );
BOOST_CHECK( ! completed );
context.queueJob(job);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
BOOST_CHECK( worked );
BOOST_CHECK( ! completed );
context.update();;
BOOST_CHECK( worked );
BOOST_CHECK( completed );
}
}
BOOST_AUTO_TEST_SUITE_END()