博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java队列集合的性能测试
阅读量:5834 次
发布时间:2019-06-18

本文共 23123 字,大约阅读时间需要 77 分钟。

同时开10个线程存入和取出100万的数据,结论如下:

DoubleBufferedQueue < ConcurrentLinkedQueue < ArrayBlockingQueue < LinkedBlockingQueue

执行结果如下:

100万 DoubleBufferedQueue入队时间:9510 出队时间:10771

100万 DoubleBufferedQueue入队时间:8169 出队时间:9789
1000万 DoubleBufferedQueue入队时间:98285 出队时间:101088
1000万 DoubleBufferedQueue入队时间:101859 出队时间:105964

100万 ConcurrentLinkedQueue入队时间:10557 出队时间:13716

100万 ConcurrentLinkedQueue入队时间:25298 出队时间:25332
1000万 ConcurrentLinkedQueue队列时间:121868 出队时间:136116
1000万 ConcurrentLinkedQueue队列时间:134306 出队时间:147893

100万 ArrayBlockingQueue入队时间:21080 出队时间:22025

100万 ArrayBlockingQueue入队时间:17689 出队时间:19654
1000万 ArrayBlockingQueue入队时间:194400 出队时间:205968
1000万 ArrayBlockingQueue入队时间:192268 出队时间:197982

100万 LinkedBlockingQueue入队时间:38236 出队时间:52555

100万 LinkedBlockingQueue入队时间:30646 出队时间:38573
1000万 LinkedBlockingQueue入队时间:375669 出队时间:391976
1000万 LinkedBlockingQueue入队时间:701363 出队时间:711217

 

doubleBufferedQueue:

package test.MoreThread.d;import java.util.ArrayList;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import test.MoreThread.l.linkedBlockingQueue;import comrt.util.DoubleBufferedQueue;//DoubleBufferedQueue入队时间:9510  出队时间:10771//DoubleBufferedQueue入队时间:8169  出队时间:9789public class doubleBufferedQueue {    private static final Logger log = LoggerFactory            .getLogger(doubleBufferedQueue.class);    public final static int size1 = 1000000;    public static DoubleBufferedQueue queue = new DoubleBufferedQueue(            size1);    public final static int threadNumber = 10;    public static boolean isOver = false;    public static void main(String[] args) throws InterruptedException,            ExecutionException {//        long timestart = System.currentTimeMillis();        Thread thread1 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService = Executors                        .newFixedThreadPool(threadNumber);                ArrayList
> results = new ArrayList
>(); for (int i = 0; i < threadNumber; i++) { Future
future = executorService .submit(new ExecDoubleBufferedQueue()); results.add(future); } long allTime = 0; for (Future
fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } doubleBufferedQueue.isOver = true; log.info("入队列总共执行时间:" + allTime); } }); thread1.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList
> results_out = new ArrayList
>(); for (int i = 0; i < threadNumber; i++) { Future
future = executorService2 .submit(new ExecDoubleBufferedQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future
fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); }}class ExecDoubleBufferedQueue implements Callable
{ private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < doubleBufferedQueue.size1; i++) { doubleBufferedQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}class ExecDoubleBufferedQueue_Out implements Callable
{ private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!doubleBufferedQueue.isOver) { doubleBufferedQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}

 

concurrentLinkedQueue:

package test.MoreThread.c;import java.util.ArrayList;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//ConcurrentLinkedQueue入队时间:10557  出队时间:13716//ConcurrentLinkedQueue入队时间:25298  出队时间:25332public class concurrentLinkedQueue {    private static final Logger log = LoggerFactory            .getLogger(concurrentLinkedQueue.class);    public static ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();    public final static int size1 = 1000000;    public final static int threadNumber = 10;    public static boolean isOver = false;    public static void main(String[] args) throws InterruptedException,            ExecutionException {        // long timestart = System.currentTimeMillis();        Thread thread1 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService = Executors                        .newFixedThreadPool(threadNumber);                ArrayList
> results = new ArrayList
>(); for (int i = 0; i < threadNumber; i++) { Future
future = executorService.submit(new Exec()); results.add(future); } long allTime = 0; for (Future
fs : results) { try { allTime += fs.get();// log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } concurrentLinkedQueue.isOver = true; log.info("队列总共执行时间:" + allTime); } }); thread1.start(); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList
> results_out = new ArrayList
>(); for (int i = 0; i < threadNumber; i++) { Future
future = executorService2 .submit(new Exec_Out()); results_out.add(future); } long allTime_out = 0; for (Future
fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); }}class Exec implements Callable
{ private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < concurrentLinkedQueue.size1; i++) { concurrentLinkedQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2); return time2; }}class Exec_Out implements Callable
{ private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!concurrentLinkedQueue.isOver) { concurrentLinkedQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}

 

arrayBlockingQueue:

