引言
对象池模式(Object Pool Pattern)是一种创建一组可重用对象的设计模式。它通过维护一个预分配的对象集合,避免了频繁地创建和销毁对象所带来的性能开销。在需要使用对象时,可以直接从池中获取,而不需要每次都创建新的对象;当对象不再使用时,可以将其归还到池中,而不是直接销毁。
对象池模式的主要优点是减少了对象的创建和销毁的开销,提高了程序的性能。此外,它还有助于控制资源的使用,避免资源的浪费。然而,对象池模式也有一些缺点,如增加了代码的复杂性,以及可能导致内存占用过高。
对象池模式并不是GoF中的23种设计模式
定义及实现
定义
When objects are expensive to create and they are needed only for short periods of time it is advantageous to utilize the Object Pool pattern. The Object Pool provides a cache for instantiated objects tracking which ones are in use and which are available.
当对象的创建成本很高并且只在很短的周期内使用,那么对象池模式就很有优势。对象池提供一个对象示例的缓存来跟踪那个对象正在使用,哪个对象是可用的。
结构
$2ObjectPool T create():T checkIn(T):void checkOut():T Oliphaunt doSomething() OliphauntObjectPool create():Oliphaunt checkIn(Oliphaunt):void checkOut():Oliphaunt
代码实现
ObjectPool.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Slf4j public abstract class ObjectPool <T> { private final Deque<T> available = new ArrayDeque <>(); private final Deque<T> using = new ArrayDeque <>(); protected abstract T create () ; public synchronized T checkOut () { if (available.isEmpty()) { T obj = this .create(); using.addLast(obj); return obj; } T obj = available.poll(); using.addLast(obj); return obj; } public synchronized void checkIn (T t) { using.remove(t); available.addLast(t); } public void printPoolInfo () { log.info("available: {}, using: {}" , available.size(), using.size()); } }
Oliphaunt.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j public class Oliphaunt { private final Integer sno; public Oliphaunt (Integer sno) { this .sno = sno; } public void doSomething () { log.info("sno: {}, do something" , this .sno); } }
OliphauntObjectPool.java 1 2 3 4 5 6 7 8 9 10 public class OliphauntObjectPool extends ObjectPool <Oliphaunt> { private final AtomicInteger count = new AtomicInteger (1 ); @Override public Oliphaunt create () { return new Oliphaunt (count.getAndIncrement()); } }
测试对象池的使用
Main.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Main { public static void main (String[] args) { ObjectPool<Oliphaunt> oliphauntObjectPool = new OliphauntObjectPool (); Oliphaunt oliphaunt = oliphauntObjectPool.checkOut(); Oliphaunt oliphaunt2 = oliphauntObjectPool.checkOut(); oliphaunt.doSomething(); oliphaunt2.doSomething(); oliphauntObjectPool.checkIn(oliphaunt); oliphauntObjectPool.checkIn(oliphaunt2); Oliphaunt oliphaunt3 = oliphauntObjectPool.checkOut(); oliphaunt3.doSomething(); oliphauntObjectPool.printPoolInfo(); } }
输出结果
1 2 3 4 org.depsea .design .pattern .creation .objectpool .Oliphaunt -- sno: 1 , do something org.depsea .design .pattern .creation .objectpool .Oliphaunt -- sno: 2 , do something org.depsea .design .pattern .creation .objectpool .Oliphaunt -- sno: 1 , do something org.depsea .design .pattern .creation .objectpool .ObjectPool -- available: 1 , using: 1
存在的问题
以上实现有一个使用起来不太方便的地方,每次使用完后都需要通过对象池的 checkIn
方法归还对象。但是我们在使用连接池获取连接,使用完毕后好像并没有这个操作,而是直接调用连接的 close
方法即可。这是如何实现的呢?这里提供一个思路。
使Oliphaut
实现Closeable
并提供一个关闭函数close
,并在 Oliphaunt
中提供一个钩子函数,用于在Oliphaunt
创建时,创建者可以注入一个钩子,这个钩子函数的目的就是将对象返还到连接池中。然后Oliphaut
在关闭函数中调用这个钩子,就可以达到回收对象的目的。
我们需要对 Oliphaut
和 OliphauntObjectPool
稍加改造。
Oliphaut.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j public class Oliphaunt implements Closeable { private final Integer sno; @Setter private Consumer<Oliphaunt> closeHook; public Oliphaunt (Integer sno) { this .sno = sno; } public void doSomething () { log.info("sno: {}, do something" , this .sno); } @Override public void close () { if (closeHook != null ) { closeHook.accept(this ); } } }
OliphauntObjectPool.java 1 2 3 4 5 6 7 8 9 10 11 12 public class OliphauntObjectPool extends ObjectPool <Oliphaunt> { private final AtomicInteger count = new AtomicInteger (1 ); @Override public Oliphaunt create () { Oliphaunt oliphaunt = new Oliphaunt (count.getAndIncrement()); oliphaunt.setCloseHook(this ::checkIn); return oliphaunt; } }
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Main { public static void main (String[] args) { ObjectPool<Oliphaunt> oliphauntObjectPool = new OliphauntObjectPool (); Oliphaunt oliphaunt = oliphauntObjectPool.checkOut(); oliphaunt.doSomething(); oliphaunt.close(); Oliphaunt oliphaunt2 = oliphauntObjectPool.checkOut(); oliphaunt2.doSomething(); oliphauntObjectPool.printPoolInfo(); } }
输出结果
1 2 3 org.depsea .design .pattern .creation .objectpool .Oliphaunt -- sno: 1 , do something org.depsea .design .pattern .creation .objectpool .Oliphaunt -- sno: 1 , do something org.depsea .design .pattern .creation .objectpool .ObjectPool -- available: 0 , using: 1
这样依然会有问题,因为我们占用了原对象的close
方法,如果原对象需要通过 close
方法来释放资源,会出现 “鱼与熊掌不可兼得”的情况。要么通过close
回收对象,要么通过close
释放资源。有没有其他方法解决呢?
那这里就需要用到代理模式(Proxy Pattern),我们创建一个 Oliphaunt
的代理类。通过代理类来回收对象。在需要关闭资源时,通过代理对象获取到真实的对象,然后调用真实对象的close
方法来释放资源。
其类结构也会后些许变化,Oliphaunt
不能再是一个具体类,而应是一个接口。具体如下:
$2ObjectPool T available: Deque<T> using: Deque<T> create():T checkIn(T):void checkOut():T Oliphaunt doSomething() close() OliphauntObjectPool create():Oliphaunt checkIn(Oliphaunt):void checkOut():Oliphaunt shutdown() OliphauntImpl doSomething() close() OliphauntProxy objectPool: OliphauntObjectPool realOliphaunt: Oliphaunt doSomething() close() getRealOliphaunt(): Oliphaunt
ObjectPool
的代码不动,其他类都有修改,具体如下:
Oliphaunt.java 1 2 3 public interface Oliphaunt extends Closeable { void doSomething () ; }
OliphauntImpl.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j public class OliphauntImpl implements Oliphaunt , Closeable { private final Integer sno; public OliphauntImpl (Integer sno) { this .sno = sno; } public void doSomething () { log.info("sno: {}, do something" , this .sno); } @Override public void close () { log.info("sno: {}, 释放资源" , this .sno); } }
OliphauntProxy.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class OliphauntProxy implements Oliphaunt { @Getter private final Oliphaunt realOliphaunt; private final ObjectPool<Oliphaunt> objectPool; public OliphauntProxy (Oliphaunt realOliphaunt, ObjectPool<Oliphaunt> objectPool) { this .realOliphaunt = realOliphaunt; this .objectPool = objectPool; } @Override public void doSomething () { this .realOliphaunt.doSomething(); } @Override public void close () throws IOException { log.info("回收资源" ); this .objectPool.checkIn(this ); } }
OliphauntObjectPool.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class OliphauntObjectPool extends ObjectPool <Oliphaunt> { private final AtomicInteger count = new AtomicInteger (1 ); @Override public Oliphaunt create () { Oliphaunt oliphaunt = new OliphauntImpl (count.getAndIncrement()); return new OliphauntProxy (oliphaunt, this ); } public void shutdown () throws IOException { while (!this .available.isEmpty()) { OliphauntProxy oliphauntProxy = (OliphauntProxy) this .available.poll(); oliphauntProxy.getRealOliphaunt().close(); } while (!this .using.isEmpty()) { OliphauntProxy oliphauntProxy = (OliphauntProxy) this .using.poll(); oliphauntProxy.getRealOliphaunt().close(); } this .available.clear(); this .using.clear(); } }
测试代码
Main.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Main { public static void main (String[] args) throws IOException { OliphauntObjectPool oliphauntObjectPool = new OliphauntObjectPool (); Oliphaunt oliphaunt = oliphauntObjectPool.checkOut(); oliphaunt.doSomething(); oliphaunt.close(); Oliphaunt oliphaunt2 = oliphauntObjectPool.checkOut(); oliphaunt2.doSomething(); oliphauntObjectPool.printPoolInfo(); oliphaunt2.close(); oliphauntObjectPool.shutdown(); } }
输出
1 2 3 4 5 6 org.depsea .design .pattern .creation .objectpool .OliphauntImpl -- sno: 1 , do something org.depsea .design .pattern .creation .objectpool .OliphauntProxy -- 回收资源 org.depsea .design .pattern .creation .objectpool .OliphauntImpl -- sno: 1 , do something org.depsea .design .pattern .creation .objectpool .ObjectPool -- available: 0 , using: 1 org.depsea .design .pattern .creation .objectpool .OliphauntProxy -- 回收资源 org.depsea .design .pattern .creation .objectpool .OliphauntImpl -- sno: 1 , 释放资源
在本文的例子中ObjectPool
是一个抽象类,但是最好定义成接口,并提供一个AbstractObjectPool
类抽象公共业务。并且 shutdown
方法应该上移至顶层接口,及ObjectPool
中。类图如下:
$2ObjectPool T checkIn(T):void checkOut():T shutdown() AbstractObjectPool T available: Deque<T> using: Deque<T> create():T checkIn(T):void checkOut():T Oliphaunt doSomething() close() OliphauntObjectPool create():Oliphaunt checkIn(Oliphaunt):void checkOut():Oliphaunt shutdown() OliphauntImpl doSomething() close() OliphauntProxy objectPool: OliphauntObjectPool realOliphaunt: Oliphaunt doSomething() close() getRealOliphaunt(): Oliphaunt Client
这种方法也是连接池常用的一种方法,无论是Druid
还是 HikariCP
处理连接回收的逻辑基本与上文中的一致。
实际应用
对象池最常见的应用就是连接池。以HikariCP
为例,简单介绍下对象池模式在连接池中的应用。
核心类方法:
com.zaxxer.hikari.HikariDataSource
com.zaxxer.hikari.pool.HikariPool
com.zaxxer.hikari.util.ConcurrentBag
com.zaxxer.hikari.pool.PoolEntry
com.zaxxer.hikari.HikariDataSource#getConnection()
com.zaxxer.hikari.pool.HikariPool#getConnection()
com.zaxxer.hikari.util.ConcurrentBag#borrow
com.zaxxer.hikari.pool.PoolEntry#createProxyConnection
com.zaxxer.hikari.pool.ProxyConnection#close
com.zaxxer.hikari.pool.PoolEntry#recycle
com.zaxxer.hikari.pool.HikariPool#recycle
连接获取
com.zaxxer.hikari.HikariDataSource
实际上是com.zaxxer.hikari.pool.HikariPool
的一个代理类,核心逻辑都在com.zaxxer.hikari.pool.HikariPool
中。当我们调用com.zaxxer.hikari.HikariDataSource#getConnection()
方法时,实际上调用的是com.zaxxer.hikari.pool.HikariPool#getConnection()
。
而HikariCP
使用com.zaxxer.hikari.util.ConcurrentBag
来管理对象。ConcurrentBag
并没有使用两个集合来维护可用和在用的对象列表,而是使用com.zaxxer.hikari.pool.PoolEntry
中的状态(state
)字段来维护。
com.zaxxer.hikari.pool.HikariPool
初始化时会创建一个 com.zaxxer.hikari.pool.PoolEntry
对象并添加到 com.zaxxer.hikari.util.ConcurrentBag
中。
com.zaxxer.hikari.pool.HikariPool#HikariPool 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public HikariPool (final HikariConfig config) { super (config); this .connectionBag = new ConcurrentBag <>(this ); this .suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock () : SuspendResumeLock.FAUX_LOCK; this .houseKeepingExecutorService = initializeHouseKeepingExecutorService(); checkFailFast(); if (config.getMetricsTrackerFactory() != null ) { setMetricsTrackerFactory(config.getMetricsTrackerFactory()); } else { setMetricRegistry(config.getMetricRegistry()); } ... }
checkFailFast
方法会创建一个 PoolEntry
并添加到connectionBag
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void checkFailFast () { final var initializationTimeout = config.getInitializationFailTimeout(); if (initializationTimeout < 0 ) { return ; } final var startTime = currentTime(); do { final var poolEntry = createPoolEntry(); if (poolEntry != null ) { if (config.getMinimumIdle() > 0 ) { connectionBag.add(poolEntry); logger.info("{} - Added connection {}" , poolName, poolEntry.connection); } else { quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)" ); } return ; } if (getLastConnectionFailure() instanceof ConnectionSetupException) { throwPoolInitializationException(getLastConnectionFailure().getCause()); } quietlySleep(SECONDS.toMillis(1 )); } while (elapsedMillis(startTime) < initializationTimeout); if (initializationTimeout > 0 ) { throwPoolInitializationException(getLastConnectionFailure()); } }
当调用com.zaxxer.hikari.pool.HikariPool#getConnection(long)
时会调用
com.zaxxer.hikari.util.ConcurrentBag#borrow
获取一个PoolEntry
,最终通过poolEntry.createProxyConnection
创建一个代理连接。代码如下:
com.zaxxer.hikari.pool.HikariPool#getConnection(long) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public Connection getConnection (final long hardTimeout) throws SQLException{ suspendResumeLock.acquire(); final var startTime = currentTime(); try { var timeout = hardTimeout; do { var poolEntry = connectionBag.borrow(timeout, MILLISECONDS); if (poolEntry == null ) { break ; } final var now = currentTime(); if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && isConnectionDead(poolEntry.connection))) { closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE); timeout = hardTimeout - elapsedMillis(startTime); } else { metricsTracker.recordBorrowStats(poolEntry, startTime); return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry)); } } while (timeout > 0L ); metricsTracker.recordBorrowTimeoutStats(startTime); throw createTimeoutException(startTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SQLException (poolName + " - Interrupted during connection acquisition" , e); } finally { suspendResumeLock.release(); } }
com.zaxxer.hikari.util.ConcurrentBag#borrow 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public T borrow (long timeout, final TimeUnit timeUnit) throws InterruptedException{ final var list = threadList.get(); for (int i = list.size() - 1 ; i >= 0 ; i--) { final var entry = list.remove(i); @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry; if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } } final int waiting = waiters.incrementAndGet(); try { for (T bagEntry : sharedList) { if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { if (waiting > 1 ) { listener.addBagItem(waiting - 1 ); } return bagEntry; } } listener.addBagItem(waiting); timeout = timeUnit.toNanos(timeout); do { final var start = currentTime(); final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } timeout -= elapsedNanos(start); } while (timeout > 10_000 ); return null ; } finally { waiters.decrementAndGet(); } }
上面代码中的listener
就是com.zaxxer.hikari.pool.HikariPool
,相当于是调用的com.zaxxer.hikari.pool.HikariPool#addBagItem
方法创建PoolEntry
。
1 2 3 4 5 6 @Override public void addBagItem (final int waiting) { if (waiting > addConnectionExecutor.getQueue().size()) addConnectionExecutor.submit(poolEntryCreator); }
以上就是对象创建的全部核心逻辑。部分代码未展示,有兴趣的可以去阅读HikariCP
源码。
连接回收
HikariPool.getConnection()
返回的连接对象实际为com.zaxxer.hikari.pool.HikariProxyConnection
,而com.zaxxer.hikari.pool.HikariProxyConnection
又继承自com.zaxxer.hikari.pool.ProxyConnection
。
当调用com.zaxxer.hikari.pool.ProxyConnection#close
方法时,最终调用了com.zaxxer.hikari.pool.PoolEntry#recycle
方法。
com.zaxxer.hikari.pool.ProxyConnection#close 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override public final void close () throws SQLException{ closeStatements(); if (delegate != ClosedConnection.CLOSED_CONNECTION) { leakTask.cancel(); try { if (isCommitStateDirty && !isAutoCommit) { delegate.rollback(); LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close()." , poolEntry.getPoolName(), delegate); } if (dirtyBits != 0 ) { poolEntry.resetConnectionState(this , dirtyBits); } delegate.clearWarnings(); } catch (SQLException e) { if (!poolEntry.isMarkedEvicted()) { throw checkException(e); } } finally { delegate = ClosedConnection.CLOSED_CONNECTION; poolEntry.recycle(); } } }
其中的com.zaxxer.hikari.pool.ProxyConnection#delegate
就是真实的连接对象。
com.zaxxer.hikari.pool.PoolEntry#recycle 1 2 3 4 5 6 7 void recycle () { if (connection != null ) { this .lastAccessed = currentTime(); hikariPool.recycle(this ); } }
java com.zaxxer.hikari.pool.PoolEntry#recycle
又调用 com.zaxxer.hikari.pool.HikariPool#recycle
方法,最终将对象回收到connectionBag
中。
总结
对象池其实有点类似于工厂,工厂生产的对象交个客户端后就由客户端来维护其生命周期了。而对象池生产的对象再客户端用完之后还需要还给对象池,对象池中对象的生命周期是由对象池来维护的。
对象池的核心操作
获取对象:从池中获取一个可用对象,可能会触发对象池的创建动作,创建出一个新的对象。
对象归还:在使用完成后归还对象