Console/ListenCommand.php 0000666 00000006060 13436756000 0011426 0 ustar 00 setOutputHandler($this->listener = $listener);
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue(
$connection = $this->input->getArgument('connection')
);
$this->listener->listen(
$connection, $queue, $this->gatherOptions()
);
}
/**
* Get the name of the queue connection to listen on.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
$connection = $connection ?: $this->laravel['config']['queue.default'];
return $this->input->getOption('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
}
/**
* Get the listener options for the command.
*
* @return \Illuminate\Queue\ListenerOptions
*/
protected function gatherOptions()
{
return new ListenerOptions(
$this->option('env'), $this->option('delay'),
$this->option('memory'), $this->option('timeout')
);
}
/**
* Set the options on the queue listener.
*
* @param \Illuminate\Queue\Listener $listener
* @return void
*/
protected function setOutputHandler(Listener $listener)
{
$listener->setOutputHandler(function ($type, $line) {
$this->output->write($line);
});
}
}
Console/RestartCommand.php 0000666 00000001273 13436756000 0011615 0 ustar 00 laravel['cache']->forever('illuminate:queue:restart', Carbon::now()->getTimestamp());
$this->info('Broadcasting queue restart signal.');
}
}
Console/FailedTableCommand.php 0000666 00000004567 13436756000 0012336 0 ustar 00 files = $files;
$this->composer = $composer;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$table = $this->laravel['config']['queue.failed.table'];
$this->replaceMigration(
$this->createBaseMigration($table), $table, Str::studly($table)
);
$this->info('Migration created successfully!');
$this->composer->dumpAutoloads();
}
/**
* Create a base migration file for the table.
*
* @param string $table
* @return string
*/
protected function createBaseMigration($table = 'failed_jobs')
{
return $this->laravel['migration.creator']->create(
'create_'.$table.'_table', $this->laravel->databasePath().'/migrations'
);
}
/**
* Replace the generated migration with the failed job table stub.
*
* @param string $path
* @param string $table
* @param string $tableClassName
* @return void
*/
protected function replaceMigration($path, $table, $tableClassName)
{
$stub = str_replace(
['{{table}}', '{{tableClassName}}'],
[$table, $tableClassName],
$this->files->get(__DIR__.'/stubs/failed_jobs.stub')
);
$this->files->put($path, $stub);
}
}
Console/RetryCommand.php 0000666 00000004424 13436756000 0011277 0 ustar 00 getJobIds() as $id) {
$this->retryJob($id);
$this->info("The failed job [{$id}] has been pushed back onto the queue!");
$this->laravel['queue.failer']->forget($id);
}
}
/**
* Get the job IDs to be retried.
*
* @return array
*/
protected function getJobIds()
{
$ids = $this->argument('id');
if (count($ids) === 1 && $ids[0] === 'all') {
$ids = Arr::pluck($this->laravel['queue.failer']->all(), 'id');
}
return $ids;
}
/**
* Retry the queue job with the given ID.
*
* @param string $id
* @return void
*/
protected function retryJob($id)
{
if (is_null($failed = $this->laravel['queue.failer']->find($id))) {
return $this->error("No failed job matches the given ID [{$id}].");
}
$failed = (object) $failed;
$this->laravel['queue']->connection($failed->connection)->pushRaw(
$this->resetAttempts($failed->payload), $failed->queue
);
}
/**
* Reset the payload attempts.
*
* Applicable to Redis jobs which store attempts in their payload.
*
* @param string $payload
* @return string
*/
protected function resetAttempts($payload)
{
$payload = json_decode($payload, true);
if (isset($payload['attempts'])) {
$payload['attempts'] = 0;
}
return json_encode($payload);
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['id', InputArgument::IS_ARRAY, 'The ID of the failed job'],
];
}
}
Console/TableCommand.php 0000666 00000004524 13436756000 0011222 0 ustar 00 files = $files;
$this->composer = $composer;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$table = $this->laravel['config']['queue.connections.database.table'];
$this->replaceMigration(
$this->createBaseMigration($table), $table, Str::studly($table)
);
$this->info('Migration created successfully!');
$this->composer->dumpAutoloads();
}
/**
* Create a base migration file for the table.
*
* @param string $table
* @return string
*/
protected function createBaseMigration($table = 'jobs')
{
return $this->laravel['migration.creator']->create(
'create_'.$table.'_table', $this->laravel->databasePath().'/migrations'
);
}
/**
* Replace the generated migration with the job table stub.
*
* @param string $path
* @param string $table
* @param string $tableClassName
* @return void
*/
protected function replaceMigration($path, $table, $tableClassName)
{
$stub = str_replace(
['{{table}}', '{{tableClassName}}'],
[$table, $tableClassName],
$this->files->get(__DIR__.'/stubs/jobs.stub')
);
$this->files->put($path, $stub);
}
}
Console/stubs/failed_jobs.stub 0000666 00000001426 13436756000 0012461 0 ustar 00 increments('id');
$table->text('connection');
$table->text('queue');
$table->longText('payload');
$table->longText('exception');
$table->timestamp('failed_at')->useCurrent();
});
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::dropIfExists('{{table}}');
}
}
Console/stubs/jobs.stub 0000666 00000001645 13436756000 0011160 0 ustar 00 bigIncrements('id');
$table->string('queue');
$table->longText('payload');
$table->tinyInteger('attempts')->unsigned();
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
$table->index(['queue', 'reserved_at']);
});
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::dropIfExists('{{table}}');
}
}
Console/FlushFailedCommand.php 0000666 00000001147 13436756000 0012357 0 ustar 00 laravel['queue.failer']->flush();
$this->info('All failed jobs deleted successfully!');
}
}
Console/WorkCommand.php 0000666 00000013041 13436756000 0011107 0 ustar 00 worker = $worker;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
if ($this->downForMaintenance() && $this->option('once')) {
return $this->worker->sleep($this->option('sleep'));
}
// We'll listen to the processed and failed events so we can write information
// to the console as jobs are processed, which will let the developer watch
// which jobs are coming through a queue and be informed on its progress.
$this->listenForEvents();
$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];
// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);
$this->runWorker(
$connection, $queue
);
}
/**
* Run the worker instance.
*
* @param string $connection
* @param string $queue
* @return array
*/
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->laravel['cache']->driver());
return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}
/**
* Gather all of the queue worker options as a single object.
*
* @return \Illuminate\Queue\WorkerOptions
*/
protected function gatherWorkerOptions()
{
return new WorkerOptions(
$this->option('delay'), $this->option('memory'),
$this->option('timeout'), $this->option('sleep'),
$this->option('tries'), $this->option('force')
);
}
/**
* Listen for the queue events in order to update the console output.
*
* @return void
*/
protected function listenForEvents()
{
$this->laravel['events']->listen(JobProcessed::class, function ($event) {
$this->writeOutput($event->job, false);
});
$this->laravel['events']->listen(JobFailed::class, function ($event) {
$this->writeOutput($event->job, true);
$this->logFailedJob($event);
});
}
/**
* Write the status output for the queue worker.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param bool $failed
* @return void
*/
protected function writeOutput(Job $job, $failed)
{
if ($failed) {
$this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Failed: '.$job->resolveName());
} else {
$this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Processed: '.$job->resolveName());
}
}
/**
* Store a failed job event.
*
* @param JobFailed $event
* @return void
*/
protected function logFailedJob(JobFailed $event)
{
$this->laravel['queue.failer']->log(
$event->connectionName, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
);
}
/**
* Get the queue name for the worker.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
return $this->option('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
}
/**
* Determine if the worker should run in maintenance mode.
*
* @return bool
*/
protected function downForMaintenance()
{
return $this->option('force') ? false : $this->laravel->isDownForMaintenance();
}
}
Console/ForgetFailedCommand.php 0000666 00000001767 13436756000 0012534 0 ustar 00 laravel['queue.failer']->forget($this->argument('id'))) {
$this->info('Failed job deleted successfully!');
} else {
$this->error('No failed job matches the given ID.');
}
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['id', InputArgument::REQUIRED, 'The ID of the failed job'],
];
}
}
Console/ListFailedCommand.php 0000666 00000005147 13436756000 0012215 0 ustar 00 getFailedJobs()) == 0) {
return $this->info('No failed jobs!');
}
$this->displayFailedJobs($jobs);
}
/**
* Compile the failed jobs into a displayable format.
*
* @return array
*/
protected function getFailedJobs()
{
$failed = $this->laravel['queue.failer']->all();
return collect($failed)->map(function ($failed) {
return $this->parseFailedJob((array) $failed);
})->filter()->all();
}
/**
* Parse the failed job row.
*
* @param array $failed
* @return array
*/
protected function parseFailedJob(array $failed)
{
$row = array_values(Arr::except($failed, ['payload', 'exception']));
array_splice($row, 3, 0, $this->extractJobName($failed['payload']));
return $row;
}
/**
* Extract the failed job name from payload.
*
* @param string $payload
* @return string|null
*/
private function extractJobName($payload)
{
$payload = json_decode($payload, true);
if ($payload && (! isset($payload['data']['command']))) {
return Arr::get($payload, 'job');
} elseif ($payload && isset($payload['data']['command'])) {
return $this->matchJobName($payload);
}
}
/**
* Match the job name from the payload.
*
* @param array $payload
* @return string
*/
protected function matchJobName($payload)
{
preg_match('/"([^"]+)"/', $payload['data']['command'], $matches);
if (isset($matches[1])) {
return $matches[1];
} else {
return Arr::get($payload, 'job');
}
}
/**
* Display the failed jobs in the console.
*
* @param array $jobs
* @return void
*/
protected function displayFailedJobs(array $jobs)
{
$this->table($this->headers, $jobs);
}
}
RedisQueue.php 0000666 00000015357 13436756000 0007353 0 ustar 00 redis = $redis;
$this->default = $default;
$this->connection = $connection;
$this->retryAfter = $retryAfter;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
$queue = $this->getQueue($queue);
return $this->getConnection()->eval(
LuaScripts::size(), 3, $queue, $queue.':delayed', $queue.':reserved'
);
}
/**
* Push a new job onto the queue.
*
* @param object|string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->getConnection()->rpush($this->getQueue($queue), $payload);
return Arr::get(json_decode($payload, true), 'id');
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param object|string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->laterRaw($delay, $this->createPayload($job, $data), $queue);
}
/**
* Push a raw job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $payload
* @param string $queue
* @return mixed
*/
protected function laterRaw($delay, $payload, $queue = null)
{
$this->getConnection()->zadd(
$this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
);
return Arr::get(json_decode($payload, true), 'id');
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*/
protected function createPayloadArray($job, $data = '', $queue = null)
{
return array_merge(parent::createPayloadArray($job, $data, $queue), [
'id' => $this->getRandomId(),
'attempts' => 0,
]);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$this->migrate($prefixed = $this->getQueue($queue));
list($job, $reserved) = $this->retrieveNextJob($prefixed);
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
/**
* Migrate any delayed or expired jobs onto the primary queue.
*
* @param string $queue
* @return void
*/
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
if (! is_null($this->retryAfter)) {
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
}
/**
* Migrate the delayed jobs that are ready to the regular queue.
*
* @param string $from
* @param string $to
* @return array
*/
public function migrateExpiredJobs($from, $to)
{
return $this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
);
}
/**
* Retrieve the next job from the queue.
*
* @param string $queue
* @return array
*/
protected function retrieveNextJob($queue)
{
return $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved',
$this->availableAt($this->retryAfter)
);
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param \Illuminate\Queues\Jobs\RedisJob $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
}
/**
* Delete a reserved job from the reserved queue and release it.
*
* @param string $queue
* @param \Illuminate\Queues\Jobs\RedisJob $job
* @param int $delay
* @return void
*/
public function deleteAndRelease($queue, $job, $delay)
{
$queue = $this->getQueue($queue);
$this->getConnection()->eval(
LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved',
$job->getReservedJob(), $this->availableAt($delay)
);
}
/**
* Get a random ID string.
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return 'queues:'.($queue ?: $this->default);
}
/**
* Get the connection for the queue.
*
* @return \Predis\ClientInterface
*/
protected function getConnection()
{
return $this->redis->connection($this->connection);
}
/**
* Get the underlying Redis instance.
*
* @return \Illuminate\Contracts\Redis\Factory
*/
public function getRedis()
{
return $this->redis;
}
}
LuaScripts.php 0000666 00000005631 13436756000 0007363 0 ustar 00 registerManager();
$this->registerConnection();
$this->registerWorker();
$this->registerListener();
$this->registerFailedJobServices();
}
/**
* Register the queue manager.
*
* @return void
*/
protected function registerManager()
{
$this->app->singleton('queue', function ($app) {
// Once we have an instance of the queue manager, we will register the various
// resolvers for the queue connectors. These connectors are responsible for
// creating the classes that accept queue configs and instantiate queues.
return tap(new QueueManager($app), function ($manager) {
$this->registerConnectors($manager);
});
});
}
/**
* Register the default queue connection binding.
*
* @return void
*/
protected function registerConnection()
{
$this->app->singleton('queue.connection', function ($app) {
return $app['queue']->connection();
});
}
/**
* Register the connectors on the queue manager.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function registerConnectors($manager)
{
foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
$this->{"register{$connector}Connector"}($manager);
}
}
/**
* Register the Null queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerNullConnector($manager)
{
$manager->addConnector('null', function () {
return new NullConnector;
});
}
/**
* Register the Sync queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSyncConnector($manager)
{
$manager->addConnector('sync', function () {
return new SyncConnector;
});
}
/**
* Register the database queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerDatabaseConnector($manager)
{
$manager->addConnector('database', function () {
return new DatabaseConnector($this->app['db']);
});
}
/**
* Register the Redis queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerRedisConnector($manager)
{
$manager->addConnector('redis', function () {
return new RedisConnector($this->app['redis']);
});
}
/**
* Register the Beanstalkd queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerBeanstalkdConnector($manager)
{
$manager->addConnector('beanstalkd', function () {
return new BeanstalkdConnector;
});
}
/**
* Register the Amazon SQS queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSqsConnector($manager)
{
$manager->addConnector('sqs', function () {
return new SqsConnector;
});
}
/**
* Register the queue worker.
*
* @return void
*/
protected function registerWorker()
{
$this->app->singleton('queue.worker', function () {
return new Worker(
$this->app['queue'], $this->app['events'], $this->app[ExceptionHandler::class]
);
});
}
/**
* Register the queue listener.
*
* @return void
*/
protected function registerListener()
{
$this->app->singleton('queue.listener', function () {
return new Listener($this->app->basePath());
});
}
/**
* Register the failed job services.
*
* @return void
*/
protected function registerFailedJobServices()
{
$this->app->singleton('queue.failer', function () {
$config = $this->app['config']['queue.failed'];
return isset($config['table'])
? $this->databaseFailedJobProvider($config)
: new NullFailedJobProvider;
});
}
/**
* Create a new database failed job provider.
*
* @param array $config
* @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider
*/
protected function databaseFailedJobProvider($config)
{
return new DatabaseFailedJobProvider(
$this->app['db'], $config['database'], $config['table']
);
}
/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
'queue', 'queue.worker', 'queue.listener',
'queue.failer', 'queue.connection',
];
}
}
InvalidPayloadException.php 0000666 00000000567 13436756000 0012054 0 ustar 00 job ? $this->job->attempts() : 1;
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
if ($this->job) {
return $this->job->delete();
}
}
/**
* Fail the job from the queue.
*
* @param \Throwable $exception
* @return void
*/
public function fail($exception = null)
{
if ($this->job) {
FailingJob::handle($this->job->getConnectionName(), $this->job, $exception);
}
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
if ($this->job) {
return $this->job->release($delay);
}
}
/**
* Set the base queue job instance.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return $this
*/
public function setJob(JobContract $job)
{
$this->job = $job;
return $this;
}
}
README.md 0000666 00000002313 13436756000 0006032 0 ustar 00 ## Illuminate Queue
The Laravel Queue component provides a unified API across a variety of different queue services. Queues allow you to defer the processing of a time consuming task, such as sending an e-mail, until a later time, thus drastically speeding up the web requests to your application.
### Usage Instructions
First, create a new Queue `Capsule` manager instance. Similar to the "Capsule" provided for the Eloquent ORM, the queue Capsule aims to make configuring the library for usage outside of the Laravel framework as easy as possible.
```PHP
use Illuminate\Queue\Capsule\Manager as Queue;
$queue = new Queue;
$queue->addConnection([
'driver' => 'beanstalkd',
'host' => 'localhost',
'queue' => 'default',
]);
// Make this Capsule instance available globally via static methods... (optional)
$queue->setAsGlobal();
```
Once the Capsule instance has been registered. You may use it like so:
```PHP
// As an instance...
$queue->push('SendEmail', array('message' => $message));
// If setAsGlobal has been called...
Queue::push('SendEmail', array('message' => $message));
```
For further documentation on using the queue, consult the [Laravel framework documentation](https://laravel.com/docs).
WorkerOptions.php 0000666 00000002522 13436756000 0010113 0 ustar 00 delay = $delay;
$this->sleep = $sleep;
$this->force = $force;
$this->memory = $memory;
$this->timeout = $timeout;
$this->maxTries = $maxTries;
}
}
Events/WorkerStopping.php 0000666 00000000113 13436756000 0011521 0 ustar 00 job = $job;
$this->connectionName = $connectionName;
}
}
Events/JobProcessing.php 0000666 00000001116 13436756000 0011277 0 ustar 00 job = $job;
$this->connectionName = $connectionName;
}
}
Events/Looping.php 0000666 00000000104 13436756000 0010133 0 ustar 00 job = $job;
$this->exception = $exception;
$this->connectionName = $connectionName;
}
}
Events/JobFailed.php 0000666 00000001433 13436756000 0010351 0 ustar 00 job = $job;
$this->exception = $exception;
$this->connectionName = $connectionName;
}
}
ListenerOptions.php 0000666 00000001361 13436756000 0010427 0 ustar 00 environment = $environment;
parent::__construct($delay, $memory, $timeout, $sleep, $maxTries, $force);
}
}
Connectors/SqsConnector.php 0000666 00000002106 13436756000 0012022 0 ustar 00 getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret']);
}
return new SqsQueue(
new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
);
}
/**
* Get the default configuration for SQS.
*
* @param array $config
* @return array
*/
protected function getDefaultConfiguration(array $config)
{
return array_merge([
'version' => 'latest',
'http' => [
'timeout' => 60,
'connect_timeout' => 60,
],
], $config);
}
}
Connectors/ConnectorInterface.php 0000666 00000000407 13436756000 0013156 0 ustar 00 pheanstalk($config), $config['queue'], $retryAfter);
}
/**
* Create a Pheanstalk instance.
*
* @param array $config
* @return \Pheanstalk\Pheanstalk
*/
protected function pheanstalk(array $config)
{
$port = Arr::get($config, 'port', PheanstalkInterface::DEFAULT_PORT);
return new Pheanstalk($config['host'], $port);
}
}
Connectors/NullConnector.php 0000666 00000000546 13436756000 0012174 0 ustar 00 redis = $redis;
$this->connection = $connection;
}
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new RedisQueue(
$this->redis, $config['queue'],
Arr::get($config, 'connection', $this->connection),
Arr::get($config, 'retry_after', 60)
);
}
}
Connectors/DatabaseConnector.php 0000666 00000002071 13436756000 0012761 0 ustar 00 connections = $connections;
}
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new DatabaseQueue(
$this->connections->connection(Arr::get($config, 'connection')),
$config['table'],
$config['queue'],
Arr::get($config, 'retry_after', 60)
);
}
}
Connectors/SyncConnector.php 0000666 00000000546 13436756000 0012176 0 ustar 00 app = $app;
}
/**
* Register an event listener for the before job event.
*
* @param mixed $callback
* @return void
*/
public function before($callback)
{
$this->app['events']->listen(Events\JobProcessing::class, $callback);
}
/**
* Register an event listener for the after job event.
*
* @param mixed $callback
* @return void
*/
public function after($callback)
{
$this->app['events']->listen(Events\JobProcessed::class, $callback);
}
/**
* Register an event listener for the exception occurred job event.
*
* @param mixed $callback
* @return void
*/
public function exceptionOccurred($callback)
{
$this->app['events']->listen(Events\JobExceptionOccurred::class, $callback);
}
/**
* Register an event listener for the daemon queue loop.
*
* @param mixed $callback
* @return void
*/
public function looping($callback)
{
$this->app['events']->listen(Events\Looping::class, $callback);
}
/**
* Register an event listener for the failed job event.
*
* @param mixed $callback
* @return void
*/
public function failing($callback)
{
$this->app['events']->listen(Events\JobFailed::class, $callback);
}
/**
* Register an event listener for the daemon queue stopping.
*
* @param mixed $callback
* @return void
*/
public function stopping($callback)
{
$this->app['events']->listen(Events\WorkerStopping::class, $callback);
}
/**
* Determine if the driver is connected.
*
* @param string $name
* @return bool
*/
public function connected($name = null)
{
return isset($this->connections[$name ?: $this->getDefaultDriver()]);
}
/**
* Resolve a queue connection instance.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connection($name = null)
{
$name = $name ?: $this->getDefaultDriver();
// If the connection has not been resolved yet we will resolve it now as all
// of the connections are resolved when they are actually needed so we do
// not make any unnecessary connection to the various queue end-points.
if (! isset($this->connections[$name])) {
$this->connections[$name] = $this->resolve($name);
$this->connections[$name]->setContainer($this->app);
}
return $this->connections[$name];
}
/**
* Resolve a queue connection.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
protected function resolve($name)
{
$config = $this->getConfig($name);
return $this->getConnector($config['driver'])
->connect($config)
->setConnectionName($name);
}
/**
* Get the connector for a given driver.
*
* @param string $driver
* @return \Illuminate\Queue\Connectors\ConnectorInterface
*
* @throws \InvalidArgumentException
*/
protected function getConnector($driver)
{
if (! isset($this->connectors[$driver])) {
throw new InvalidArgumentException("No connector for [$driver]");
}
return call_user_func($this->connectors[$driver]);
}
/**
* Add a queue connection resolver.
*
* @param string $driver
* @param \Closure $resolver
* @return void
*/
public function extend($driver, Closure $resolver)
{
return $this->addConnector($driver, $resolver);
}
/**
* Add a queue connection resolver.
*
* @param string $driver
* @param \Closure $resolver
* @return void
*/
public function addConnector($driver, Closure $resolver)
{
$this->connectors[$driver] = $resolver;
}
/**
* Get the queue connection configuration.
*
* @param string $name
* @return array
*/
protected function getConfig($name)
{
if (! is_null($name) && $name !== 'null') {
return $this->app['config']["queue.connections.{$name}"];
}
return ['driver' => 'null'];
}
/**
* Get the name of the default queue connection.
*
* @return string
*/
public function getDefaultDriver()
{
return $this->app['config']['queue.default'];
}
/**
* Set the name of the default queue connection.
*
* @param string $name
* @return void
*/
public function setDefaultDriver($name)
{
$this->app['config']['queue.default'] = $name;
}
/**
* Get the full name for the given connection.
*
* @param string $connection
* @return string
*/
public function getName($connection = null)
{
return $connection ?: $this->getDefaultDriver();
}
/**
* Determine if the application is in maintenance mode.
*
* @return bool
*/
public function isDownForMaintenance()
{
return $this->app->isDownForMaintenance();
}
/**
* Dynamically pass calls to the default connection.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public function __call($method, $parameters)
{
return $this->connection()->$method(...$parameters);
}
}
Queue.php 0000666 00000007623 13436756000 0006361 0 ustar 00 push($job, $data, $queue);
}
/**
* Push a new job onto the queue after a delay.
*
* @param string $queue
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @return mixed
*/
public function laterOn($queue, $delay, $job, $data = '')
{
return $this->later($delay, $job, $data, $queue);
}
/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function bulk($jobs, $data = '', $queue = null)
{
foreach ((array) $jobs as $job) {
$this->push($job, $data, $queue);
}
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*
* @throws \Illuminate\Queue\InvalidPayloadException
*/
protected function createPayload($job, $data = '', $queue = null)
{
$payload = json_encode($this->createPayloadArray($job, $data, $queue));
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException;
}
return $payload;
}
/**
* Create a payload array from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return array
*/
protected function createPayloadArray($job, $data = '', $queue = null)
{
return is_object($job)
? $this->createObjectPayload($job)
: $this->createStringPayload($job, $data);
}
/**
* Create a payload for an object-based queue handler.
*
* @param mixed $job
* @return array
*/
protected function createObjectPayload($job)
{
return [
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => isset($job->tries) ? $job->tries : null,
'timeout' => isset($job->timeout) ? $job->timeout : null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
];
}
/**
* Create a typical, string based queue payload array.
*
* @param string $job
* @param mixed $data
* @return array
*/
protected function createStringPayload($job, $data)
{
return ['job' => $job, 'data' => $data];
}
/**
* Get the connection name for the queue.
*
* @return string
*/
public function getConnectionName()
{
return $this->connectionName;
}
/**
* Set the connection name for the queue.
*
* @param string $name
* @return $this
*/
public function setConnectionName($name)
{
$this->connectionName = $name;
return $this;
}
/**
* Set the IoC container instance.
*
* @param \Illuminate\Container\Container $container
* @return void
*/
public function setContainer(Container $container)
{
$this->container = $container;
}
}
Capsule/Manager.php 0000666 00000011016 13436756000 0010232 0 ustar 00 setupContainer($container ?: new Container);
// Once we have the container setup, we will setup the default configuration
// options in the container "config" bindings. This just makes this queue
// manager behave correctly since all the correct binding are in place.
$this->setupDefaultConfiguration();
$this->setupManager();
$this->registerConnectors();
}
/**
* Setup the default queue configuration options.
*
* @return void
*/
protected function setupDefaultConfiguration()
{
$this->container['config']['queue.default'] = 'default';
}
/**
* Build the queue manager instance.
*
* @return void
*/
protected function setupManager()
{
$this->manager = new QueueManager($this->container);
}
/**
* Register the default connectors that the component ships with.
*
* @return void
*/
protected function registerConnectors()
{
$provider = new QueueServiceProvider($this->container);
$provider->registerConnectors($this->manager);
}
/**
* Get a connection instance from the global manager.
*
* @param string $connection
* @return \Illuminate\Contracts\Queue\Queue
*/
public static function connection($connection = null)
{
return static::$instance->getConnection($connection);
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function push($job, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->push($job, $data, $queue);
}
/**
* Push a new an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function bulk($jobs, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->bulk($jobs, $data, $queue);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function later($delay, $job, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->later($delay, $job, $data, $queue);
}
/**
* Get a registered connection instance.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
public function getConnection($name = null)
{
return $this->manager->connection($name);
}
/**
* Register a connection with the manager.
*
* @param array $config
* @param string $name
* @return void
*/
public function addConnection(array $config, $name = 'default')
{
$this->container['config']["queue.connections.{$name}"] = $config;
}
/**
* Get the queue manager instance.
*
* @return \Illuminate\Queue\QueueManager
*/
public function getQueueManager()
{
return $this->manager;
}
/**
* Pass dynamic instance methods to the manager.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public function __call($method, $parameters)
{
return $this->manager->$method(...$parameters);
}
/**
* Dynamically pass methods to the default connection.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public static function __callStatic($method, $parameters)
{
return static::connection()->$method(...$parameters);
}
}
SerializesModels.php 0000666 00000002461 13436756000 0010546 0 ustar 00 getProperties();
foreach ($properties as $property) {
$property->setValue($this, $this->getSerializedPropertyValue(
$this->getPropertyValue($property)
));
}
return array_map(function ($p) {
return $p->getName();
}, $properties);
}
/**
* Restore the model after serialization.
*
* @return void
*/
public function __wakeup()
{
foreach ((new ReflectionClass($this))->getProperties() as $property) {
$property->setValue($this, $this->getRestoredPropertyValue(
$this->getPropertyValue($property)
));
}
}
/**
* Get the property value for the given property.
*
* @param \ReflectionProperty $property
* @return mixed
*/
protected function getPropertyValue(ReflectionProperty $property)
{
$property->setAccessible(true);
return $property->getValue($this);
}
}
SerializesAndRestoresModelIdentifiers.php 0000666 00000004240 13436756000 0014720 0 ustar 00 getQueueableClass(), $value->getQueueableIds());
}
if ($value instanceof QueueableEntity) {
return new ModelIdentifier(get_class($value), $value->getQueueableId());
}
return $value;
}
/**
* Get the restored property value after deserialization.
*
* @param mixed $value
* @return mixed
*/
protected function getRestoredPropertyValue($value)
{
if (! $value instanceof ModelIdentifier) {
return $value;
}
return is_array($value->id)
? $this->restoreCollection($value)
: $this->getQueryForModelRestoration(new $value->class)
->useWritePdo()->findOrFail($value->id);
}
/**
* Restore a queueable collection instance.
*
* @param \Illuminate\Contracts\Database\ModelIdentifier $value
* @return \Illuminate\Database\Eloquent\Collection
*/
protected function restoreCollection($value)
{
if (! $value->class || count($value->id) === 0) {
return new EloquentCollection;
}
$model = new $value->class;
return $this->getQueryForModelRestoration($model)->useWritePdo()
->whereIn($model->getQualifiedKeyName(), $value->id)->get();
}
/**
* Get the query for restoration.
*
* @param \Illuminate\Database\Eloquent\Model $model
* @return \Illuminate\Database\Eloquent\Builder
*/
protected function getQueryForModelRestoration($model)
{
return $model->newQueryWithoutScopes();
}
}
BeanstalkdQueue.php 0000666 00000007426 13436756000 0010353 0 ustar 00 default = $default;
$this->timeToRun = $timeToRun;
$this->pheanstalk = $pheanstalk;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
$queue = $this->getQueue($queue);
return (int) $this->pheanstalk->statsTube($queue)->total_jobs;
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pheanstalk->useTube($this->getQueue($queue))->put(
$payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun
);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));
return $pheanstalk->put(
$this->createPayload($job, $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$job = $this->pheanstalk->watchOnly($queue)->reserve(0);
if ($job instanceof PheanstalkJob) {
return new BeanstalkdJob(
$this->container, $this->pheanstalk, $job, $this->connectionName, $queue
);
}
}
/**
* Delete a message from the Beanstalk queue.
*
* @param string $queue
* @param string $id
* @return void
*/
public function deleteMessage($queue, $id)
{
$queue = $this->getQueue($queue);
$this->pheanstalk->useTube($queue)->delete(new PheanstalkJob($id, ''));
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
return $queue ?: $this->default;
}
/**
* Get the underlying Pheanstalk instance.
*
* @return \Pheanstalk\Pheanstalk
*/
public function getPheanstalk()
{
return $this->pheanstalk;
}
}
NullQueue.php 0000666 00000002541 13436756000 0007206 0 ustar 00 table = $table;
$this->resolver = $resolver;
$this->database = $database;
}
/**
* Log a failed job into storage.
*
* @param string $connection
* @param string $queue
* @param string $payload
* @param \Exception $exception
* @return int|null
*/
public function log($connection, $queue, $payload, $exception)
{
$failed_at = Carbon::now();
$exception = (string) $exception;
return $this->getTable()->insertGetId(compact(
'connection', 'queue', 'payload', 'exception', 'failed_at'
));
}
/**
* Get a list of all of the failed jobs.
*
* @return array
*/
public function all()
{
return $this->getTable()->orderBy('id', 'desc')->get()->all();
}
/**
* Get a single failed job.
*
* @param mixed $id
* @return array
*/
public function find($id)
{
return $this->getTable()->find($id);
}
/**
* Delete a single failed job from storage.
*
* @param mixed $id
* @return bool
*/
public function forget($id)
{
return $this->getTable()->where('id', $id)->delete() > 0;
}
/**
* Flush all of the failed jobs from storage.
*
* @return void
*/
public function flush()
{
$this->getTable()->delete();
}
/**
* Get a new query builder instance for the table.
*
* @return \Illuminate\Database\Query\Builder
*/
protected function getTable()
{
return $this->resolver->connection($this->database)->table($this->table);
}
}
Worker.php 0000666 00000037313 13436756000 0006545 0 ustar 00 events = $events;
$this->manager = $manager;
$this->exceptions = $exceptions;
}
/**
* Listen to the given queue in a loop.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
{
$this->listenForSignals();
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
$this->registerTimeoutHandler($job, $options);
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job && $this->daemonShouldRun($options)) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
if ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
}
}
}
/**
* Register the worker timeout handler (PHP 7.1+).
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if ($options->timeout > 0 && $this->supportsAsyncSignals()) {
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});
pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep);
}
}
/**
* Get the appropriate timeout for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param WorkerOptions $options
* @return int
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout())
? $job->timeout() : $options->timeout;
}
/**
* Determine if the daemon should process on this iteration.
*
* @param WorkerOptions $options
* @return bool
*/
protected function daemonShouldRun(WorkerOptions $options)
{
if (($this->manager->isDownForMaintenance() && ! $options->force) ||
$this->paused ||
$this->events->until(new Events\Looping) === false) {
// If the application is down for maintenance or doesn't want the queues to run
// we will sleep for one second just in case the developer has it set to not
// sleep at all. This just prevents CPU from maxing out in this situation.
$this->sleep(1);
return false;
}
return true;
}
/**
* Process the next job on the queue.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
return $this->runJob($job, $connectionName, $options);
}
$this->sleep($options->sleep);
}
/**
* Get the next job from the queue connection.
*
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
protected function getNextJob($connection, $queue)
{
try {
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
$this->exceptions->report(new FatalThrowableError($e));
}
}
/**
* Process the given job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
return $this->process($connectionName, $job, $options);
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
$this->exceptions->report(new FatalThrowableError($e));
}
}
/**
* Process a given job from the queue.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connectionName, $job, WorkerOptions $options)
{
try {
// First we will raise the before job event and determine if the job has already ran
// over the its maximum attempt limit, which could primarily happen if the job is
// continually timing out and not actually throwing any exceptions from itself.
$this->raiseBeforeJobEvent($connectionName, $job);
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);
// Here we will fire off the job and let it process. We will catch any exceptions so
// they can be reported to the developers logs, etc. Once the job is finished the
// proper events will be fired to let any listeners know this job has finished.
$job->fire();
$this->raiseAfterJobEvent($connectionName, $job);
} catch (Exception $e) {
$this->handleJobException($connectionName, $job, $options, $e);
} catch (Throwable $e) {
$this->handleJobException(
$connectionName, $job, $options, new FatalThrowableError($e)
);
}
}
/**
* Handle an exception that occurred while the job was running.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @param \Exception $e
* @return void
*
* @throws \Exception
*/
protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
try {
// First, we will go ahead and mark the job as failed if it will exceed the maximum
// attempts it is allowed to run the next time we process it. If so we will just
// go ahead and mark it as failed now so we do not have to release this again.
$this->markJobAsFailedIfWillExceedMaxAttempts(
$connectionName, $job, (int) $options->maxTries, $e
);
$this->raiseExceptionOccurredJobEvent(
$connectionName, $job, $e
);
} finally {
// If we catch an exception, we will attempt to release the job back onto the queue
// so it is not lost entirely. This'll let the job be retried at a later time by
// another listener (or this same one). We will re-throw this exception after.
if (! $job->isDeleted()) {
$job->release($options->delay);
}
}
throw $e;
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* This will likely be because the job previously exceeded a timeout.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @return void
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
if ($maxTries === 0 || $job->attempts() <= $maxTries) {
return;
}
$this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
'A queued job has been attempted too many times. The job may have previously timed out.'
));
throw $e;
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @param \Exception $e
* @return void
*/
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
if ($maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($connectionName, $job, $e);
}
}
/**
* Mark the given job as failed and raise the relevant event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function failJob($connectionName, $job, $e)
{
return FailingJob::handle($connectionName, $job, $e);
}
/**
* Raise the before queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent($connectionName, $job)
{
$this->events->fire(new Events\JobProcessing(
$connectionName, $job
));
}
/**
* Raise the after queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent($connectionName, $job)
{
$this->events->fire(new Events\JobProcessed(
$connectionName, $job
));
}
/**
* Raise the exception occurred queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e)
{
$this->events->fire(new Events\JobExceptionOccurred(
$connectionName, $job, $e
));
}
/**
* Raise the failed queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function raiseFailedJobEvent($connectionName, $job, $e)
{
$this->events->fire(new Events\JobFailed(
$connectionName, $job, $e
));
}
/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}
/**
* Enable async signals for the process.
*
* @return void
*/
protected function listenForSignals()
{
if ($this->supportsAsyncSignals()) {
pcntl_async_signals(true);
pcntl_signal(SIGUSR2, function () {
$this->paused = true;
});
pcntl_signal(SIGCONT, function () {
$this->paused = false;
});
}
}
/**
* Determine if "async" signals are supported.
*
* @return bool
*/
protected function supportsAsyncSignals()
{
return version_compare(PHP_VERSION, '7.1.0') >= 0 &&
extension_loaded('pcntl');
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @param int $status
* @return void
*/
public function stop($status = 0)
{
$this->events->fire(new Events\WorkerStopping);
exit($status);
}
/**
* Kill the process.
*
* @param int $status
* @return void
*/
public function kill($status = 0)
{
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}
exit($status);
}
/**
* Sleep the script for a given number of seconds.
*
* @param int $seconds
* @return void
*/
public function sleep($seconds)
{
sleep($seconds);
}
/**
* Set the cache repository implementation.
*
* @param \Illuminate\Contracts\Cache\Repository $cache
* @return void
*/
public function setCache(CacheContract $cache)
{
$this->cache = $cache;
}
}
SyncQueue.php 0000666 00000007615 13436756000 0007217 0 ustar 00 resolveJob($this->createPayload($job, $data, $queue), $queue);
try {
$this->raiseBeforeJobEvent($queueJob);
$queueJob->fire();
$this->raiseAfterJobEvent($queueJob);
} catch (Exception $e) {
$this->handleException($queueJob, $e);
} catch (Throwable $e) {
$this->handleException($queueJob, new FatalThrowableError($e));
}
return 0;
}
/**
* Resolve a Sync job instance.
*
* @param string $payload
* @param string $queue
* @return \Illuminate\Queue\Jobs\SyncJob
*/
protected function resolveJob($payload, $queue)
{
return new SyncJob($this->container, $payload, $this->connectionName, $queue);
}
/**
* Raise the before queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent(Job $job)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessing($this->connectionName, $job));
}
}
/**
* Raise the after queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent(Job $job)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessed($this->connectionName, $job));
}
}
/**
* Raise the exception occurred queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function raiseExceptionOccurredJobEvent(Job $job, $e)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobExceptionOccurred($this->connectionName, $job, $e));
}
}
/**
* Handle an exception that occurred while processing a job.
*
* @param \Illuminate\Queue\Jobs\Job $queueJob
* @param \Exception $e
* @return void
*
* @throws \Exception
*/
protected function handleException($queueJob, $e)
{
$this->raiseExceptionOccurredJobEvent($queueJob, $e);
FailingJob::handle($this->connectionName, $queueJob, $e);
throw $e;
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
//
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->push($job, $data, $queue);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
//
}
}
ManuallyFailedException.php 0000666 00000000175 13436756000 0012036 0 ustar 00 =5.6.4",
"illuminate/console": "5.4.*",
"illuminate/container": "5.4.*",
"illuminate/contracts": "5.4.*",
"illuminate/support": "5.4.*",
"nesbot/carbon": "~1.20",
"symfony/debug": "~3.2",
"symfony/process": "~3.2"
},
"autoload": {
"psr-4": {
"Illuminate\\Queue\\": ""
}
},
"extra": {
"branch-alias": {
"dev-master": "5.4-dev"
}
},
"suggest": {
"aws/aws-sdk-php": "Required to use the SQS queue driver (~3.0).",
"illuminate/redis": "Required to use the Redis queue driver (5.4.*).",
"pda/pheanstalk": "Required to use the Beanstalk queue driver (~3.0)."
},
"config": {
"sort-packages": true
},
"minimum-stability": "dev"
}
FailingJob.php 0000666 00000002510 13436756000 0007267 0 ustar 00 markAsFailed();
if ($job->isDeleted()) {
return;
}
try {
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
$job->delete();
$job->failed($e);
} finally {
static::events()->fire(new JobFailed(
$connectionName, $job, $e ?: new ManuallyFailedException
));
}
}
/**
* Get the event dispatcher instance.
*
* @return \Illuminate\Contracts\Events\Dispatcher
*/
protected static function events()
{
return Container::getInstance()->make(Dispatcher::class);
}
}
DatabaseQueue.php 0000666 00000020462 13436756000 0010002 0 ustar 00 table = $table;
$this->default = $default;
$this->database = $database;
$this->retryAfter = $retryAfter;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->count();
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload($job, $data));
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pushToDatabase($queue, $payload);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return void
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay);
}
/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function bulk($jobs, $data = '', $queue = null)
{
$queue = $this->getQueue($queue);
$availableAt = $this->availableAt();
return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
function ($job) use ($queue, $data, $availableAt) {
return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
}
)->all());
}
/**
* Release a reserved job back onto the queue.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts);
}
/**
* Push a raw payload to the database with a given delay.
*
* @param string|null $queue
* @param string $payload
* @param \DateTime|int $delay
* @param int $attempts
* @return mixed
*/
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
{
return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
$this->getQueue($queue), $payload, $this->availableAt($delay), $attempts
));
}
/**
* Create an array to insert for the given job.
*
* @param string|null $queue
* @param string $payload
* @param int $availableAt
* @param int $attempts
* @return array
*/
protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
{
return [
'queue' => $queue,
'payload' => $payload,
'attempts' => $attempts,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->currentTime(),
];
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$this->database->beginTransaction();
if ($job = $this->getNextAvailableJob($queue)) {
return $this->marshalJob($queue, $job);
}
$this->database->commit();
}
/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null
*/
protected function getNextAvailableJob($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('id', 'asc')
->first();
return $job ? new DatabaseJobRecord((object) $job) : null;
}
/**
* Modify the query to check for available jobs.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isAvailable($query)
{
$query->where(function ($query) {
$query->whereNull('reserved_at')
->where('available_at', '<=', $this->currentTime());
});
}
/**
* Modify the query to check for jobs that are reserved but have expired.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isReservedButExpired($query)
{
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
$query->orWhere(function ($query) use ($expiration) {
$query->where('reserved_at', '<=', $expiration);
});
}
/**
* Marshal the reserved job into a DatabaseJob instance.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @return \Illuminate\Queue\Jobs\DatabaseJob
*/
protected function marshalJob($queue, $job)
{
$job = $this->markJobAsReserved($job);
$this->database->commit();
return new DatabaseJob(
$this->container, $this, $job, $this->connectionName, $queue
);
}
/**
* Mark the given job ID as reserved.
*
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @return \Illuminate\Queue\Jobs\DatabaseJobRecord
*/
protected function markJobAsReserved($job)
{
$this->database->table($this->table)->where('id', $job->id)->update([
'reserved_at' => $job->touch(),
'attempts' => $job->increment(),
]);
return $job;
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $id
* @return void
*/
public function deleteReserved($queue, $id)
{
$this->database->beginTransaction();
if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
$this->database->table($this->table)->where('id', $id)->delete();
}
$this->database->commit();
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return $queue ?: $this->default;
}
/**
* Get the underlying database instance.
*
* @return \Illuminate\Database\Connection
*/
public function getDatabase()
{
return $this->database;
}
}
Jobs/DatabaseJobRecord.php 0000666 00000002146 13436756000 0011463 0 ustar 00 record = $record;
}
/**
* Increment the number of times the job has been attempted.
*
* @return int
*/
public function increment()
{
$this->record->attempts++;
return $this->record->attempts;
}
/**
* Update the "reserved at" timestamp of the job.
*
* @return int
*/
public function touch()
{
$this->record->reserved_at = $this->currentTime();
return $this->record->reserved_at;
}
/**
* Dynamically access the underlying job information.
*
* @param string $key
* @return mixed
*/
public function __get($key)
{
return $this->record->{$key};
}
}
Jobs/SqsJob.php 0000666 00000004763 13436756000 0007375 0 ustar 00 sqs = $sqs;
$this->job = $job;
$this->queue = $queue;
$this->container = $container;
$this->connectionName = $connectionName;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->sqs->changeMessageVisibility([
'QueueUrl' => $this->queue,
'ReceiptHandle' => $this->job['ReceiptHandle'],
'VisibilityTimeout' => $delay,
]);
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->sqs->deleteMessage([
'QueueUrl' => $this->queue, 'ReceiptHandle' => $this->job['ReceiptHandle'],
]);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return (int) $this->job['Attributes']['ApproximateReceiveCount'];
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job['MessageId'];
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job['Body'];
}
/**
* Get the underlying SQS client instance.
*
* @return \Aws\Sqs\SqsClient
*/
public function getSqs()
{
return $this->sqs;
}
/**
* Get the underlying raw SQS job.
*
* @return array
*/
public function getSqsJob()
{
return $this->job;
}
}
Jobs/Job.php 0000666 00000011174 13436756000 0006700 0 ustar 00 payload();
list($class, $method) = JobName::parse($payload['job']);
with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
$this->deleted = true;
}
/**
* Determine if the job has been deleted.
*
* @return bool
*/
public function isDeleted()
{
return $this->deleted;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
$this->released = true;
}
/**
* Determine if the job was released back into the queue.
*
* @return bool
*/
public function isReleased()
{
return $this->released;
}
/**
* Determine if the job has been deleted or released.
*
* @return bool
*/
public function isDeletedOrReleased()
{
return $this->isDeleted() || $this->isReleased();
}
/**
* Determine if the job has been marked as a failure.
*
* @return bool
*/
public function hasFailed()
{
return $this->failed;
}
/**
* Mark the job as "failed".
*
* @return void
*/
public function markAsFailed()
{
$this->failed = true;
}
/**
* Process an exception that caused the job to fail.
*
* @param \Exception $e
* @return void
*/
public function failed($e)
{
$this->markAsFailed();
$payload = $this->payload();
list($class, $method) = JobName::parse($payload['job']);
if (method_exists($this->instance = $this->resolve($class), 'failed')) {
$this->instance->failed($payload['data'], $e);
}
}
/**
* Resolve the given class.
*
* @param string $class
* @return mixed
*/
protected function resolve($class)
{
return $this->container->make($class);
}
/**
* Get the decoded body of the job.
*
* @return array
*/
public function payload()
{
return json_decode($this->getRawBody(), true);
}
/**
* The number of times to attempt a job.
*
* @return int|null
*/
public function maxTries()
{
return array_get($this->payload(), 'maxTries');
}
/**
* The number of seconds the job can run.
*
* @return int|null
*/
public function timeout()
{
return array_get($this->payload(), 'timeout');
}
/**
* Get the name of the queued job class.
*
* @return string
*/
public function getName()
{
return $this->payload()['job'];
}
/**
* Get the resolved name of the queued job class.
*
* Resolves the name of "wrapped" jobs such as class-based handlers.
*
* @return string
*/
public function resolveName()
{
return JobName::resolve($this->getName(), $this->payload());
}
/**
* Get the name of the connection the job belongs to.
*
* @return string
*/
public function getConnectionName()
{
return $this->connectionName;
}
/**
* Get the name of the queue the job belongs to.
*
* @return string
*/
public function getQueue()
{
return $this->queue;
}
/**
* Get the service container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
}
Jobs/SyncJob.php 0000666 00000003247 13436756000 0007537 0 ustar 00 queue = $queue;
$this->payload = $payload;
$this->container = $container;
$this->connectionName = $connectionName;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return 1;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return '';
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->payload;
}
/**
* Get the name of the queue the job belongs to.
*
* @return string
*/
public function getQueue()
{
return 'sync';
}
}
Jobs/RedisJob.php 0000666 00000005764 13436756000 0007677 0 ustar 00 job = $job;
$this->redis = $redis;
$this->queue = $queue;
$this->reserved = $reserved;
$this->container = $container;
$this->connectionName = $connectionName;
$this->decoded = $this->payload();
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job;
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->redis->deleteAndRelease($this->queue, $this, $delay);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return Arr::get($this->decoded, 'attempts') + 1;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return Arr::get($this->decoded, 'id');
}
/**
* Get the underlying Redis factory implementation.
*
* @return \Illuminate\Contracts\Redis\Factory
*/
public function getRedisQueue()
{
return $this->redis;
}
/**
* Get the underlying reserved Redis job.
*
* @return string
*/
public function getReservedJob()
{
return $this->reserved;
}
}
Jobs/JobName.php 0000666 00000001577 13436756000 0007507 0 ustar 00 job = $job;
$this->queue = $queue;
$this->database = $database;
$this->container = $container;
$this->connectionName = $connectionName;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->database->release($this->queue, $this->job, $delay);
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->database->deleteReserved($this->queue, $this->job->id);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return (int) $this->job->attempts;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job->id;
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->payload;
}
}
Jobs/BeanstalkdJob.php 0000666 00000005262 13436756000 0010672 0 ustar 00 job = $job;
$this->queue = $queue;
$this->container = $container;
$this->pheanstalk = $pheanstalk;
$this->connectionName = $connectionName;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$priority = Pheanstalk::DEFAULT_PRIORITY;
$this->pheanstalk->release($this->job, $priority, $delay);
}
/**
* Bury the job in the queue.
*
* @return void
*/
public function bury()
{
parent::release();
$this->pheanstalk->bury($this->job);
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->pheanstalk->delete($this->job);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
$stats = $this->pheanstalk->statsJob($this->job);
return (int) $stats->reserves;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job->getId();
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->getData();
}
/**
* Get the underlying Pheanstalk instance.
*
* @return \Pheanstalk\Pheanstalk
*/
public function getPheanstalk()
{
return $this->pheanstalk;
}
/**
* Get the underlying Pheanstalk job.
*
* @return \Pheanstalk\Job
*/
public function getPheanstalkJob()
{
return $this->job;
}
}
InteractsWithTime.php 0000666 00000002042 13436756000 0010672 0 ustar 00 getTimestamp() - $this->currentTime())
: (int) $delay;
}
/**
* Get the "available at" UNIX timestamp.
*
* @param \DateTimeInterface|int $delay
* @return int
*/
protected function availableAt($delay = 0)
{
return $delay instanceof DateTimeInterface
? $delay->getTimestamp()
: Carbon::now()->addSeconds($delay)->getTimestamp();
}
/**
* Get the current system time as a UNIX timestamp.
*
* @return int
*/
protected function currentTime()
{
return Carbon::now()->getTimestamp();
}
}
MaxAttemptsExceededException.php 0000666 00000000202 13436756000 0013034 0 ustar 00 dispatcher = $dispatcher;
}
/**
* Handle the queued job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function call(Job $job, array $data)
{
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
$this->dispatcher->dispatchNow(
$command, $handler = $this->resolveHandler($job, $command)
);
if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}
/**
* Resolve the handler for the given command.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param mixed $command
* @return mixed
*/
protected function resolveHandler($job, $command)
{
$handler = $this->dispatcher->getCommandHandler($command) ?: null;
if ($handler) {
$this->setJobInstanceIfNecessary($job, $handler);
}
return $handler;
}
/**
* Set the job instance of the given class if necessary.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param mixed $instance
* @return mixed
*/
protected function setJobInstanceIfNecessary(Job $job, $instance)
{
if (in_array(InteractsWithQueue::class, class_uses_recursive(get_class($instance)))) {
$instance->setJob($job);
}
return $instance;
}
/**
* Call the failed method on the job instance.
*
* The exception that caused the failure will be passed.
*
* @param array $data
* @param \Exception $e
* @return void
*/
public function failed(array $data, $e)
{
$command = unserialize($data['command']);
if (method_exists($command, 'failed')) {
$command->failed($e);
}
}
}
Listener.php 0000666 00000014344 13436756000 0007060 0 ustar 00 commandPath = $commandPath;
$this->workerCommand = $this->buildCommandTemplate();
}
/**
* Build the environment specific worker command.
*
* @return string
*/
protected function buildCommandTemplate()
{
$command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';
return "{$this->phpBinary()} {$this->artisanBinary()} {$command}";
}
/**
* Get the PHP binary.
*
* @return string
*/
protected function phpBinary()
{
return ProcessUtils::escapeArgument(
(new PhpExecutableFinder)->find(false)
);
}
/**
* Get the Artisan binary.
*
* @return string
*/
protected function artisanBinary()
{
return defined('ARTISAN_BINARY')
? ProcessUtils::escapeArgument(ARTISAN_BINARY)
: 'artisan';
}
/**
* Listen to the given queue connection.
*
* @param string $connection
* @param string $queue
* @param \Illuminate\Queue\ListenerOptions $options
* @return void
*/
public function listen($connection, $queue, ListenerOptions $options)
{
$process = $this->makeProcess($connection, $queue, $options);
while (true) {
$this->runProcess($process, $options->memory);
}
}
/**
* Create a new Symfony process for the worker.
*
* @param string $connection
* @param string $queue
* @param \Illuminate\Queue\ListenerOptions $options
* @return \Symfony\Component\Process\Process
*/
public function makeProcess($connection, $queue, ListenerOptions $options)
{
$command = $this->workerCommand;
// If the environment is set, we will append it to the command string so the
// workers will run under the specified environment. Otherwise, they will
// just run under the production environment which is not always right.
if (isset($options->environment)) {
$command = $this->addEnvironment($command, $options);
}
// Next, we will just format out the worker commands with all of the various
// options available for the command. This will produce the final command
// line that we will pass into a Symfony process object for processing.
$command = $this->formatCommand(
$command, $connection, $queue, $options
);
return new Process(
$command, $this->commandPath, null, null, $options->timeout
);
}
/**
* Add the environment option to the given command.
*
* @param string $command
* @param \Illuminate\Queue\ListenerOptions $options
* @return string
*/
protected function addEnvironment($command, ListenerOptions $options)
{
return $command.' --env='.ProcessUtils::escapeArgument($options->environment);
}
/**
* Format the given command with the listener options.
*
* @param string $command
* @param string $connection
* @param string $queue
* @param \Illuminate\Queue\ListenerOptions $options
* @return string
*/
protected function formatCommand($command, $connection, $queue, ListenerOptions $options)
{
return sprintf(
$command,
ProcessUtils::escapeArgument($connection),
ProcessUtils::escapeArgument($queue),
$options->delay, $options->memory,
$options->sleep, $options->maxTries
);
}
/**
* Run the given process.
*
* @param \Symfony\Component\Process\Process $process
* @param int $memory
* @return void
*/
public function runProcess(Process $process, $memory)
{
$process->run(function ($type, $line) {
$this->handleWorkerOutput($type, $line);
});
// Once we have run the job we'll go check if the memory limit has been exceeded
// for the script. If it has, we will kill this script so the process manager
// will restart this with a clean slate of memory automatically on exiting.
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}
/**
* Handle output from the worker process.
*
* @param int $type
* @param string $line
* @return void
*/
protected function handleWorkerOutput($type, $line)
{
if (isset($this->outputHandler)) {
call_user_func($this->outputHandler, $type, $line);
}
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @return void
*/
public function stop()
{
die;
}
/**
* Set the output handler callback.
*
* @param \Closure $outputHandler
* @return void
*/
public function setOutputHandler(Closure $outputHandler)
{
$this->outputHandler = $outputHandler;
}
}
SqsQueue.php 0000666 00000007017 13436756000 0007045 0 ustar 00 sqs = $sqs;
$this->prefix = $prefix;
$this->default = $default;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
return (int) $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
])->get('ApproximateNumberOfMessages');
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
])->get('MessageId');
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $this->createPayload($job, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue = $this->getQueue($queue),
'AttributeNames' => ['ApproximateReceiveCount'],
]);
if (count($response['Messages']) > 0) {
return new SqsJob(
$this->container, $this->sqs, $response['Messages'][0],
$this->connectionName, $queue
);
}
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
$queue = $queue ?: $this->default;
return filter_var($queue, FILTER_VALIDATE_URL) === false
? rtrim($this->prefix, '/').'/'.$queue : $queue;
}
/**
* Get the underlying SQS instance.
*
* @return \Aws\Sqs\SqsClient
*/
public function getSqs()
{
return $this->sqs;
}
}