package test.MoreThread.a;import java.util.ArrayList;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//ArrayBlockingQueue入队时间:21080  出队时间:22025//ArrayBlockingQueue入队时间:17689  出队时间:19654public class arrayBlockingQueue {    private static final Logger log = LoggerFactory            .getLogger(arrayBlockingQueue.class);    public final static int size1 = 1000000;    public static ArrayBlockingQueue queue = new ArrayBlockingQueue(            size1);    public final static int threadNumber = 10;    public static boolean isOver = false;    public static void main(String[] args) throws InterruptedException,            ExecutionException {        // long timestart = System.currentTimeMillis();        Thread thread1 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService = Executors                        .newFixedThreadPool(threadNumber);                ArrayList
> results = new ArrayList
>(); for (int i = 0; i < threadNumber; i++) { Future
future = executorService .submit(new ExecArrayBlockingQueue()); results.add(future); } long allTime = 0; for (Future
fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } arrayBlockingQueue.isOver = true; log.info("队列总共执行时间:" + allTime); } }); thread1.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList
> results_out = new ArrayList
>(); for (int i = 0; i < threadNumber; i++) { Future
future = executorService2 .submit(new ExecArrayBlockingQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future
fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); }}class ExecArrayBlockingQueue implements Callable
{ private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < arrayBlockingQueue.size1; i++) { arrayBlockingQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}class ExecArrayBlockingQueue_Out implements Callable
{ private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!arrayBlockingQueue.isOver) { arrayBlockingQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}

 

linkedBlockingQueue:

package test.MoreThread.l;import java.util.ArrayList;import java.util.Vector;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingQueue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//LinkedBlockingQueue入队时间:38236  出队时间:52555//LinkedBlockingQueue入队时间:30646  出队时间:38573public class linkedBlockingQueue {    private static final Logger log = LoggerFactory            .getLogger(linkedBlockingQueue.class);    public final static int size1 = 1000000;    public static LinkedBlockingQueue queue = new LinkedBlockingQueue(            size1);    public final static int threadNumber = 10;    public static boolean isOver = false;    public static void main(String[] args) throws InterruptedException,            ExecutionException {        long timestart = System.currentTimeMillis();        Thread thread1 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService = Executors                        .newFixedThreadPool(threadNumber);                ArrayList
> results = new ArrayList
>(); for (int i = 0; i < threadNumber; i++) { Future
future = executorService .submit(new ExecLinkedBlockingQueue()); results.add(future); } long allTime = 0; for (Future
fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } linkedBlockingQueue.isOver = true; log.info("入队列总共执行时间:" + allTime); } }); thread1.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart));// System.out.println(linkedBlockingQueue.queue.size()); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList
> results_out = new ArrayList
>(); for (int i = 0; i < threadNumber; i++) { Future
future = executorService2 .submit(new ExecLinkedBlockingQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future
fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); }}class ExecLinkedBlockingQueue implements Callable
{ private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < linkedBlockingQueue.size1; i++) { linkedBlockingQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}class ExecLinkedBlockingQueue_Out implements Callable
{ private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!linkedBlockingQueue.isOver) { linkedBlockingQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}

 

DoubleBufferedQueue双缓冲队列

