`
QING____
  • 浏览: 2234132 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

JAVA-NIO程序设计完整实例

 
阅读更多

NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.

 

为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.

  1. TestMain.java:引导类
  2. ClientControllor.java:client连接处理类,负责队列化数据提交,并负责维护socket句柄.
  3. Packet.java:对于读取或者写入的buffer,进行二次封装,使其具有更好的可读性.
  4. ServerControllor.java:server端连接处理类,负责接收连接和数据处理
  5. ServerHandler.java:server端连接维护类.

TestMain.java:

 

package com.test.web;


public class TestMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
		int port = 30008;
		ServerControllor sc = new ServerControllor(port);
		sc.start();
		Thread.sleep(2000);
		ClientControllor cc = new ClientControllor("127.0.0.1", port);
		cc.start();
		Packet p1 = Packet.wrap("Hello,I am first!");
		cc.put(p1);
		Packet p2 = Packet.wrap("Hello,I am second!");
		cc.put(p2);
		Packet p3 = Packet.wrap("Hello,I am thread!");
		cc.put(p3);

	}

}

 

 

ClientControllor.java

 

 

package com.test.web;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.Adler32;
import java.util.zip.Checksum;

public class ClientControllor {

	private BlockingQueue<Packet> inner = new LinkedBlockingQueue<Packet>(100);//no any more
	private Object lock = new Object();
	private InetSocketAddress remote;
	private Thread thread = new ClientThread(remote);
	public ClientControllor(String host,int port){
		remote = new InetSocketAddress(host, port);
	}
	
	public void start(){
		if(thread.isAlive() || remote == null){
			return;
		}
		synchronized (lock) {
			thread.start();
		}
			
		
	}
	public boolean put(Packet packet){
		return inner.offer(packet);
	}
	
	public void clear(){
		inner.clear();
	}
	
	class ClientThread extends Thread {
		SocketAddress remote;
		SocketChannel channel;
		ClientThread(SocketAddress remote){
			this.remote = remote;
		}
		@Override
		public void run(){
			try{
				try{
					channel = SocketChannel.open();
					channel.configureBlocking(true);
					boolean isSuccess = channel.connect(new InetSocketAddress(30008));
					if(!isSuccess){
						while(!channel.finishConnect()){
							System.out.println("Client is connecting...");
						}
					}
					System.out.println("Client is connected.");
//					Selector selector = Selector.open();
//					channel.register(selector, SelectionKey.OP_WRITE);
//					while(selector.isOpen()){
//						selector.select();
//						Iterator<SelectionKey> it = selector.selectedKeys().iterator();
//						while(it.hasNext()){
//							SelectionKey key = it.next();
//							it.remove();
//							if(!key.isValid()){
//								continue;
//							}
//							if(key.isWritable()){
//								write();
//							}
//						}
//					}
					while(channel.isOpen()){
						write();
					}
				}catch(Exception e){
					e.printStackTrace();
				}finally{
					if(channel != null){
						try{
							channel.close();
						}catch(Exception ex){
							ex.printStackTrace();
						}
					}
				}
			}catch(Exception e){
				e.printStackTrace();
				inner.clear();
			}
		}
		
		private void write() throws Exception{
			Packet packet = inner.take();
			synchronized (lock) {
				ByteBuffer body = packet.getBuffer();//
				ByteBuffer head = ByteBuffer.allocate(4);
				head.putInt(body.limit());
				head.flip();
				while(head.hasRemaining()){
					channel.write(head);
				}
				Checksum checksum = new Adler32();
				while(body.hasRemaining()){
					checksum.update(body.get());
				}
				body.rewind();
				while(body.hasRemaining()){
					channel.write(body);
				}
				long cks = checksum.getValue();
				ByteBuffer tail = ByteBuffer.allocate(8);
				tail.putLong(cks);
				tail.flip();
				while(tail.hasRemaining()){
					channel.write(tail);
				}
			}
			
		}
	}
}

 

 

Handler.java(接口,面向设计):

 

package com.test.web;

import java.nio.channels.SocketChannel;

public interface Handler {

	public void handle(SocketChannel channel);
}

 

 

Packet.java

 

package com.test.web;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;

public class Packet implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 7719389291885063462L;
	
	private ByteBuffer buffer;
	
	private static Charset charset = Charset.defaultCharset();
	
	private Packet(ByteBuffer buffer){
		this.buffer = buffer;
	}
	
	public String getDataAsString(){
		return charset.decode(buffer).toString();
	}
	
	public byte[] getData(){
		return buffer.array();
	}
	
	public ByteBuffer getBuffer(){
		return this.buffer;
	}
	
	
	public static Packet wrap(ByteBuffer buffer){
		return new Packet(buffer);
	}
	
	public static Packet wrap(String data){
		ByteBuffer source = charset.encode(data);
		return new Packet(source);
	}
}

 

 

ServerControllor.java

 

package com.test.web;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class ServerControllor {
	private int port;
	private Thread thread = new ServerThread();;
	private Object lock = new Object();
	public ServerControllor(){
		this(0);
	}
	public ServerControllor(int port){
		this.port = port;
	}
	
	public void start(){
		if(thread.isAlive()){
			return;
		}
		synchronized (lock) {
			thread.start();
			System.out.println("Server starting....");
		}
	}
	
	
	class ServerThread extends Thread {
		private static final int TIMEOUT = 3000;
		private ServerHandler handler = new ServerHandler();
		@Override
		public void run(){
			try{
				ServerSocketChannel channel = null;
				try{
					channel = ServerSocketChannel.open();
					channel.configureBlocking(false);
					channel.socket().setReuseAddress(true);
					channel.socket().bind(new InetSocketAddress(port));
					Selector selector = Selector.open();
					channel.register(selector, SelectionKey.OP_ACCEPT);
					while(selector.isOpen()){
						System.out.println("Server is running,port:" + channel.socket().getLocalPort());
						if(selector.select(TIMEOUT) == 0){
							continue;
						}
						Iterator<SelectionKey> it = selector.selectedKeys().iterator();
						while(it.hasNext()){
							SelectionKey key = it.next();
							it.remove();
							if(!key.isValid()){
								continue;
							}
							if(key.isAcceptable()){
								accept(key);
							}else if(key.isReadable()){
								read(key);
							}
						}
					}
				}catch(Exception e){
					e.printStackTrace();
				}finally{
					if(channel != null){
						try{
							channel.close();
						}catch(Exception ex){
							ex.printStackTrace();
						}
					}
				}
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		
		private void accept(SelectionKey key) throws Exception{
			SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
			socketChannel.configureBlocking(true);
			//socketChannel.register(key.selector(), SelectionKey.OP_READ);
			handler.handle(socketChannel);
		}
		
		private void read(SelectionKey key) throws Exception{
			SocketChannel channel = (SocketChannel)key.channel();
			//handler.handle(channel);
		}
	}
}

 

 

ServerHandler.java

 

 

package com.test.web;

import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.zip.Adler32;
import java.util.zip.Checksum;

class ServerHandler implements Handler {

	private static Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors() + 1);
	
	private static Map<SocketChannel,Thread> holder = new HashMap<SocketChannel,Thread>(32);
	
	@Override
	public void handle(SocketChannel channel) {
		synchronized (holder) {
			if(holder.containsKey(channel)){
				return;
			}
			Thread t = new ReadThread(channel);
			holder.put(channel, t);
			t.start();
		}
	}
	
	
	static class ReadThread extends Thread{
		SocketChannel channel;
		ReadThread(SocketChannel channel){
			this.channel = channel;
		}
		@Override
		public void run(){
			try{
				semaphore.acquire();
				boolean eof = false;
				while(channel.isOpen()){
					//ByteBuffer byteBuffer = new ByteBuffer(1024);
					ByteBuffer head = ByteBuffer.allocate(4);//int for data-size
					while(true){
						int cb = channel.read(head);
						if(cb == -1){
							throw new RuntimeException("EOF error,data lost!");
						}
						if(isFull(head)){
							break;
						}
					}
					head.flip();
					int dataSize = head.getInt();
					if(dataSize <= 0){
						throw new RuntimeException("Data format error,something lost???");
					}
					ByteBuffer body = ByteBuffer.allocate(dataSize);
					while(true){
						int cb = channel.read(body);
						if(cb == -1){
							throw new RuntimeException("EOF error,data lost!");
						}else if(cb == 0 && this.isFull(body)){
							break;
						}
					}
					ByteBuffer tail = ByteBuffer.allocate(8);//int for data-size
					while(true){
						int cb = channel.read(tail);
						if(cb == -1){
							eof = true;
						}
						if(isFull(tail)){
							break;
						}
					}
					tail.flip();
					long sck = tail.getLong();
					Checksum checksum = new Adler32();
					checksum.update(body.array(), 0, dataSize);
					long cck = checksum.getValue();
					if(sck != cck){
						throw new RuntimeException("Sorry,some data lost or be modified,please check!");
					}
					body.flip();
					Packet packet = Packet.wrap(body);
					System.out.println(packet.getDataAsString());
					if(eof){
						break;
					}
				}
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				if(channel != null){
					try{
						channel.close();
					}catch(Exception ex){
						ex.printStackTrace();
					}
				}
				holder.remove(channel);
				semaphore.release();
			}
		}
		
		private boolean isFull(ByteBuffer byteBuffer){
			return byteBuffer.position() == byteBuffer.capacity() ? true : false;
		}
	}

}

 

 

--End--

  • web.rar (4.1 KB)
  • 下载次数: 21
分享到:
评论
2 楼 QING____ 2013-09-05  
qifan.yang 写道
文章写得不错,看了就要赞一个,希望能和博主交流交流

OK,欢迎交流,可以私信给我奥.
1 楼 qifan.yang 2013-09-03  
文章写得不错,看了就要赞一个,希望能和博主交流交流

相关推荐

    JAVA上百实例源码以及开源项目源代码

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    JAVA上百实例源码以及开源项目

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    java开源包10

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包1

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包4

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包101

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包6

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包11

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包9

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包8

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包5

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包3

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    netty-demo实例

    Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序dsf。 也就是说,Netty 是一个基于NIO的客户,服务器端编程框架...

    java开源包2

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包7

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    Java思维导图xmind文件+导出图片

    Java客户端实现Kafka生产者与消费者实例 kafka的副本机制及选举原理剖析 基于kafka实现应用日志实时上报统计分析 RabbitMQ 初步认识RabbitMQ及高可用集群部署 详解RabbitMQ消息分发机制及主题消息分发 ...

    Java资源包01

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    Java SE实践教程 源代码 下载

    1.2.2 体验Java程序开发 21 1.2.3 J2SE 5.0新特性实践 26 1.3 小结 35 第2章 对象无处不在——面向对象的基本概念 37 2.1 讲解 38 2.1.1 什么是面向对象 38 2.1.2 面向对象的基本概念 38 2.1.3 Java对面向...

Global site tag (gtag.js) - Google Analytics