From 9ed8ce836eec35562f10b261f08d23089dfcfd1e Mon Sep 17 00:00:00 2001 From: liangjf Date: Wed, 24 Oct 2018 19:12:54 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0zookeeper=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 14 ++ pom.xml | 6 +- .../uid/worker/ZookeeperWorkerIdAssigner.java | 106 +++++++++++++++ .../uid/ZookeeperCachedUidGeneratorTest.java | 127 ++++++++++++++++++ .../uid/ZookeeperDefaultUidGeneratorTest.java | 122 +++++++++++++++++ src/test/resources/uid/mysql.properties | 10 +- .../resources/uid/zoo-cached-uid-spring.xml | 36 +++++ .../resources/uid/zoo-default-uid-spring.xml | 24 ++++ 8 files changed, 441 insertions(+), 4 deletions(-) create mode 100644 .gitignore create mode 100644 src/main/java/com/baidu/fsg/uid/worker/ZookeeperWorkerIdAssigner.java create mode 100644 src/test/java/com/baidu/fsg/uid/ZookeeperCachedUidGeneratorTest.java create mode 100644 src/test/java/com/baidu/fsg/uid/ZookeeperDefaultUidGeneratorTest.java create mode 100644 src/test/resources/uid/zoo-cached-uid-spring.xml create mode 100644 src/test/resources/uid/zoo-default-uid-spring.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8ad10e5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +.settings +.project +*target/ +*/target/* +.classpath +.springBeans +.idea/ +*.iml +*.log +*.txt +*.xml.bak +*rebel.xml +*.DS_Store +*/logs/* diff --git a/pom.xml b/pom.xml index ceffec5..f5beaff 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,11 @@ commons-lang 2.6 - + + org.apache.curator + curator-framework + 2.12.0 + ch.qos.logback diff --git a/src/main/java/com/baidu/fsg/uid/worker/ZookeeperWorkerIdAssigner.java b/src/main/java/com/baidu/fsg/uid/worker/ZookeeperWorkerIdAssigner.java new file mode 100644 index 0000000..78703bf --- /dev/null +++ b/src/main/java/com/baidu/fsg/uid/worker/ZookeeperWorkerIdAssigner.java @@ -0,0 +1,106 @@ +/** + * hsjry.com Inc. + * Copyright (c) 2014-2018 All Rights Reserved. + */ +package com.baidu.fsg.uid.worker; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.data.Stat; + +import com.baidu.fsg.uid.exception.UidGenerateException; + +/** + * zookeeper模式WorkerID分配器 + * @author liangjf + * @version $Id: ZookeeperWorkerIdAssigner.java, v 1.0 2018年10月24日 下午3:26:50 liangjf Exp $ + * @since 1.0 + */ +public class ZookeeperWorkerIdAssigner implements WorkerIdAssigner { + + /** 服务器地址 */ + private String servers = "127.0.0.1:2181"; + /** 初试时间 */ + private int baseSleepTimeMs = 1000; + /** 重试次数 */ + private int maxRetries = 3; + /** 会话超时时间 */ + private int sessionTimeoutMs = 5000; + /** 结点路径 */ + private String path = "/sq"; + + private CuratorFramework client; + + public String getServers() { + return servers; + } + + public void setServers(String servers) { + this.servers = servers; + } + + public int getBaseSleepTimeMs() { + return baseSleepTimeMs; + } + + public void setBaseSleepTimeMs(int baseSleepTimeMs) { + this.baseSleepTimeMs = baseSleepTimeMs; + } + + public int getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + + public int getSessionTimeoutMs() { + return sessionTimeoutMs; + } + + public void setSessionTimeoutMs(int sessionTimeoutMs) { + this.sessionTimeoutMs = sessionTimeoutMs; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public void init() { + //重试策略,初试时间1秒,重试10次 + RetryPolicy policy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); + //通过工厂创建Curator + client = CuratorFrameworkFactory.builder().connectString(servers) + .sessionTimeoutMs(sessionTimeoutMs).retryPolicy(policy).build(); + //开启连接 + client.start(); + } + + /** + * @see com.baidu.fsg.uid.worker.WorkerIdAssigner#assignWorkerId() + */ + public long assignWorkerId() { + init(); + try { + Stat stat = client.checkExists().forPath(path); + if(stat == null) { + client.create().creatingParentsIfNeeded().forPath(path); + } + stat = client.setData().forPath(path,new byte[0]); + return stat.getVersion(); + } catch (Exception e) { + throw new UidGenerateException("创建zookeeper节点失败", e); + } + } + + public void close() { + client.close(); + } +} diff --git a/src/test/java/com/baidu/fsg/uid/ZookeeperCachedUidGeneratorTest.java b/src/test/java/com/baidu/fsg/uid/ZookeeperCachedUidGeneratorTest.java new file mode 100644 index 0000000..9229024 --- /dev/null +++ b/src/test/java/com/baidu/fsg/uid/ZookeeperCachedUidGeneratorTest.java @@ -0,0 +1,127 @@ +package com.baidu.fsg.uid; + +import com.baidu.fsg.uid.impl.CachedUidGenerator; +import org.apache.commons.lang.StringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import javax.annotation.Resource; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Test for {@link CachedUidGenerator} + * + * @author yutianbao + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(locations = { "classpath:uid/zoo-cached-uid-spring.xml" }) +public class ZookeeperCachedUidGeneratorTest { + private static final int SIZE = 7000000; // 700w + private static final boolean VERBOSE = false; + private static final int THREADS = Runtime.getRuntime().availableProcessors() << 1; + + @Resource + private UidGenerator uidGenerator; + + /** + * Test for serially generate + * + * @throws IOException + */ + @Test + public void testSerialGenerate() throws IOException { + // Generate UID serially + Set uidSet = new HashSet<>(SIZE); + for (int i = 0; i < SIZE; i++) { + doGenerate(uidSet, i); + } + + // Check UIDs are all unique + checkUniqueID(uidSet); + } + + /** + * Test for parallel generate + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testParallelGenerate() throws InterruptedException, IOException { + AtomicInteger control = new AtomicInteger(-1); + Set uidSet = new ConcurrentSkipListSet<>(); + + // Initialize threads + List threadList = new ArrayList<>(THREADS); + for (int i = 0; i < THREADS; i++) { + Thread thread = new Thread(() -> workerRun(uidSet, control)); + thread.setName("UID-generator-" + i); + + threadList.add(thread); + thread.start(); + } + + // Wait for worker done + for (Thread thread : threadList) { + thread.join(); + } + + // Check generate 700w times + Assert.assertEquals(SIZE, control.get()); + + // Check UIDs are all unique + checkUniqueID(uidSet); + } + + /** + * Woker run + */ + private void workerRun(Set uidSet, AtomicInteger control) { + for (;;) { + int myPosition = control.updateAndGet(old -> (old == SIZE ? SIZE : old + 1)); + if (myPosition == SIZE) { + return; + } + + doGenerate(uidSet, myPosition); + } + } + + /** + * Do generating + */ + private void doGenerate(Set uidSet, int index) { + long uid = uidGenerator.getUID(); + String parsedInfo = uidGenerator.parseUID(uid); + boolean existed = !uidSet.add(uid); + if (existed) { + System.out.println("Found duplicate UID " + uid); + } + + // Check UID is positive, and can be parsed + Assert.assertTrue(uid > 0L); + Assert.assertTrue(StringUtils.isNotBlank(parsedInfo)); + + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + " No." + index + " >>> " + parsedInfo); + } + } + + /** + * Check UIDs are all unique + */ + private void checkUniqueID(Set uidSet) throws IOException { + System.out.println(uidSet.size()); + Assert.assertEquals(SIZE, uidSet.size()); + } + +} diff --git a/src/test/java/com/baidu/fsg/uid/ZookeeperDefaultUidGeneratorTest.java b/src/test/java/com/baidu/fsg/uid/ZookeeperDefaultUidGeneratorTest.java new file mode 100644 index 0000000..9d909a4 --- /dev/null +++ b/src/test/java/com/baidu/fsg/uid/ZookeeperDefaultUidGeneratorTest.java @@ -0,0 +1,122 @@ +package com.baidu.fsg.uid; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Resource; + +import org.apache.commons.lang.StringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import com.baidu.fsg.uid.impl.DefaultUidGenerator; + +/** + * Test for {@link DefaultUidGenerator} + * + * @author yutianbao + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(locations = { "classpath:uid/zoo-default-uid-spring.xml" }) +public class ZookeeperDefaultUidGeneratorTest { + private static final int SIZE = 100000; // 10w + private static final boolean VERBOSE = true; + private static final int THREADS = Runtime.getRuntime().availableProcessors() << 1; + + @Resource + private UidGenerator uidGenerator; + + /** + * Test for serially generate + */ + @Test + public void testSerialGenerate() { + // Generate UID serially + Set uidSet = new HashSet<>(SIZE); + for (int i = 0; i < SIZE; i++) { + doGenerate(uidSet, i); + } + + // Check UIDs are all unique + checkUniqueID(uidSet); + } + + /** + * Test for parallel generate + * + * @throws InterruptedException + */ + @Test + public void testParallelGenerate() throws InterruptedException { + AtomicInteger control = new AtomicInteger(-1); + Set uidSet = new ConcurrentSkipListSet<>(); + + // Initialize threads + List threadList = new ArrayList<>(THREADS); + for (int i = 0; i < THREADS; i++) { + Thread thread = new Thread(() -> workerRun(uidSet, control)); + thread.setName("UID-generator-" + i); + + threadList.add(thread); + thread.start(); + } + + // Wait for worker done + for (Thread thread : threadList) { + thread.join(); + } + + // Check generate 10w times + Assert.assertEquals(SIZE, control.get()); + + // Check UIDs are all unique + checkUniqueID(uidSet); + } + + /** + * Worker run + */ + private void workerRun(Set uidSet, AtomicInteger control) { + for (;;) { + int myPosition = control.updateAndGet(old -> (old == SIZE ? SIZE : old + 1)); + if (myPosition == SIZE) { + return; + } + + doGenerate(uidSet, myPosition); + } + } + + /** + * Do generating + */ + private void doGenerate(Set uidSet, int index) { + long uid = uidGenerator.getUID(); + String parsedInfo = uidGenerator.parseUID(uid); + uidSet.add(uid); + + // Check UID is positive, and can be parsed + Assert.assertTrue(uid > 0L); + Assert.assertTrue(StringUtils.isNotBlank(parsedInfo)); + + if (VERBOSE) { + System.out.println(Thread.currentThread().getName() + " No." + index + " >>> " + parsedInfo); + } + } + + /** + * Check UIDs are all unique + */ + private void checkUniqueID(Set uidSet) { + System.out.println(uidSet.size()); + Assert.assertEquals(SIZE, uidSet.size()); + } + +} diff --git a/src/test/resources/uid/mysql.properties b/src/test/resources/uid/mysql.properties index 01f1d6b..24f4ea0 100644 --- a/src/test/resources/uid/mysql.properties +++ b/src/test/resources/uid/mysql.properties @@ -1,8 +1,8 @@ #datasource db info mysql.driver=com.mysql.jdbc.Driver -jdbc.url=jdbc:mysql://localhost:xxxx/xxxx -jdbc.username=xxxx -jdbc.password=xxxx +jdbc.url=jdbc:mysql://localhost:3306/xxxx +jdbc.username=root +jdbc.password=root jdbc.maxActive=2 #datasource base @@ -21,3 +21,7 @@ datasource.removeAbandoned=true datasource.removeAbandonedTimeout=120 datasource.filters=stat +zk.servers=127.0.0.1:2181 +zk.baseSleepTimeMs=1000 +zk.maxRetries=3 +zk.sessionTimeoutMs=50000 diff --git a/src/test/resources/uid/zoo-cached-uid-spring.xml b/src/test/resources/uid/zoo-cached-uid-spring.xml new file mode 100644 index 0000000..a03d048 --- /dev/null +++ b/src/test/resources/uid/zoo-cached-uid-spring.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/test/resources/uid/zoo-default-uid-spring.xml b/src/test/resources/uid/zoo-default-uid-spring.xml new file mode 100644 index 0000000..253a655 --- /dev/null +++ b/src/test/resources/uid/zoo-default-uid-spring.xml @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + + + + + + From fb5597d0fe39b2ba2b198a4cb4b4c6c1e60f2b61 Mon Sep 17 00:00:00 2001 From: liangjf Date: Wed, 24 Oct 2018 19:18:09 +0800 Subject: [PATCH 2/3] =?UTF-8?q?zookeeper=E8=B0=83=E6=95=B4=E4=B8=BA?= =?UTF-8?q?=E5=8F=AF=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f5beaff..0cdcbe8 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,7 @@ 1.8 4.2.5.RELEASE 1.7.7 + 2.12.0 @@ -69,7 +70,7 @@ org.apache.curator curator-framework - 2.12.0 + ${curator.version} From 0a1a126cf0cf5273b01732dfc4df49f33ba7fe51 Mon Sep 17 00:00:00 2001 From: liangjf Date: Thu, 25 Oct 2018 15:57:18 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=97=B6=E9=92=9F?= =?UTF-8?q?=E5=9B=9E=E6=8B=A8workerID=E8=87=AA=E5=8A=A8=E6=BC=82=E7=A7=BB?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fsg/uid/impl/DefaultUidGenerator.java | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/baidu/fsg/uid/impl/DefaultUidGenerator.java b/src/main/java/com/baidu/fsg/uid/impl/DefaultUidGenerator.java index 20f9a2a..9c60219 100644 --- a/src/main/java/com/baidu/fsg/uid/impl/DefaultUidGenerator.java +++ b/src/main/java/com/baidu/fsg/uid/impl/DefaultUidGenerator.java @@ -87,10 +87,7 @@ public void afterPropertiesSet() throws Exception { bitsAllocator = new BitsAllocator(timeBits, workerBits, seqBits); // initialize worker id - workerId = workerIdAssigner.assignWorkerId(); - if (workerId > bitsAllocator.getMaxWorkerId()) { - throw new RuntimeException("Worker id " + workerId + " exceeds the max " + bitsAllocator.getMaxWorkerId()); - } + assignWorkerId(); LOGGER.info("Initialized bits(1, {}, {}, {}) for workerID:{}", timeBits, workerBits, seqBits, workerId); } @@ -138,7 +135,23 @@ protected synchronized long nextId() { // Clock moved backwards, refuse to generate uid if (currentSecond < lastSecond) { long refusedSeconds = lastSecond - currentSecond; - throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds); + LOGGER.warn("Clock moved backwards. Refusing for {} seconds", refusedSeconds); + if (refusedSeconds <= 5) { + try { + //时间偏差大小小于5ms,则等待两倍时间 + wait(refusedSeconds << 1);//wait + } catch (InterruptedException e) { + e.printStackTrace(); + } + currentSecond = getCurrentSecond(); + if (currentSecond < lastSecond) {//时钟回拨较大 + //获取新的workerId + assignWorkerId(); + } + }else {//时钟回拨较大 + //获取新的workerId + assignWorkerId(); + } } // At the same second, increase sequence @@ -183,6 +196,17 @@ private long getCurrentSecond() { return currentSecond; } + + /** + * initialize worker id + */ + private void assignWorkerId() { + // initialize worker id + workerId = workerIdAssigner.assignWorkerId(); + if (workerId > bitsAllocator.getMaxWorkerId()) { + throw new RuntimeException("Worker id " + workerId + " exceeds the max " + bitsAllocator.getMaxWorkerId()); + } + } /** * Setters for spring property