首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。
共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。
官网资料:
中文资料:
详见Locks章节。
原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:
首先,创建一个Maven工程,在pom文件里导入下面的包:
Xml代码
- <dependencies>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>2.8.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.8.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>2.8.0</version>
- </dependency>
- <dependency>
- <groupId>commons-beanutils</groupId>
- <artifactId>commons-beanutils</artifactId>
- <version>1.9.2</version>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.2</version>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.6</version>
- </dependency>
- </dependencies>
LockZookeeperClient接口:
Java代码
- package com.XXX.framework.lock;
- import org.apache.curator.framework.CuratorFramework;
- /**
- *
- * description
- *
- * @author Roadrunners
- * @version 1.0, 2015年7月9日
- */
- public interface LockZookeeperClient {
- /**
- *
- *
- */
- CuratorFramework getCuratorFramework();
- /**
- *
- *
- */
- String getBasePath();
- /**
- * garbage collector
- *
- * @param gcPath
- */
- void gc(String gcPath);
- }
LockZookeeperClient接口的实现LockZookeeperClientFactory:
Java代码
- package com.XXX.framework.lock;
- import java.util.Date;
- import java.util.List;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.concurrent.ConcurrentSkipListSet;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.lang.StringUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- /**
- *
- * description
- *
- * @author Roadrunners
- * @version 1.0, 2015年7月9日
- */
- public class LockZookeeperClientFactory implements LockZookeeperClient {
- private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class);
- private boolean hasGc = true;
- private Timer gcTimer;
- private TimerTask gcTimerTask;
- private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>();
- private int gcIntervalSecond = 60;
- private CuratorFramework curatorFramework;
- private String zookeeperIpPort = "localhost:2181";
- private int sessionTimeoutMs = 10000;
- private int connectionTimeoutMs = 10000;
- private String basePath = "/locks";
- public void setHasGc(boolean hasGc) {
- this.hasGc = hasGc;
- }
- public void setGcIntervalSecond(int gcIntervalSecond) {
- this.gcIntervalSecond = gcIntervalSecond;
- }
- public void setZookeeperIpPort(String zookeeperIpPort) {
- this.zookeeperIpPort = zookeeperIpPort;
- }
- public void setSessionTimeoutMs(int sessionTimeoutMs) {
- this.sessionTimeoutMs = sessionTimeoutMs;
- }
- public void setConnectionTimeoutMs(int connectionTimeoutMs) {
- this.connectionTimeoutMs = connectionTimeoutMs;
- }
- public void setBasePath(String basePath) {
- basePath = basePath.trim();
- if (basePath.endsWith("/")) {
- basePath = basePath.substring(0, basePath.length() - 1);
- }
- this.basePath = basePath;
- }
- public void init() {
- if(StringUtils.isBlank(zookeeperIpPort)){
- throw new NullPointerException("zookeeperIpPort");
- }
- ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
- curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
- curatorFramework.start();
- LOG.info("CuratorFramework initialise succeed.");
- if (hasGc) {
- gc();
- }
- }
- public void destroy() {
- gcPaths.clear();
- gcPaths = null;
- gcStop();
- curatorFramework.close();
- curatorFramework = null;
- }
- @Override
- public void gc(String gcPath) {
- if (hasGc && StringUtils.isNotBlank(gcPath)) {
- gcPaths.add(gcPath.trim());
- }
- }
- @Override
- public CuratorFramework getCuratorFramework() {
- return this.curatorFramework;
- }
- @Override
- public String getBasePath() {
- return this.basePath;
- }
- private synchronized void gc() {
- gcStop();
- try {
- scanningGCNodes();
- } catch (Throwable e) {
- LOG.warn(e);
- }
- gcTimerTask = new TimerTask() {
- @Override
- public void run() {
- doingGc();
- }
- };
- Date begin = new Date();
- begin.setTime(begin.getTime() + (10 * 1000L));
- gcTimer = new Timer("lock-gc", true);
- gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L);
- }
- private synchronized void gcStop() {
- if (null != gcTimer) {
- gcTimer.cancel();
- gcTimer = null;
- }
- if (null != gcTimerTask) {
- gcTimerTask.cancel();
- gcTimerTask = null;
- }
- }
- private synchronized void scanningGCNodes() throws Exception {
- if (null == curatorFramework.checkExists().forPath(basePath)) {
- return;
- }
- List<String> paths = curatorFramework.getChildren().forPath(basePath);
- if (CollectionUtils.isEmpty(paths)) {
- gcPaths.add(basePath);
- return;
- }
- for (String path : paths) {
- try{
- String tmpPath = basePath + "/" + path;
- if (null == curatorFramework.checkExists().forPath(tmpPath)) {
- continue;
- }
- gcPaths.add(tmpPath);
- } catch(Throwable e){
- LOG.warn("scanning gc nodes error.", e);
- }
- }
- }
- private synchronized void doingGc() {
- LOG.debug("GC beginning.");
- if (CollectionUtils.isNotEmpty(gcPaths)) {
- for (String path : gcPaths) {
- try {
- if (null != curatorFramework.checkExists().forPath(path)) {
- if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) {
- curatorFramework.delete().forPath(path);
- gcPaths.remove(path);
- LOG.debug("GC " + path);
- }
- } else {
- gcPaths.remove(path);
- }
- } catch (Throwable e) {
- gcPaths.remove(path);
- LOG.warn(e);
- }
- }
- }
- LOG.debug("GC ended.");
- }
- }
SharedLock共享锁:
Java代码
- package com.XXX.framework.lock.shared;
- import java.util.concurrent.TimeUnit;
- import org.apache.commons.lang.StringUtils;
- import org.apache.curator.framework.recipes.locks.InterProcessLock;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import com.XXX.framework.lock.LockZookeeperClient;
- /**
- *
- * description
- *
- * @author Roadrunners
- * @version 1.0, 2015年7月9日
- */
- public class SharedLock {
- private InterProcessLock interProcessLock;
- public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) {
- super();
- if (StringUtils.isBlank(resourceId)) {
- throw new NullPointerException("resourceId");
- }
- String path = lockZookeeperClient.getBasePath();
- path += ("/" + resourceId.trim());
- interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path);
- lockZookeeperClient.gc(path);
- }
- /**
- * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
- * to {@link #release()}
- *
- * @throws Exception ZK errors, connection interruptions
- */
- public void acquire() throws Exception {
- interProcessLock.acquire();
- }
- /**
- * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call
- * to {@link #release()}
- *
- * @param time time to wait
- * @param unit time unit
- * @return true if the mutex was acquired, false if not
- * @throws Exception ZK errors, connection interruptions
- */
- public boolean acquire(long time, TimeUnit unit) throws Exception {
- return interProcessLock.acquire(time, unit);
- }
- /**
- * Perform one release of the mutex.
- *
- * @throws Exception ZK errors, interruptions, current thread does not own the lock
- */
- public void release() throws Exception {
- interProcessLock.release();
- }
- /**
- * Returns true if the mutex is acquired by a thread in this JVM
- *
- * @return true/false
- */
- public boolean isAcquiredInThisProcess() {
- return interProcessLock.isAcquiredInThisProcess();
- }
- }
到此代码已经完成。下面写一个简单的Demo:
Java代码
- //LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写
- LockZookeeperClientFactory lzc = new LockZookeeperClientFactory();
- lzc.setZookeeperIpPort("<span><span class="string">10.100.15.1</span></span>:8900");
- lzc.setBasePath("/locks/sharedLock/");
- lzc.init();
- SharedLock sharedLock = new SharedLock(lzc, "sharedLock1");
- try {
- if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) {
- System.out.println("sharedLock1 get");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- sharedLock.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- lzc.destroy();
就这样,系统就会每隔一分钟去回收一次没有使用的结点。