Console/ListenCommand.php 0000666 00000007504 13436756220 0011436 0 ustar 00 listener = $listener;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$this->setListenerOptions();
$delay = $this->input->getOption('delay');
// The memory limit is the amount of memory we will allow the script to occupy
// before killing it and letting a process manager restart it for us, which
// is to protect us against any memory leaks that will be in the scripts.
$memory = $this->input->getOption('memory');
$connection = $this->input->getArgument('connection');
$timeout = $this->input->getOption('timeout');
// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);
$this->listener->listen(
$connection, $queue, $delay, $memory, $timeout
);
}
/**
* Get the name of the queue connection to listen on.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
if (is_null($connection)) {
$connection = $this->laravel['config']['queue.default'];
}
$queue = $this->laravel['config']->get("queue.connections.{$connection}.queue", 'default');
return $this->input->getOption('queue') ?: $queue;
}
/**
* Set the options on the queue listener.
*
* @return void
*/
protected function setListenerOptions()
{
$this->listener->setEnvironment($this->laravel->environment());
$this->listener->setSleep($this->option('sleep'));
$this->listener->setMaxTries($this->option('tries'));
$this->listener->setOutputHandler(function ($type, $line) {
$this->output->write($line);
});
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection'],
];
}
/**
* Get the console command options.
*
* @return array
*/
protected function getOptions()
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on', null],
['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],
['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],
['timeout', null, InputOption::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60],
['sleep', null, InputOption::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3],
['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
}
}
Console/RestartCommand.php 0000666 00000001221 13436756220 0011612 0 ustar 00 laravel['cache']->forever('illuminate:queue:restart', time());
$this->info('Broadcasting queue restart signal.');
}
}
Console/FailedTableCommand.php 0000666 00000004105 13436756220 0012326 0 ustar 00 files = $files;
$this->composer = $composer;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$table = $this->laravel['config']['queue.failed.table'];
$tableClassName = Str::studly($table);
$fullPath = $this->createBaseMigration($table);
$stub = str_replace(
['{{table}}', '{{tableClassName}}'], [$table, $tableClassName], $this->files->get(__DIR__.'/stubs/failed_jobs.stub')
);
$this->files->put($fullPath, $stub);
$this->info('Migration created successfully!');
$this->composer->dumpAutoloads();
}
/**
* Create a base migration file for the table.
*
* @param string $table
* @return string
*/
protected function createBaseMigration($table = 'failed_jobs')
{
$name = 'create_'.$table.'_table';
$path = $this->laravel->databasePath().'/migrations';
return $this->laravel['migration.creator']->create($name, $path);
}
}
Console/RetryCommand.php 0000666 00000004213 13436756220 0011277 0 ustar 00 argument('id');
if (count($ids) === 1 && $ids[0] === 'all') {
$ids = Arr::pluck($this->laravel['queue.failer']->all(), 'id');
}
foreach ($ids as $id) {
$this->retryJob($id);
}
}
/**
* Retry the queue job with the given ID.
*
* @param string $id
* @return void
*/
protected function retryJob($id)
{
$failed = $this->laravel['queue.failer']->find($id);
if (! is_null($failed)) {
$failed = (object) $failed;
$failed->payload = $this->resetAttempts($failed->payload);
$this->laravel['queue']->connection($failed->connection)
->pushRaw($failed->payload, $failed->queue);
$this->laravel['queue.failer']->forget($failed->id);
$this->info("The failed job [{$id}] has been pushed back onto the queue!");
} else {
$this->error("No failed job matches the given ID [{$id}].");
}
}
/**
* Reset the payload attempts.
*
* @param string $payload
* @return string
*/
protected function resetAttempts($payload)
{
$payload = json_decode($payload, true);
if (isset($payload['attempts'])) {
$payload['attempts'] = 1;
}
return json_encode($payload);
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['id', InputArgument::IS_ARRAY, 'The ID of the failed job'],
];
}
}
Console/TableCommand.php 0000666 00000004051 13436756220 0011221 0 ustar 00 files = $files;
$this->composer = $composer;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
$table = $this->laravel['config']['queue.connections.database.table'];
$tableClassName = Str::studly($table);
$fullPath = $this->createBaseMigration($table);
$stub = str_replace(
['{{table}}', '{{tableClassName}}'], [$table, $tableClassName], $this->files->get(__DIR__.'/stubs/jobs.stub')
);
$this->files->put($fullPath, $stub);
$this->info('Migration created successfully!');
$this->composer->dumpAutoloads();
}
/**
* Create a base migration file for the table.
*
* @param string $table
* @return string
*/
protected function createBaseMigration($table = 'jobs')
{
$name = 'create_'.$table.'_table';
$path = $this->laravel->databasePath().'/migrations';
return $this->laravel['migration.creator']->create($name, $path);
}
}
Console/stubs/failed_jobs.stub 0000666 00000001416 13436756220 0012464 0 ustar 00 increments('id');
$table->text('connection');
$table->text('queue');
$table->longText('payload');
$table->longText('exception');
$table->timestamp('failed_at')->useCurrent();
});
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::drop('{{table}}');
}
}
Console/stubs/jobs.stub 0000666 00000001634 13436756220 0011162 0 ustar 00 bigIncrements('id');
$table->string('queue');
$table->longText('payload');
$table->tinyInteger('attempts')->unsigned();
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
$table->index(['queue', 'reserved_at']);
});
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::drop('{{table}}');
}
}
Console/FlushFailedCommand.php 0000666 00000001147 13436756220 0012363 0 ustar 00 laravel['queue.failer']->flush();
$this->info('All failed jobs deleted successfully!');
}
}
Console/WorkCommand.php 0000666 00000014234 13436756220 0011120 0 ustar 00 worker = $worker;
}
/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
if ($this->downForMaintenance() && $this->option('once')) {
return $this->worker->sleep($this->option('sleep'));
}
// We'll listen to the processed and failed events so we can write information
// to the console as jobs are processed, which will let the developer watch
// which jobs are coming through a queue and be informed on its progress.
$this->listenForEvents();
$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];
// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->option('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
$response = $this->runWorker(
$connection, $queue
);
}
/**
* Run the worker instance.
*
* @param string $connection
* @param string $queue
* @return array
*/
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->laravel['cache']->driver());
$method = $this->option('once') ? 'runNextJob' : 'daemon';
return $this->worker->{$method}(
$connection, $queue, $this->gatherWorkerOptions()
);
}
/**
* Gather all of the queue worker options as a single object.
*
* @return \Illuminate\Queue\WorkerOptions
*/
protected function gatherWorkerOptions()
{
$timeout = $this->option('timeout');
if ($timeout && ! function_exists('pcntl_fork')) {
throw new RuntimeException('The pcntl extension is required in order to specify job timeouts.');
}
return new WorkerOptions(
$this->option('delay'), $this->option('memory'),
$timeout, $this->option('sleep'),
$this->option('tries')
);
}
/**
* Listen for the queue events in order to update the console output.
*
* @return void
*/
protected function listenForEvents()
{
$this->laravel['events']->listen(JobProcessed::class, function ($event) {
$this->writeOutput($event->job, false);
});
$this->laravel['events']->listen(JobFailed::class, function ($event) {
$this->writeOutput($event->job, true);
$this->logFailedJob($event);
});
}
/**
* Write the status output for the queue worker.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param bool $failed
* @return void
*/
protected function writeOutput(Job $job, $failed)
{
if ($failed) {
$this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Failed: '.$job->resolveName());
} else {
$this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Processed: '.$job->resolveName());
}
}
/**
* Store a failed job event.
*
* @param JobFailed $event
* @return void
*/
protected function logFailedJob(JobFailed $event)
{
$this->laravel['queue.failer']->log(
$event->connectionName, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
);
}
/**
* Determine if the worker should run in maintenance mode.
*
* @return bool
*/
protected function downForMaintenance()
{
return $this->option('force') ? false : $this->laravel->isDownForMaintenance();
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection', null],
];
}
/**
* Get the console command options.
*
* @return array
*/
protected function getOptions()
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on'],
['daemon', null, InputOption::VALUE_NONE, 'Run the worker in daemon mode (Deprecated)'],
['once', null, InputOption::VALUE_NONE, 'Only process the next job on the queue'],
['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],
['force', null, InputOption::VALUE_NONE, 'Force the worker to run even in maintenance mode'],
['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],
['sleep', null, InputOption::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3],
['timeout', null, InputOption::VALUE_OPTIONAL, 'The number of seconds a child process can run', 60],
['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
}
}
Console/ForgetFailedCommand.php 0000666 00000001767 13436756220 0012540 0 ustar 00 laravel['queue.failer']->forget($this->argument('id'))) {
$this->info('Failed job deleted successfully!');
} else {
$this->error('No failed job matches the given ID.');
}
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['id', InputArgument::REQUIRED, 'The ID of the failed job'],
];
}
}
Console/ListFailedCommand.php 0000666 00000004663 13436756220 0012223 0 ustar 00 getFailedJobs();
if (count($jobs) == 0) {
return $this->info('No failed jobs!');
}
$this->displayFailedJobs($jobs);
}
/**
* Compile the failed jobs into a displayable format.
*
* @return array
*/
protected function getFailedJobs()
{
$results = [];
foreach ($this->laravel['queue.failer']->all() as $failed) {
$results[] = $this->parseFailedJob((array) $failed);
}
return array_filter($results);
}
/**
* Parse the failed job row.
*
* @param array $failed
* @return array
*/
protected function parseFailedJob(array $failed)
{
$row = array_values(Arr::except($failed, ['payload', 'exception']));
array_splice($row, 3, 0, $this->extractJobName($failed['payload']));
return $row;
}
/**
* Extract the failed job name from payload.
*
* @param string $payload
* @return string|null
*/
private function extractJobName($payload)
{
$payload = json_decode($payload, true);
if ($payload && (! isset($payload['data']['command']))) {
return Arr::get($payload, 'job');
}
if ($payload && isset($payload['data']['command'])) {
preg_match('/"([^"]+)"/', $payload['data']['command'], $matches);
if (isset($matches[1])) {
return $matches[1];
} else {
return Arr::get($payload, 'job');
}
}
}
/**
* Display the failed jobs in the console.
*
* @param array $jobs
* @return void
*/
protected function displayFailedJobs(array $jobs)
{
$this->table($this->headers, $jobs);
}
}
ConsoleServiceProvider.php 0000666 00000002734 13436756220 0011735 0 ustar 00 app->singleton('command.queue.failed', function () {
return new ListFailedCommand;
});
$this->app->singleton('command.queue.retry', function () {
return new RetryCommand;
});
$this->app->singleton('command.queue.forget', function () {
return new ForgetFailedCommand;
});
$this->app->singleton('command.queue.flush', function () {
return new FlushFailedCommand;
});
$this->commands(
'command.queue.failed', 'command.queue.retry',
'command.queue.forget', 'command.queue.flush'
);
}
/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
'command.queue.failed', 'command.queue.retry',
'command.queue.forget', 'command.queue.flush',
];
}
}
RedisQueue.php 0000666 00000013503 13436756220 0007346 0 ustar 00 redis = $redis;
$this->expire = $expire;
$this->default = $default;
$this->connection = $connection;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
$queue = $this->getQueue($queue);
return $this->getConnection()->eval(LuaScripts::size(), 3, $queue, $queue.':delayed', $queue.':reserved');
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->getConnection()->rpush($this->getQueue($queue), $payload);
return Arr::get(json_decode($payload, true), 'id');
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$this->getConnection()->zadd(
$this->getQueue($queue).':delayed', $this->getTime() + $this->getSeconds($delay), $payload
);
return Arr::get(json_decode($payload, true), 'id');
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$original = $queue ?: $this->default;
$queue = $this->getQueue($queue);
$this->migrateExpiredJobs($queue.':delayed', $queue);
if (! is_null($this->expire)) {
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
list($job, $reserved) = $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
);
if ($reserved) {
return new RedisJob($this->container, $this, $job, $reserved, $original);
}
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem($this->getQueue($queue).':reserved', $job);
}
/**
* Delete a reserved job from the reserved queue and release it.
*
* @param string $queue
* @param string $job
* @param int $delay
* @return void
*/
public function deleteAndRelease($queue, $job, $delay)
{
$queue = $this->getQueue($queue);
$this->getConnection()->eval(
LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved',
$job, $this->getTime() + $delay
);
}
/**
* Migrate the delayed jobs that are ready to the regular queue.
*
* @param string $from
* @param string $to
* @return void
*/
public function migrateExpiredJobs($from, $to)
{
$this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->getTime()
);
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*/
protected function createPayload($job, $data = '', $queue = null)
{
$payload = $this->setMeta(
parent::createPayload($job, $data), 'id', $this->getRandomId()
);
return $this->setMeta($payload, 'attempts', 1);
}
/**
* Get a random ID string.
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return 'queues:'.($queue ?: $this->default);
}
/**
* Get the connection for the queue.
*
* @return \Predis\ClientInterface
*/
protected function getConnection()
{
return $this->redis->connection($this->connection);
}
/**
* Get the underlying Redis instance.
*
* @return \Illuminate\Redis\Database
*/
public function getRedis()
{
return $this->redis;
}
}
LuaScripts.php 0000666 00000003033 13436756220 0007361 0 ustar 00 registerManager();
$this->registerWorker();
$this->registerListener();
$this->registerFailedJobServices();
}
/**
* Register the queue manager.
*
* @return void
*/
protected function registerManager()
{
$this->app->singleton('queue', function ($app) {
// Once we have an instance of the queue manager, we will register the various
// resolvers for the queue connectors. These connectors are responsible for
// creating the classes that accept queue configs and instantiate queues.
$manager = new QueueManager($app);
$this->registerConnectors($manager);
return $manager;
});
$this->app->singleton('queue.connection', function ($app) {
return $app['queue']->connection();
});
}
/**
* Register the queue worker.
*
* @return void
*/
protected function registerWorker()
{
$this->registerWorkCommand();
$this->registerRestartCommand();
$this->app->singleton('queue.worker', function ($app) {
return new Worker(
$app['queue'], $app['events'],
$app['Illuminate\Contracts\Debug\ExceptionHandler']
);
});
}
/**
* Register the queue worker console command.
*
* @return void
*/
protected function registerWorkCommand()
{
$this->app->singleton('command.queue.work', function ($app) {
return new WorkCommand($app['queue.worker']);
});
$this->commands('command.queue.work');
}
/**
* Register the queue listener.
*
* @return void
*/
protected function registerListener()
{
$this->registerListenCommand();
$this->app->singleton('queue.listener', function ($app) {
return new Listener($app->basePath());
});
}
/**
* Register the queue listener console command.
*
* @return void
*/
protected function registerListenCommand()
{
$this->app->singleton('command.queue.listen', function ($app) {
return new ListenCommand($app['queue.listener']);
});
$this->commands('command.queue.listen');
}
/**
* Register the queue restart console command.
*
* @return void
*/
public function registerRestartCommand()
{
$this->app->singleton('command.queue.restart', function () {
return new RestartCommand;
});
$this->commands('command.queue.restart');
}
/**
* Register the connectors on the queue manager.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function registerConnectors($manager)
{
foreach (['Null', 'Sync', 'Database', 'Beanstalkd', 'Redis', 'Sqs'] as $connector) {
$this->{"register{$connector}Connector"}($manager);
}
}
/**
* Register the Null queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerNullConnector($manager)
{
$manager->addConnector('null', function () {
return new NullConnector;
});
}
/**
* Register the Sync queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSyncConnector($manager)
{
$manager->addConnector('sync', function () {
return new SyncConnector;
});
}
/**
* Register the Beanstalkd queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerBeanstalkdConnector($manager)
{
$manager->addConnector('beanstalkd', function () {
return new BeanstalkdConnector;
});
}
/**
* Register the database queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerDatabaseConnector($manager)
{
$manager->addConnector('database', function () {
return new DatabaseConnector($this->app['db']);
});
}
/**
* Register the Redis queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerRedisConnector($manager)
{
$app = $this->app;
$manager->addConnector('redis', function () use ($app) {
return new RedisConnector($app['redis']);
});
}
/**
* Register the Amazon SQS queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerSqsConnector($manager)
{
$manager->addConnector('sqs', function () {
return new SqsConnector;
});
}
/**
* Register the failed job services.
*
* @return void
*/
protected function registerFailedJobServices()
{
$this->app->singleton('queue.failer', function ($app) {
$config = $app['config']['queue.failed'];
if (isset($config['table'])) {
return new DatabaseFailedJobProvider($app['db'], $config['database'], $config['table']);
} else {
return new NullFailedJobProvider;
}
});
}
/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
'queue', 'queue.worker', 'queue.listener', 'queue.failer',
'command.queue.work', 'command.queue.listen',
'command.queue.restart', 'queue.connection',
];
}
}
InteractsWithQueue.php 0000666 00000002476 13436756220 0011077 0 ustar 00 job ? $this->job->attempts() : 1;
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
if ($this->job) {
return $this->job->delete();
}
}
/**
* Fail the job from the queue.
*
* @return void
*/
public function failed()
{
if ($this->job) {
return $this->job->failed();
}
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
if ($this->job) {
return $this->job->release($delay);
}
}
/**
* Set the base queue job instance.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return $this
*/
public function setJob(JobContract $job)
{
$this->job = $job;
return $this;
}
}
README.md 0000666 00000002313 13436756220 0006036 0 ustar 00 ## Illuminate Queue
The Laravel Queue component provides a unified API across a variety of different queue services. Queues allow you to defer the processing of a time consuming task, such as sending an e-mail, until a later time, thus drastically speeding up the web requests to your application.
### Usage Instructions
First, create a new Queue `Capsule` manager instance. Similar to the "Capsule" provided for the Eloquent ORM, the queue Capsule aims to make configuring the library for usage outside of the Laravel framework as easy as possible.
```PHP
use Illuminate\Queue\Capsule\Manager as Queue;
$queue = new Queue;
$queue->addConnection([
'driver' => 'beanstalkd',
'host' => 'localhost',
'queue' => 'default',
]);
// Make this Capsule instance available globally via static methods... (optional)
$queue->setAsGlobal();
```
Once the Capsule instance has been registered. You may use it like so:
```PHP
// As an instance...
$queue->push('SendEmail', array('message' => $message));
// If setAsGlobal has been called...
Queue::push('SendEmail', array('message' => $message));
```
For further documentation on using the queue, consult the [Laravel framework documentation](https://laravel.com/docs).
WorkerOptions.php 0000666 00000002214 13436756220 0010115 0 ustar 00 delay = $delay;
$this->sleep = $sleep;
$this->memory = $memory;
$this->timeout = $timeout;
$this->maxTries = $maxTries;
}
}
Events/WorkerStopping.php 0000666 00000000113 13436756220 0011525 0 ustar 00 job = $job;
$this->connectionName = $connectionName;
}
}
Events/JobProcessing.php 0000666 00000001116 13436756220 0011303 0 ustar 00 job = $job;
$this->connectionName = $connectionName;
}
}
Events/JobExceptionOccurred.php 0000666 00000001423 13436756220 0012615 0 ustar 00 job = $job;
$this->exception = $exception;
$this->connectionName = $connectionName;
}
}
Events/JobFailed.php 0000666 00000001433 13436756220 0010355 0 ustar 00 job = $job;
$this->exception = $exception;
$this->connectionName = $connectionName;
}
}
Connectors/SqsConnector.php 0000666 00000002106 13436756220 0012026 0 ustar 00 getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret']);
}
return new SqsQueue(
new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
);
}
/**
* Get the default configuration for SQS.
*
* @param array $config
* @return array
*/
protected function getDefaultConfiguration(array $config)
{
return array_merge([
'version' => 'latest',
'http' => [
'timeout' => 60,
'connect_timeout' => 60,
],
], $config);
}
}
Connectors/ConnectorInterface.php 0000666 00000000407 13436756220 0013162 0 ustar 00 redis = $redis;
$this->connection = $connection;
}
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new RedisQueue(
$this->redis, $config['queue'],
Arr::get($config, 'connection', $this->connection),
Arr::get($config, 'retry_after', 60)
);
}
}
Connectors/DatabaseConnector.php 0000666 00000002071 13436756220 0012765 0 ustar 00 connections = $connections;
}
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new DatabaseQueue(
$this->connections->connection(Arr::get($config, 'connection')),
$config['table'],
$config['queue'],
Arr::get($config, 'retry_after', 60)
);
}
}
Connectors/SyncConnector.php 0000666 00000000546 13436756220 0012202 0 ustar 00 app = $app;
}
/**
* Register an event listener for the before job event.
*
* @param mixed $callback
* @return void
*/
public function before($callback)
{
$this->app['events']->listen(Events\JobProcessing::class, $callback);
}
/**
* Register an event listener for the after job event.
*
* @param mixed $callback
* @return void
*/
public function after($callback)
{
$this->app['events']->listen(Events\JobProcessed::class, $callback);
}
/**
* Register an event listener for the exception occurred job event.
*
* @param mixed $callback
* @return void
*/
public function exceptionOccurred($callback)
{
$this->app['events']->listen(Events\JobExceptionOccurred::class, $callback);
}
/**
* Register an event listener for the daemon queue loop.
*
* @param mixed $callback
* @return void
*/
public function looping($callback)
{
$this->app['events']->listen('illuminate.queue.looping', $callback);
}
/**
* Register an event listener for the failed job event.
*
* @param mixed $callback
* @return void
*/
public function failing($callback)
{
$this->app['events']->listen(Events\JobFailed::class, $callback);
}
/**
* Register an event listener for the daemon queue stopping.
*
* @param mixed $callback
* @return void
*/
public function stopping($callback)
{
$this->app['events']->listen(Events\WorkerStopping::class, $callback);
}
/**
* Determine if the driver is connected.
*
* @param string $name
* @return bool
*/
public function connected($name = null)
{
return isset($this->connections[$name ?: $this->getDefaultDriver()]);
}
/**
* Resolve a queue connection instance.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connection($name = null)
{
$name = $name ?: $this->getDefaultDriver();
// If the connection has not been resolved yet we will resolve it now as all
// of the connections are resolved when they are actually needed so we do
// not make any unnecessary connection to the various queue end-points.
if (! isset($this->connections[$name])) {
$this->connections[$name] = $this->resolve($name);
$this->connections[$name]->setContainer($this->app);
}
return $this->connections[$name];
}
/**
* Resolve a queue connection.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
protected function resolve($name)
{
$config = $this->getConfig($name);
return $this->getConnector($config['driver'])->connect($config);
}
/**
* Get the connector for a given driver.
*
* @param string $driver
* @return \Illuminate\Queue\Connectors\ConnectorInterface
*
* @throws \InvalidArgumentException
*/
protected function getConnector($driver)
{
if (isset($this->connectors[$driver])) {
return call_user_func($this->connectors[$driver]);
}
throw new InvalidArgumentException("No connector for [$driver]");
}
/**
* Add a queue connection resolver.
*
* @param string $driver
* @param \Closure $resolver
* @return void
*/
public function extend($driver, Closure $resolver)
{
return $this->addConnector($driver, $resolver);
}
/**
* Add a queue connection resolver.
*
* @param string $driver
* @param \Closure $resolver
* @return void
*/
public function addConnector($driver, Closure $resolver)
{
$this->connectors[$driver] = $resolver;
}
/**
* Get the queue connection configuration.
*
* @param string $name
* @return array
*/
protected function getConfig($name)
{
if ($name === null || $name === 'null') {
return ['driver' => 'null'];
}
return $this->app['config']["queue.connections.{$name}"];
}
/**
* Get the name of the default queue connection.
*
* @return string
*/
public function getDefaultDriver()
{
return $this->app['config']['queue.default'];
}
/**
* Set the name of the default queue connection.
*
* @param string $name
* @return void
*/
public function setDefaultDriver($name)
{
$this->app['config']['queue.default'] = $name;
}
/**
* Get the full name for the given connection.
*
* @param string $connection
* @return string
*/
public function getName($connection = null)
{
return $connection ?: $this->getDefaultDriver();
}
/**
* Determine if the application is in maintenance mode.
*
* @return bool
*/
public function isDownForMaintenance()
{
return $this->app->isDownForMaintenance();
}
/**
* Dynamically pass calls to the default connection.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public function __call($method, $parameters)
{
return $this->connection()->$method(...$parameters);
}
}
Queue.php 0000666 00000006547 13436756220 0006371 0 ustar 00 push($job, $data, $queue);
}
/**
* Push a new job onto the queue after a delay.
*
* @param string $queue
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @return mixed
*/
public function laterOn($queue, $delay, $job, $data = '')
{
return $this->later($delay, $job, $data, $queue);
}
/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function bulk($jobs, $data = '', $queue = null)
{
foreach ((array) $jobs as $job) {
$this->push($job, $data, $queue);
}
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*/
protected function createPayload($job, $data = '', $queue = null)
{
if (is_object($job)) {
return json_encode([
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
]);
}
return json_encode($this->createPlainPayload($job, $data));
}
/**
* Create a typical, "plain" queue payload array.
*
* @param string $job
* @param mixed $data
* @return array
*/
protected function createPlainPayload($job, $data)
{
return ['job' => $job, 'data' => $data];
}
/**
* Set additional meta on a payload string.
*
* @param string $payload
* @param string $key
* @param string $value
* @return string
*/
protected function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
return json_encode(Arr::set($payload, $key, $value));
}
/**
* Calculate the number of seconds with the given delay.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getSeconds($delay)
{
if ($delay instanceof DateTime) {
return max(0, $delay->getTimestamp() - $this->getTime());
}
return (int) $delay;
}
/**
* Get the current UNIX timestamp.
*
* @return int
*/
protected function getTime()
{
return time();
}
/**
* Set the IoC container instance.
*
* @param \Illuminate\Container\Container $container
* @return void
*/
public function setContainer(Container $container)
{
$this->container = $container;
}
}
TimeoutException.php 0000666 00000000166 13436756220 0010601 0 ustar 00 setupContainer($container ?: new Container);
// Once we have the container setup, we will setup the default configuration
// options in the container "config" bindings. This just makes this queue
// manager behave correctly since all the correct binding are in place.
$this->setupDefaultConfiguration();
$this->setupManager();
$this->registerConnectors();
}
/**
* Setup the default queue configuration options.
*
* @return void
*/
protected function setupDefaultConfiguration()
{
$this->container['config']['queue.default'] = 'default';
}
/**
* Build the queue manager instance.
*
* @return void
*/
protected function setupManager()
{
$this->manager = new QueueManager($this->container);
}
/**
* Register the default connectors that the component ships with.
*
* @return void
*/
protected function registerConnectors()
{
$provider = new QueueServiceProvider($this->container);
$provider->registerConnectors($this->manager);
}
/**
* Get a connection instance from the global manager.
*
* @param string $connection
* @return \Illuminate\Contracts\Queue\Queue
*/
public static function connection($connection = null)
{
return static::$instance->getConnection($connection);
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function push($job, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->push($job, $data, $queue);
}
/**
* Push a new an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function bulk($jobs, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->bulk($jobs, $data, $queue);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @param string $connection
* @return mixed
*/
public static function later($delay, $job, $data = '', $queue = null, $connection = null)
{
return static::$instance->connection($connection)->later($delay, $job, $data, $queue);
}
/**
* Get a registered connection instance.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
public function getConnection($name = null)
{
return $this->manager->connection($name);
}
/**
* Register a connection with the manager.
*
* @param array $config
* @param string $name
* @return void
*/
public function addConnection(array $config, $name = 'default')
{
$this->container['config']["queue.connections.{$name}"] = $config;
}
/**
* Get the queue manager instance.
*
* @return \Illuminate\Queue\QueueManager
*/
public function getQueueManager()
{
return $this->manager;
}
/**
* Pass dynamic instance methods to the manager.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public function __call($method, $parameters)
{
return $this->manager->$method(...$parameters);
}
/**
* Dynamically pass methods to the default connection.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public static function __callStatic($method, $parameters)
{
return static::connection()->$method(...$parameters);
}
}
SerializesModels.php 0000666 00000002461 13436756220 0010552 0 ustar 00 getProperties();
foreach ($properties as $property) {
$property->setValue($this, $this->getSerializedPropertyValue(
$this->getPropertyValue($property)
));
}
return array_map(function ($p) {
return $p->getName();
}, $properties);
}
/**
* Restore the model after serialization.
*
* @return void
*/
public function __wakeup()
{
foreach ((new ReflectionClass($this))->getProperties() as $property) {
$property->setValue($this, $this->getRestoredPropertyValue(
$this->getPropertyValue($property)
));
}
}
/**
* Get the property value for the given property.
*
* @param \ReflectionProperty $property
* @return mixed
*/
protected function getPropertyValue(ReflectionProperty $property)
{
$property->setAccessible(true);
return $property->getValue($this);
}
}
SerializesAndRestoresModelIdentifiers.php 0000666 00000003447 13436756220 0014734 0 ustar 00 getQueueableClass(), $value->getQueueableIds());
}
if ($value instanceof QueueableEntity) {
return new ModelIdentifier(get_class($value), $value->getQueueableId());
}
return $value;
}
/**
* Get the restored property value after deserialization.
*
* @param mixed $value
* @return mixed
*/
protected function getRestoredPropertyValue($value)
{
if (! $value instanceof ModelIdentifier) {
return $value;
}
return is_array($value->id)
? $this->restoreCollection($value)
: (new $value->class)->newQuery()->useWritePdo()->findOrFail($value->id);
}
/**
* Restore a queueable collection instance.
*
* @param \Illuminate\Contracts\Database\ModelIdentifier $value
* @return \Illuminate\Database\Eloquent\Collection
*/
protected function restoreCollection($value)
{
if (! $value->class || count($value->id) === 0) {
return new EloquentCollection;
}
$model = new $value->class;
return $model->newQuery()->useWritePdo()
->whereIn($model->getKeyName(), $value->id)->get();
}
}
BeanstalkdQueue.php 0000666 00000007222 13436756220 0010351 0 ustar 00 default = $default;
$this->timeToRun = $timeToRun;
$this->pheanstalk = $pheanstalk;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
$queue = $this->getQueue($queue);
return (int) $this->pheanstalk->statsTube($queue)->total_jobs;
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pheanstalk->useTube($this->getQueue($queue))->put(
$payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun
);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));
return $pheanstalk->put($payload, Pheanstalk::DEFAULT_PRIORITY, $this->getSeconds($delay), $this->timeToRun);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$job = $this->pheanstalk->watchOnly($queue)->reserve(0);
if ($job instanceof PheanstalkJob) {
return new BeanstalkdJob($this->container, $this->pheanstalk, $job, $queue);
}
}
/**
* Delete a message from the Beanstalk queue.
*
* @param string $queue
* @param string $id
* @return void
*/
public function deleteMessage($queue, $id)
{
$this->pheanstalk->useTube($this->getQueue($queue))->delete($id);
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
return $queue ?: $this->default;
}
/**
* Get the underlying Pheanstalk instance.
*
* @return \Pheanstalk\Pheanstalk
*/
public function getPheanstalk()
{
return $this->pheanstalk;
}
}
NullQueue.php 0000666 00000002541 13436756220 0007212 0 ustar 00 table = $table;
$this->resolver = $resolver;
$this->database = $database;
}
/**
* Log a failed job into storage.
*
* @param string $connection
* @param string $queue
* @param string $payload
* @param \Exception $exception
* @return int|null
*/
public function log($connection, $queue, $payload, $exception)
{
$failed_at = Carbon::now();
$exception = (string) $exception;
return $this->getTable()->insertGetId(compact(
'connection', 'queue', 'payload', 'exception', 'failed_at'
));
}
/**
* Get a list of all of the failed jobs.
*
* @return array
*/
public function all()
{
return $this->getTable()->orderBy('id', 'desc')->get()->all();
}
/**
* Get a single failed job.
*
* @param mixed $id
* @return array
*/
public function find($id)
{
return $this->getTable()->find($id);
}
/**
* Delete a single failed job from storage.
*
* @param mixed $id
* @return bool
*/
public function forget($id)
{
return $this->getTable()->where('id', $id)->delete() > 0;
}
/**
* Flush all of the failed jobs from storage.
*
* @return void
*/
public function flush()
{
$this->getTable()->delete();
}
/**
* Get a new query builder instance for the table.
*
* @return \Illuminate\Database\Query\Builder
*/
protected function getTable()
{
return $this->resolver->connection($this->database)->table($this->table);
}
}
Worker.php 0000666 00000026762 13436756220 0006557 0 ustar 00 events = $events;
$this->manager = $manager;
$this->exceptions = $exceptions;
}
/**
* Listen to the given queue in a loop.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
{
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
if ($this->daemonShouldRun()) {
$this->runNextJobForDaemon($connectionName, $queue, $options);
} else {
$this->sleep($options->sleep);
}
if ($this->memoryExceeded($options->memory) ||
$this->queueShouldRestart($lastRestart)) {
$this->stop();
}
}
}
/**
* Determine if the daemon should process on this iteration.
*
* @return bool
*/
protected function daemonShouldRun()
{
return ! $this->manager->isDownForMaintenance();
}
/**
* Run the next job for the daemon worker.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runNextJobForDaemon($connectionName, $queue, WorkerOptions $options)
{
if (! $options->timeout) {
$this->runNextJob($connectionName, $queue, $options);
} elseif ($processId = pcntl_fork()) {
$this->waitForChildProcess($processId, $options->timeout);
} else {
$this->runNextJob($connectionName, $queue, $options);
exit;
}
}
/**
* Wait for the given child process to finish.
*
* @param int $processId
* @param int $timeout
* @return void
*/
protected function waitForChildProcess($processId, $timeout)
{
declare(ticks=1) {
pcntl_signal(SIGALRM, function () use ($processId, $timeout) {
posix_kill($processId, SIGKILL);
$this->exceptions->report(new TimeoutException("Queue child process timed out after {$timeout} seconds."));
}, true);
pcntl_alarm($timeout);
pcntl_waitpid($processId, $status);
pcntl_alarm(0);
}
}
/**
* Process the next job on the queue.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
try {
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
return $this->process(
$connectionName, $job, $options
);
}
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
$this->exceptions->report(new FatalThrowableError($e));
}
$this->sleep($options->sleep);
}
/**
* Get the next job from the queue connection.
*
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
protected function getNextJob($connection, $queue)
{
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
}
/**
* Process a given job from the queue.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connectionName, $job, WorkerOptions $options)
{
try {
$this->raiseBeforeJobEvent($connectionName, $job);
// Here we will fire off the job and let it process. We will catch any exceptions so
// they can be reported to the developers logs, etc. Once the job is finished the
// proper events will be fired to let any listeners know this job has finished.
$job->fire();
$this->raiseAfterJobEvent($connectionName, $job);
} catch (Exception $e) {
$this->handleJobException($connectionName, $job, $options, $e);
} catch (Throwable $e) {
$this->handleJobException(
$connectionName, $job, $options, new FatalThrowableError($e)
);
}
}
/**
* Handle an exception that occurred while the job was running.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @param \Exception $e
* @return void
*
* @throws \Exception
*/
protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
// If we catch an exception, we will attempt to release the job back onto the queue
// so it is not lost entirely. This'll let the job be retried at a later time by
// another listener (or this same one). We will re-throw this exception after.
try {
$this->markJobAsFailedIfHasExceededMaxAttempts(
$connectionName, $job, $options->maxTries, $e
);
$this->raiseExceptionOccurredJobEvent(
$connectionName, $job, $e
);
} finally {
if (! $job->isDeleted()) {
$job->release($options->delay);
}
}
throw $e;
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @param \Exception $e
* @return void
*/
protected function markJobAsFailedIfHasExceededMaxAttempts(
$connectionName, $job, $maxTries, $e
) {
if ($maxTries === 0 || $job->attempts() < $maxTries) {
return;
}
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
$job->delete();
$job->failed($e);
$this->raiseFailedJobEvent($connectionName, $job, $e);
}
/**
* Raise the before queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent($connectionName, $job)
{
$this->events->fire(new Events\JobProcessing(
$connectionName, $job
));
}
/**
* Raise the after queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent($connectionName, $job)
{
$this->events->fire(new Events\JobProcessed(
$connectionName, $job
));
}
/**
* Raise the exception occurred queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e)
{
$this->events->fire(new Events\JobExceptionOccurred(
$connectionName, $job, $e
));
}
/**
* Raise the failed queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function raiseFailedJobEvent($connectionName, $job, $e)
{
$this->events->fire(new Events\JobFailed(
$connectionName, $job, $e
));
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @return void
*/
public function stop()
{
$this->events->fire(new Events\WorkerStopping);
die;
}
/**
* Sleep the script for a given number of seconds.
*
* @param int $seconds
* @return void
*/
public function sleep($seconds)
{
sleep($seconds);
}
/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}
/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Set the cache repository implementation.
*
* @param \Illuminate\Contracts\Cache\Repository $cache
* @return void
*/
public function setCache(CacheContract $cache)
{
$this->cache = $cache;
}
/**
* Get the queue manager instance.
*
* @return \Illuminate\Queue\QueueManager
*/
public function getManager()
{
return $this->manager;
}
/**
* Set the queue manager instance.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function setManager(QueueManager $manager)
{
$this->manager = $manager;
}
}
SyncQueue.php 0000666 00000010620 13436756220 0007211 0 ustar 00 resolveJob($this->createPayload($job, $data, $queue));
try {
$this->raiseBeforeJobEvent($queueJob);
$queueJob->fire();
$this->raiseAfterJobEvent($queueJob);
} catch (Exception $e) {
$this->handleSyncException($queueJob, $e);
} catch (Throwable $e) {
$this->handleSyncException($queueJob, new FatalThrowableError($e));
}
return 0;
}
/**
* Handle an exception that occured while processing a job.
*
* @param \Illuminate\Queue\Jobs\Job $queueJob
* @param \Exception $e
* @return void
*/
protected function handleSyncException($queueJob, $e)
{
$this->raiseExceptionOccurredJobEvent($queueJob, $e);
$this->handleFailedJob($queueJob, $e);
throw $e;
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
//
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->push($job, $data, $queue);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
//
}
/**
* Resolve a Sync job instance.
*
* @param string $payload
* @return \Illuminate\Queue\Jobs\SyncJob
*/
protected function resolveJob($payload)
{
return new SyncJob($this->container, $payload);
}
/**
* Raise the before queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent(Job $job)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessing('sync', $job));
}
}
/**
* Raise the after queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent(Job $job)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessed('sync', $job));
}
}
/**
* Raise the exception occurred queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function raiseExceptionOccurredJobEvent(Job $job, $e)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobExceptionOccurred('sync', $job, $e));
}
}
/**
* Handle the failed job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return array
*/
protected function handleFailedJob(Job $job, $e)
{
$job->failed($e);
$this->raiseFailedJobEvent($job, $e);
}
/**
* Raise the failed queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function raiseFailedJobEvent(Job $job, $e)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobFailed('sync', $job, $e));
}
}
}
composer.json 0000666 00000002265 13436756220 0007307 0 ustar 00 {
"name": "illuminate/queue",
"description": "The Illuminate Queue package.",
"license": "MIT",
"homepage": "https://laravel.com",
"support": {
"issues": "https://github.com/laravel/framework/issues",
"source": "https://github.com/laravel/framework"
},
"authors": [
{
"name": "Taylor Otwell",
"email": "taylor@laravel.com"
}
],
"require": {
"php": ">=5.6.4",
"illuminate/console": "5.3.*",
"illuminate/container": "5.3.*",
"illuminate/contracts": "5.3.*",
"illuminate/support": "5.3.*",
"nesbot/carbon": "~1.20",
"symfony/debug": "3.1.*",
"symfony/process": "3.1.*"
},
"autoload": {
"psr-4": {
"Illuminate\\Queue\\": ""
}
},
"extra": {
"branch-alias": {
"dev-master": "5.3-dev"
}
},
"suggest": {
"aws/aws-sdk-php": "Required to use the SQS queue driver (~3.0).",
"illuminate/redis": "Required to use the Redis queue driver (5.3.*).",
"pda/pheanstalk": "Required to use the Beanstalk queue driver (~3.0)."
},
"minimum-stability": "dev"
}
DatabaseQueue.php 0000666 00000020355 13436756220 0010007 0 ustar 00 table = $table;
$this->expire = $expire;
$this->default = $default;
$this->database = $database;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->count();
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pushToDatabase(0, $queue, $payload);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return void
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
}
/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function bulk($jobs, $data = '', $queue = null)
{
$queue = $this->getQueue($queue);
$availableAt = $this->getAvailableAt(0);
$records = array_map(function ($job) use ($queue, $data, $availableAt) {
return $this->buildDatabaseRecord(
$queue, $this->createPayload($job, $data), $availableAt
);
}, (array) $jobs);
return $this->database->table($this->table)->insert($records);
}
/**
* Release a reserved job back onto the queue.
*
* @param string $queue
* @param \StdClass $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
}
/**
* Push a raw payload to the database with a given delay.
*
* @param \DateTime|int $delay
* @param string|null $queue
* @param string $payload
* @param int $attempts
* @return mixed
*/
protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
{
$attributes = $this->buildDatabaseRecord(
$this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts
);
return $this->database->table($this->table)->insertGetId($attributes);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$this->database->beginTransaction();
if ($job = $this->getNextAvailableJob($queue)) {
$job = $this->markJobAsReserved($job);
$this->database->commit();
return new DatabaseJob(
$this->container, $this, $job, $queue
);
}
$this->database->commit();
}
/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('id', 'asc')
->first();
return $job ? (object) $job : null;
}
/**
* Modify the query to check for available jobs.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isAvailable($query)
{
$query->where(function ($query) {
$query->whereNull('reserved_at');
$query->where('available_at', '<=', $this->getTime());
});
}
/**
* Modify the query to check for jobs that are reserved but have expired.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isReservedButExpired($query)
{
$expiration = Carbon::now()->subSeconds($this->expire)->getTimestamp();
$query->orWhere(function ($query) use ($expiration) {
$query->where('reserved_at', '<=', $expiration);
});
}
/**
* Mark the given job ID as reserved.
*
* @param \stdClass $job
* @return \stdClass
*/
protected function markJobAsReserved($job)
{
$job->attempts = $job->attempts + 1;
$job->reserved_at = $this->getTime();
$this->database->table($this->table)->where('id', $job->id)->update([
'reserved_at' => $job->reserved_at,
'attempts' => $job->attempts,
]);
return $job;
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $id
* @return void
*/
public function deleteReserved($queue, $id)
{
$this->database->beginTransaction();
if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
$this->database->table($this->table)->where('id', $id)->delete();
}
$this->database->commit();
}
/**
* Get the "available at" UNIX timestamp.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getAvailableAt($delay)
{
$availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
return $availableAt->getTimestamp();
}
/**
* Create an array to insert for the given job.
*
* @param string|null $queue
* @param string $payload
* @param int $availableAt
* @param int $attempts
* @return array
*/
protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
{
return [
'queue' => $queue,
'attempts' => $attempts,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->getTime(),
'payload' => $payload,
];
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return $queue ?: $this->default;
}
/**
* Get the underlying database instance.
*
* @return \Illuminate\Database\Connection
*/
public function getDatabase()
{
return $this->database;
}
}
Jobs/SqsJob.php 0000666 00000004752 13436756220 0007377 0 ustar 00 sqs = $sqs;
$this->job = $job;
$this->queue = $queue;
$this->container = $container;
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job['Body'];
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->sqs->deleteMessage([
'QueueUrl' => $this->queue, 'ReceiptHandle' => $this->job['ReceiptHandle'],
]);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->sqs->changeMessageVisibility([
'QueueUrl' => $this->queue,
'ReceiptHandle' => $this->job['ReceiptHandle'],
'VisibilityTimeout' => $delay,
]);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return (int) $this->job['Attributes']['ApproximateReceiveCount'];
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job['MessageId'];
}
/**
* Get the underlying SQS client instance.
*
* @return \Aws\Sqs\SqsClient
*/
public function getSqs()
{
return $this->sqs;
}
/**
* Get the underlying raw SQS job.
*
* @return array
*/
public function getSqsJob()
{
return $this->job;
}
}
Jobs/Job.php 0000666 00000011457 13436756220 0006710 0 ustar 00 payload();
list($class, $method) = $this->parseJob($payload['job']);
$this->instance = $this->resolve($class);
$this->instance->{$method}($this, $payload['data']);
}
/**
* Resolve the given job handler.
*
* @param string $class
* @return mixed
*/
protected function resolve($class)
{
return $this->container->make($class);
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
$this->deleted = true;
}
/**
* Determine if the job has been deleted.
*
* @return bool
*/
public function isDeleted()
{
return $this->deleted;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
$this->released = true;
}
/**
* Determine if the job was released back into the queue.
*
* @return bool
*/
public function isReleased()
{
return $this->released;
}
/**
* Determine if the job has been deleted or released.
*
* @return bool
*/
public function isDeletedOrReleased()
{
return $this->isDeleted() || $this->isReleased();
}
/**
* Call the failed method on the job instance.
*
* @param \Exception $e
* @return void
*/
public function failed($e)
{
$payload = $this->payload();
list($class, $method) = $this->parseJob($payload['job']);
$this->instance = $this->resolve($class);
if (method_exists($this->instance, 'failed')) {
$this->instance->failed($payload['data'], $e);
}
}
/**
* Parse the job declaration into class and method.
*
* @param string $job
* @return array
*/
protected function parseJob($job)
{
$segments = explode('@', $job);
return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
}
/**
* Calculate the number of seconds with the given delay.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getSeconds($delay)
{
if ($delay instanceof DateTime) {
return max(0, $delay->getTimestamp() - $this->getTime());
}
return (int) $delay;
}
/**
* Get the current system time.
*
* @return int
*/
protected function getTime()
{
return time();
}
/**
* Get the name of the queued job class.
*
* @return string
*/
public function getName()
{
return $this->payload()['job'];
}
/**
* Get the resolved name of the queued job class.
*
* @return string
*/
public function resolveName()
{
$name = $this->getName();
$payload = $this->payload();
if ($name === 'Illuminate\Queue\CallQueuedHandler@call') {
return Arr::get($payload, 'data.commandName', $name);
}
if ($name === 'Illuminate\Events\CallQueuedHandler@call') {
return $payload['data']['class'].'@'.$payload['data']['method'];
}
return $name;
}
/**
* Get the decoded body of the job.
*
* @return array
*/
public function payload()
{
return json_decode($this->getRawBody(), true);
}
/**
* Get the name of the queue the job belongs to.
*
* @return string
*/
public function getQueue()
{
return $this->queue;
}
/**
* Get the service container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
}
Jobs/SyncJob.php 0000666 00000002524 13436756220 0007540 0 ustar 00 payload = $payload;
$this->container = $container;
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->payload;
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return 1;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return '';
}
}
Jobs/RedisJob.php 0000666 00000005147 13436756220 0007676 0 ustar 00 job = $job;
$this->redis = $redis;
$this->queue = $queue;
$this->reserved = $reserved;
$this->container = $container;
$this->decoded = $this->payload();
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job;
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this->reserved);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->redis->deleteAndRelease($this->queue, $this->reserved, $delay);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return Arr::get($this->decoded, 'attempts');
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return Arr::get($this->decoded, 'id');
}
/**
* Get the underlying queue driver instance.
*
* @return \Illuminate\Redis\Database
*/
public function getRedisQueue()
{
return $this->redis;
}
/**
* Get the underlying reserved Redis job.
*
* @return string
*/
public function getReservedJob()
{
return $this->reserved;
}
}
Jobs/DatabaseJob.php 0000666 00000004740 13436756220 0010332 0 ustar 00 job = $job;
$this->queue = $queue;
$this->database = $database;
$this->container = $container;
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->database->deleteReserved($this->queue, $this->job->id);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->database->release($this->queue, $this->job, $delay);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return (int) $this->job->attempts;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job->id;
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->payload;
}
/**
* Get the IoC container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
/**
* Get the underlying queue driver instance.
*
* @return \Illuminate\Queue\DatabaseQueue
*/
public function getDatabaseQueue()
{
return $this->database;
}
/**
* Get the underlying database job.
*
* @return \StdClass
*/
public function getDatabaseJob()
{
return $this->job;
}
}
Jobs/BeanstalkdJob.php 0000666 00000005251 13436756220 0010674 0 ustar 00 job = $job;
$this->queue = $queue;
$this->container = $container;
$this->pheanstalk = $pheanstalk;
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->getData();
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->pheanstalk->delete($this->job);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$priority = Pheanstalk::DEFAULT_PRIORITY;
$this->pheanstalk->release($this->job, $priority, $delay);
}
/**
* Bury the job in the queue.
*
* @return void
*/
public function bury()
{
parent::release();
$this->pheanstalk->bury($this->job);
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
$stats = $this->pheanstalk->statsJob($this->job);
return (int) $stats->reserves;
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job->getId();
}
/**
* Get the underlying Pheanstalk instance.
*
* @return \Pheanstalk\Pheanstalk
*/
public function getPheanstalk()
{
return $this->pheanstalk;
}
/**
* Get the underlying Pheanstalk job.
*
* @return \Pheanstalk\Job
*/
public function getPheanstalkJob()
{
return $this->job;
}
}
CallQueuedHandler.php 0000666 00000003777 13436756220 0010631 0 ustar 00 dispatcher = $dispatcher;
}
/**
* Handle the queued job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function call(Job $job, array $data)
{
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
$handler = $this->dispatcher->getCommandHandler($command) ?: null;
if ($handler) {
$this->setJobInstanceIfNecessary($job, $handler);
}
$this->dispatcher->dispatchNow($command, $handler);
if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}
/**
* Set the job instance of the given class if necessary.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param mixed $instance
* @return mixed
*/
protected function setJobInstanceIfNecessary(Job $job, $instance)
{
if (in_array(InteractsWithQueue::class, class_uses_recursive(get_class($instance)))) {
$instance->setJob($job);
}
return $instance;
}
/**
* Call the failed method on the job instance.
*
* The exception that caused the failure will be passed.
*
* @param array $data
* @param \Exception $e
* @return void
*/
public function failed(array $data, $e)
{
$command = unserialize($data['command']);
if (method_exists($command, 'failed')) {
$command->failed($e);
}
}
}
Listener.php 0000666 00000014164 13436756220 0007064 0 ustar 00 commandPath = $commandPath;
$this->workerCommand = $this->buildWorkerCommand();
}
/**
* Build the environment specific worker command.
*
* @return string
*/
protected function buildWorkerCommand()
{
$binary = ProcessUtils::escapeArgument((new PhpExecutableFinder)->find(false));
$artisan = defined('ARTISAN_BINARY') ? ProcessUtils::escapeArgument(ARTISAN_BINARY) : 'artisan';
$command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';
return "{$binary} {$artisan} {$command}";
}
/**
* Listen to the given queue connection.
*
* @param string $connection
* @param string $queue
* @param string $delay
* @param string $memory
* @param int $timeout
* @return void
*/
public function listen($connection, $queue, $delay, $memory, $timeout = 60)
{
$process = $this->makeProcess($connection, $queue, $delay, $memory, $timeout);
while (true) {
$this->runProcess($process, $memory);
}
}
/**
* Run the given process.
*
* @param \Symfony\Component\Process\Process $process
* @param int $memory
* @return void
*/
public function runProcess(Process $process, $memory)
{
$process->run(function ($type, $line) {
$this->handleWorkerOutput($type, $line);
});
// Once we have run the job we'll go check if the memory limit has been
// exceeded for the script. If it has, we will kill this script so a
// process manager will restart this with a clean slate of memory.
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}
/**
* Create a new Symfony process for the worker.
*
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @param int $timeout
* @return \Symfony\Component\Process\Process
*/
public function makeProcess($connection, $queue, $delay, $memory, $timeout)
{
$string = $this->workerCommand;
// If the environment is set, we will append it to the command string so the
// workers will run under the specified environment. Otherwise, they will
// just run under the production environment which is not always right.
if (isset($this->environment)) {
$string .= ' --env='.ProcessUtils::escapeArgument($this->environment);
}
// Next, we will just format out the worker commands with all of the various
// options available for the command. This will produce the final command
// line that we will pass into a Symfony process object for processing.
$command = sprintf(
$string,
ProcessUtils::escapeArgument($connection),
ProcessUtils::escapeArgument($queue),
$delay,
$memory,
$this->sleep,
$this->maxTries
);
return new Process($command, $this->commandPath, null, null, $timeout);
}
/**
* Handle output from the worker process.
*
* @param int $type
* @param string $line
* @return void
*/
protected function handleWorkerOutput($type, $line)
{
if (isset($this->outputHandler)) {
call_user_func($this->outputHandler, $type, $line);
}
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @return void
*/
public function stop()
{
die;
}
/**
* Set the output handler callback.
*
* @param \Closure $outputHandler
* @return void
*/
public function setOutputHandler(Closure $outputHandler)
{
$this->outputHandler = $outputHandler;
}
/**
* Get the current listener environment.
*
* @return string
*/
public function getEnvironment()
{
return $this->environment;
}
/**
* Set the current environment.
*
* @param string $environment
* @return void
*/
public function setEnvironment($environment)
{
$this->environment = $environment;
}
/**
* Get the amount of seconds to wait before polling the queue.
*
* @return int
*/
public function getSleep()
{
return $this->sleep;
}
/**
* Set the amount of seconds to wait before polling the queue.
*
* @param int $sleep
* @return void
*/
public function setSleep($sleep)
{
$this->sleep = $sleep;
}
/**
* Set the amount of times to try a job before logging it failed.
*
* @param int $tries
* @return void
*/
public function setMaxTries($tries)
{
$this->maxTries = $tries;
}
}
SqsQueue.php 0000666 00000007102 13436756220 0007044 0 ustar 00 sqs = $sqs;
$this->prefix = $prefix;
$this->default = $default;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
return (int) $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
])->get('ApproximateNumberOfMessages');
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
]);
return $response->get('MessageId');
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'DelaySeconds' => $delay,
])->get('MessageId');
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue,
'AttributeNames' => ['ApproximateReceiveCount'],
]);
if (count($response['Messages']) > 0) {
return new SqsJob($this->container, $this->sqs, $queue, $response['Messages'][0]);
}
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
$queue = $queue ?: $this->default;
if (filter_var($queue, FILTER_VALIDATE_URL) !== false) {
return $queue;
}
return rtrim($this->prefix, '/').'/'.($queue);
}
/**
* Get the underlying SQS instance.
*
* @return \Aws\Sqs\SqsClient
*/
public function getSqs()
{
return $this->sqs;
}
}