14#ifndef MPICH_IGNORE_CXX_SEEK
15#define MPICH_IGNORE_CXX_SEEK
19#include <boost/mpi.hpp>
21#include <boost/config.hpp>
41#include "objectcache.h"
46#include "moduleregistry.h"
48#include <boost/dll/alias.hpp>
52namespace mpi = ::boost::mpi;
78 bool hasObject(
const Port *p);
81 typename Type::const_ptr accept(
const Port *port);
83 typename Type::const_ptr accept(
const std::string &port);
85 typename Type::const_ptr expect(
const Port *port);
87 typename Type::const_ptr expect(
const std::string &port);
89 void addDependency(std::shared_ptr<PortTask> dep);
91 void addObject(
const std::string &port,
Object::ptr obj);
98 bool dependenciesDone();
101 bool waitDependencies();
105 std::map<const Port *, Object::const_ptr>
m_input;
118 friend class Renderer;
122 static bool setup(
const std::string &shmname,
int moduleID,
int rank);
124 Module(
const std::string &name,
const int moduleID, mpi::communicator comm);
126 virtual void eventLoop();
129 virtual bool dispatch(
bool block =
true,
bool *messageReceived =
nullptr);
131 Parameter *addParameterGeneric(
const std::string &name, std::shared_ptr<Parameter> parameter)
override;
132 bool removeParameter(
Parameter *param)
override;
134 const std::string &name()
const;
135 const mpi::communicator &comm()
const;
136 const mpi::communicator &commShmGroup()
const;
139 int shmLeader(
int rank = -1)
const;
142 unsigned hardware_concurrency()
const;
144 ObjectCache::CacheMode setCacheMode(ObjectCache::CacheMode mode,
bool update =
true);
145 ObjectCache::CacheMode cacheMode()
const;
147 Port *createInputPort(
const std::string &name,
const std::string &description =
"",
const int flags = 0);
148 Port *createOutputPort(
const std::string &name,
const std::string &description =
"",
const int flags = 0);
149 bool destroyPort(
const std::string &portName);
150 bool destroyPort(
const Port *port);
165 ObjectList getObjects(
const std::string &portName);
166 bool hasObject(
const Port *port)
const;
167 bool hasObject(
const std::string &portName)
const;
172 typename Type::const_ptr accept(
Port *port);
174 typename Type::const_ptr accept(
const std::string &port);
175 template<
class Interface>
177 template<
class Interface>
181 typename Type::const_ptr expect(
Port *port);
183 typename Type::const_ptr expect(
const std::string &port);
186 void requestPortMapping(
unsigned short forwardPort,
unsigned short localPort);
188 void removePortMapping(
unsigned short forwardPort);
193 template<
class Payload>
194 bool sendMessageWithPayload(
message::Message &message, Payload &payload)
const;
197 void sendText(
int type,
const std::string &msg)
const;
200 void sendInfo(
const char *fmt, ...) const
202 __attribute__((format(printf, 2, 3)))
207 void sendWarning(
const char *fmt, ...) const
209 __attribute__((format(printf, 2, 3)))
214 void sendError(
const char *fmt, ...) const
216 __attribute__((format(printf, 2, 3)))
223 __attribute__((format(printf, 3, 4)))
228 void sendInfo(
const std::string &text)
const;
230 void sendWarning(
const std::string &text)
const;
232 void sendError(
const std::string &text)
const;
236 int schedulingPolicy()
const;
237 void setSchedulingPolicy(
int schedulingPolicy );
239 int reducePolicy()
const;
240 void setReducePolicy(
int reduceRequirement );
242 void virtual prepareQuit();
246 bool isConnected(
const Port &port)
const;
247 bool isConnected(
const std::string &portname)
const;
248 std::string getModuleName(
int id)
const;
249 int mirrorId()
const;
250 std::set<int> getMirrors()
const;
253 void setObjectReceivePolicy(
int pol);
254 int objectReceivePolicy()
const;
255 void startIteration();
265 void setDefaultCacheMode(ObjectCache::CacheMode mode);
273 bool cancelRequested(
bool collective =
false);
274 bool wasCancelRequested()
const;
276 virtual bool addInputObject(
int sender,
const std::string &senderPort,
const std::string &portName,
279 objectAdded(
int sender,
const std::string &senderPort,
282 bool syncMessageProcessing()
const;
283 void setSyncMessageProcessing(
bool sync);
285 virtual void connectionAdded(
const Port *from,
const Port *to);
286 virtual void connectionRemoved(
const Port *from,
const Port *to);
289 bool changeParameter(
const Parameter *p)
override;
291 int openmpThreads()
const;
292 void setOpenmpThreads(
int,
bool updateParam =
true);
294 void enableBenchmark(
bool benchmark,
bool updateParam =
true);
296 virtual bool prepare();
297 virtual bool reduce(
int timestep);
298 virtual bool cancelExecute();
299 int numTimesteps()
const;
301 void setStatus(
const std::string &text, message::UpdateStatus::Importance prio = message::UpdateStatus::Low);
310 std::shared_ptr<StateTracker> m_stateTracker;
312 int m_schedulingPolicy;
315 bool havePort(
const std::string &name);
316 Port *findInputPort(
const std::string &name);
317 const Port *findInputPort(
const std::string &name)
const;
318 Port *findOutputPort(
const std::string &name);
319 const Port *findOutputPort(
const std::string &name)
const;
324 virtual bool parameterAdded(
const int senderId,
const std::string &name,
const message::AddParameter &msg,
325 const std::string &moduleName);
327 virtual bool parameterChanged(
const int senderId,
const std::string &name,
const message::SetParameter &msg);
331 virtual bool compute();
332 virtual bool compute(std::shared_ptr<PortTask> task)
const;
334 std::map<std::string, Port> outputPorts;
335 std::map<std::string, Port> inputPorts;
338 ObjectCache::CacheMode m_defaultCacheMode;
339 bool m_prioritizeVisible;
340 void updateCacheMode();
341 bool m_syncMessageProcessing;
343 void updateOutputMode();
344 std::streambuf *m_origStreambuf =
nullptr, *m_streambuf =
nullptr;
348 double m_benchmarkStart;
349 double m_avgComputeTime;
350 mpi::communicator m_comm, m_commShmGroup, m_commShmLeaders;
351 std::vector<int> m_shmLeaders;
354 bool m_cancelRequested =
false, m_cancelExecuteCalled =
false, m_executeAfterCancelFound =
false;
355 bool m_prepared, m_computed, m_reduced;
358 IntParameter *m_concurrency =
nullptr;
360 std::shared_ptr<PortTask> m_lastTask;
361 std::deque<std::shared_ptr<PortTask>> m_tasks;
363 unsigned m_hardware_concurrency = 1;
376#define MODULE_MAIN_THREAD(X, THREAD_MODE) \
377 static std::shared_ptr<vistle::Module> newModuleInstance(const std::string &name, int moduleId, \
378 mpi::communicator comm) \
380 vistle::Module::setup("dummy shm", moduleId, comm.rank()); \
381 return std::shared_ptr<X>(new X(name, moduleId, comm)); \
383 static vistle::ModuleRegistry::RegisterClass registerModule##X(VISTLE_MODULE_NAME, newModuleInstance);
385#define MODULE_MAIN_THREAD(X, THREAD_MODE) \
386 static std::shared_ptr<vistle::Module> newModuleInstance(const std::string &name, int moduleId, \
387 mpi::communicator comm) \
389 vistle::Module::setup("dummy shm", moduleId, comm.rank()); \
390 return std::shared_ptr<X>(new X(name, moduleId, comm)); \
392 BOOST_DLL_ALIAS(newModuleInstance, newModule)
395#define MODULE_DEBUG(X)
398#define MODULE_MAIN_THREAD(X, THREAD_MODE) \
399 int main(int argc, char **argv) \
402 std::cerr << "module requires exactly 4 parameters" << std::endl; \
405 int rank = -1, size = -1; \
407 std::string shmname = argv[1]; \
408 const std::string name = argv[2]; \
409 int moduleID = atoi(argv[3]); \
410 mpi::environment mpi_environment(argc, argv, THREAD_MODE, true); \
411 vistle::registerTypes(); \
412 mpi::communicator comm_world; \
413 rank = comm_world.rank(); \
414 size = comm_world.size(); \
415 vistle::Module::setup(shmname, moduleID, rank); \
417 X module(name, moduleID, comm_world); \
418 module.eventLoop(); \
420 comm_world.barrier(); \
421 } catch (vistle::exception & e) { \
422 std::cerr << "[" << rank << "/" << size << "]: fatal exception: " << e.what() << std::endl; \
423 std::cerr << " info: " << e.info() << std::endl; \
424 std::cerr << e.where() << std::endl; \
426 } catch (std::exception & e) { \
427 std::cerr << "[" << rank << "/" << size << "]: fatal exception: " << e.what() << std::endl; \
434#define MODULE_DEBUG(X)
436#define MODULE_DEBUG(X) \
437 std::cerr << #X << ": PID " << get_process_handle() << std::endl; \
438 std::cerr << " attach debugger within 10 s" << std::endl; \
440 std::cerr << " continuing..." << std::endl;
444#define MODULE_MAIN(X) MODULE_MAIN_THREAD(X, boost::mpi::threading::funneled)
Definition: messagesender.h:12
message::MessageQueue * sendMessageQueue
Definition: module.h:268
int m_executionCount
Definition: module.h:262
const std::string m_name
Definition: module.h:257
const int m_id
Definition: module.h:260
int m_rank
Definition: module.h:258
std::set< Port * > m_withOutput
Definition: module.h:263
const Interface * acceptInterface(Port *port)
std::deque< message::Buffer > messageBacklog
Definition: module.h:270
int m_size
Definition: module.h:259
message::MessageQueue * receiveMessageQueue
Definition: module.h:269
const Interface * acceptInterface(const std::string &port)
std::shared_ptr< const Object > const_ptr
Definition: object.h:68
std::shared_ptr< Object > ptr
Definition: object.h:67
Definition: parametermanager.h:16
Definition: parameter.h:26
std::mutex m_mutex
Definition: module.h:112
std::map< const Port *, Object::const_ptr > m_input
Definition: module.h:105
std::set< std::shared_ptr< PortTask > > m_dependencies
Definition: module.h:108
std::map< std::string, Port * > m_portsByString
Definition: module.h:107
std::shared_future< bool > m_future
Definition: module.h:113
std::map< Port *, std::deque< Object::ptr > > m_objects
Definition: module.h:109
std::set< Port * > m_ports
Definition: module.h:106
std::map< Port *, std::deque< bool > > m_passThrough
Definition: module.h:110
base class for Vistle read modules
Definition: reader.h:22
notification that a module has created a parameter
Definition: messages.h:460
Definition: message.h:286
trigger execution of a module function
Definition: messages.h:271
Definition: messagequeue.h:18
Definition: message.h:157
notification that a module has removed a parameter
Definition: messages.h:483
request parameter value update or notify that a parameter value has been changed
Definition: messages.h:501
Definition: shm_reference.h:15
base class for Vistle modules
std::string module(const std::string &prefix)
Definition: directory.cpp:61
Definition: allobjects.cpp:30
std::vector< char, allocator< char > > buffer
Definition: buffer.h:9
std::deque< obj_const_ptr > ObjectList
Definition: port.h:18
V_MODULEEXPORT double getRealTime(Object::const_ptr obj)
Definition: module.cpp:162
V_MODULEEXPORT int getTimestep(Object::const_ptr obj)
Definition: module.cpp:145
Definition: statetracker.h:80