use Predis\Client;
use Predis\Transaction\MultiExec;
use Ramsey\Uuid\Uuid;
/**
* Class Semaphore
*
* A fair, race free implementation of a counting semaphore, based on the algorithm description in
* section 6.3 of "Redis in Action"
* (https://redislabs.com/ebook/part-2-core-concepts/chapter-6-application-components-in-redis/6-3-counting-semaphores/)
*
* @package CodeOrange\RedisCountingSemaphore
*/
class Semaphore {
private $client;
private $name;
private $limit;
private $timeout;
/** @var string $identifier Identifier for the acquired semaphore */
private $identifier = null;
/**
* Semaphore constructor.
* @param Client $client Predis client with an open connection
* @param string $name Name of the semaphore
* @param int $limit The amount of resources this semaphore protects
* @param int $timeout Timeout of an acquired semaphore, in seconds
*/
public function __construct(Client $client, $name, $limit = 1, $timeout = 10) {
$this->client = $client;
$this->name = $name;
$this->limit = $limit;
$this->timeout = $timeout;
}
/**
* @return string
*/
public function getIdentifier() {
return $this->identifier;
}
/**
* Try to acquire a semaphore
*
* @param float $sleep Number of seconds to sleep between retries. If null, this function will not retry but return immediately.
* @param int $retries Number of times to retry before giving up
* @return bool Whether or not the semaphore was acquired correctly
*/
public function acquire($sleep = null, $retries = null) {
if ($this->identifier) {
// We already have it
return true;
}
if ($sleep == null || $retries == null) {
$acquired = $this->acquire_fair_with_lock();
return $acquired;
}
while ($sleep > 0 && $retries > 0) {
$acquired = $this->acquire_fair_with_lock();
if ($acquired) {
return true;
}
$retries -= 1;
sleep($sleep);
}
return false;
}
/**
* Release this semaphore
*
* @return void
*/
public function release() {
if (!$this->identifier) {
// We didn't have it
return;
}
if (!$this->client->isConnected()) {
$this->client = get_predis_client();
}
$this->release_fair();
}
/**
* Refresh the semaphore
*
* @return bool Whether or not we still have the semaphore
*/
public function refresh() {
if (!$this->identifier) {
return false;
}
if (!$this->client->isConnected()) {
$this->client = get_predis_client();
}
return $this->refresh_fair();
}
//<editor-fold desc="Methods as built up to in the book">
private function acquire_unfair() {
$identifier = (string)Uuid::uuid4();
$now = time();
$transaction = $this->client->transaction();
// Time out old identifiers
$transaction->zremrangebyscore($this->name, '-inf', $now - $this->timeout);
// Try to acquire semaphore
$transaction->zadd($this->name, [$identifier => $now]);
// Check to see if we have it
$transaction->zrank($this->name, $identifier);
$result = $transaction->execute();
$rank = $result[count($result) - 1];
if ($rank < $this->limit) {
// We got it!
$this->identifier = $identifier;
return true;
}
// We didn't get it, remove the identifier from the table
$this->client->zrem($this->name, $identifier);
return false;
}
private function release_unfair() {
$id = $this->identifier;
$this->identifier = null;
return $this->client->zrem($this->name, $id);
}
private function acquire_fair() {
$identifier = (string)Uuid::uuid4();
$cszet = $this->name . ':owner';
$ctr = $this->name . ':counter';
$now = time();
$transaction = $this->client->transaction();
// Time out old entries
$transaction->zremrangebyscore($this->name, '-inf', $now - $this->timeout);
$transaction->zinterstore($cszet, [$cszet, $this->name], ['weights' => [1, 0]]);
// Get the counter
$transaction->incr($ctr);
$result = $transaction->execute();
$counter = $result[count($result) - 1];
// Try to acquire the semaphore
$transaction = $this->client->transaction();
$transaction->zadd($this->name, [$identifier => $now]);
$transaction->zadd($cszet, [$identifier => $counter]);
// Check the rank to determine if we got the semaphore
$transaction->zrank($cszet, $identifier);
$result = $transaction->execute();
$rank = $result[count($result) - 1];
if ($rank < $this->limit) {
// We got it!
$this->identifier = $identifier;
return true;
}
// We didn't get the semaphore, clean out the bad data
$transaction = $this->client->transaction();
$transaction->zrem($this->name, $identifier);
$transaction->zrem($cszet, $identifier);
$transaction->execute();
return false;
}
private function release_fair() {
$id = $this->identifier;
$this->identifier = null;
$transaction = $this->client->transaction();
$transaction->zrem($this->name, $id);
$transaction->zrem($this->name . ':owner', $id);
return $transaction->execute()[0];
}
private function refresh_fair() {
if ($this->client->zadd($this->name, [$this->identifier => time()])) {
// We lost it
$this->release_fair();
return false;
}
// We still have it
return true;
}
private function acquire_fair_with_lock() {
$identifier = $this->acquire_lock(0.01);
if ($identifier) {
try {
return $this->acquire_fair();
} finally {
$this->release_lock($identifier);
}
}
return false;
}
// From section 6.2 of the book
private function acquire_lock($acquire_timeout = 10) {
$identifier = (string)Uuid::uuid4();
$end = time() + $acquire_timeout;
while (time() < $end) {
$lock_name = 'lock:' . $this->name;
$res = $this->client->setnx($lock_name, $identifier);
if ($res) {
//设置10s的过期时间
$this->client->expire($lock_name, 10);
return $identifier;
}
sleep(0.001);
}
return false;
}
private function release_lock($id) {
$lock_name = 'lock:' . $this->name;
$res = $this->client->transaction(['watch' => $lock_name, 'cas' => true, 'retry' => 1000], function (MultiExec $t) use ($id, $lock_name) {
$value = $t->get($lock_name);
if ($value === $id) {
$t->multi();
$t->del([$lock_name]);
}
});
if ($res) {
return true;
} else {
// We didn't execute anything, so we've lost the lock
return false;
}
}
//</editor-fold>
}