From 5471d34089277ad5c622fd9c392b9270229d9e3d Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Thu, 14 Jan 2010 00:28:08 +0000 Subject: Created the minicron system. This is a cute little cron like implementation that allows a program to signal slots on a schedule, possibly a dynamic schedule. --- src/minicron.cpp | 360 +++++++++++++++++++++++++++++++++++++++++++++++++ src/minicron.h | 226 +++++++++++++++++++++++++++++++ src/tests/minicron.cpp | 53 ++++++++ 3 files changed, 639 insertions(+) create mode 100644 src/minicron.cpp create mode 100644 src/minicron.h create mode 100644 src/tests/minicron.cpp diff --git a/src/minicron.cpp b/src/minicron.cpp new file mode 100644 index 0000000..f20a382 --- /dev/null +++ b/src/minicron.cpp @@ -0,0 +1,360 @@ +#include "bu/minicron.h" + +#include +#include + +Bu::MiniCron::MiniCron() : + jidNext( 1 ) +{ +} + +Bu::MiniCron::~MiniCron() +{ +} + +bool Bu::MiniCron::hasJobs() +{ + return !hJobs.isEmpty(); +} + +time_t Bu::MiniCron::getNextRun() +{ + if( hasJobs() ) + return hJobs.peek()->getNextRun(); + return -1; +} + +void Bu::MiniCron::poll() +{ + time_t tNow = time( NULL ); + + while( !hJobs.isEmpty() ) + { + if( hJobs.peek()->getNextRun() <= tNow ) + { + Job *pJob = hJobs.dequeue(); + pJob->run(); + if( pJob->bContinue ) + { + hJobs.enqueue( pJob ); + } + else + { + delete pJob; + } + } + else + { + break; + } + } +} + +Bu::MiniCron::JobId Bu::MiniCron::addJob( Bu::MiniCron::CronSignal sigJob, + const Bu::MiniCron::Timer &t ) +{ + JobId jid = jidNext++; + Job *pJob = new Job( jid ); + pJob->sigJob = sigJob; + pJob->pTimer = t.clone(); + pJob->tNextRun = pJob->pTimer->nextTime(); + hJobs.enqueue( pJob ); + + return jid; +} + +Bu::MiniCron::JobId Bu::MiniCron::addJobOnce( Bu::MiniCron::CronSignal sigJob, + const Bu::MiniCron::Timer &t ) +{ + JobId jid = jidNext++; + Job *pJob = new Job( jid, false ); + pJob->sigJob = sigJob; + pJob->pTimer = t.clone(); + pJob->tNextRun = pJob->pTimer->nextTime(); + hJobs.enqueue( pJob ); + + return jid; +} + +void Bu::MiniCron::removeJob( JobId jid ) +{ + Bu::List lJobs; + while( !hJobs.isEmpty() ) + { + Job *pJob = hJobs.dequeue(); + if( pJob->getId() == jid ) + { + delete pJob; + } + else + lJobs.append( pJob ); + } + + for( Bu::List::iterator i = lJobs.begin(); i; i++ ) + { + hJobs.enqueue( *i ); + } +} + +Bu::MiniCron::Job::Job( JobId jid, bool bRepeat ) : + pTimer( NULL ), + bContinue( bRepeat ), + jid( jid ), + tAdded( time( NULL ) ), + iRunCount( 0 ) +{ +} + +Bu::MiniCron::Job::~Job() +{ + delete pTimer; + pTimer = NULL; +} + +void Bu::MiniCron::Job::run() +{ + iRunCount++; + tNextRun = pTimer->nextTime(); + sigJob( *this ); +} + +time_t Bu::MiniCron::Job::getNextRun() +{ + return tNextRun; +} + +void Bu::MiniCron::Job::calcNextRun() +{ + if( pTimer ) + tNextRun = pTimer->nextTime(); +} + +void Bu::MiniCron::Job::setTimer( const Timer &t ) +{ + delete pTimer; + pTimer = t.clone(); +} + +void Bu::MiniCron::Job::stop() +{ + bContinue = false; +} + +void Bu::MiniCron::Job::resume() +{ + bContinue = true; +} + +Bu::MiniCron::JobId Bu::MiniCron::Job::getId() +{ + return jid; +} + +time_t Bu::MiniCron::Job::getTimeCreated() +{ + return tAdded; +} + +int Bu::MiniCron::Job::getRunCount() +{ + return iRunCount; +} + +Bu::MiniCron::Timer::Timer() +{ +} + +Bu::MiniCron::Timer::~Timer() +{ +} + +Bu::MiniCron::TimerInterval::TimerInterval( time_t tFirst, time_t tInterval ) : + tNext( tFirst ), + tInterval( tInterval ) +{ +} + +Bu::MiniCron::TimerInterval::~TimerInterval() +{ +} + +time_t Bu::MiniCron::TimerInterval::nextTime() +{ + time_t tRet = tNext; + tNext += tInterval; + return tRet; +} + +Bu::MiniCron::TimerBasic::TimerBasic( const Bu::FString &s ) : + tLast( -1 ), + sSpec( s ) +{ +} + +Bu::MiniCron::TimerBasic::~TimerBasic() +{ +} + +time_t Bu::MiniCron::TimerBasic::nextTime() +{ + if( tLast == -1 ) + tLast = time( NULL ); + + Bu::FString::const_iterator i = sSpec.begin(); + switch( lex( i ) ) + { + case tokDaily: + { + int iHour = lexInt( i ); + int iMin = lexInt( i ); + + struct tm t; + ::memcpy( &t, localtime( &tLast ), sizeof(struct tm) ); + if( iHour < t.tm_hour || + (iHour == t.tm_hour && iMin <= t.tm_min) ) + { + t.tm_mday++; + } + t.tm_hour = iHour; + t.tm_min = iMin; + t.tm_sec = 0; + tLast = mktime( &t ); + } + break; + + case tokHourly: + { + int iMin = lexInt( i ); + + struct tm t; + ::memcpy( &t, localtime( &tLast ), sizeof(struct tm) ); + if( iMin <= t.tm_min ) + t.tm_hour++; + t.tm_min = iMin; + t.tm_sec = 0; + tLast = mktime( &t ); + } + break; + + case tokWeekly: + { + int iDay = lexInt( i ); + int iHour = lexInt( i ); + int iMin = lexInt( i ); + + struct tm t; + ::memcpy( &t, localtime( &tLast ), sizeof(struct tm) ); + if( iDay < t.tm_wday || + (iDay == t.tm_wday && iHour < t.tm_hour) || + (iDay == t.tm_wday && iHour == t.tm_hour + && iMin <= t.tm_min) ) + { + if( iDay <= t.tm_wday ) + t.tm_mday += 7 - (t.tm_wday-iDay); + else + t.tm_mday += 7 - (iDay-t.tm_wday); + } + else + { + t.tm_mday += (iDay-t.tm_wday); + } + t.tm_hour = iHour; + t.tm_min = iMin; + t.tm_sec = 0; + tLast = mktime( &t ); + } + break; + + case tokMonthly: + break; + + case tokYearly: + break; + + default: + break; + } + + return tLast; +} + +Bu::MiniCron::TimerBasic::Token Bu::MiniCron::TimerBasic::lex( + Bu::FString::const_iterator &i ) +{ + if( !i ) + { + return tokEos; + } + + Bu::FString::const_iterator b = i; + + for(; b && (*b == ' ' || *b == '\t'); b++ ) { i = b+1; } + for(; b && *b != ' ' && *b != '\t'; b++ ) { } + + Bu::FString sTok( i, b ); + i = b; + + if( sTok == "daily" ) + return tokDaily; + else if( sTok == "hourly" ) + return tokHourly; + else if( sTok == "weekly" ) + return tokWeekly; + else if( sTok == "monthly" ) + return tokMonthly; + else if( sTok == "yearly" ) + return tokYearly; + else if( sTok == "sun" ) + { + iVal = 0; + return valInt; + } + else if( sTok == "mon" ) + { + iVal = 1; + return valInt; + } + else if( sTok == "tue" ) + { + iVal = 2; + return valInt; + } + else if( sTok == "wed" ) + { + iVal = 3; + return valInt; + } + else if( sTok == "thu" ) + { + iVal = 4; + return valInt; + } + else if( sTok == "fri" ) + { + iVal = 5; + return valInt; + } + else if( sTok == "sat" ) + { + iVal = 6; + return valInt; + } + else if( sTok[0] >= '0' && sTok[0] <= '9' ) + { + iVal = strtol( sTok.getStr(), NULL, 0 ); + return valInt; + } + + return tokErr; +} + +int Bu::MiniCron::TimerBasic::lexInt( Bu::FString::const_iterator &i ) +{ + Token t = lex( i ); + if( t == tokEos ) + return 0; + if( t != valInt ) + throw Bu::ExceptionBase("Expected int, got something else."); + return iVal; +} + diff --git a/src/minicron.h b/src/minicron.h new file mode 100644 index 0000000..b100ad0 --- /dev/null +++ b/src/minicron.h @@ -0,0 +1,226 @@ +#ifndef BU_MINICRON_H +#define BU_MINICRON_H + +#include "bu/signals.h" +#include "bu/heap.h" +#include "bu/fstring.h" + +#include + +namespace Bu +{ + /** + * A simple cron like system designed to be embedded in any program. This + * class creates a simple cron system that can run any number of jobs at + * customizable intervals or schedules. It does not support some of the + * more complex scheduling that some cron systems can do such as load + * balancing directly, but this could be done on the job side. + * + * This system is synchronous, it does not use any threads on it's own, but + * it is threadsafe, so a cron thread could be created if desired. + * + * The operation is fairly simple, jobs can be added at any time, and use + * any timer they would like, even custom timers. When it is time for a + * job to be run it signals the slot provided when the job was added. Every + * job slot recieves a handle to the job object so that it may control it's + * own lifetime and get information about itself. In addition, every job + * is assigned a unique ID that can be used to control it's operation + * at any time. + * + * By default a job will continually reschedule itself after being run + * unless it calls stop() on it's job object, it is removed using + * removeJob() on the cron object, or it is added with addJobOnce. + */ + class MiniCron + { + public: + class Job; + class Timer; + typedef Bu::Signal1 CronSignal; + typedef int JobId; + + MiniCron(); + virtual ~MiniCron(); + + /** + * Tells you if there are jobs registered in the MiniCron. + *@returns true if there are jobs, false otherwise. + */ + virtual bool hasJobs(); + + /** + * If there are jobs, tells you the time the next one will execute. + *@returns The timestamp that the next job will execute at. + */ + virtual time_t getNextRun(); + + /** + * Call this regularly to execute all jobs that should be executed. + * This will loop until all jobs who's run time match the current time + * or are below the current time (we've missed them). + * If there is nothing to run, the runtime of this funcion is constant, + * it is very fast. Otherwise it executes at log(N) per job run, + * O(N*log(N)). + */ + virtual void poll(); + + /** + * Add a job for repeated scheduling. Pass in a slot to signal, and a + * Timer object to use to do the scheduling. This function returns a + * JobId which can be used at a later time to control the execution of + * the job. + */ + virtual JobId addJob( CronSignal sigJob, const Timer &t ); + + /** + * Add a job for one time scheduling. Pass in a slot to signal, and a + * Timer object to use to schodule the one run of this job. This + * function returns a JobId which can be used at a later time to control + * the execution of the job. + */ + virtual JobId addJobOnce( CronSignal sigJob, const Timer &t ); + + /** + * Remove a job, preventing all future runs of the job. If there is no + * job matching the given JobId then nothing will happen. However, this + * function is relatively expensive compared to the others in this class + * and has a worse case runtime of 2*N*log(N), still not that bad, and + * a O(N*log(N)). + */ + virtual void removeJob( JobId jid ); + + class Timer + { + public: + Timer(); + virtual ~Timer(); + + virtual time_t nextTime()=0; + virtual Timer *clone() const = 0; + }; + + class TimerInterval : public Timer + { + public: + TimerInterval( time_t tFirst, time_t tInterval ); + virtual ~TimerInterval(); + + virtual time_t nextTime(); + virtual Timer *clone() const + { return new TimerInterval( *this ); } + private: + time_t tNext; + time_t tInterval; + }; + + class TimerBasic : public Timer + { + public: + TimerBasic( const Bu::FString &s ); + virtual ~TimerBasic(); + + virtual time_t nextTime(); + virtual Timer *clone() const + { return new TimerBasic( *this ); } + + private: + enum Token + { + tokDaily, + tokHourly, + tokWeekly, + tokMonthly, + tokYearly, + valInt, + tokErr, + tokEos + }; + Token lex( Bu::FString::const_iterator &i ); + int lexInt( Bu::FString::const_iterator &i ); + int iVal; //< A temp variable for parsing. + time_t tLast; + Bu::FString sSpec; + }; + + class Job + { + friend class Bu::MiniCron; + public: + Job( JobId jid, bool bRepeat=true ); + virtual ~Job(); + + /** + * Execute this job once, increment the runcount and schedule the + * next occurance of it. + */ + void run(); + + /** + * Get the time this job will next run. + */ + time_t getNextRun(); + + /** + * Compute the time this job will next run. + */ + void calcNextRun(); + + /** + * Replace the current job timer with a new one, this will trigger + * a re-schedule. + */ + void setTimer( const Timer &t ); + + /** + * Stop execution of this job, never execute this job again. + */ + void stop(); + + /** + * Undo a previous stop. This will cause a job that has been + * stopped or even added with addJobOnce to be set for repeated + * scheduling. + */ + void resume(); + + /** + * Get the unique ID of this job. + */ + JobId getId(); + + /** + * Get the timestamp this job was created. + */ + time_t getTimeCreated(); + + /** + * Get the current run count of this job, how many times it has been + * executed. This is incremented before the slot is signaled. + */ + int getRunCount(); + + private: + CronSignal sigJob; + time_t tNextRun; + Timer *pTimer; + bool bContinue; + JobId jid; + time_t tAdded; + int iRunCount; + }; + + private: + struct JobPtrCmp + { + bool operator()( const Job *pLeft, const Job *pRight ) + { + return pLeft->tNextRun < pRight->tNextRun; + } + }; + typedef Bu::Heap JobHeap; + JobHeap hJobs; + JobId jidNext; + }; +}; + +#endif diff --git a/src/tests/minicron.cpp b/src/tests/minicron.cpp new file mode 100644 index 0000000..715a74d --- /dev/null +++ b/src/tests/minicron.cpp @@ -0,0 +1,53 @@ +#include "bu/minicron.h" +#include "bu/sio.h" + +#include + +using namespace Bu; + +Bu::MiniCron mCron; + +void job0( Bu::MiniCron::Job &job ) +{ + sio << time( NULL ) << ": job0( id = " << job.getId() << ", count = " + << job.getRunCount() << ")" << sio.nl; +} + +void job1( Bu::MiniCron::Job &job ) +{ + sio << time( NULL ) << ": job1( id = " << job.getId() << ", count = " + << job.getRunCount() << ")" << sio.nl; + mCron.removeJob( 4 ); +} + +void job2( Bu::MiniCron::Job &job ) +{ + sio << time( NULL ) << ": job2( id = " << job.getId() << ", count = " + << job.getRunCount() << ")" << sio.nl; +} + +void job3( Bu::MiniCron::Job &job ) +{ + sio << time( NULL ) << ": job3( id = " << job.getId() << ", count = " + << job.getRunCount() << ")" << sio.nl; +} + +int main() +{ + + mCron.addJob( slot( &job0 ), MiniCron::TimerInterval( time(NULL)+3, 5 ) ); + mCron.addJob( slot( &job1 ), MiniCron::TimerInterval( time(NULL)+10, 8 ) ); + mCron.addJob( slot( &job2 ), MiniCron::TimerBasic("weekly wed 17") ); + mCron.addJob( slot( &job3 ), MiniCron::TimerInterval( time(NULL)+1, 2 ) ); + + sio << time( NULL ) << ": Program started." << sio.nl; + + for(;;) + { + usleep( 50000 ); + mCron.poll(); + } + + return 0; +} + -- cgit v1.2.3