Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
403 views
in Technique[技术] by (71.8m points)

10000 concurrent connection using Java NIO

I wrote a Server(similar to one here) and Client code using Java nio.

I am trying to achieve as many connections as possible. From previous suggestions I slowed down the process of Client creation giving OS(Windows 8) enough time to handle the requests.

I ran the Client code on different machine so that Server has all available space for running.

When I try to create 10,000 connections around 8500 are getting connected and rest are refused for connection and the refusal of connection for clients(threads in client code) happens more which are created later (for loop in Client code ).

My CPU and Memory usage go drastically high.I profiled to see most(48% of total CPU consumption) is consumed by select method (rest mostly by gui events) . Is it due to so many clients ? also I saw some people complaining about this bug in JRE7 and suggesting to use JRE6 .

Memory Usage are 2000+ MB for javaw.exe processes.(I noticed 1 process which was using low memory but had major CPU usage ).Overall usage we around 98% when all 8500 or so clients were connected. The system hanged too for many times but continued to service.I saw Non-page pooled memory usage increased during the process from 178 MB to 310 MB (what is the max limit ?).Is it because when we write to sockets Non-page pooled memory is used ?

Can anybody please tell which limits I might be hitting so the 10,000 successful connections are not possible ? (Socket per process limit ?)(Non-paged memory ?)(Backlog Queue again ?) Tweaks that might be able to allow limits to be pushed ? (Windows machine)

I am using Windows 8 on a 4GB system.

`

public class Server implements Runnable  {

public final static String ADDRESS = "192.168.2.14";

public final static int PORT = 8511;

public final static long TIMEOUT = 10000;

public int clients;

ByteBuffer readBuffer = ByteBuffer.allocate(1024);

private ServerSocketChannel serverChannel;

private Selector selector;

private Map<SocketChannel,byte[]> dataTracking = new HashMap<SocketChannel, byte[]>();

public Server(){
    init();
}

private void init(){
    System.out.println("initializing server");

    if (selector != null) return;
    if (serverChannel != null) return;

    try {
        selector = Selector.open();
        serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(new InetSocketAddress(ADDRESS, PORT));
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

@Override
public void run() {
    System.out.println("Now accepting connections...");
    try{
        while (!Thread.currentThread().isInterrupted()){

            int ready = selector.select();
            if(ready==0)
                continue;
            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

            while (keys.hasNext()){
                SelectionKey key = keys.next();
                keys.remove();
                if (!key.isValid()){
                    continue;
                }

                if (key.isAcceptable()){
                    System.out.println("Accepting connection");
                    accept(key);
                }

                if (key.isWritable()){
                    System.out.println("Writing...");
                    write(key);
                }

                if (key.isReadable()){
                    System.out.println("Reading connection");
                    read(key);
                }
            }
        }
    } catch (IOException e){
        e.printStackTrace();
    } finally{
        closeConnection();
    }

}

private void write(SelectionKey key) throws IOException{

    SocketChannel channel = (SocketChannel) key.channel();
    byte[] data = dataTracking.get(channel);
    dataTracking.remove(channel);
    **int count = channel.write(ByteBuffer.wrap(data));
    if(count == 0)
    {
        key.interestOps(SelectionKey.OP_WRITE);
        return;
    }
    else if(count > 0)
    {
        key.interestOps(0);
        key.interestOps(SelectionKey.OP_READ);  
    }** 
}

private void closeConnection(){

    System.out.println("Closing server down");
    if (selector != null){
        try {
            selector.close();
            serverChannel.socket().close();
            serverChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

private void accept(SelectionKey key) throws IOException{
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
    SocketChannel socketChannel = serverSocketChannel.accept();
    if(socketChannel == null)
    {
        throw new IOException();
    }
    socketChannel.configureBlocking(false);
     clients++;
    **//socketChannel.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ);
    SelectionKey skey = socketChannel.register(selector, SelectionKey.OP_READ);**

    byte[] hello = new String("Hello from server").getBytes();
    dataTracking.put(socketChannel, hello);
}

private void read(SelectionKey key) throws IOException{
    SocketChannel channel = (SocketChannel) key.channel();
    readBuffer.clear();
    int length;
    try {
        length = channel.read(readBuffer);
    } catch (IOException e) {
        System.out.println("Reading problem, closing connection");
        System.out.println("No of clients :"+clients);
        key.cancel();
        channel.close();
        return;
    }
    if (length == -1){
        System.out.println("Nothing was there to be read, closing connection");
        channel.close();
        key.cancel();
        return;
    }

    readBuffer.flip();
    byte[] data = new byte[1000];
    readBuffer.get(data, 0, length);
    String fromclient = new String(data,0,length,"UTF-8");
    System.out.println("Received: "+fromclient);
    String dat = fromclient+channel.getRemoteAddress();
    data= dat.getBytes();
    echo(key,data);
}

private void echo(SelectionKey key, byte[] data) throws IOException{
    SocketChannel socketChannel = (SocketChannel) key.channel();
    dataTracking.put(socketChannel, data);
    **//key.interestOps(SelectionKey.OP_WRITE);
    try
    {
        write(key);
    }
    catch(IOException e)
    {
        System.out.println("Problem in echo"+e);
        e.printStackTrace();
    }
}
public static void main(String [] args)
{
    Thread serv = new Thread(new Server());
    serv.start();
}

}

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
socketChannel.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ);

This is incorrect usage. Your selector will spin, as OP_WRITE is almost always ready, except at the rare times when the socket send buffer is full. This is why you're not processing OP_ACCEPT as fast as you could. You're busy processing OP_WRITE at times when you have nothing to write.

The correct way to use OP_WRITE is as follows:

  • Register a newly accepted channel for OP_READ only
  • When you have something to write to the channel, just write it
  • If that write returns zero, register the channel for OP_WRITE, save the ByteBuffer you were trying to write, and return to the select loop
  • When OP_WRITE fires on the channel, call write() with the same buffer
  • if that write succeeds and doesn't return zero, register OP_READ again, or at least remove OP_WRITE from the interestOps.

NB Closing a channel cancels its key. You don't need the cancel.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...