Console/ListenCommand.php 0000666 00000007504 13436752360 0011437 0 ustar 00 listener = $listener;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$this->setListenerOptions();
$delay = $this->input->getOption('delay');
// The memory limit is the amount of memory we will allow the script to occupy
// before killing it and letting a process manager restart it for us, which
// is to protect us against any memory leaks that will be in the scripts.
$memory = $this->input->getOption('memory');
$connection = $this->input->getArgument('connection');
$timeout = $this->input->getOption('timeout');
// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);
$this->listener->listen(
$connection, $queue, $delay, $memory, $timeout
);
}
/**
* Get the name of the queue connection to listen on.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
if (is_null($connection)) {
$connection = $this->laravel['config']['queue.default'];
}
$queue = $this->laravel['config']->get("queue.connections.{$connection}.queue", 'default');
return $this->input->getOption('queue') ?: $queue;
}
/**
* Set the options on the queue listener.
*
* @return void
*/
protected function setListenerOptions()
{
$this->listener->setEnvironment($this->laravel->environment());
$this->listener->setSleep($this->option('sleep'));
$this->listener->setMaxTries($this->option('tries'));
$this->listener->setOutputHandler(function ($type, $line) {
$this->output->write($line);
});
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection'],
];
}
/**
* Get the console command options.
*
* @return array
*/
protected function getOptions()
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on', null],
['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],
['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],
['timeout', null, InputOption::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60],
['sleep', null, InputOption::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3],
['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
}
}
Console/RestartCommand.php 0000666 00000001221 13436752360 0011613 0 ustar 00 laravel['cache']->forever('illuminate:queue:restart', time());
$this->info('Broadcasting queue restart signal.');
}
}
Console/FailedTableCommand.php 0000666 00000004105 13436752360 0012327 0 ustar 00 files = $files;
$this->composer = $composer;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$table = $this->laravel['config']['queue.failed.table'];
$tableClassName = Str::studly($table);
$fullPath = $this->createBaseMigration($table);
$stub = str_replace(
['{{table}}', '{{tableClassName}}'], [$table, $tableClassName], $this->files->get(__DIR__.'/stubs/failed_jobs.stub')
);
$this->files->put($fullPath, $stub);
$this->info('Migration created successfully!');
$this->composer->dumpAutoloads();
}
/**
* Create a base migration file for the table.
*
* @param string $table
* @return string
*/
protected function createBaseMigration($table = 'failed_jobs')
{
$name = 'create_'.$table.'_table';
$path = $this->laravel->databasePath().'/migrations';
return $this->laravel['migration.creator']->create($name, $path);
}
}
Console/RetryCommand.php 0000666 00000004213 13436752360 0011300 0 ustar 00 argument('id');
if (count($ids) === 1 && $ids[0] === 'all') {
$ids = Arr::pluck($this->laravel['queue.failer']->all(), 'id');
}
foreach ($ids as $id) {
$this->retryJob($id);
}
}
/**
* Retry the queue job with the given ID.
*
* @param string $id
* @return void
*/
protected function retryJob($id)
{
$failed = $this->laravel['queue.failer']->find($id);
if (! is_null($failed)) {
$failed = (object) $failed;
$failed->payload = $this->resetAttempts($failed->payload);
$this->laravel['queue']->connection($failed->connection)
->pushRaw($failed->payload, $failed->queue);
$this->laravel['queue.failer']->forget($failed->id);
$this->info("The failed job [{$id}] has been pushed back onto the queue!");
} else {
$this->error("No failed job matches the given ID [{$id}].");
}
}
/**
* Reset the payload attempts.
*
* @param string $payload
* @return string
*/
protected function resetAttempts($payload)
{
$payload = json_decode($payload, true);
if (isset($payload['attempts'])) {
$payload['attempts'] = 1;
}
return json_encode($payload);
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['id', InputArgument::IS_ARRAY, 'The ID of the failed job'],
];
}
}
Console/TableCommand.php 0000666 00000004051 13436752360 0011222 0 ustar 00 files = $files;
$this->composer = $composer;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$table = $this->laravel['config']['queue.connections.database.table'];
$tableClassName = Str::studly($table);
$fullPath = $this->createBaseMigration($table);
$stub = str_replace(
['{{table}}', '{{tableClassName}}'], [$table, $tableClassName], $this->files->get(__DIR__.'/stubs/jobs.stub')
);
$this->files->put($fullPath, $stub);
$this->info('Migration created successfully!');
$this->composer->dumpAutoloads();
}
/**
* Create a base migration file for the table.
*
* @param string $table
* @return string
*/
protected function createBaseMigration($table = 'jobs')
{
$name = 'create_'.$table.'_table';
$path = $this->laravel->databasePath().'/migrations';
return $this->laravel['migration.creator']->create($name, $path);
}
}
Console/stubs/failed_jobs.stub 0000666 00000001274 13436752360 0012467 0 ustar 00 increments('id');
$table->text('connection');
$table->text('queue');
$table->longText('payload');
$table->timestamp('failed_at')->useCurrent();
});
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::drop('{{table}}');
}
}
Console/stubs/jobs.stub 0000666 00000001672 13436752360 0011165 0 ustar 00 bigIncrements('id');
$table->string('queue');
$table->longText('payload');
$table->tinyInteger('attempts')->unsigned();
$table->tinyInteger('reserved')->unsigned();
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
$table->index(['queue', 'reserved', 'reserved_at']);
});
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::drop('{{table}}');
}
}
Console/FlushFailedCommand.php 0000666 00000001147 13436752360 0012364 0 ustar 00 laravel['queue.failer']->flush();
$this->info('All failed jobs deleted successfully!');
}
}
Console/WorkCommand.php 0000666 00000012475 13436752360 0011126 0 ustar 00 worker = $worker;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
if ($this->downForMaintenance() && ! $this->option('daemon')) {
return $this->worker->sleep($this->option('sleep'));
}
// We'll listen to the processed and failed events so we can write information
// to the console as jobs are processed, which will let the developer watch
// which jobs are coming through a queue and be informed on its progress.
$this->listenForEvents();
$queue = $this->option('queue');
$delay = $this->option('delay');
// The memory limit is the amount of memory we will allow the script to occupy
// before killing it and letting a process manager restart it for us, which
// is to protect us against any memory leaks that will be in the scripts.
$memory = $this->option('memory');
$connection = $this->argument('connection');
$this->runWorker(
$connection, $queue, $delay, $memory, $this->option('daemon')
);
}
/**
* Listen for the queue events in order to update the console output.
*
* @return void
*/
protected function listenForEvents()
{
$this->laravel['events']->listen(JobProcessed::class, function ($event) {
$this->writeOutput($event->job, false);
});
$this->laravel['events']->listen(JobFailed::class, function ($event) {
$this->writeOutput($event->job, true);
});
}
/**
* Run the worker instance.
*
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @param bool $daemon
* @return array
*/
protected function runWorker($connection, $queue, $delay, $memory, $daemon = false)
{
$this->worker->setDaemonExceptionHandler(
$this->laravel['Illuminate\Contracts\Debug\ExceptionHandler']
);
if ($daemon) {
$this->worker->setCache($this->laravel['cache']->driver());
return $this->worker->daemon(
$connection, $queue, $delay, $memory,
$this->option('sleep'), $this->option('tries')
);
}
return $this->worker->pop(
$connection, $queue, $delay,
$this->option('sleep'), $this->option('tries')
);
}
/**
* Write the status output for the queue worker.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param bool $failed
* @return void
*/
protected function writeOutput(Job $job, $failed)
{
if ($failed) {
$this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Failed: '.$job->resolveName());
} else {
$this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Processed: '.$job->resolveName());
}
}
/**
* Determine if the worker should run in maintenance mode.
*
* @return bool
*/
protected function downForMaintenance()
{
if ($this->option('force')) {
return false;
}
return $this->laravel->isDownForMaintenance();
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection', null],
];
}
/**
* Get the console command options.
*
* @return array
*/
protected function getOptions()
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on'],
['daemon', null, InputOption::VALUE_NONE, 'Run the worker in daemon mode'],
['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],
['force', null, InputOption::VALUE_NONE, 'Force the worker to run even in maintenance mode'],
['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],
['sleep', null, InputOption::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3],
['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
}
}
Console/ForgetFailedCommand.php 0000666 00000001767 13436752360 0012541 0 ustar 00 laravel['queue.failer']->forget($this->argument('id'))) {
$this->info('Failed job deleted successfully!');
} else {
$this->error('No failed job matches the given ID.');
}
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['id', InputArgument::REQUIRED, 'The ID of the failed job'],
];
}
}
Console/ListFailedCommand.php 0000666 00000004646 13436752360 0012225 0 ustar 00 getFailedJobs();
if (count($jobs) == 0) {
return $this->info('No failed jobs!');
}
$this->displayFailedJobs($jobs);
}
/**
* Compile the failed jobs into a displayable format.
*
* @return array
*/
protected function getFailedJobs()
{
$results = [];
foreach ($this->laravel['queue.failer']->all() as $failed) {
$results[] = $this->parseFailedJob((array) $failed);
}
return array_filter($results);
}
/**
* Parse the failed job row.
*
* @param array $failed
* @return array
*/
protected function parseFailedJob(array $failed)
{
$row = array_values(Arr::except($failed, ['payload']));
array_splice($row, 3, 0, $this->extractJobName($failed['payload']));
return $row;
}
/**
* Extract the failed job name from payload.
*
* @param string $payload
* @return string|null
*/
private function extractJobName($payload)
{
$payload = json_decode($payload, true);
if ($payload && (! isset($payload['data']['command']))) {
return Arr::get($payload, 'job');
}
if ($payload && isset($payload['data']['command'])) {
preg_match('/"([^"]+)"/', $payload['data']['command'], $matches);
if (isset($matches[1])) {
return $matches[1];
} else {
return Arr::get($payload, 'job');
}
}
}
/**
* Display the failed jobs in the console.
*
* @param array $jobs
* @return void
*/
protected function displayFailedJobs(array $jobs)
{
$this->table($this->headers, $jobs);
}
}
ConsoleServiceProvider.php 0000666 00000002734 13436752360 0011736 0 ustar 00 app->singleton('command.queue.failed', function () {
return new ListFailedCommand;
});
$this->app->singleton('command.queue.retry', function () {
return new RetryCommand;
});
$this->app->singleton('command.queue.forget', function () {
return new ForgetFailedCommand;
});
$this->app->singleton('command.queue.flush', function () {
return new FlushFailedCommand;
});
$this->commands(
'command.queue.failed', 'command.queue.retry',
'command.queue.forget', 'command.queue.flush'
);
}
/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
'command.queue.failed', 'command.queue.retry',
'command.queue.forget', 'command.queue.flush',
];
}
}
RedisQueue.php 0000666 00000017727 13436752360 0007363 0 ustar 00 redis = $redis;
$this->default = $default;
$this->connection = $connection;
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->getConnection()->rpush($this->getQueue($queue), $payload);
return Arr::get(json_decode($payload, true), 'id');
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
$this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload);
return Arr::get(json_decode($payload, true), 'id');
}
/**
* Release a reserved job back onto the queue.
*
* @param string $queue
* @param string $payload
* @param int $delay
* @param int $attempts
* @return void
*/
public function release($queue, $payload, $delay, $attempts)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);
$this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$original = $queue ?: $this->default;
$queue = $this->getQueue($queue);
if (! is_null($this->expire)) {
$this->migrateAllExpiredJobs($queue);
}
$job = $this->getConnection()->lpop($queue);
if (! is_null($job)) {
$this->getConnection()->zadd($queue.':reserved', $this->getTime() + $this->expire, $job);
return new RedisJob($this->container, $this, $job, $original);
}
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem($this->getQueue($queue).':reserved', $job);
}
/**
* Migrate all of the waiting jobs in the queue.
*
* @param string $queue
* @return void
*/
protected function migrateAllExpiredJobs($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
/**
* Migrate the delayed jobs that are ready to the regular queue.
*
* @param string $from
* @param string $to
* @return void
*/
public function migrateExpiredJobs($from, $to)
{
$options = ['cas' => true, 'watch' => $from, 'retry' => 10];
$this->getConnection()->transaction($options, function ($transaction) use ($from, $to) {
// First we need to get all of jobs that have expired based on the current time
// so that we can push them onto the main queue. After we get them we simply
// remove them from this "delay" queues. All of this within a transaction.
$jobs = $this->getExpiredJobs(
$transaction, $from, $time = $this->getTime()
);
// If we actually found any jobs, we will remove them from the old queue and we
// will insert them onto the new (ready) "queue". This means they will stand
// ready to be processed by the queue worker whenever their turn comes up.
if (count($jobs) > 0) {
$this->removeExpiredJobs($transaction, $from, $time);
$this->pushExpiredJobsOntoNewQueue($transaction, $to, $jobs);
}
});
}
/**
* Get the expired jobs from a given queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $from
* @param int $time
* @return array
*/
protected function getExpiredJobs($transaction, $from, $time)
{
return $transaction->zrangebyscore($from, '-inf', $time);
}
/**
* Remove the expired jobs from a given queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $from
* @param int $time
* @return void
*/
protected function removeExpiredJobs($transaction, $from, $time)
{
$transaction->multi();
$transaction->zremrangebyscore($from, '-inf', $time);
}
/**
* Push all of the given jobs onto another queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $to
* @param array $jobs
* @return void
*/
protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
{
call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*/
protected function createPayload($job, $data = '', $queue = null)
{
$payload = parent::createPayload($job, $data);
$payload = $this->setMeta($payload, 'id', $this->getRandomId());
return $this->setMeta($payload, 'attempts', 1);
}
/**
* Get a random ID string.
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return 'queues:'.($queue ?: $this->default);
}
/**
* Get the connection for the queue.
*
* @return \Predis\ClientInterface
*/
protected function getConnection()
{
return $this->redis->connection($this->connection);
}
/**
* Get the underlying Redis instance.
*
* @return \Illuminate\Redis\Database
*/
public function getRedis()
{
return $this->redis;
}
/**
* Get the expiration time in seconds.
*
* @return int|null
*/
public function getExpire()
{
return $this->expire;
}
/**
* Set the expiration time in seconds.
*
* @param int|null $seconds
* @return void
*/
public function setExpire($seconds)
{
$this->expire = $seconds;
}
}
QueueServiceProvider.php 0000666 00000015650 13436752360 0011421 0 ustar 00 registerManager();
$this->registerWorker();
$this->registerListener();
$this->registerFailedJobServices();
$this->registerQueueClosure();
}
/**
* Register the queue manager.
*
* @return void
*/
protected function registerManager()
{
$this->app->singleton('queue', function ($app) {
// Once we have an instance of the queue manager, we will register the various
// resolvers for the queue connectors. These connectors are responsible for
// creating the classes that accept queue configs and instantiate queues.
$manager = new QueueManager($app);
$this->registerConnectors($manager);
return $manager;
});
$this->app->singleton('queue.connection', function ($app) {
return $app['queue']->connection();
});
}
/**
* Register the queue worker.
*
* @return void
*/
protected function registerWorker()
{
$this->registerWorkCommand();
$this->registerRestartCommand();
$this->app->singleton('queue.worker', function ($app) {
return new Worker($app['queue'], $app['queue.failer'], $app['events']);
});
}
/**
* Register the queue worker console command.
*
* @return void
*/
protected function registerWorkCommand()
{
$this->app->singleton('command.queue.work', function ($app) {
return new WorkCommand($app['queue.worker']);
});
$this->commands('command.queue.work');
}
/**
* Register the queue listener.
*
* @return void
*/
protected function registerListener()
{
$this->registerListenCommand();
$this->app->singleton('queue.listener', function ($app) {
return new Listener($app->basePath());
});
}
/**
* Register the queue listener console command.
*
* @return void
*/
protected function registerListenCommand()
{
$this->app->singleton('command.queue.listen', function ($app) {
return new ListenCommand($app['queue.listener']);
});
$this->commands('command.queue.listen');
}
/**
* Register the queue restart console command.
*
* @return void
*/
public function registerRestartCommand()
{
$this->app->singleton('command.queue.restart', function () {
return new RestartCommand;
});
$this->commands('command.queue.restart');
}
/**
* Register the connectors on the queue manager.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function registerConnectors($manager)
{
foreach (['Null', 'Sync', 'Database', 'Beanstalkd', 'Redis', 'Sqs'] as $connector) {
$this->{"register{$connector}Connector"}($manager);
}
}
/**
* Register the Null queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerNullConnector($manager)
{
$manager->addConnector('null', function () {
return new NullConnector;
});
}
/**
* Register the Sync queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSyncConnector($manager)
{
$manager->addConnector('sync', function () {
return new SyncConnector;
});
}
/**
* Register the Beanstalkd queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerBeanstalkdConnector($manager)
{
$manager->addConnector('beanstalkd', function () {
return new BeanstalkdConnector;
});
}
/**
* Register the database queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerDatabaseConnector($manager)
{
$manager->addConnector('database', function () {
return new DatabaseConnector($this->app['db']);
});
}
/**
* Register the Redis queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerRedisConnector($manager)
{
$app = $this->app;
$manager->addConnector('redis', function () use ($app) {
return new RedisConnector($app['redis']);
});
}
/**
* Register the Amazon SQS queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSqsConnector($manager)
{
$manager->addConnector('sqs', function () {
return new SqsConnector;
});
}
/**
* Register the failed job services.
*
* @return void
*/
protected function registerFailedJobServices()
{
$this->app->singleton('queue.failer', function ($app) {
$config = $app['config']['queue.failed'];
if (isset($config['table'])) {
return new DatabaseFailedJobProvider($app['db'], $config['database'], $config['table']);
} else {
return new NullFailedJobProvider;
}
});
}
/**
* Register the Illuminate queued closure job.
*
* @return void
*/
protected function registerQueueClosure()
{
$this->app->singleton('IlluminateQueueClosure', function ($app) {
return new IlluminateQueueClosure($app['encrypter']);
});
}
/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
'queue', 'queue.worker', 'queue.listener', 'queue.failer',
'command.queue.work', 'command.queue.listen',
'command.queue.restart', 'queue.connection',
];
}
}
InteractsWithQueue.php 0000666 00000002476 13436752360 0011100 0 ustar 00 job ? $this->job->attempts() : 1;
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
if ($this->job) {
return $this->job->delete();
}
}
/**
* Fail the job from the queue.
*
* @return void
*/
public function failed()
{
if ($this->job) {
return $this->job->failed();
}
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
if ($this->job) {
return $this->job->release($delay);
}
}
/**
* Set the base queue job instance.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return $this
*/
public function setJob(JobContract $job)
{
$this->job = $job;
return $this;
}
}
README.md 0000666 00000002312 13436752360 0006036 0 ustar 00 ## Illuminate Queue
The Laravel Queue component provides a unified API across a variety of different queue services. Queues allow you to defer the processing of a time consuming task, such as sending an e-mail, until a later time, thus drastically speeding up the web requests to your application.
### Usage Instructions
First, create a new Queue `Capsule` manager instance. Similar to the "Capsule" provided for the Eloquent ORM, the queue Capsule aims to make configuring the library for usage outside of the Laravel framework as easy as possible.
```PHP
use Illuminate\Queue\Capsule\Manager as Queue;
$queue = new Queue;
$queue->addConnection([
'driver' => 'beanstalkd',
'host' => 'localhost',
'queue' => 'default',
]);
// Make this Capsule instance available globally via static methods... (optional)
$queue->setAsGlobal();
```
Once the Capsule instance has been registered. You may use it like so:
```PHP
// As an instance...
$queue->push('SendEmail', array('message' => $message));
// If setAsGlobal has been called...
Queue::push('SendEmail', array('message' => $message));
```
For further documentation on using the queue, consult the [Laravel framework documentation](http://laravel.com/docs).
Events/WorkerStopping.php 0000666 00000000113 13436752360 0011526 0 ustar 00 job = $job;
$this->data = $data;
$this->connectionName = $connectionName;
}
}
Events/JobProcessing.php 0000666 00000001354 13436752360 0011310 0 ustar 00 job = $job;
$this->data = $data;
$this->connectionName = $connectionName;
}
}
Events/JobExceptionOccurred.php 0000666 00000001661 13436752360 0012622 0 ustar 00 job = $job;
$this->data = $data;
$this->exception = $exception;
$this->connectionName = $connectionName;
}
}
Events/JobFailed.php 0000666 00000001672 13436752360 0010363 0 ustar 00 job = $job;
$this->data = $data;
$this->failedId = $failedId;
$this->connectionName = $connectionName;
}
}
Connectors/SqsConnector.php 0000666 00000002106 13436752360 0012027 0 ustar 00 getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret']);
}
return new SqsQueue(
new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
);
}
/**
* Get the default configuration for SQS.
*
* @param array $config
* @return array
*/
protected function getDefaultConfiguration(array $config)
{
return array_merge([
'version' => 'latest',
'http' => [
'timeout' => 60,
'connect_timeout' => 60,
],
], $config);
}
}
Connectors/ConnectorInterface.php 0000666 00000000407 13436752360 0013163 0 ustar 00 redis = $redis;
$this->connection = $connection;
}
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
$queue = new RedisQueue(
$this->redis, $config['queue'], Arr::get($config, 'connection', $this->connection)
);
$queue->setExpire(Arr::get($config, 'expire', 60));
return $queue;
}
}
Connectors/DatabaseConnector.php 0000666 00000002064 13436752360 0012770 0 ustar 00 connections = $connections;
}
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new DatabaseQueue(
$this->connections->connection(Arr::get($config, 'connection')),
$config['table'],
$config['queue'],
Arr::get($config, 'expire', 60)
);
}
}
Connectors/SyncConnector.php 0000666 00000000546 13436752360 0012203 0 ustar 00 app = $app;
}
/**
* Register an event listener for the before job event.
*
* @param mixed $callback
* @return void
*/
public function before($callback)
{
$this->app['events']->listen(Events\JobProcessing::class, $callback);
}
/**
* Register an event listener for the after job event.
*
* @param mixed $callback
* @return void
*/
public function after($callback)
{
$this->app['events']->listen(Events\JobProcessed::class, $callback);
}
/**
* Register an event listener for the exception occurred job event.
*
* @param mixed $callback
* @return void
*/
public function exceptionOccurred($callback)
{
$this->app['events']->listen(Events\JobExceptionOccurred::class, $callback);
}
/**
* Register an event listener for the daemon queue loop.
*
* @param mixed $callback
* @return void
*/
public function looping($callback)
{
$this->app['events']->listen('illuminate.queue.looping', $callback);
}
/**
* Register an event listener for the failed job event.
*
* @param mixed $callback
* @return void
*/
public function failing($callback)
{
$this->app['events']->listen(Events\JobFailed::class, $callback);
}
/**
* Register an event listener for the daemon queue stopping.
*
* @param mixed $callback
* @return void
*/
public function stopping($callback)
{
$this->app['events']->listen(Events\WorkerStopping::class, $callback);
}
/**
* Determine if the driver is connected.
*
* @param string $name
* @return bool
*/
public function connected($name = null)
{
return isset($this->connections[$name ?: $this->getDefaultDriver()]);
}
/**
* Resolve a queue connection instance.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connection($name = null)
{
$name = $name ?: $this->getDefaultDriver();
// If the connection has not been resolved yet we will resolve it now as all
// of the connections are resolved when they are actually needed so we do
// not make any unnecessary connection to the various queue end-points.
if (! isset($this->connections[$name])) {
$this->connections[$name] = $this->resolve($name);
$this->connections[$name]->setContainer($this->app);
$this->connections[$name]->setEncrypter($this->app['encrypter']);
}
return $this->connections[$name];
}
/**
* Resolve a queue connection.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
protected function resolve($name)
{
$config = $this->getConfig($name);
return $this->getConnector($config['driver'])->connect($config);
}
/**
* Get the connector for a given driver.
*
* @param string $driver
* @return \Illuminate\Queue\Connectors\ConnectorInterface
*
* @throws \InvalidArgumentException
*/
protected function getConnector($driver)
{
if (isset($this->connectors[$driver])) {
return call_user_func($this->connectors[$driver]);
}
throw new InvalidArgumentException("No connector for [$driver]");
}
/**
* Add a queue connection resolver.
*
* @param string $driver
* @param \Closure $resolver
* @return void
*/
public function extend($driver, Closure $resolver)
{
return $this->addConnector($driver, $resolver);
}
/**
* Add a queue connection resolver.
*
* @param string $driver
* @param \Closure $resolver
* @return void
*/
public function addConnector($driver, Closure $resolver)
{
$this->connectors[$driver] = $resolver;
}
/**
* Get the queue connection configuration.
*
* @param string $name
* @return array
*/
protected function getConfig($name)
{
if ($name === null || $name === 'null') {
return ['driver' => 'null'];
}
return $this->app['config']["queue.connections.{$name}"];
}
/**
* Get the name of the default queue connection.
*
* @return string
*/
public function getDefaultDriver()
{
return $this->app['config']['queue.default'];
}
/**
* Set the name of the default queue connection.
*
* @param string $name
* @return void
*/
public function setDefaultDriver($name)
{
$this->app['config']['queue.default'] = $name;
}
/**
* Get the full name for the given connection.
*
* @param string $connection
* @return string
*/
public function getName($connection = null)
{
return $connection ?: $this->getDefaultDriver();
}
/**
* Determine if the application is in maintenance mode.
*
* @return bool
*/
public function isDownForMaintenance()
{
return $this->app->isDownForMaintenance();
}
/**
* Dynamically pass calls to the default connection.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public function __call($method, $parameters)
{
$callable = [$this->connection(), $method];
return call_user_func_array($callable, $parameters);
}
}
Queue.php 0000666 00000013121 13436752360 0006354 0 ustar 00 push($job, $data, $queue);
}
/**
* Push a new job onto the queue after a delay.
*
* @param string $queue
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @return mixed
*/
public function laterOn($queue, $delay, $job, $data = '')
{
return $this->later($delay, $job, $data, $queue);
}
/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function bulk($jobs, $data = '', $queue = null)
{
foreach ((array) $jobs as $job) {
$this->push($job, $data, $queue);
}
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*/
protected function createPayload($job, $data = '', $queue = null)
{
if ($job instanceof Closure) {
return json_encode($this->createClosurePayload($job, $data));
}
if (is_object($job)) {
return json_encode([
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'data' => ['commandName' => get_class($job), 'command' => serialize(clone $job)],
]);
}
return json_encode($this->createPlainPayload($job, $data));
}
/**
* Create a typical, "plain" queue payload array.
*
* @param string $job
* @param mixed $data
* @return array
*/
protected function createPlainPayload($job, $data)
{
return ['job' => $job, 'data' => $this->prepareQueueableEntities($data)];
}
/**
* Prepare any queueable entities for storage in the queue.
*
* @param mixed $data
* @return mixed
*/
protected function prepareQueueableEntities($data)
{
if ($data instanceof QueueableEntity) {
return $this->prepareQueueableEntity($data);
}
if (is_array($data)) {
$data = array_map(function ($d) {
if (is_array($d)) {
return $this->prepareQueueableEntities($d);
}
return $this->prepareQueueableEntity($d);
}, $data);
}
return $data;
}
/**
* Prepare a single queueable entity for storage on the queue.
*
* @param mixed $value
* @return mixed
*/
protected function prepareQueueableEntity($value)
{
if ($value instanceof QueueableEntity) {
return '::entity::|'.get_class($value).'|'.$value->getQueueableId();
}
return $value;
}
/**
* Create a payload string for the given Closure job.
*
* @param \Closure $job
* @param mixed $data
* @return array
*/
protected function createClosurePayload($job, $data)
{
$closure = $this->getEncrypter()->encrypt((new Serializer)->serialize($job));
return ['job' => 'IlluminateQueueClosure', 'data' => compact('closure')];
}
/**
* Set additional meta on a payload string.
*
* @param string $payload
* @param string $key
* @param string $value
* @return string
*/
protected function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
return json_encode(Arr::set($payload, $key, $value));
}
/**
* Calculate the number of seconds with the given delay.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getSeconds($delay)
{
if ($delay instanceof DateTime) {
return max(0, $delay->getTimestamp() - $this->getTime());
}
return (int) $delay;
}
/**
* Get the current UNIX timestamp.
*
* @return int
*/
protected function getTime()
{
return time();
}
/**
* Set the IoC container instance.
*
* @param \Illuminate\Container\Container $container
* @return void
*/
public function setContainer(Container $container)
{
$this->container = $container;
}
/**
* Get the encrypter implementation.
*
* @return \Illuminate\Contracts\Encryption\Encrypter
*
* @throws \Exception
*/
protected function getEncrypter()
{
if (is_null($this->encrypter)) {
throw new Exception('No encrypter has been set on the Queue.');
}
return $this->encrypter;
}
/**
* Set the encrypter implementation.
*
* @param \Illuminate\Contracts\Encryption\Encrypter $encrypter
* @return void
*/
public function setEncrypter(Encrypter $encrypter)
{
$this->encrypter = $encrypter;
}
}
Capsule/Manager.php 0000666 00000011070 13436752360 0010237 0 ustar 00 setupContainer($container ?: new Container);
// Once we have the container setup, we will setup the default configuration
// options in the container "config" bindings. This just makes this queue
// manager behave correctly since all the correct binding are in place.
$this->setupDefaultConfiguration();
$this->setupManager();
$this->registerConnectors();
}
/**
* Setup the default queue configuration options.
*
* @return void
*/
protected function setupDefaultConfiguration()
{
$this->container['config']['queue.default'] = 'default';
}
/**
* Build the queue manager instance.
*
* @return void
*/
protected function setupManager()
{
$this->manager = new QueueManager($this->container);
}
/**
* Register the default connectors that the component ships with.
*
* @return void
*/
protected function registerConnectors()
{
$provider = new QueueServiceProvider($this->container);
$provider->registerConnectors($this->manager);
}
/**
* Get a connection instance from the global manager.
*
* @param string $connection
* @return \Illuminate\Contracts\Queue\Queue
*/
public static function connection($connection = null)
{
return static::$instance->getConnection($connection);
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function push($job, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->push($job, $data, $queue);
}
/**
* Push a new an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function bulk($jobs, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->bulk($jobs, $data, $queue);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function later($delay, $job, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->later($delay, $job, $data, $queue);
}
/**
* Get a registered connection instance.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
public function getConnection($name = null)
{
return $this->manager->connection($name);
}
/**
* Register a connection with the manager.
*
* @param array $config
* @param string $name
* @return void
*/
public function addConnection(array $config, $name = 'default')
{
$this->container['config']["queue.connections.{$name}"] = $config;
}
/**
* Get the queue manager instance.
*
* @return \Illuminate\Queue\QueueManager
*/
public function getQueueManager()
{
return $this->manager;
}
/**
* Pass dynamic instance methods to the manager.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public function __call($method, $parameters)
{
return call_user_func_array([$this->manager, $method], $parameters);
}
/**
* Dynamically pass methods to the default connection.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public static function __callStatic($method, $parameters)
{
return call_user_func_array([static::connection(), $method], $parameters);
}
}
SerializesModels.php 0000666 00000005400 13436752360 0010547 0 ustar 00 getProperties();
foreach ($properties as $property) {
$property->setValue($this, $this->getSerializedPropertyValue(
$this->getPropertyValue($property)
));
}
return array_map(function ($p) {
return $p->getName();
}, $properties);
}
/**
* Restore the model after serialization.
*
* @return void
*/
public function __wakeup()
{
foreach ((new ReflectionClass($this))->getProperties() as $property) {
$property->setValue($this, $this->getRestoredPropertyValue(
$this->getPropertyValue($property)
));
}
}
/**
* Get the property value prepared for serialization.
*
* @param mixed $value
* @return mixed
*/
protected function getSerializedPropertyValue($value)
{
if ($value instanceof QueueableEntity) {
return new ModelIdentifier(get_class($value), $value->getQueueableId());
}
return $value;
}
/**
* Get the restored property value after deserialization.
*
* @param mixed $value
* @return mixed
*/
protected function getRestoredPropertyValue($value)
{
if (! $value instanceof ModelIdentifier) {
return $value;
}
return is_array($value->id)
? $this->restoreCollection($value)
: (new $value->class)->newQuery()->useWritePdo()->findOrFail($value->id);
}
/**
* Restore a queueable collection instance.
*
* @param \Illuminate\Contracts\Database\ModelIdentifier $value
* @return \Illuminate\Database\Eloquent\Collection
*/
protected function restoreCollection($value)
{
if (! $value->class || count($value->id) === 0) {
return new EloquentCollection;
}
$model = new $value->class;
return $model->newQuery()->useWritePdo()
->whereIn($model->getKeyName(), $value->id)->get();
}
/**
* Get the property value for the given property.
*
* @param \ReflectionProperty $property
* @return mixed
*/
protected function getPropertyValue(ReflectionProperty $property)
{
$property->setAccessible(true);
return $property->getValue($this);
}
}
BeanstalkdQueue.php 0000666 00000006601 13436752360 0010352 0 ustar 00 default = $default;
$this->timeToRun = $timeToRun;
$this->pheanstalk = $pheanstalk;
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pheanstalk->useTube($this->getQueue($queue))->put(
$payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun
);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));
return $pheanstalk->put($payload, Pheanstalk::DEFAULT_PRIORITY, $this->getSeconds($delay), $this->timeToRun);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$job = $this->pheanstalk->watchOnly($queue)->reserve(0);
if ($job instanceof PheanstalkJob) {
return new BeanstalkdJob($this->container, $this->pheanstalk, $job, $queue);
}
}
/**
* Delete a message from the Beanstalk queue.
*
* @param string $queue
* @param string $id
* @return void
*/
public function deleteMessage($queue, $id)
{
$this->pheanstalk->useTube($this->getQueue($queue))->delete($id);
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
return $queue ?: $this->default;
}
/**
* Get the underlying Pheanstalk instance.
*
* @return \Pheanstalk\Pheanstalk
*/
public function getPheanstalk()
{
return $this->pheanstalk;
}
}
NullQueue.php 0000666 00000002260 13436752360 0007211 0 ustar 00 table = $table;
$this->resolver = $resolver;
$this->database = $database;
}
/**
* Log a failed job into storage.
*
* @param string $connection
* @param string $queue
* @param string $payload
* @return int|null
*/
public function log($connection, $queue, $payload)
{
$failed_at = Carbon::now();
return $this->getTable()->insertGetId(compact('connection', 'queue', 'payload', 'failed_at'));
}
/**
* Get a list of all of the failed jobs.
*
* @return array
*/
public function all()
{
return $this->getTable()->orderBy('id', 'desc')->get();
}
/**
* Get a single failed job.
*
* @param mixed $id
* @return array
*/
public function find($id)
{
return $this->getTable()->find($id);
}
/**
* Delete a single failed job from storage.
*
* @param mixed $id
* @return bool
*/
public function forget($id)
{
return $this->getTable()->where('id', $id)->delete() > 0;
}
/**
* Flush all of the failed jobs from storage.
*
* @return void
*/
public function flush()
{
$this->getTable()->delete();
}
/**
* Get a new query builder instance for the table.
*
* @return \Illuminate\Database\Query\Builder
*/
protected function getTable()
{
return $this->resolver->connection($this->database)->table($this->table);
}
}
Worker.php 0000666 00000027306 13436752360 0006553 0 ustar 00 failer = $failer;
$this->events = $events;
$this->manager = $manager;
}
/**
* Listen to the given queue in a loop.
*
* @param string $connectionName
* @param string $queue
* @param int $delay
* @param int $memory
* @param int $sleep
* @param int $maxTries
* @return array
*/
public function daemon($connectionName, $queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0)
{
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
if ($this->daemonShouldRun()) {
$this->runNextJobForDaemon(
$connectionName, $queue, $delay, $sleep, $maxTries
);
} else {
$this->sleep($sleep);
}
if ($this->memoryExceeded($memory) || $this->queueShouldRestart($lastRestart)) {
$this->stop();
}
}
}
/**
* Run the next job for the daemon worker.
*
* @param string $connectionName
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return void
*/
protected function runNextJobForDaemon($connectionName, $queue, $delay, $sleep, $maxTries)
{
try {
$this->pop($connectionName, $queue, $delay, $sleep, $maxTries);
} catch (Exception $e) {
if ($this->exceptions) {
$this->exceptions->report($e);
}
} catch (Throwable $e) {
if ($this->exceptions) {
$this->exceptions->report(new FatalThrowableError($e));
}
}
}
/**
* Determine if the daemon should process on this iteration.
*
* @return bool
*/
protected function daemonShouldRun()
{
return $this->manager->isDownForMaintenance()
? false : $this->events->until('illuminate.queue.looping') !== false;
}
/**
* Listen to the given queue.
*
* @param string $connectionName
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return array
*/
public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
{
try {
$connection = $this->manager->connection($connectionName);
$job = $this->getNextJob($connection, $queue);
// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out. If there is no job on the queue
// we will "sleep" the worker for the specified number of seconds.
if (! is_null($job)) {
return $this->process(
$this->manager->getName($connectionName), $job, $maxTries, $delay
);
}
} catch (Exception $e) {
if ($this->exceptions) {
$this->exceptions->report($e);
}
}
$this->sleep($sleep);
return ['job' => null, 'failed' => false];
}
/**
* Get the next job from the queue connection.
*
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
protected function getNextJob($connection, $queue)
{
if (is_null($queue)) {
return $connection->pop();
}
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
}
/**
* Process a given job from the queue.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @param int $delay
* @return array|null
*
* @throws \Throwable
*/
public function process($connection, Job $job, $maxTries = 0, $delay = 0)
{
if ($maxTries > 0 && $job->attempts() > $maxTries) {
return $this->logFailedJob($connection, $job);
}
try {
$this->raiseBeforeJobEvent($connection, $job);
// First we will fire off the job. Once it is done we will see if it will be
// automatically deleted after processing and if so we'll fire the delete
// method on the job. Otherwise, we will just keep on running our jobs.
$job->fire();
$this->raiseAfterJobEvent($connection, $job);
return ['job' => $job, 'failed' => false];
} catch (Exception $e) {
$this->handleJobException($connection, $job, $delay, $e);
} catch (Throwable $e) {
$this->handleJobException($connection, $job, $delay, $e);
}
}
/**
* Handle an exception that occurred while the job was running.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $delay
* @param \Throwable $e
* @return void
*
* @throws \Throwable
*/
protected function handleJobException($connection, Job $job, $delay, $e)
{
// If we catch an exception, we will attempt to release the job back onto
// the queue so it is not lost. This will let is be retried at a later
// time by another listener (or the same one). We will do that here.
try {
$this->raiseExceptionOccurredJobEvent(
$connection, $job, $e
);
} finally {
if (! $job->isDeleted()) {
$job->release($delay);
}
}
throw $e;
}
/**
* Raise the before queue job event.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent($connection, Job $job)
{
if ($this->events) {
$data = json_decode($job->getRawBody(), true);
$this->events->fire(new Events\JobProcessing($connection, $job, $data));
}
}
/**
* Raise the after queue job event.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent($connection, Job $job)
{
if ($this->events) {
$data = json_decode($job->getRawBody(), true);
$this->events->fire(new Events\JobProcessed($connection, $job, $data));
}
}
/**
* Raise the exception occurred queue job event.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $exception
* @return void
*/
protected function raiseExceptionOccurredJobEvent($connection, Job $job, $exception)
{
if ($this->events) {
$data = json_decode($job->getRawBody(), true);
$this->events->fire(new Events\JobExceptionOccurred($connection, $job, $data, $exception));
}
}
/**
* Log a failed job into storage.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @return array
*/
protected function logFailedJob($connection, Job $job)
{
if ($this->failer) {
$failedId = $this->failer->log($connection, $job->getQueue(), $job->getRawBody());
$job->delete();
$job->failed();
$this->raiseFailedJobEvent($connection, $job, $failedId);
}
return ['job' => $job, 'failed' => true];
}
/**
* Raise the failed queue job event.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @param int|null $failedId
* @return void
*/
protected function raiseFailedJobEvent($connection, Job $job, $failedId)
{
if ($this->events) {
$data = json_decode($job->getRawBody(), true);
$this->events->fire(new Events\JobFailed($connection, $job, $data, $failedId));
}
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @return void
*/
public function stop()
{
$this->events->fire(new Events\WorkerStopping);
die;
}
/**
* Sleep the script for a given number of seconds.
*
* @param int $seconds
* @return void
*/
public function sleep($seconds)
{
sleep($seconds);
}
/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}
/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Set the exception handler to use in Daemon mode.
*
* @param \Illuminate\Contracts\Debug\ExceptionHandler $handler
* @return void
*/
public function setDaemonExceptionHandler(ExceptionHandler $handler)
{
$this->exceptions = $handler;
}
/**
* Set the cache repository implementation.
*
* @param \Illuminate\Contracts\Cache\Repository $cache
* @return void
*/
public function setCache(CacheContract $cache)
{
$this->cache = $cache;
}
/**
* Get the queue manager instance.
*
* @return \Illuminate\Queue\QueueManager
*/
public function getManager()
{
return $this->manager;
}
/**
* Set the queue manager instance.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function setManager(QueueManager $manager)
{
$this->manager = $manager;
}
}
SyncQueue.php 0000666 00000010157 13436752360 0007217 0 ustar 00 resolveJob($this->createPayload($job, $data, $queue));
try {
$this->raiseBeforeJobEvent($queueJob);
$queueJob->fire();
$this->raiseAfterJobEvent($queueJob);
} catch (Exception $e) {
$this->raiseExceptionOccurredJobEvent($queueJob, $e);
$this->handleFailedJob($queueJob);
throw $e;
} catch (Throwable $e) {
$this->raiseExceptionOccurredJobEvent($queueJob, $e);
$this->handleFailedJob($queueJob);
throw $e;
}
return 0;
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
//
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->push($job, $data, $queue);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
//
}
/**
* Resolve a Sync job instance.
*
* @param string $payload
* @return \Illuminate\Queue\Jobs\SyncJob
*/
protected function resolveJob($payload)
{
return new SyncJob($this->container, $payload);
}
/**
* Raise the before queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent(Job $job)
{
$data = json_decode($job->getRawBody(), true);
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessing('sync', $job, $data));
}
}
/**
* Raise the after queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent(Job $job)
{
$data = json_decode($job->getRawBody(), true);
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessed('sync', $job, $data));
}
}
/**
* Raise the exception occurred queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $exception
* @return void
*/
protected function raiseExceptionOccurredJobEvent(Job $job, $exception)
{
$data = json_decode($job->getRawBody(), true);
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobExceptionOccurred('sync', $job, $data, $exception));
}
}
/**
* Handle the failed job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return array
*/
protected function handleFailedJob(Job $job)
{
$job->failed();
$this->raiseFailedJobEvent($job);
}
/**
* Raise the failed queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseFailedJobEvent(Job $job)
{
$data = json_decode($job->getRawBody(), true);
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobFailed('sync', $job, $data));
}
}
}
composer.json 0000666 00000002412 13436752360 0007302 0 ustar 00 {
"name": "illuminate/queue",
"description": "The Illuminate Queue package.",
"license": "MIT",
"homepage": "http://laravel.com",
"support": {
"issues": "https://github.com/laravel/framework/issues",
"source": "https://github.com/laravel/framework"
},
"authors": [
{
"name": "Taylor Otwell",
"email": "taylor@laravel.com"
}
],
"require": {
"php": ">=5.5.9",
"illuminate/console": "5.2.*",
"illuminate/container": "5.2.*",
"illuminate/contracts": "5.2.*",
"illuminate/support": "5.2.*",
"nesbot/carbon": "~1.20",
"symfony/debug": "2.8.*|3.0.*",
"symfony/process": "2.8.*|3.0.*"
},
"autoload": {
"psr-4": {
"Illuminate\\Queue\\": ""
},
"classmap": [
"IlluminateQueueClosure.php"
]
},
"extra": {
"branch-alias": {
"dev-master": "5.2-dev"
}
},
"suggest": {
"aws/aws-sdk-php": "Required to use the SQS queue driver (~3.0).",
"illuminate/redis": "Required to use the Redis queue driver (5.2.*).",
"pda/pheanstalk": "Required to use the Beanstalk queue driver (~3.0)."
},
"minimum-stability": "dev"
}
DatabaseQueue.php 0000666 00000020672 13436752360 0010012 0 ustar 00 table = $table;
$this->expire = $expire;
$this->default = $default;
$this->database = $database;
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pushToDatabase(0, $queue, $payload);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return void
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
}
/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function bulk($jobs, $data = '', $queue = null)
{
$queue = $this->getQueue($queue);
$availableAt = $this->getAvailableAt(0);
$records = array_map(function ($job) use ($queue, $data, $availableAt) {
return $this->buildDatabaseRecord(
$queue, $this->createPayload($job, $data), $availableAt
);
}, (array) $jobs);
return $this->database->table($this->table)->insert($records);
}
/**
* Release a reserved job back onto the queue.
*
* @param string $queue
* @param \StdClass $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
}
/**
* Push a raw payload to the database with a given delay.
*
* @param \DateTime|int $delay
* @param string|null $queue
* @param string $payload
* @param int $attempts
* @return mixed
*/
protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
{
$attributes = $this->buildDatabaseRecord(
$this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts
);
return $this->database->table($this->table)->insertGetId($attributes);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$this->database->beginTransaction();
if ($job = $this->getNextAvailableJob($queue)) {
$job = $this->markJobAsReserved($job);
$this->database->commit();
return new DatabaseJob(
$this->container, $this, $job, $queue
);
}
$this->database->commit();
}
/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('id', 'asc')
->first();
return $job ? (object) $job : null;
}
/**
* Modify the query to check for available jobs.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isAvailable($query)
{
$query->where(function ($query) {
$query->where('reserved', 0);
$query->where('available_at', '<=', $this->getTime());
});
}
/**
* Modify the query to check for jobs that are reserved but have expired.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isReservedButExpired($query)
{
$expiration = Carbon::now()->subSeconds($this->expire)->getTimestamp();
$query->orWhere(function ($query) use ($expiration) {
$query->where('reserved', 1);
$query->where('reserved_at', '<=', $expiration);
});
}
/**
* Mark the given job ID as reserved.
*
* @param \stdClass $job
* @return \stdClass
*/
protected function markJobAsReserved($job)
{
$job->reserved = 1;
$job->attempts = $job->attempts + 1;
$job->reserved_at = $this->getTime();
$this->database->table($this->table)->where('id', $job->id)->update([
'reserved' => $job->reserved,
'reserved_at' => $job->reserved_at,
'attempts' => $job->attempts,
]);
return $job;
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $id
* @return void
*/
public function deleteReserved($queue, $id)
{
$this->database->beginTransaction();
if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
$this->database->table($this->table)->where('id', $id)->delete();
}
$this->database->commit();
}
/**
* Get the "available at" UNIX timestamp.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getAvailableAt($delay)
{
$availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
return $availableAt->getTimestamp();
}
/**
* Create an array to insert for the given job.
*
* @param string|null $queue
* @param string $payload
* @param int $availableAt
* @param int $attempts
* @return array
*/
protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
{
return [
'queue' => $queue,
'attempts' => $attempts,
'reserved' => 0,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->getTime(),
'payload' => $payload,
];
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return $queue ?: $this->default;
}
/**
* Get the underlying database instance.
*
* @return \Illuminate\Database\Connection
*/
public function getDatabase()
{
return $this->database;
}
/**
* Get the expiration time in seconds.
*
* @return int|null
*/
public function getExpire()
{
return $this->expire;
}
/**
* Set the expiration time in seconds.
*
* @param int|null $seconds
* @return void
*/
public function setExpire($seconds)
{
$this->expire = $seconds;
}
}
Jobs/SqsJob.php 0000666 00000005531 13436752360 0007374 0 ustar 00 sqs = $sqs;
$this->job = $job;
$this->queue = $queue;
$this->container = $container;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job['Body'];
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->sqs->deleteMessage([
'QueueUrl' => $this->queue, 'ReceiptHandle' => $this->job['ReceiptHandle'],
]);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->sqs->changeMessageVisibility([
'QueueUrl' => $this->queue,
'ReceiptHandle' => $this->job['ReceiptHandle'],
'VisibilityTimeout' => $delay,
]);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return (int) $this->job['Attributes']['ApproximateReceiveCount'];
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job['MessageId'];
}
/**
* Get the IoC container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
/**
* Get the underlying SQS client instance.
*
* @return \Aws\Sqs\SqsClient
*/
public function getSqs()
{
return $this->sqs;
}
/**
* Get the underlying raw SQS job.
*
* @return array
*/
public function getSqsJob()
{
return $this->job;
}
}
Jobs/Job.php 0000666 00000014026 13436752360 0006704 0 ustar 00 deleted = true;
}
/**
* Determine if the job has been deleted.
*
* @return bool
*/
public function isDeleted()
{
return $this->deleted;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
$this->released = true;
}
/**
* Determine if the job was released back into the queue.
*
* @return bool
*/
public function isReleased()
{
return $this->released;
}
/**
* Determine if the job has been deleted or released.
*
* @return bool
*/
public function isDeletedOrReleased()
{
return $this->isDeleted() || $this->isReleased();
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
abstract public function attempts();
/**
* Get the raw body string for the job.
*
* @return string
*/
abstract public function getRawBody();
/**
* Resolve and fire the job handler method.
*
* @param array $payload
* @return void
*/
protected function resolveAndFire(array $payload)
{
list($class, $method) = $this->parseJob($payload['job']);
$this->instance = $this->resolve($class);
$this->instance->{$method}($this, $this->resolveQueueableEntities($payload['data']));
}
/**
* Parse the job declaration into class and method.
*
* @param string $job
* @return array
*/
protected function parseJob($job)
{
$segments = explode('@', $job);
return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
}
/**
* Resolve the given job handler.
*
* @param string $class
* @return mixed
*/
protected function resolve($class)
{
return $this->container->make($class);
}
/**
* Resolve all of the queueable entities in the given payload.
*
* @param mixed $data
* @return mixed
*/
protected function resolveQueueableEntities($data)
{
if (is_string($data)) {
return $this->resolveQueueableEntity($data);
}
if (is_array($data)) {
$data = array_map(function ($d) {
if (is_array($d)) {
return $this->resolveQueueableEntities($d);
}
return $this->resolveQueueableEntity($d);
}, $data);
}
return $data;
}
/**
* Resolve a single queueable entity from the resolver.
*
* @param mixed $value
* @return \Illuminate\Contracts\Queue\QueueableEntity
*/
protected function resolveQueueableEntity($value)
{
if (is_string($value) && Str::startsWith($value, '::entity::')) {
list($marker, $type, $id) = explode('|', $value, 3);
return $this->getEntityResolver()->resolve($type, $id);
}
return $value;
}
/**
* Call the failed method on the job instance.
*
* @return void
*/
public function failed()
{
$payload = json_decode($this->getRawBody(), true);
list($class, $method) = $this->parseJob($payload['job']);
$this->instance = $this->resolve($class);
if (method_exists($this->instance, 'failed')) {
$this->instance->failed($this->resolveQueueableEntities($payload['data']));
}
}
/**
* Get an entity resolver instance.
*
* @return \Illuminate\Contracts\Queue\EntityResolver
*/
protected function getEntityResolver()
{
return $this->container->make('Illuminate\Contracts\Queue\EntityResolver');
}
/**
* Calculate the number of seconds with the given delay.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getSeconds($delay)
{
if ($delay instanceof DateTime) {
return max(0, $delay->getTimestamp() - $this->getTime());
}
return (int) $delay;
}
/**
* Get the current system time.
*
* @return int
*/
protected function getTime()
{
return time();
}
/**
* Get the name of the queued job class.
*
* @return string
*/
public function getName()
{
return json_decode($this->getRawBody(), true)['job'];
}
/**
* Get the resolved name of the queued job class.
*
* @return string
*/
public function resolveName()
{
$name = $this->getName();
$payload = json_decode($this->getRawBody(), true);
if ($name === 'Illuminate\Queue\CallQueuedHandler@call') {
return Arr::get($payload, 'data.commandName', $name);
}
if ($name === 'Illuminate\Events\CallQueuedHandler@call') {
return $payload['data']['class'].'@'.$payload['data']['method'];
}
return $name;
}
/**
* Get the name of the queue the job belongs to.
*
* @return string
*/
public function getQueue()
{
return $this->queue;
}
}
Jobs/SyncJob.php 0000666 00000002776 13436752360 0007552 0 ustar 00 payload = $payload;
$this->container = $container;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->payload, true));
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->payload;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return 1;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return '';
}
}
Jobs/RedisJob.php 0000666 00000005252 13436752360 0007674 0 ustar 00 job = $job;
$this->redis = $redis;
$this->queue = $queue;
$this->container = $container;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job;
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this->job);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->redis->release($this->queue, $this->job, $delay, $this->attempts() + 1);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return Arr::get(json_decode($this->job, true), 'attempts');
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return Arr::get(json_decode($this->job, true), 'id');
}
/**
* Get the IoC container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
/**
* Get the underlying queue driver instance.
*
* @return \Illuminate\Redis\Database
*/
public function getRedisQueue()
{
return $this->redis;
}
/**
* Get the underlying Redis job.
*
* @return string
*/
public function getRedisJob()
{
return $this->job;
}
}
Jobs/DatabaseJob.php 0000666 00000005217 13436752360 0010333 0 ustar 00 job = $job;
$this->queue = $queue;
$this->database = $database;
$this->container = $container;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->job->payload, true));
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->database->deleteReserved($this->queue, $this->job->id);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->database->release($this->queue, $this->job, $delay);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return (int) $this->job->attempts;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job->id;
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->payload;
}
/**
* Get the IoC container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
/**
* Get the underlying queue driver instance.
*
* @return \Illuminate\Queue\DatabaseQueue
*/
public function getDatabaseQueue()
{
return $this->database;
}
/**
* Get the underlying database job.
*
* @return \StdClass
*/
public function getDatabaseJob()
{
return $this->job;
}
}
Jobs/BeanstalkdJob.php 0000666 00000006026 13436752360 0010676 0 ustar 00 job = $job;
$this->queue = $queue;
$this->container = $container;
$this->pheanstalk = $pheanstalk;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->getData();
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->pheanstalk->delete($this->job);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$priority = Pheanstalk::DEFAULT_PRIORITY;
$this->pheanstalk->release($this->job, $priority, $delay);
}
/**
* Bury the job in the queue.
*
* @return void
*/
public function bury()
{
parent::release();
$this->pheanstalk->bury($this->job);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
$stats = $this->pheanstalk->statsJob($this->job);
return (int) $stats->reserves;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job->getId();
}
/**
* Get the IoC container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
/**
* Get the underlying Pheanstalk instance.
*
* @return \Pheanstalk\Pheanstalk
*/
public function getPheanstalk()
{
return $this->pheanstalk;
}
/**
* Get the underlying Pheanstalk job.
*
* @return \Pheanstalk\Job
*/
public function getPheanstalkJob()
{
return $this->job;
}
}
IlluminateQueueClosure.php 0000666 00000001445 13436752360 0011743 0 ustar 00 crypt = $crypt;
}
/**
* Fire the Closure based queue job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function fire($job, $data)
{
$closure = unserialize($this->crypt->decrypt($data['closure']));
$closure($job);
}
}
CallQueuedHandler.php 0000666 00000003354 13436752360 0010621 0 ustar 00 dispatcher = $dispatcher;
}
/**
* Handle the queued job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function call(Job $job, array $data)
{
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
$this->dispatcher->dispatchNow($command);
if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}
/**
* Set the job instance of the given class if necessary.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param mixed $instance
* @return mixed
*/
protected function setJobInstanceIfNecessary(Job $job, $instance)
{
if (in_array('Illuminate\Queue\InteractsWithQueue', class_uses_recursive(get_class($instance)))) {
$instance->setJob($job);
}
return $instance;
}
/**
* Call the failed method on the job instance.
*
* @param array $data
* @return void
*/
public function failed(array $data)
{
$command = unserialize($data['command']);
if (method_exists($command, 'failed')) {
$command->failed();
}
}
}
Listener.php 0000666 00000014402 13436752360 0007060 0 ustar 00 commandPath = $commandPath;
$this->workerCommand = $this->buildWorkerCommand();
}
/**
* Build the environment specific worker command.
*
* @return string
*/
protected function buildWorkerCommand()
{
$binary = ProcessUtils::escapeArgument((new PhpExecutableFinder)->find(false));
if (defined('HHVM_VERSION')) {
$binary .= ' --php';
}
if (defined('ARTISAN_BINARY')) {
$artisan = ProcessUtils::escapeArgument(ARTISAN_BINARY);
} else {
$artisan = 'artisan';
}
$command = 'queue:work %s --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';
return "{$binary} {$artisan} {$command}";
}
/**
* Listen to the given queue connection.
*
* @param string $connection
* @param string $queue
* @param string $delay
* @param string $memory
* @param int $timeout
* @return void
*/
public function listen($connection, $queue, $delay, $memory, $timeout = 60)
{
$process = $this->makeProcess($connection, $queue, $delay, $memory, $timeout);
while (true) {
$this->runProcess($process, $memory);
}
}
/**
* Run the given process.
*
* @param \Symfony\Component\Process\Process $process
* @param int $memory
* @return void
*/
public function runProcess(Process $process, $memory)
{
$process->run(function ($type, $line) {
$this->handleWorkerOutput($type, $line);
});
// Once we have run the job we'll go check if the memory limit has been
// exceeded for the script. If it has, we will kill this script so a
// process manager will restart this with a clean slate of memory.
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}
/**
* Create a new Symfony process for the worker.
*
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @param int $timeout
* @return \Symfony\Component\Process\Process
*/
public function makeProcess($connection, $queue, $delay, $memory, $timeout)
{
$string = $this->workerCommand;
// If the environment is set, we will append it to the command string so the
// workers will run under the specified environment. Otherwise, they will
// just run under the production environment which is not always right.
if (isset($this->environment)) {
$string .= ' --env='.ProcessUtils::escapeArgument($this->environment);
}
// Next, we will just format out the worker commands with all of the various
// options available for the command. This will produce the final command
// line that we will pass into a Symfony process object for processing.
$command = sprintf(
$string,
ProcessUtils::escapeArgument($connection),
ProcessUtils::escapeArgument($queue),
$delay,
$memory,
$this->sleep,
$this->maxTries
);
return new Process($command, $this->commandPath, null, null, $timeout);
}
/**
* Handle output from the worker process.
*
* @param int $type
* @param string $line
* @return void
*/
protected function handleWorkerOutput($type, $line)
{
if (isset($this->outputHandler)) {
call_user_func($this->outputHandler, $type, $line);
}
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @return void
*/
public function stop()
{
die;
}
/**
* Set the output handler callback.
*
* @param \Closure $outputHandler
* @return void
*/
public function setOutputHandler(Closure $outputHandler)
{
$this->outputHandler = $outputHandler;
}
/**
* Get the current listener environment.
*
* @return string
*/
public function getEnvironment()
{
return $this->environment;
}
/**
* Set the current environment.
*
* @param string $environment
* @return void
*/
public function setEnvironment($environment)
{
$this->environment = $environment;
}
/**
* Get the amount of seconds to wait before polling the queue.
*
* @return int
*/
public function getSleep()
{
return $this->sleep;
}
/**
* Set the amount of seconds to wait before polling the queue.
*
* @param int $sleep
* @return void
*/
public function setSleep($sleep)
{
$this->sleep = $sleep;
}
/**
* Set the amount of times to try a job before logging it failed.
*
* @param int $tries
* @return void
*/
public function setMaxTries($tries)
{
$this->maxTries = $tries;
}
}
SqsQueue.php 0000666 00000007401 13436752360 0007047 0 ustar 00 sqs = $sqs;
$this->prefix = $prefix;
$this->default = $default;
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload]);
return $response->get('MessageId');
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
])->get('MessageId');
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$response = $this->sqs->receiveMessage(
['QueueUrl' => $queue, 'AttributeNames' => ['ApproximateReceiveCount']]
);
if (count($response['Messages']) > 0) {
if ($this->jobCreator) {
return call_user_func($this->jobCreator, $this->container, $this->sqs, $queue, $response);
} else {
return new SqsJob($this->container, $this->sqs, $queue, $response['Messages'][0]);
}
}
}
/**
* Define the job creator callback for the connection.
*
* @param callable $callback
* @return $this
*/
public function createJobsUsing(callable $callback)
{
$this->jobCreator = $callback;
return $this;
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
$queue = $queue ?: $this->default;
if (filter_var($queue, FILTER_VALIDATE_URL) !== false) {
return $queue;
}
return rtrim($this->prefix, '/').'/'.($queue);
}
/**
* Get the underlying SQS instance.
*
* @return \Aws\Sqs\SqsClient
*/
public function getSqs()
{
return $this->sqs;
}
}