Java并发编程基础之线程应用实战-数据库连接池

举报
李子捌 发表于 2021/10/19 14:27:40 2021/10/19
【摘要】 1、简单的数据库连接池简介:使用等待超时模式来构造一个简单的数据库连接池。数据库连接池支持如下功能普通获取连接无超时时间超时获取连接,超时时间之后返回null使用连接释放连接记录获取连接和为获取连接的次数,统计连接池的性能超时等待模式在实现数据库连接池功能之前,我们先来回顾一下上一章的等待/通知经典范式。即加锁、条件循环和处理逻辑三个步骤,但是常规的等待/通知无法做到超时等待,因此我们做一些...

1、简单的数据库连接池

简介:

使用等待超时模式来构造一个简单的数据库连接池。

数据库连接池支持如下功能

  1. 普通获取连接无超时时间
  2. 超时获取连接,超时时间之后返回null
  1. 使用连接
  2. 释放连接
  1. 记录获取连接和为获取连接的次数,统计连接池的性能


超时等待模式

在实现数据库连接池功能之前,我们先来回顾一下上一章的等待/通知经典范式。即加锁、条件循环和处理逻辑三个步骤,但是常规的等待/通知无法做到超时等待,因此我们做一些小改动来实现一个超时等待的等待/通知范式。

改动方式:

  1. 定义等待时间T
  2. 计算超时返回的时间now() + T
  1. 当结果不满足或者超时时间未到,wait()

伪代码:

// 当前对象加锁

public synchronized Object get(long mills) throws InterruptedException {
	long future = System.currentTimeMills + mills;
    long remaining = mills;
    // 当结果不满足并且超时时间大于0时继续等待(即使被唤醒)
    while((result == null) && remainig > 0) {
    	wait(remaining);
        remaining = future - System.currentTimeMills;
    }
    return result;
}


连接池代码及功能介绍

连接池初始大小为10,也可以指定大小。使用LinkedList队列来做连接池管理,队列尾部插入连接,队列头部获取连接。connection()/connection(long)支持普通获取连接无超时时间和超时等待获取连接,超时获取在指定时间内没有获取到连接将会返回null;releaseConnection(connection)释放连接后,连接归队并通知等待在该连接池上的线程获取连接。

package com.lizba.p3;

import java.sql.Connection;
import java.util.LinkedList;

/**
 * <p>
 *  数据库连接池
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/17 21:13
 */
public class ConnectionPool {

    /** 默认连接池大小 */
    private static final int DEFAULT_SIZE = 10;
    /** 连接池存储容器 */
    private LinkedList<Connection> pool = new LinkedList<>();

    public ConnectionPool(int initialSize) {
        // 参数不正确默认初始化10
        initialSize = initialSize <= 0 ? DEFAULT_SIZE : initialSize;
        for (int i = 0; i < initialSize; i++) {
            pool.addLast(ConnectionDriver.createConnection());
        }
    }

    /**
     * 释放连接,重回连接池,并且通知等待中的消费者
     * @param connection
     */
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                pool.addLast(connection);
                // 通知消费者
                pool.notifyAll();
            }
        }
    }
    /**
     * 直接获取连接,不知道超时时间,则会一直等待
     *
     * @return
     * @throws InterruptedException
     */
    public Connection connection() throws InterruptedException {
       return connection(0);
    }

    /**
     * 指定获取连接时间
     *
     * @param mills
     * @return
     * @throws InterruptedException
     */
    public Connection connection(long mills) throws InterruptedException {
        synchronized (pool) {
            if (mills <= 0) {
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(mills);
                    remaining = future - System.currentTimeMillis();
                }
                return pool.isEmpty() ? null : pool.removeFirst();
            }
        }
    }
}

连接驱动模拟:

Connection是一个接口,我们通过动态代理来创建Connection,当执行Connection的commit方法时,通过 TimeUnit.MILLISECONDS.sleep(200);休眠线程来模拟执行事务提交。

package com.lizba.p3;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * 		数据库连接驱动动态代理模拟类
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/17 16:57
 */
public class ConnectionDriver {

    private static final String COMMIT_OP = "commit";

    static class ConnectionHandler implements InvocationHandler {

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // commit 时睡眠200ms
            if (method.getName().equals(COMMIT_OP)) {
                TimeUnit.MILLISECONDS.sleep(200);
            }
            return null;
        }
    }

    public static final Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
                new Class<?>[] {Connection.class}, new ConnectionHandler());
    }
}

客户端测试:

客户端使用多线程模拟对数据库发起多个连接,并通过统计获取和未获取的次数来计算在不同线程池大小和客户端连接数的情况下,客户端从线程池获取线程的成功和失败的次数。

package com.lizba.p3;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * <p>
 * 		连接池测试,使用CountdownLatch确保connectionThread能同时执行,并发获取连接
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/17 17:51
 */
public class PoolTest {

    /** 初始化线程池 */
    private static ConnectionPool pool = new ConnectionPool(10);
    /** 保证所有线程都一起执行 */
    private static CountDownLatch start = new CountDownLatch(1);
    /** 保证main线程执行在所有线程执行完之后再往后执行 */
    private static CountDownLatch end;

    public static void main(String[] args) throws InterruptedException {
        // 定义获取连接的线程数量
        int threadSize = 10;
        // 定义每个线程获取连接的次数
        int count = 50;
        end = new CountDownLatch(threadSize);
        AtomicInteger getConnectionCount = new AtomicInteger();
        AtomicInteger notGetConnectionCount = new AtomicInteger();
        for (int i = 0; i < threadSize; i++) {
            Thread t = new Thread(new Runner(count, getConnectionCount, notGetConnectionCount), "connectionThread");
            t.start();
        }
        start.countDown();
        end.await();
        System.out.println("执行次数" + (threadSize * count));
        System.out.println("获取连接次数" + getConnectionCount);
        System.out.println("未获取连接次数" + notGetConnectionCount);
    }

    static class Runner implements Runnable {

        private int count;
        private AtomicInteger getConnectionCount;
        private AtomicInteger notGetConnectionCount;

        public Runner(int count, AtomicInteger getConnectionCount, AtomicInteger notGetConnectionCount) {
            this.count = count;
            this.getConnectionCount = getConnectionCount;
            this.notGetConnectionCount = notGetConnectionCount;
        }

        @Override
        public void run() {
            try {
                // 等待知道主线程执行完start.countDown();
                start.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 循环获取连接
            while (count > 0) {
                try {
                    Connection connection = pool.connection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            // 释放连接
                            pool.releaseConnection(connection);
                            // 记录获取的数量
                            getConnectionCount.incrementAndGet();
                        }
                    } else {
                        // 记录未获取到的数量
                        notGetConnectionCount.incrementAndGet();
                    }
                } catch (InterruptedException | SQLException e) {
                    e.printStackTrace();
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }

}

测试结果统计:

笔者CPU(AMD Ryzen 5 3600 6-Core Processor),内存16G。

线程数量

获取总次数

成功获取次数

失败获取次数

失败比率

10

500

500

0

0%

20

1000

891

109

10.9%

30

1500

1212

288

19.2%

40

2000

1482

518

25.9%


总结:

在数据量资源固定的情况下,随着并发连接数的上升,获取连接失败的比率会升高。此处超时连接的设计,在实际开发运用过程中,有利于减少程序阻塞时间,避免在连接一直不可用的情况下导致服务不可用,通过返回null,程序员在设计系统时可以做响应的容错措施或者服务降级等处理。



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。