Created
February 12, 2018 14:31
-
-
Save ryantxr/fd46991bbe5ceb01c092c15435386385 to your computer and use it in GitHub Desktop.
Use Beanstalk and PHP to make an asynchronous system
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BeanstalkClient extends AbstractBaseQueue{ | |
public $queue; | |
public $host; | |
public $port; | |
public $timeout; | |
function __construct($timeout=null) { | |
$this->loadClasses(); | |
$this->host = '127.0.0.1; | |
$this->port = BEANSTALK_PORT; | |
$this->timeout = 30; | |
$this->connect(); | |
} | |
public function connect(){ | |
$this->queue = new \Pheanstalk\Pheanstalk($this->host, $this->port); | |
} | |
public function publish($tube, $data, $delay){ | |
$payload = $this->encodeData($data); | |
$this->queue->useTube($tube)->put($payload, | |
\Pheanstalk\PheanstalkInterface::DEFAULT_PRIORITY, $delay); | |
} | |
public function waitForMessages($tube, $callback=null){ | |
if ( $this->timeout ) { | |
return $this->queue->watchOnly($tube)->reserve($this->timeout); | |
} | |
return $this->queue->watchOnly($tube)->reserve(); | |
} | |
public function delete($message){ | |
$this->queue->delete($message); | |
} | |
public function encodeData($data){ | |
$payload = json_encode($data); | |
return $payload; | |
} | |
public function decodeData($encodedData) { | |
return json_decode($encodedData, true); | |
} | |
public function getData($message){ | |
if ( is_string($message) ) { | |
throw new Exception('message is a string'); | |
} | |
return json_decode($message->getData(), true); | |
} | |
} | |
abstract class BaseQueueProcess { | |
protected $channelName = ''; // child class should set this | |
// The queue object | |
public $queue = null; | |
public $processId = null; // this is the system process id | |
public $name = null; | |
public $status = null; | |
public function initialize() { | |
$this->processId = getmypid(); | |
$this->name = get_called_class(); | |
$this->endTime = time() + (2 * 60 * 60); // restart every hour | |
// seconds to timeout when waiting for a message | |
// if the process isn't doing anything, timeout so they have a chance to do housekeeping. | |
$queueTimeout = 900; | |
if ( empty($this->queue) ) { | |
$this->queue = new BeanstalkClient($queueTimeout); | |
} | |
} | |
public function receiveMessage($queueMessage) { | |
$taskData = $this->queue->getData($queueMessage); | |
// debuglog(' Task Data = ' . print_r($taskData, true)); | |
if ( $this->validateTaskData($taskData) ) { | |
// process the message | |
$good = $this->didReceiveMessage($taskData); | |
if ( $good !== false ) { | |
// debuglog("Completing task {$this->taskId}"); | |
$this->completeTask($queueMessage); | |
} | |
else { | |
$this->failTask($queueMessage); | |
} | |
} | |
else { | |
// Handle bad message | |
$this->queue->delete($queueMessage); | |
} | |
} | |
public function run() { | |
$this->processName = $this->channelName; | |
// debuglog('Start ' . $this->processName); | |
// debuglog(print_r($this->params, true)); | |
while(1) { | |
$queueMessage = $this->queue->waitForMessages($this->channelName); | |
if ( ! empty($queueMessage) ) { | |
$this->receiveMessage($queueMessage); | |
} | |
else { | |
// empty message | |
// a timeout | |
// // debuglog("empty message " . get_called_class()); | |
} | |
$memory = memory_get_usage(); | |
if( $memory > 20000000 ) { | |
// debuglog('Exit '.get_called_class().' due to memory. Memory:'. ($memory/1024/1024).' MB'); | |
exit; | |
} | |
elseif ( time() > $this->endTime ) { | |
// debuglog('Exit '.get_called_class().' due to time.'); | |
exit; | |
} | |
usleep(10); | |
} | |
} | |
public function completeTask($queueMessage) { | |
// | |
$this->queue->delete($queueMessage); | |
} | |
public function failTask($queueMessage) { | |
// | |
$this->queue->delete($queueMessage); | |
} | |
} | |
class MyProcess extends BaseQueueProcess { | |
public function initialize() { | |
$this->channelName = 'Temperature'; | |
parent::initialize(); | |
} | |
public function didReceiveMessage($taskData) { | |
// debuglog(print_r($taskData, true)); | |
// process data here | |
// return false if something went wrong | |
return true; | |
} | |
} | |
Sender | |
class WorkSender { | |
const TubeName = 'Temperature'; | |
const TubeDelay = 0; // Set delay to 0, i.e. don't use a delay. | |
function send($data) { | |
$c = BeanstalkClient(); | |
$c->publish(self::TubeName, $data, self::TubeDelay); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment