Java7NIO2高性能Web服务器并发连接的处理

本文来自DoubleH的BlogJava博客,原文标题为《基于JDK7 NIO2的高性能web服务器实践之二》。该主题的第一篇博文可在这里阅读。

推荐专题:Java 7 下一代Java开发技术详解

前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点。

不过JDK7中又一次做了简单的实现,不支持同时投递多个AcceptEx请求,只支持一次一个,返回后再投递。这样,客户端连接的接受速度必然大打折扣。不知道为什么sun会做这样的实现,WSASend()/WSAReceive()一次只允许一个还是可以理解,毕竟简化了编程,不用考虑封包乱序问题。

也降低了内存耗尽的风险。AcceptEx却没有这样的理由了。

于是再一次为了性能,我增加了同时投递多个的支持。

另外,在JDK7的默认实现中,AcceptEx返回后,为了设置远程和本地InetSocketAddress也采用了效率很低的方法。4次通过JNI调用getsockname,2次为了取sockaddr,2次为了取port. 这些操作本人采用GetAcceptExSockaddrs一次完成,进一步提高效率。

先看Java部分的代码,框架跟JDK7的一样,细节处理不一样:

 
 
 
  1. /**  
  2.  *   
  3.  */ 
  4. package sun.nio.ch;  
  5.  
  6. import java.io.IOException;  
  7. import java.lang.reflect.Field;  
  8. import java.lang.reflect.Method;  
  9. import java.net.InetAddress;  
  10. import java.net.InetSocketAddress;  
  11. import java.nio.channels.AcceptPendingException;  
  12. import java.nio.channels.AsynchronousCloseException;  
  13. import java.nio.channels.AsynchronousServerSocketChannel;  
  14. import java.nio.channels.AsynchronousSocketChannel;  
  15. import java.nio.channels.ClosedChannelException;  
  16. import java.nio.channels.CompletionHandler;  
  17. import java.nio.channels.NotYetBoundException;  
  18. import java.nio.channels.ShutdownChannelGroupException;  
  19. import java.security.AccessControlContext;  
  20. import java.security.AccessController;  
  21. import java.security.PrivilegedAction;  
  22. import java.util.Queue;  
  23. import java.util.concurrent.ConcurrentLinkedQueue;  
  24. import java.util.concurrent.Future;  
  25. import java.util.concurrent.atomic.AtomicBoolean;  
  26. import java.util.concurrent.atomic.AtomicInteger;  
  27.  
  28. import sun.misc.Unsafe;  
  29.  
  30. /**  
  31.  * This class enable multiple 'AcceptEx' post on the completion port, hence improve the concurrent connection number.  
  32.  * @author Yvon  
  33.  *  
  34.  */ 
  35. public class WindowsMultiAcceptSupport {  
  36.  
  37.     WindowsAsynchronousServerSocketChannelImpl schannel;  
  38.  
  39.     private static final Unsafe unsafe = Unsafe.getUnsafe();  
  40.  
  41.     // 2 * (sizeof(SOCKET_ADDRESS) + 16)  
  42.     private static final int ONE_DATA_BUFFER_SIZE = 88;  
  43.  
  44.     private long handle;  
  45.     private Iocp iocp;  
  46.  
  47.     // typically there will be zero, or one I/O operations pending. In rare  
  48.     // cases there may be more. These rare cases arise when a sequence of accept  
  49.     // operations complete immediately and handled by the initiating thread.  
  50.     // The corresponding OVERLAPPED cannot be reused/released until the completion  
  51.     // event has been posted.  
  52.     private PendingIoCache ioCache;  
  53.  
  54.     private Queue dataBuffers;  
  55.     // the data buffer to receive the local/remote socket address  
  56.     //        private final long dataBuffer;  
  57.  
  58.     private AtomicInteger pendingAccept;  
  59.     private int maxPending;  
  60.  
  61.     Method updateAcceptContextM;  
  62.     Method acceptM;  
  63.  
  64.     WindowsMultiAcceptSupport() {  
  65.         //dummy for JNI code.  
  66.     }  
  67.  
  68.     public void close() throws IOException {  
  69.  
  70.         schannel.close();  
  71.  
  72.         for (int i = 0; i < maxPending + 1; i++)//assert there is maxPending+1 buffer in the queue  
  73.         {  
  74.             long addr = dataBuffers.poll();  
  75.             // release  resources  
  76.             unsafe.freeMemory(addr);  
  77.         }  
  78.  
  79.     }  
  80.  
  81.     /**  
  82.      *   
  83.      */ 
  84.     public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {  
  85.         if (maxPost <= 0 || maxPost > 1024)  
  86.             throw new IllegalStateException("maxPost can't less than 1 and greater than 1024");  
  87.         this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;  
  88.         maxPending = maxPost;  
  89.         dataBuffers = new ConcurrentLinkedQueue();  
  90.         for (int i = 0; i < maxPending + 1; i++) {  
  91.             dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));  
  92.         }  
  93.  
  94.         pendingAccept = new AtomicInteger(0);  
  95.         try {  
  96.             Field f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");  
  97.             f.setAccessible(true);  
  98.             handle = f.getLong(schannel);  
  99.  
  100.  
  101.             f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");  
  102.             f.setAccessible(true);  
  103.             iocp = (Iocp) f.get(schannel);  
  104.  
  105.             f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");  
  106.             f.setAccessible(true);  
  107.             ioCache = (PendingIoCache) f.get(schannel);  
  108.  
  109.             f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");  
  110.             f.setAccessible(true);  
  111.             AtomicBoolean accepting = (AtomicBoolean) f.get(schannel);  
  112.  
  113.             accepting.set(true);//disable accepting by origin channel.  
  114.  
  115.         } catch (Exception e) {  
  116.             e.printStackTrace();  
  117.         }  
  118.  
  119.     }  
  120.  
  121.     @SuppressWarnings("unchecked")  
  122.     public final  void accept(A attachment,  
  123.         CompletionHandler handler) {  
  124.         if (handler == null)  
  125.             throw new NullPointerException("'handler' is null");  
  126.         implAccept(attachment, (CompletionHandler) handler);  
  127.     }  
  128.  
  129.     /**  
  130.      * Task to initiate accept operation and to handle result.  
  131.      */ 
  132.     private class AcceptTask implements Runnable, Iocp.ResultHandler {  
  133.  
  134.         private final WindowsAsynchronousSocketChannelImpl channel;  
  135.         private final AccessControlContext acc;  
  136.         private final PendingFuture result;  
  137.         private final long dataBuffer;  
  138.  
  139.         AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,  
  140.             long dataBuffer, PendingFuture result) {  
  141.             this.channel = channel;  
  142.             this.acc = acc;  
  143.             this.result = result;  
  144.             this.dataBuffer = dataBuffer;  
  145.         }  
  146.  
  147.         void enableAccept() {  
  148.             pendingAccept.decrementAndGet();  
  149.             dataBuffers.add(dataBuffer);  
  150.         }  
  151.  
  152.         void closeChildChannel() {  
  153.             try {  
  154.                 channel.close();  
  155.             } catch (IOException ignore) {  
  156.             }  
  157.         }  
  158.  
  159.         // caller must have acquired read lock for the listener and child channel.  
  160.         void finishAccept() throws IOException {  
  161.             /**  
  162.              * JDK7 use 4 calls to getsockname  to setup  
  163.              * local& remote address, this is very inefficient.  
  164.              *   
  165.              * I change this to use GetAcceptExSockaddrs  
  166.              */ 
  167.  
  168.             InetAddress[] socks = new InetAddress[2];  
  169.             int[] ports = new int[2];  
  170.             updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);  
  171.             InetSocketAddress local = new InetSocketAddress(socks[0], ports[0]);  
  172.             final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);  
  173.             channel.setConnected(local, remote);  
  174.  
  175.             // permission check (in context of initiating thread)  
  176.             if (acc != null) {  
  177.                 AccessController.doPrivileged(new PrivilegedAction() {  
  178.  
  179.                     public Void run() {  
  180.                         SecurityManager sm = System.getSecurityManager();  
  181.                         sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());  
  182.  
  183.                         return null;  
  184.                     }  
  185.                 }, acc);  
  186.             }  
  187.         }  
  188.  
  189.         /**  
  190.          * Initiates the accept operation.  
  191.          */ 
  192.         @Override 
  193.         public void run() {  
  194.             long overlapped = 0L;  
  195.  
  196.             try {  
  197.                 // begin usage of listener socket  
  198.                 schannel.begin();  
  199.                 try {  
  200.                     // begin usage of child socket (as it is registered with  
  201.                     // completion port and so may be closed in the event that  
  202.                     // the group is forcefully closed).  
  203.                     channel.begin();  
  204.  
  205.                     synchronized (result) {  
  206.                         overlapped = ioCache.add(result);  
  207.  
  208.                         
  209.                         int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address  
  210.                         if (n == IOStatus.UNAVAILABLE) {  
  211.                             return;  
  212.                         }  
  213.  
  214.                         // connection accepted immediately  
  215.                         finishAccept();  
  216.  
  217.                         // allow another accept before the result is set  
  218.                         enableAccept();  
  219.                         result.setResult(channel);  
  220.                     }  
  221.                 } finally {  
  222.                     // end usage on child socket  
  223.                     channel.end();  
  224.                 }  
  225.             } catch (Throwable x) {  
  226.                 // failed to initiate accept so release resources  
  227.                 if (overlapped != 0L)  
  228.                     ioCache.remove(overlapped);  
  229.                 closeChildChannel();  
  230.                 if (x instanceof ClosedChannelException)  
  231.                     x = new AsynchronousCloseException();  
  232.                 if (!(x instanceof IOException) && !(x instanceof SecurityException))  
  233.                     x = new IOException(x);  
  234.                 enableAccept();  
  235.                 result.setFailure(x);  
  236.             } finally {  
  237.                 // end of usage of listener socket  
  238.                 schannel.end();  
  239.             }  
  240.  
  241.             // accept completed immediately but may not have executed on  
  242.             // initiating thread in which case the operation may have been  
  243.             // cancelled.  
  244.             if (result.isCancelled()) {  
  245.                 closeChildChannel();  
  246.             }  
  247.  
  248.             // invoke completion handler  
  249.             Invoker.invokeIndirectly(result);  
  250.         }  
  251.  
  252.         /**  
  253.          * Executed when the I/O has completed  
  254.          */ 
  255.         @Override 
  256.         public void completed(int bytesTransferred, boolean canInvokeDirect) {  
  257.             try {  
  258.                 // connection accept after group has shutdown  
  259.                 if (iocp.isShutdown()) {  
  260.                     throw new IOException(new ShutdownChannelGroupException());  
  261.                 }  
  262.  
  263.                 // finish the accept  
  264.                 try {  
  265.                     schannel.begin();  
  266.                     try {  
  267.                         channel.begin();  
  268.                         finishAccept();  
  269.                     } finally {  
  270.                         channel.end();  
  271.                     }  
  272.                 } finally {  
  273.                     schannel.end();  
  274.                 }  
  275.  
  276.                 // allow another accept before the result is set  
  277.                 enableAccept();  
  278.                 result.setResult(channel);  
  279.             } catch (Throwable x) {  
  280.                 enableAccept();  
  281.                 closeChildChannel();  
  282.                 if (x instanceof ClosedChannelException)  
  283.                     x = new AsynchronousCloseException();  
  284.                 if (!(x instanceof IOException) && !(x instanceof SecurityException))  
  285.                     x = new IOException(x);  
  286.                 result.setFailure(x);  
  287.             }  
  288.  
  289.             // if an async cancel has already cancelled the operation then  
  290.             // close the new channel so as to free resources  
  291.             if (result.isCancelled()) {  
  292.                 closeChildChannel();  
  293.             }  
  294.  
  295.             // invoke handler (but not directly)  
  296.             Invoker.invokeIndirectly(result);  
  297.         }  
  298.  
  299.         @Override 
  300.         public void failed(int error, IOException x) {  
  301.             enableAccept();  
  302.             closeChildChannel();  
  303.  
  304.             // release waiters  
  305.             if (schannel.isOpen()) {  
  306.                 result.setFailure(x);  
  307.             } else {  
  308.                 result.setFailure(new AsynchronousCloseException());  
  309.             }  
  310.             Invoker.invokeIndirectly(result);  
  311.         }  
  312.     }  
  313.  
  314.     Future implAccept(Object attachment,  
  315.         final CompletionHandler handler) {  
  316.         if (!schannel.isOpen()) {  
  317.             Throwable exc = new ClosedChannelException();  
  318.             if (handler == null)  
  319.                 return CompletedFuture.withFailure(exc);  
  320.             Invoker.invokeIndirectly(schannel, handler, attachment, null, exc);  
  321.             return null;  
  322.         }  
  323.         if (schannel.isAcceptKilled())  
  324.             throw new RuntimeException("Accept not allowed due to cancellation");  
  325.  
  326.         // ensure channel is bound to local address  
  327.         if (schannel.localAddress == null)  
  328.             throw new NotYetBoundException();  
  329.  
  330.         // create the socket that will be accepted. The creation of the socket  
  331.         // is enclosed by a begin/end for the listener socket to ensure that  
  332.         // we check that the listener is open and also to prevent the I/O  
  333.         // port from being closed as the new socket is registered.  
  334.         WindowsAsynchronousSocketChannelImpl ch = null;  
  335.         IOException ioe = null;  
  336.         try {  
  337.             schannel.begin();  
  338.             ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);  
  339.         } catch (IOException x) {  
  340.             ioe = x;  
  341.         } finally {  
  342.             schannel.end();  
  343.         }  
  344.         if (ioe != null) {  
  345.             if (handler == null)  
  346.                 return CompletedFuture.withFailure(ioe);  
  347.             Invoker.invokeIndirectly(this.schannel, handler, attachment, null, ioe);  
  348.             return null;  
  349.         }  
  350.  
  351.         // need calling context when there is security manager as  
  352.         // permission check may be done in a different thread without  
  353.         // any application call frames on the stack  
  354.         AccessControlContext acc =  
  355.             (System.getSecurityManager() == null) ? null : AccessController.getContext();  
  356.  
  357.         PendingFuture result =  
  358.             new PendingFuture(schannel, handler, attachment);  
  359.  
  360.         // check and set flag to prevent concurrent accepting  
  361.         if (pendingAccept.get() >= maxPending)  
  362.             throw new AcceptPendingException();  
  363.         pendingAccept.incrementAndGet();  
  364.         AcceptTask task = new AcceptTask(ch, acc, dataBuffers.poll(), result);  
  365.         result.setContext(task);  
  366.  
  367.         // initiate I/O  
  368.         if (Iocp.supportsThreadAgnosticIo()) {  
  369.             task.run();  
  370.         } else {  
  371.             Invoker.invokeOnThreadInThreadPool(this.schannel, task);  
  372.         }  
  373.         return result;  
  374.     }  
  375.  
  376.     //    //reimplements for performance  
  377.     static native void updateAcceptContext(long listenSocket, long acceptSocket,  
  378.         InetAddress[] addresses, int[] ports, long dataBuffer) throws IOException;  
  379.  
  380.     static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);  
  381.  
  382. }  
  383.  

