Java线程应用复杂实例
关于线程的六大状态NEW、RUNNABLE、WAITING、TIME_WAITING、BLOCKED、TERMINATED之间的切换,线程的中断标记及相关容易混淆的isInterrupted和Thread.interrupted函数的区别、利用wait、notify系列函数实现的等待/通知机制等内容默认大家都已经掌握了,我们重点关注几个复杂的线程应用实例。
1.等待超时模式 开发人员经常会遇到这样的方法调用场景:调用一个方法时等待一段时间,如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。等待/通知的经典范式,即加锁
、条件循环
和处理逻辑
3个步骤,这种范式无法做到超时等待。而超时等待的加入,只需要对经典范式做出非常小的改动,改动内容如下所示。
假设超时时间段是T,那么可以推断出在当前时间now+T之后就会超时。定义如下变量。
等待持续时间:REMAINING=T。
超时时间:FUTURE=now+T。
这时仅需要wait(REMAINING)即可,在wait(REMAINING)返回之后会将执行:REMAINING=FUTURE–now。如果REMAINING小于等于0,表示已经超时,直接退出,否则将继续执行wait(REMAINING)。
上述描述等待超时模式的伪代码如下。
1 2 3 4 5 6 7 8 9 10 11 public synchronized Object get (long mills) throws InterruptedException { long future = System.currentTimeMillis() + mills; long remaining = mills; while ((result == null ) && remaining > 0 ) { wait(remaining); remaining = future - System.currentTimeMillis(); } return result; }
可以看出,等待超时模式就是在等待/通知范式基础上增加了超时控制
,这使得该模式相比原有范式更具有灵活性
,因为即使方法执行时间过长,也不会永久
阻塞调用者,而是会按照调用者的要求按时
返回。
2.简单数据库连接池 我们使用等待超时模式
来构造一个简单的数据库连接池,在示例中模拟从连接池中获取、使用和释放连接的过程,而客户端获取连接的过程被设定为等待超时的模式,也就是在1000毫秒内如果无法获取到可用连接,将会返回给客户端一个null。设定连接池的大小为10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。
首先看一下连接池的定义。它通过构造函数初始化连接的最大上限,通过一个双向队列
来维护连接,调用方需要先调用fetchConnection(long)
方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用releaseConnection(Connection)
方法将连接放回线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList <>(); public ConnectionPool (int initialSize) { if (initialSize > 0 ) { for (int i = 0 ; i < initialSize; i++) { pool.addLast(ConnectionDriver.createConnection()); } } } public void releaseConnection (Connection connection) { if (connection != null ) { synchronized (pool) { pool.addLast(connection); pool.notifyAll(); } } } public Connection fetchConnection (long mills) throws InterruptedException { synchronized (pool) { if (mills <= 0 ) { while (pool.isEmpty()) { pool.wait(); } return pool.removeFirst(); } else { long future = System.currentTimeMillis() + mills; long remaining = mills; while (pool.isEmpty() && remaining > 0 ) { pool.wait(remaining); remaining = future - System.currentTimeMillis(); } Connection connection = null ; if (!pool.isEmpty()) { connection = pool.removeFirst(); } return connection; } } } }
由于java.sql.Connection
是一个接口,最终的实现是由数据库驱动提供方来实现的,考虑到只是个示例,我们通过动态代理
构造了一个Connection,该Connection的代理实现仅仅是在commit()方法调用时休眠100毫秒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ConnectionDriver { static class ConnectionHandler implements InvocationHandler { @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("commit" )) { TimeUnit.MILLISECONDS.sleep(100 ); } return null ; } } public static final Connection createConnection () { return (Connection) Proxy.newProxyInstance( ConnectionDriver.class.getClassLoader(), new Class []{Connection.class}, new ConnectionHandler ()); } }
下面通过一个示例来测试简易数据库连接池的工作情况,模拟客户端ConnectionRunner获取、使用、最后释放连接的过程,当它使用时连接将会增加获取到连接的数量,反之,将会增加未获取到连接的数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class ConnectionPoolTest { static ConnectionPool pool = new ConnectionPool (10 ); static CountDownLatch start = new CountDownLatch (1 ); static CountDownLatch end; public static void main (String[] args) throws InterruptedException { int threadCount = 10 ; end = new CountDownLatch (threadCount); int count = 20 ; AtomicInteger got = new AtomicInteger (); AtomicInteger notGot = new AtomicInteger (); for (int i = 0 ; i < threadCount; i++) { Thread thread = new Thread (new ConnectionRunner (count, got, notGot), "ConnectionRunnerThread" ); thread.start(); } start.countDown(); end.await(); System.out.println("total invoke: " + (threadCount * count)); System.out.println("got connection: " + got); System.out.println("not got connection " + notGot); } static class ConnectionRunner implements Runnable { int count; AtomicInteger got; AtomicInteger notGot; public ConnectionRunner (int count, AtomicInteger got, AtomicInteger notGot) { this .count = count; this .got = got; this .notGot = notGot; } @Override public void run () { try { start.await(); } catch (InterruptedException e) { } while (count > 0 ) { try { Connection connection = pool.fetchConnection(1000 ); if (connection != null ) { try { connection.createStatement(); connection.commit(); } finally { pool.releaseConnection(connection); got.incrementAndGet(); } } else { notGot.incrementAndGet(); } } catch (Exception e) { } finally { count--; } } end.countDown(); } } }
上述示例中使用了CountDownLatch来确保ConnectionRunnerThread能够同时开始执行,并且在全部结束之后,才使main线程从等待状态中返回。当前设定的场景是10个线程同时运行获取连接池(10个连接)中的连接,通过调节线程数量来观察未获取到连接的情况。线程数、总获取次数、获取到的数量、未获取到的数量以及未获取到的比率:
从表中的数据统计可以看出,在资源一定的情况下(连接池中的10个连接),随着客户端线程的逐步增加,客户端出现超时无法获取连接的比率不断升高
。虽然客户端线程在这种超时获取的模式下会出现连接无法获取的情况,但是它能够保证客户端线程不会一直挂在连接获取的操作上,而是按时
返回,并告知客户端连接获取出现问题,是系统的一种自我保护机制
。数据库连接池的设计也可以复用到其他的资源获取的场景,==针对昂贵资源(比如数据库连接)的获取都应该加以超时限制==。
3.线程池技术示例 对于服务端的程序,经常面对的是客户端传入的短小(执行时间短、工作内容较为单一)任务,需要服务端快速处理并返回结果。如果服务端每次接受到一个任务,创建一个线程,然后进行执行,这在原型阶段是个不错的选择,但是面对成千上万的任务递交进服务器时,如果还是采用一个任务一个线程的方式,那么将会创建数以万记的线程,这不是一个好的选择。因为这会使操作系统频繁的进行线程上下文切换,无故增加系统的负载,而线程的创建和消亡都是需要耗费系统资源的,也无疑浪费了系统资源。
线程池
技术能够很好地解决这个问题,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制
,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,==一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面,面对过量任务的提交能够平缓的劣化。==
下面先看一个简单的线程池接口定义:
1 2 3 4 5 6 7 8 9 10 11 12 public interface ThreadPool <Job extends Runnable > { void execute (Job job) ; void shutdown () ; void addWorkers (int num) ; void removeWorker (int num) ; int getJobSize () ; }
客户端可以通过execute(Job)方法将Job提交入线程池执行,而客户端自身不用等待Job的执行完成。除了execute(Job)方法以外,线程池接口提供了增大/减少工作者线程以及关闭线程池的方法。这里工作者线程代表着一个重复执行Job的线程,而每个由客户端提交的Job都将进入到一个工作队列
中等待工作者线程的处理。
接下来是线程池接口的默认实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 public class DefaultThreadPool <Job extends Runnable > implements ThreadPool <Job> { private static final int MAX_WORKER_NUMBERS = 10 ; private static final int DEFAULT_WORKER_NUMBERS = 5 ; private static final int MIN_WORKER_NUMBERS = 1 ; private final LinkedList<Job> jobs = new LinkedList <Job>(); private final List<Worker> workers = Collections.synchronizedList(new ArrayList <Worker>()); private int workerNum = DEFAULT_WORKER_NUMBERS; private AtomicLong threadNum = new AtomicLong (); public DefaultThreadPool () { initializeWorkers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool (int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : Math.max(num, MIN_WORKER_NUMBERS); initializeWorkers(num); } @Override public void execute (Job job) { if (job != null ) { synchronized (jobs) { jobs.addLast(job); jobs.notify(); } } } @Override public void shutdown () { for (Worker worker : workers) { worker.shutdown(); } } @Override public void addWorkers (int num) { synchronized (jobs) { if (num + this .workerNum > MAX_WORKER_NUMBERS) { num = MAX_WORKER_NUMBERS - this .workerNum; } initializeWorkers(num); this .workerNum += num; } } @Override public void removeWorker (int num) { synchronized (jobs) { if (num >= this .workerNum) { throw new IllegalArgumentException ("beyond workNum" ); } int count = 0 ; while (count < num) { Worker worker = workers.get(0 ); if (workers.remove(worker)) { worker.shutdown(); count++; } } this .workerNum -= count; } } @Override public int getJobSize () { return jobs.size(); } private void initializeWorkers (int num) { for (int i = 0 ; i < num; i++) { Worker worker = new Worker (); workers.add(worker); Thread thread = new Thread (worker, "ThreadPool-Worker-" + threadNum.incrementAndGet()); thread.start(); } } class Worker implements Runnable { private volatile boolean running = true ; @Override public void run () { while (running) { Job job = null ; synchronized (jobs) { while (jobs.isEmpty()) { try { jobs.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return ; } } job = jobs.removeFirst(); } if (job != null ) { try { job.run(); } catch (Exception e) { } } } } public void shutdown () { running = false ; } } }
从线程池的实现可以看到,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中添加Job,而每个工作者线程会不断地从jobs上取出一个Job进行执行,当jobs为空时,工作者线程进入等待状态。
添加一个Job后,对工作队列jobs调用了其notify()方法,而不是notifyAll()方法,因为能够确定有工作者线程被唤醒,这时使用notify()方法将会比notifyAll()方法获得更小的开销(避免将等待队列中的线程全部移动到同步队列中
)。
可以看到,==线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队列上取出工作并执行。==当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。
4.简单Web服务器 目前的浏览器都支持多线程访问,比如说在请求一个HTML页面的时候,页面中包含的图片资源、样式资源会被浏览器发起并发的获取,这样用户就不会遇到一直等到一个图片完全下载完成才能继续查看文字内容的尴尬情况。
如果Web服务器是单线程的,多线程的浏览器也没有用武之地,因为服务端还是一个请求一个请求的顺序处理。因此,大部分Web服务器都是支持并发访问
的。常用的Java Web服务器,如Tomcat、Jetty,在其处理请求的过程中都使用到了线程池技术
。
下面通过使用前一节中的线程池来构造一个简单的Web服务器,这个Web服务器用来处理HTTP请求,目前只能处理简单的文本和JPG图片内容。这个Web服务器使用main线程不断地接受客户端Socket的连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处理多个客户端请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 public class SimpleHttpServer { static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool <HttpRequestHandler>(1 ); static String basePath; static ServerSocket serverSocket; static int port = 8080 ; public static void setPort (int port) { if (port > 0 ) { SimpleHttpServer.port = port; } } public static void setBasePath (String basePath) { if (basePath != null && new File (basePath).exists() && new File (basePath). isDirectory()) { SimpleHttpServer.basePath = basePath; } } public static void start () throws Exception { serverSocket = new ServerSocket (port); Socket socket = null ; while ((socket = serverSocket.accept()) != null ) { threadPool.execute(new HttpRequestHandler (socket)); } serverSocket.close(); } static class HttpRequestHandler implements Runnable { private Socket socket; public HttpRequestHandler (Socket socket) { this .socket = socket; } @Override public void run () { String line = null ; BufferedReader br = null ; BufferedReader reader = null ; PrintWriter out = null ; InputStream in = null ; try { reader = new BufferedReader (new InputStreamReader (socket.getInputStream())); String header = reader.readLine(); String filePath = basePath + header.split(" " )[1 ]; out = new PrintWriter (socket.getOutputStream()); if (filePath.endsWith("jpg" ) || filePath.endsWith("ico" )) { in = new FileInputStream (filePath); ByteArrayOutputStream baos = new ByteArrayOutputStream (); int i = 0 ; while ((i = in.read()) != -1 ) { baos.write(i); } byte [] array = baos.toByteArray(); out.println("HTTP/1.1 200 OK" ); out.println("Server: Molly" ); out.println("Content-Type: image/jpeg" ); out.println("Content-Length: " + array.length); out.println("" ); socket.getOutputStream().write(array, 0 , array.length); } else { br = new BufferedReader (new InputStreamReader (new FileInputStream (filePath))); out = new PrintWriter (socket.getOutputStream()); out.println("HTTP/1.1 200 OK" ); out.println("Server: Molly" ); out.println("Content-Type: text/html; charset=UTF-8" ); out.println("" ); while ((line = br.readLine()) != null ) { out.println(line); } } out.flush(); } catch (Exception ex) { out.println("HTTP/1.1 500" ); out.println("" ); out.flush(); } finally { close(br, in, reader, out, socket); } } } private static void close (Closeable... closeables) { if (closeables != null ) { for (Closeable closeable : closeables) { try { closeable.close(); } catch (Exception ex) { } } } } }
SimpleHttpServer在建立了与客户端的连接之后,并不会处理客户端的请求,而是将其包装成HttpRequestHandler并交由线程池处理。在线程池中的Worker处理客户端请求的同时,SimpleHttpServer能够继续完成后续客户端连接的建立,不会阻塞后续客户端的请求。
接下来,通过一个测试对比来认识线程池技术带来服务器吞吐量的提高。我们准备了一个简单的HTML页面
1 2 3 4 5 6 7 8 9 10 11 12 13 <html > <head > <title > 测试页面</title > </head > <body > <h1 > 第一张图片</h1 > <img src ="1.jpg" /> <h1 > 第二张图片</h1 > <img src ="2.jpg" /> <h1 > 第三张图片</h1 > <img src ="3.jpg" /> </body > </html >
将SimpleHttpServer的根目录设定到该HTML页面所在目录,并启动SimpleHttpServer,通过Apache HTTP server benchmarking tool来测试不同线程数下,SimpleHttpServer的吞吐量表现。测试场景是5000次请求,分10个线程并发执行,测试内容主要考察响应时间(越小越好)和每秒查询的数量(越高越好):
可以看到,随着线程池中线程数量的增加,SimpleHttpServer的吞吐量不断增大,响应时间不断变小,线程池的作用非常明显。
但是,线程池中线程数量并不是越多越好
,==具体的数量需要评估每个任务的处理时间,以及当前计算机的处理器能力和数量==。使用的线程过少,无法发挥处理器的性能;使用的线程过多,将会增加系统的无故开销,起到相反的作用。