Java并发编程基础之线程应用实战-基于线程池的简单web服务器

举报
李子捌 发表于 2021/10/19 14:28:29 2021/10/19
【摘要】 简介:目前浏览器作为web服务的客户端访问者,都支持并发多线程的访问。例如在浏览器访问一个web服务器上的HTML页面,此时HTML页面中的各种资源(图片、样式)会被浏览器并发的获取,这种并发访问使得用户不至于等待图片加载的同时也不能看到文字内容。客户端既然是多线程并发访问,那么如果服务端仅仅是单线程处理客户端的请求,那么客户端的并发访问将会变得毫无意义。因此,大部分的web服务器也是支持并...

简介:

目前浏览器作为web服务的客户端访问者,都支持并发多线程的访问。例如在浏览器访问一个web服务器上的HTML页面,此时HTML页面中的各种资源(图片、样式)会被浏览器并发的获取,这种并发访问使得用户不至于等待图片加载的同时也不能看到文字内容。

客户端既然是多线程并发访问,那么如果服务端仅仅是单线程处理客户端的请求,那么客户端的并发访问将会变得毫无意义。因此,大部分的web服务器也是支持并发访问的。常见的Java web服务器有Tomcat\Netty等等。

接下来我们通过结合线程池来写一个简单的web服务器,支持访问html(文本、图片)资源。


1、线程池实现:(有需要的看我上一章)

线程池接口定义

package com.lizba.p3.threadpool;

/**
 * <p>
 *      线程池接口
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/17 22:28
 */
public interface ThreadPool<Job extends Runnable> {

    /**
     * 执行一个Job,这个Job需要实现Runnable
     * @param job
     */
    void execute(Job job);

    /**
     * 关闭线程池
     */
    void shutdown();

    /**
     * 增加工作者线程
     * @param num
     */
    void addWorkers(int num);

    /**
     * 减少工作者线程
     * @param num
     */
    void removeWorkers(int num);

    /**
     * 得到正在等待执行的任务数量
     * @return
     */
    int getJobSize();
}

线程池具体实现:

package com.lizba.p3.threadpool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
 * <p>
 *
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/17 22:34
 */
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {

    /** 线程池最大工作者线程数量 */
    private static final int MAX_WORKER_SIZE = 20;
    /** 线程池默认工作者线程数量 */
    private static final int DEFAULT_WORKER_SIZE = 5;
    /** 线程池最小工作者线程数量 */
    private static final int MIN_WORKER_SIZE = 5;
    /** 工作队列,也称任务队列,用来存放客户端提交的任务 */
    private final LinkedList<Job> jobs = new LinkedList<>();
    /** 工作者列表,需要具有同步性质,支持并发操作 */
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    /** 工作线程的数量 */
    private int workerNum = DEFAULT_WORKER_SIZE;
    /** 线程编号生成器 */
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool() {
        initWorker(DEFAULT_WORKER_SIZE);
    }

    public DefaultThreadPool(int size) {
        initWorker(size);
    }

    /**
     * 初始化线程工作者,并启动
     *
     * @param size  初始化工作着大小
     */
    private void initWorker(int size) {
        for (int i = 0; i < size; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    @Override
    public void execute(Job job) {
        if (job != null) {
            // 添加一个任务,然后通知等待在jobs上的worker
            synchronized (jobs) {
                jobs.add(job);
                jobs.notifyAll();
            }
        }
    }

    @Override
    public void shutdown() {
        workers.forEach(worker -> worker.shutdown());
    }

    @Override
    public void addWorkers(int num) {
        // 此处要锁住jobs,因为worker会从jobs获取任务,需要jobs通知等待中的worker
        synchronized (jobs) {
            // 不允许工作者线程数操作最大值
            if (num + this.workerNum > MAX_WORKER_SIZE) {
                num = MAX_WORKER_SIZE - this.workerNum;
            }
            initWorker(num);
            this.workerNum += num;
        }
    }

    @Override
    public void removeWorkers(int num) {
        synchronized (jobs) {
            if (num > this.workerNum) {
                throw new IllegalArgumentException("超出工作者数目!");
            }

            int count = 0;
            while (count < num) {
                Worker worker = workers.get(count);
                // 如果移除成功则关闭工作者,工作者将不会继续获取任务执行
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
                this.workerNum -= count;
            }
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }



    /**
     * <p>
     *      工作者-负责消费客户端提交的任务
     * </p>
     *
     * @Author: Liziba
     * @Date: 2021/6/17 22:41
     */
    class Worker implements Runnable {

        /** 是否工作 */
        private volatile boolean running = Boolean.TRUE;

        @Override
        public void run() {
            while (running) {
                Job job = null;
                synchronized (jobs) {
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            // 如果感应到外部的中断通知,则自己主动中断返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 取出任务队列的第一个任务
                    job = jobs.removeFirst();
                }
                // 执行任务
                if (job != null) {
                    try {
                        job.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * 关闭worker,全部关闭意味着线程池关闭
         */
        public void shutdown() {
            running = false;
        }
    }
}


2、web服务器实现

其主要功能和实现如下

  1. 服务端监听客户端的socket连接
  2. 接收到的socket连接封装到HttpRequestHandler线程中,当成任务提交给线程池去调度执行
  1. HttpRequestHandler线程的run方法主要包含静态资源jpg图片的读取和输出(字节流),HTML文本读取和输出(字符流),关流等操作
package com.lizba.p3.http;



import com.lizba.p3.threadpool.DefaultThreadPool;
import com.lizba.p3.threadpool.ThreadPool;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * <p>
 * 简单HTTP服务器
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/18 11:47
 */
public class SimplerHttpServer {
	
    /** 设置线程池的默认大小 */
    private static ThreadPool<HttpRequestHandler> pool = new DefaultThreadPool<>(1);
    /** SimplerHttpServer根路径  */
    private static String basePath;
    /** 端口 */
    private static int port = 8888;
    /** serverSocket */
    private static ServerSocket serverSocket;

    public SimplerHttpServer(int port) {
        if (port < 0) {
            return;
        }
        SimplerHttpServer.port = port;
    }

    /**
     * 设置资源根路径
     *
     * @param basePath
     */
    public static void setBasePath(String basePath) {
        if (basePath == null || "".equals(basePath)) {
            return;
        }
        if (new File(basePath).exists() && new File(basePath).isDirectory()) {
            SimplerHttpServer.basePath = basePath;
        }
    }


    /**
     * 启动web服务
     *
     * @throws IOException
     */
    public static void start() throws IOException {
        serverSocket = new ServerSocket(port);

        Socket socket = null;
        while ((socket = serverSocket.accept()) != null) {
            pool.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }


    /**
     * 将socket请求封装成一个HttpRequestHandler线程任务,将任务提交给线程池
     *
     */
    static class HttpRequestHandler implements Runnable {

        private Socket socket;

        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {

            BufferedReader reader = null;
            BufferedReader br = null;
            PrintWriter out = null;
            InputStream in = null;
            String line;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                // 计算绝对路径
                String absolutePath = basePath + header.split(" ")[1];
                out = new PrintWriter(socket.getOutputStream());
                // 图片资源处理,此处只支持jpg
                if (absolutePath.endsWith("jpg")) {
                    in = new FileInputStream(absolutePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int len = 0;
                    while ((len = in.read()) != -1) {
                        baos.write(len);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Liziba");
                    out.println("Content-Type: image/jpeg");
                    out.println("Content-Length: " + array.length);
                    out.println("");

                    socket.getOutputStream().write(array, 0, array.length);
                } else {
                    // 其他资源例如HTML文本等资源(此处仅支持HTML文本资源)
                    br = new BufferedReader(new InputStreamReader(new FileInputStream(absolutePath)));
                    out = new PrintWriter(socket.getOutputStream());
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: Liziba");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println("");
                    while ((line = br.readLine()) != null) {
                        out.println(line);
                    }
                }
                out.flush();
            } catch (IOException e) {
                // 错误提示
                out.println("HTTP/1.1 500");
                out.println("");
                out.flush();
            } finally {
                close(br, in, reader, out, socket);
            }
        }

        /**
         * 关闭流
         * @param closeables
         */
        private static void close(Closeable... closeables) {
            if (closeables != null) {
                for (Closeable c : closeables) {
                    try {
                        if (c != null)
                            c.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

}

3、启动web服务

在启动服务前我们需要在本地提前准备资源,并将资源根目录指给SimplerHttpServer服务,我在D盘放了2张图和一个html文件。

HTML代码

<html>
	<head>web服务测试页面</head>
	<body align="center">
		<h1>图片1</h1>
		<img src="a.jpg" align="middle" />
		<h1>图片2</h1>
		<img src="b.jpg" align="middle" />
	</body>
</html>

浏览器访问效果(后来换成了纯文字,内容比较多,ab请求对HTML中的图片不加载,浏览器是可以加载的)


文件大概250KB


启动服务代码

package com.lizba.p3.http;

import java.io.IOException;

/**
 * <p>
 *      启动服务
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/6/18 21:40
 */
public class TestHttpServer {

    public static void main(String[] args) throws IOException {
        SimplerHttpServer.setBasePath("D:\\test");
        SimplerHttpServer.start();
    }

}


4、web服务用户请求时序图


5、测试

测试工具

Apache HTTP server benchmarking tool(ab),简单说明一下这个测试工具。ab是一个Apache Http服务器基准测试工具。它可以测试HTTP服务器每秒最多可以处理多少请求。如果测试的是web应用服务,这个结果可以装换成整个应用每秒可以满足多少请求。它的缺点是用途比较有限,只能针对单个URL进行尽可能快的压力测试。


测试内容

使用ab分10个线程发起5000请求,每次测试结束后改变线程池的大小,初始大小为1,测试主要观察的是SimplerHttpServer的响应时间和每秒完成的查询数量,笔者的机器(CPU(AMD Ryzen 5 3600 6-Core Processor),内存16G)。



请求指令(具体参数说明请看我的ab工具使用章节)

ab -c 10 -n 10000 http://localhost:8888/test.html

这个表示同时处理10个线程的并发请求,一共请求10000次


测试结果

通过修改线程池的大小,执行相同的测试语句来测试,web服务器的响应情况

线程池线程数量

1

5

10

20

响应时间(ms)

0.990

0.297

0.272

0.290

每秒查询数量

1010.08

3367.34

3677.18

3442.95

测试完成时间(s)

9.900

2.970

2.719

2.904

总结:

在上述测试结果中,可以发现随着线程池的线程数目的增加,SimpleHttpServer的吞吐量不断增加,响应时间不断减小,因此线程池的实际作用是十分明显的,但是我们看到线程池中的线程由10改变为20的时候,SimpleHttpServer的响应时间没有减少反而有些变大了,因此线程池中的线程数目也不是越多也好的,线程池中的线程过多,反而会给系统增加无故开销,适得其反。在实际开发中,我们要根据业务具体需求,硬件资源等情况来设置线程池的大小,必要的时候也可以实现线程池的动态伸缩。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。