对应的CPP代码如下:

 
 
 
  1. /*  
  2.  * Class:     sun_nio_ch_WindowsMultiAcceptSupport  
  3.  * Method:    updateAcceptContext  
  4.  * Signature: (JJ[Ljava/net/InetAddress;[IJ)V  
  5.  */ 
  6. JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext  
  7. (JNIEnv *env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)  
  8. {  
  9.     SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);  
  10.     SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);  
  11.     PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);  
  12.     INT iLocalAddrLen=0;  
  13.     INT iRemoteAddrLen=0;  
  14.     SOCKETADDRESS* lpLocalAddr;  
  15.     SOCKETADDRESS* lpRemoteAddr;  
  16.     jobject localAddr;  
  17.     jobject remoteAddr;  
  18.     jint ports[2]={0};  
  19.  
  20.       
  21.  
  22.     setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&s1, sizeof(s1));  
  23.  
  24.     (lpGetAcceptExSockaddrs)(outputBuffer,  
  25.         0,  
  26.         sizeof(SOCKETADDRESS)+16,  
  27.         sizeof(SOCKETADDRESS)+16,  
  28.         (LPSOCKADDR*)&lpLocalAddr,  
  29.         &iLocalAddrLen,  
  30.         (LPSOCKADDR*)&lpRemoteAddr,  
  31.         &iRemoteAddrLen);  
  32.  
  33.     localAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);  
  34.     remoteAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));  
  35.  
  36.     env->SetObjectArrayElement(sockArray,0,localAddr);  
  37.     env->SetObjectArrayElement(sockArray,1,remoteAddr);  
  38.     env->SetIntArrayRegion(portArray,0,2,ports);  
  39.  
  40. }  
  41.  
  42. /*  
  43.  * Class:     sun_nio_ch_WindowsMultiAcceptSupport  
  44.  * Method:    accept0  
  45.  * Signature: (JJJJ)I  
  46.  */ 
  47. jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0  
  48.   (JNIEnv *env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)  
  49. {  
  50.  
  51.     BOOL res;  
  52.     SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);  
  53.     SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);  
  54.     PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);  
  55.  
  56.     DWORD nread = 0;  
  57.     OVERLAPPED* lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);  
  58.     ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));  
  59.  
  60.       
  61.  
  62.     //why use SOCKETADDRESS?  
  63.     //because client may use IPv6 to connect to server.  
  64.     res = (lpAcceptEx)(s1,  
  65.         s2,  
  66.         outputBuffer,  
  67.         0,  
  68.         sizeof(SOCKETADDRESS)+16,  
  69.         sizeof(SOCKETADDRESS)+16,  
  70.         &nread,  
  71.       

    分享名称:Java7NIO2高性能Web服务器并发连接的处理
    标题来源:http://www.csdahua.cn/qtweb/news1/354201.html

    网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

    广告

    声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网