package comrt.util;import java.util.AbstractQueue;import java.util.Collection;import java.util.Iterator;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//双缓冲队列,线程安全public class DoubleBufferedQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { private static final long serialVersionUID = 1011398447523020L; public static final int DEFAULT_QUEUE_CAPACITY = 5000000; public static final long DEFAULT_MAX_TIMEOUT = 0; public static final long DEFAULT_MAX_COUNT = 10; private Logger logger = LoggerFactory.getLogger(DoubleBufferedQueue.class.getName()); /** The queued items */ private ReentrantLock readLock; // 写锁 private ReentrantLock writeLock; // 是否满 private Condition notFull; private Condition awake; // 读写数组 private transient E[] writeArray; private transient E[] readArray; // 读写计数 private volatile int writeCount; private volatile int readCount; // 写数组下标指针 private int writeArrayTP; private int writeArrayHP; // 读数组下标指针 private int readArrayTP; private int readArrayHP; private int capacity; public DoubleBufferedQueue(int capacity) { // 默认 this.capacity = DEFAULT_QUEUE_CAPACITY; if (capacity > 0) { this.capacity = capacity; } readArray = (E[]) new Object[capacity]; writeArray = (E[]) new Object[capacity]; readLock = new ReentrantLock(); writeLock = new ReentrantLock(); notFull = writeLock.newCondition(); awake = writeLock.newCondition(); } private void insert(E e) { writeArray[writeArrayTP] = e; ++writeArrayTP; ++writeCount; } private E extract() { E e = readArray[readArrayHP]; readArray[readArrayHP] = null; ++readArrayHP; --readCount; return e; } /** * switch condition: read queue is empty && write queue is not empty * * Notice:This function can only be invoked after readLock is grabbed,or may * cause dead lock * * @param timeout * @param isInfinite * : whether need to wait forever until some other thread awake * it * @return * @throws InterruptedException */ private long queueSwap(long timeout, boolean isInfinite) throws InterruptedException { writeLock.lock(); try { if (writeCount <= 0) { // logger.debug("Write Count:" + writeCount // + ", Write Queue is empty, do not switch!"); try { // logger.debug("Queue is empty, need wait...."); if (isInfinite && timeout <= 0) { awake.await(); return -1; } else if (timeout > 0) { return awake.awaitNanos(timeout); } else { return 0; } } catch (InterruptedException ie) { awake.signal(); throw ie; } } else { E[] tmpArray = readArray; readArray = writeArray; writeArray = tmpArray; readCount = writeCount; readArrayHP = 0; readArrayTP = writeArrayTP; writeCount = 0; writeArrayHP = readArrayHP; writeArrayTP = 0; notFull.signal(); // logger.debug("Queue switch successfully!"); return 0; } } finally { writeLock.unlock(); } } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) { throw new NullPointerException(); } long nanoTime = 0; if (timeout > 0) { nanoTime = unit.toNanos(timeout); } writeLock.lockInterruptibly(); try { for (int i = 0; i < DEFAULT_MAX_COUNT; i++) { if (writeCount < writeArray.length) { insert(e); if (writeCount == 1) { awake.signal(); } return true; } // Time out if (nanoTime <= 0) { // logger.debug("offer wait time out!"); return false; } // keep waiting try { // logger.debug("Queue is full, need wait...."); nanoTime = notFull.awaitNanos(nanoTime); } catch (InterruptedException ie) { notFull.signal(); throw ie; } } } finally { writeLock.unlock(); } return false; } // 取 @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = 0; if (timeout > 0) { nanoTime = unit.toNanos(timeout); } readLock.lockInterruptibly(); try { if (nanoTime > 0) { for (int i = 0; i < DEFAULT_MAX_COUNT; i++) { if (readCount > 0) { return extract(); } if (nanoTime <= 0) { // logger.debug("poll time out!"); return null; } nanoTime = queueSwap(nanoTime, false); } } else { if (readCount > 0) { return extract(); } queueSwap(nanoTime, false); if (readCount > 0) { return extract(); } } } finally { readLock.unlock(); } return null; } // 等待500毫秒 @Override public E poll() { E ret = null; try { ret = poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } catch (Exception e) { ret = null; } return ret; } // 查看 @Override public E peek() { E e = null; readLock.lock(); try { if (readCount > 0) { e = readArray[readArrayHP]; } } finally { readLock.unlock(); } return e; } // 默认500毫秒 @Override public boolean offer(E e) { boolean ret = false; try { ret = offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } catch (Exception e2) { ret = false; } return ret; } @Override public void put(E e) throws InterruptedException { // never need to // block offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } @Override public E take() throws InterruptedException { return poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } @Override public int remainingCapacity() { return this.capacity; } @Override public int drainTo(Collection
c) { return 0; } @Override public int drainTo(Collection
c, int maxElements) { return 0; } @Override public Iterator
iterator() { return null; } // 当前读队列中还有多少个 @Override public int size() { int size = 0; readLock.lock(); try { size = readCount; } finally { readLock.unlock(); } return size; } /** * 当前已写入的队列大小 * */ public int WriteSize() { int size = 0; writeLock.lock(); try { size = writeCount; } finally { writeLock.unlock(); } return size; } public int unsafeReadSize() { return readCount; } public int unsafeWriteSize() { return writeCount; } public int capacity() { return capacity; } public String toMemString() { return "--read: " + readCount + "/" + capacity + "--write: " + writeCount + "/" + capacity; } // 清理 /* * public void clear() { readLock.lock(); writeLock.lock(); try { readCount * = 0; readArrayHP = 0; writeCount = 0; writeArrayTP = 0; * //logger.debug("Queue clear successfully!"); } finally { * writeLock.unlock(); readLock.unlock(); } } */}

 

转载地址:http://iuucx.baihongyu.com/

你可能感兴趣的文章
CollabNet_Subversion小结
查看>>
mysql定时备份自动上传
查看>>
17岁时少年决定把海洋洗干净,现在21岁的他做到了
查看>>
linux 启动oracle
查看>>
《写给大忙人看的java se 8》笔记
查看>>
倒计时:计算时间差
查看>>
Linux/windows P2V VMWare ESXi
查看>>
Windows XP倒计时到底意味着什么?
查看>>
运维工程师在干什么学些什么?【致菜鸟】
查看>>
Linux中iptables详解
查看>>
java中回调函数以及关于包装类的Demo
查看>>
maven异常:missing artifact jdk.tools:jar:1.6
查看>>
终端安全求生指南(五)-——日志管理
查看>>
Nginx 使用 openssl 的自签名证书
查看>>
创业维艰、守成不易
查看>>
PHP环境安装套件:快速安装LAMP环境
查看>>
CSS3
查看>>
ul下的li浮动,如何是ul有li的高度
查看>>
C++ primer plus
查看>>
python mysqlDB
查看>>