前言
前段时间公司一个物联网项目需要通过TCP连接设备收发消息,现在我把代码整理出来,分享一下。
源代码已发布在GitHub
创建Socket
使用ServerSocket
绑定IP
和端口,
TcpSocket
实现Java的Runnable
的类,在run
方法中使用Accept
监听端口是否有客户端发送连接请求,如果有连接来了就创建SocketReceive
对象然后将他扔给线程池执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.example.socket_demo.socket;
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
@Component @Slf4j public class TcpSocket implements Runnable { public Integer port; private ServerSocket server; private ExecutorService threadPool;
public TcpSocket() { try { port = 8081; threadPool = Executors.newCachedThreadPool(); server = new ServerSocket(port); } catch (Exception e) { log.error(e.getMessage()); } }
@Override public void run() { while (true) { try { Socket socket = server.accept(); if (socket != null) { SocketReceive socketReceive = new SocketReceive(socket); threadPool.submit(socketReceive); } } catch (IOException e) { e.printStackTrace(); } } } }
|
在线连接
AllClientsMap
类存放了所有的在线连接,通过hostAddress
为key
,Socket
为Value
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| package com.example.socket_demo.socket;
import lombok.extern.slf4j.Slf4j;
import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap;
@Slf4j public class AllClientsMap {
private static final ConcurrentMap<String, Socket> ALLCLIENTS = new ConcurrentHashMap<>();
public static ConcurrentMap<String, Socket> getAllClients() { return ALLCLIENTS; }
public static Socket getSocketByKey(String key) { return ALLCLIENTS.get(key); }
public static void put(String key, Socket socket) { ALLCLIENTS.put(key, socket); log.info("设备Key:{}========ip:{}已加入列表", key, socket.getInetAddress().getHostAddress()); }
public static void remove(String key) { ALLCLIENTS.remove(key); log.info("已移除设备Key:{}", key); }
public static int size() { log.info("当前设备数:{}", ALLCLIENTS.size()); return ALLCLIENTS.size(); }
public static void print() { log.info("当前设备列表信息:长度:{}", ALLCLIENTS.size()); ALLCLIENTS.forEach((key, socket) -> { log.info("设备Key:{}========ip:{}", key, socket.getInetAddress().getHostAddress()); }); }
public static boolean contains(String key) { return ALLCLIENTS.containsKey(key); } }
|
创建SocketReceive
在SocketReceive
类中,我们可以执行相关的接收消息,以及业务操作;在第64
行代码的位置,可以通过ApplicationContext
获取Spring Bean
执行业务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
| package com.example.socket_demo.socket;
import lombok.extern.slf4j.Slf4j;
import java.io.DataInputStream; import java.io.IOException; import java.io.OutputStream; import java.net.Socket;
@Slf4j public class SocketReceive implements Runnable { private Socket socket;
public SocketReceive() { }
public SocketReceive(Socket socket) { this.socket = socket; }
@Override public void run() { while (true) { if (null == socket) { log.info("socket为空"); return; } boolean isClosed = socket.isClosed(); String hostAddress = socket.getInetAddress().getHostAddress(); if (isClosed) { log.info("socket检测到关闭了"); if (AllClientsMap.contains(hostAddress)) { AllClientsMap.remove(hostAddress); AllClientsMap.print(); } return; } String hostAddress = socket.getInetAddress().getHostAddress(); try { DataInputStream in = new DataInputStream(socket.getInputStream()); byte[] bytes = new byte[1024]; int len = in.read(bytes); if (len == -1) { return; } byte[] bytes1 = new byte[len]; System.arraycopy(bytes, 0, bytes1, 0, len); log.info("客户端传的byte字节数组:" + printBytesByStringBuilder(bytes1)); String s = new String(bytes1); log.info("客户端传的byte字节数组转换成字符串打印:" + s); String data = byteArrayToHex(bytes1); log.info("接收的16进制数据:" + data); if(!AllClientsMap.contains(hostAddress)){ AllClientsMap.put(hostAddress, socket); AllClientsMap.print(); } log.debug("客户端" + hostAddress + "发送数据:{}", data); System.out.println("执行业务");
response(AllClientsMap.getSocketByKey(hostAddress), data);
} catch (IOException e) { if (AllClientsMap.contains(hostAddress)) { AllClientsMap.remove(hostAddress); AllClientsMap.print(); } try { socket.close(); log.error("{}断开连接", hostAddress); return; } catch (IOException ioException) { log.error(ioException.getMessage()); } } } }
public static String printBytesByStringBuilder(byte[] bytes) { StringBuilder stringBuilder = new StringBuilder(); for (byte aByte : bytes) { stringBuilder.append(byte2String(aByte)); } return stringBuilder.toString(); }
public static String byte2String(byte b) { return String.format("%02x ", b); }
public static void response(Socket socket, String msg) { log.debug("向设备IP:{}发送消息:{}", socket.getInetAddress().getHostAddress(), msg); OutputStream outputStream = null; try { outputStream = socket.getOutputStream(); outputStream.write(hexStringToByteArray(msg)); } catch (IOException e) { try { socket.close(); } catch (IOException ioException) { log.error(ioException.getMessage()); } log.error(e.getMessage()); } }
public static String byteArrayToHex(byte[] bytes) { StringBuilder result = new StringBuilder(); for (int index = 0, len = bytes.length; index <= len - 1; index += 1) { int char1 = ((bytes[index] >> 4) & 0xF); char chara1 = Character.forDigit(char1, 16); int char2 = ((bytes[index]) & 0xF); char chara2 = Character.forDigit(char2, 16); result.append(chara1); result.append(chara2); } return result.toString(); }
public static byte[] hexStringToByteArray(String hexString) { hexString = hexString.replaceAll(" ", ""); int len = hexString.length(); byte[] bytes = new byte[len / 2]; for (int i = 0; i < len; i += 2) { bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character .digit(hexString.charAt(i + 1), 16)); } return bytes; } }
|
启动
通过继承Spring
的InitializingBean
类,重写afterPropertiesSet
方法,这个方法将在所有的属性被初始化后调用。
然后会创建一个线程执行ServerSocket
的监听,初始化我们的TcpSocket
对象,一旦Server
接收到了连接请求后,会创建一个SocketReceive
对象将其扔给线程池执行,在线程池中的SocketReceive
对象可以通过ApplicationContext
获取Spring Bean
执行业务代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.example.socket_demo.socket;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class SpringFinishedListener implements InitializingBean { @Autowired private TcpSocket tcpsocket;
@Override public void afterPropertiesSet() { Thread serverThread = new Thread(tcpsocket); serverThread.start(); } }
|
测试工具
这里推荐一个测试工具,还挺好用的。下载连接