`
QING____
  • 浏览: 2234153 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Socket通讯与多线程处理-完整实例

    博客分类:
  • JAVA
 
阅读更多

开发一段程序来模拟Socket 通讯和数据传输,socket通讯需要注意的几个问题:

  1. Socket API的合理使用,以及关于socket option参数的设计和优化
  2. socket交互中,有关阻塞/异常处理问题
  3. socket通讯对于server端而言,多线程或者线程池的合理使用
  4. 数据流成帧技术的使用
本段程序,如果在production环境使用,还有很多需要优化和适配的地方,如下仅作实例展示:
  1. SocketTestMain.java:引导类
  2. SocketPacket.java:用于封装流数据,适用流数据解析和传输,我们期望以封装性/面向对象的方式解决问题
  3. SoketInput.java:输入流解析和封装,将会在read操作之后获得
  4. SoketOutput.java:输出流解析和封装,将会在write操作时候发送.
SocketTestMain.java
package com.test.socket;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketTestMain {

	private static final int port = 30008;

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Thread server = new ServerThread();
		server.start();
		Thread client = new ClientThread();
		client.start();
		Thread.sleep(2000);
	}

	static class ServerThread extends Thread {
		@Override
		public void run() {
			try {
				ServerSocket socket = new ServerSocket();
				try{
					//socket.setSoTimeout(20000);
					socket.setReuseAddress(true);
					socket.bind(new InetSocketAddress(port));
					Socket accept = null;
					while ((accept = socket.accept()) != null) {
						accept.setTcpNoDelay(true);
						System.out.println("Connected Server");
						Thread thread = new HandlerThread(accept);
						thread.start();
					}
				}catch (Exception e) {
					e.printStackTrace();
				}finally{
					socket.close();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	static class ClientThread extends Thread {
		@Override
		public void run() {
			try {
				Socket socket = new Socket();
				socket.setSoTimeout(20000);
				//socket.setTcpNoDelay(true);
				socket.connect(new InetSocketAddress(port));
				if (!socket.isConnected()) {
					System.out.println("Connect error!!");
				}
				System.out.println("Connected");
				SocketInput input = new SocketInput(socket.getInputStream());
				SocketOutput output = new SocketOutput(socket.getOutputStream());
				int i = 0;
				while (true) {
					if (i < 5) {
						SocketPacket sp = new SocketPacket();
						String send = ">>>>>>" + System.currentTimeMillis();
						sp.append(send.getBytes());
						output.write(sp);
					} else {
						SocketPacket packet = new SocketPacket();
						packet.append("quit".getBytes());
						output.write(packet);
						socket.close();
						Thread.sleep(1000);
						return;
					}
					SocketPacket rp = input.receive();
					System.out.println("Client receive data:"
							+ new String(rp.drawAll()));
					i++;
					Thread.sleep(2000);
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	static class HandlerThread extends Thread {
		private Socket socket;

		HandlerThread(Socket socket) {
			this.socket = socket;
		}

		@Override
		public void run() {
			try {
				SocketInput input = new SocketInput(socket.getInputStream());
				SocketOutput output = new SocketOutput(socket.getOutputStream());
				while (true) {
					SocketPacket rp = input.receive();
					String receive =  new String(rp.drawAll());
					System.out.println("Server receives data : " + receive);
					if (receive.equalsIgnoreCase("quit")) {
						socket.close();
						return;
					}
					rp.clear();
					String data = "<<<<<<<< : " + System.currentTimeMillis();
					rp.append(data.getBytes());
					output.write(rp);
					Thread.sleep(2000);
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
}
 
SocketPacket.java:
package com.test.socket;

import java.nio.ByteBuffer;

public class SocketPacket {

	public static final String HEADER = "[MAGIC_HEADER]";
	public static final int MAX_SIZE = 2048;
	public static final int HB_SIZE = 14;//HEADER.getBytes().length
	private ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
	
	
	public void append(byte[] segment){
		buffer.put(segment);
	}
	
	/**
	 * 返回buffer中所有的可用数据
	 * @return
	 */
	public byte[] drawAll(){
		buffer.flip();
		byte[] all = new byte[buffer.limit()];
		buffer.get(all);
		buffer.clear();
		return all;
	}
	
	/**
	 * 获取buffer中可用的数据
	 *
	 * @param dst
	 * @return 获取到的byte数
	 */
	public int draw(byte[] dst){
		buffer.flip();
		int remaining = buffer.remaining();
		if(remaining > 0){
			buffer.get(dst);
			buffer.compact();//压缩
			buffer.position(buffer.limit());//重置position
		}
		buffer.limit(buffer.capacity());//重置limit
		//after this ,the buffer is ready for put.
		return remaining > dst.length ? dst.length : remaining;
	}
	
	public void clear(){
		buffer.clear();
	}
}
 
SocketOutput.java
package com.test.socket;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Adler32;

public class SocketOutput implements Closeable {

	private DataOutputStream innerOutput;
	
	public SocketOutput(OutputStream stream){
		innerOutput = new DataOutputStream(new BufferedOutputStream(stream,256));
	}
	
	public void write(SocketPacket packet) throws Exception{
		innerOutput.write(SocketPacket.HEADER.getBytes());
		byte[] data = packet.drawAll();
		Adler32 check = new Adler32();
		check.update(data);
		long cs = check.getValue();
		innerOutput.writeLong(cs);
		innerOutput.writeInt(data.length);
		innerOutput.write(data);
		innerOutput.flush();
	}

	@Override
	public void close() throws IOException {
		innerOutput.close();
		
	}
}
 
SocketInput.java
package com.test.socket;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.Adler32;
import java.util.zip.Checksum;

public class SocketInput implements Closeable{

	private DataInputStream innerInput;
	
	public SocketInput(InputStream stream){
		innerInput = new DataInputStream(new BufferedInputStream(stream, 256));
	}
	
	public SocketPacket receive() throws Exception{
		byte[] hb = new byte[SocketPacket.HB_SIZE];
		//check header
		innerInput.readFully(hb);
		String ph = new String(hb);
		if(!ph.equals(SocketPacket.HEADER)){
			throw new IOException("Data is miss,please reconnect server!");
		}
		//check checksum
		long cs = innerInput.readLong();
		//check data 
		int dl = innerInput.readInt();//data length;
		//get data
		byte[] data = new byte[dl];
		innerInput.readFully(data);
		Adler32 check = new Adler32();
		check.update(data);
		long ccs = check.getValue();
		if(ccs != cs){
			throw new IOException("Data is lost or modified,please reconnect server!");
		}
		SocketPacket packet = new SocketPacket();
		packet.append(data);
		return packet;
	}

	@Override
	public void close() throws IOException {
		innerInput.close();
		
	}
}
 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics