SpringBoot整合Socket

前言

前段时间公司一个物联网项目需要通过TCP连接设备收发消息,现在我把代码整理出来,分享一下。

源代码已发布在GitHub

创建Socket

使用ServerSocket绑定IP和端口,

TcpSocket实现Java的Runnable的类,在run方法中使用Accept监听端口是否有客户端发送连接请求,如果有连接来了就创建SocketReceive对象然后将他扔给线程池执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.example.socket_demo.socket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Administrator
*/
@Component
@Slf4j
public class TcpSocket implements Runnable {
public Integer port;
private ServerSocket server;
private ExecutorService threadPool;

public TcpSocket() {
try {
port = 8081;
threadPool = Executors.newCachedThreadPool();
server = new ServerSocket(port);
} catch (Exception e) {
log.error(e.getMessage());
}
}

@Override
public void run() {
while (true) {
try {
Socket socket = server.accept();
if (socket != null) {
SocketReceive socketReceive = new SocketReceive(socket);
threadPool.submit(socketReceive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

在线连接

AllClientsMap类存放了所有的在线连接,通过hostAddresskeySocketValue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.example.socket_demo.socket;


import lombok.extern.slf4j.Slf4j;

import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Slf4j
public class AllClientsMap {

/**
* 所有已连接设备
*/
private static final ConcurrentMap<String, Socket> ALLCLIENTS = new ConcurrentHashMap<>();

/**
* 返回设备列表
*
* @return
*/
public static ConcurrentMap<String, Socket> getAllClients() {
return ALLCLIENTS;
}

/**
* 通过key获取客户端
*
* @return
*/
public static Socket getSocketByKey(String key) {
return ALLCLIENTS.get(key);
}

/**
* 添加设备到列表
*
* @param key
* @param socket
*/
public static void put(String key, Socket socket) {
ALLCLIENTS.put(key, socket);
log.info("设备Key:{}========ip:{}已加入列表", key, socket.getInetAddress().getHostAddress());
}

/**
* 移除设备
*
* @param key
*/
public static void remove(String key) {
ALLCLIENTS.remove(key);
log.info("已移除设备Key:{}", key);
}

/**
* 返回已连接设备数量
*
* @return
*/
public static int size() {
log.info("当前设备数:{}", ALLCLIENTS.size());
return ALLCLIENTS.size();
}

/**
* 打印信息
*
* @return
*/
public static void print() {
log.info("当前设备列表信息:长度:{}", ALLCLIENTS.size());
ALLCLIENTS.forEach((key, socket) -> {
log.info("设备Key:{}========ip:{}", key, socket.getInetAddress().getHostAddress());
});
}

/**
* 是否包含
*
* @param key
* @return
*/
public static boolean contains(String key) {
return ALLCLIENTS.containsKey(key);
}
}

创建SocketReceive

SocketReceive类中,我们可以执行相关的接收消息,以及业务操作;在第64行代码的位置,可以通过ApplicationContext获取Spring Bean执行业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package com.example.socket_demo.socket;

import lombok.extern.slf4j.Slf4j;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

@Slf4j
public class SocketReceive implements Runnable {
private Socket socket;

public SocketReceive() {
}

public SocketReceive(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
while (true) {
if (null == socket) {
log.info("socket为空");
return;
}
boolean isClosed = socket.isClosed();
String hostAddress = socket.getInetAddress().getHostAddress();
if (isClosed) {
log.info("socket检测到关闭了");
if (AllClientsMap.contains(hostAddress)) {
AllClientsMap.remove(hostAddress);
AllClientsMap.print();
}
return;
}
String hostAddress = socket.getInetAddress().getHostAddress();
try {
//建立客户端信息输入流
DataInputStream in = new DataInputStream(socket.getInputStream());
//定义字节数组读取数据
byte[] bytes = new byte[1024];
int len = in.read(bytes);
if (len == -1) {
return;
}
//定义一个新数组copy,解决读取出来的数据字节不够全是0的问题
byte[] bytes1 = new byte[len];
System.arraycopy(bytes, 0, bytes1, 0, len);
log.info("客户端传的byte字节数组:" + printBytesByStringBuilder(bytes1));
String s = new String(bytes1);
log.info("客户端传的byte字节数组转换成字符串打印:" + s);
//转换hex数据
String data = byteArrayToHex(bytes1);
log.info("接收的16进制数据:" + data);
//如果服务端没有保存该socket
if(!AllClientsMap.contains(hostAddress)){
AllClientsMap.put(hostAddress, socket);
AllClientsMap.print();
}
log.debug("客户端" + hostAddress + "发送数据:{}", data);
//执行业务
System.out.println("执行业务");

//从map中获取客户端发送消息
response(AllClientsMap.getSocketByKey(hostAddress), data);

} catch (IOException e) {
if (AllClientsMap.contains(hostAddress)) {
AllClientsMap.remove(hostAddress);
AllClientsMap.print();
}
try {
socket.close();
log.error("{}断开连接", hostAddress);
return;
} catch (IOException ioException) {
log.error(ioException.getMessage());
}
}
}
}


/**
* 根据字节数组,输出对应的格式化字符串
*
* @param bytes 字节数组
* @return 字节数组字符串
*/
public static String printBytesByStringBuilder(byte[] bytes) {
StringBuilder stringBuilder = new StringBuilder();
for (byte aByte : bytes) {
stringBuilder.append(byte2String(aByte));
}
return stringBuilder.toString();
}

public static String byte2String(byte b) {
return String.format("%02x ", b);
}

/**
* 向socket发送消息
*
* @param socket 对应socket
* @param msg 消息
*/
public static void response(Socket socket, String msg) {
log.debug("向设备IP:{}发送消息:{}", socket.getInetAddress().getHostAddress(), msg);
OutputStream outputStream = null;
try {
outputStream = socket.getOutputStream();
outputStream.write(hexStringToByteArray(msg));
} catch (IOException e) {
try {
socket.close();
} catch (IOException ioException) {
log.error(ioException.getMessage());
}
log.error(e.getMessage());
}
}

/**
* 字节数组转字符串
*
* @param bytes
* @return
*/
public static String byteArrayToHex(byte[] bytes) {
StringBuilder result = new StringBuilder();
for (int index = 0, len = bytes.length; index <= len - 1; index += 1) {
int char1 = ((bytes[index] >> 4) & 0xF);
char chara1 = Character.forDigit(char1, 16);
int char2 = ((bytes[index]) & 0xF);
char chara2 = Character.forDigit(char2, 16);
result.append(chara1);
result.append(chara2);
}
return result.toString();
}


/**
* 16进制表示的字符串转换为字节数组
*
* @param hexString 16进制表示的字符串
* @return byte[] 字节数组
*/
public static byte[] hexStringToByteArray(String hexString) {
hexString = hexString.replaceAll(" ", "");
int len = hexString.length();
byte[] bytes = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
// 两位一组,表示一个字节,把这样表示的16进制字符串,还原成一个字节
bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character
.digit(hexString.charAt(i + 1), 16));
}
return bytes;
}
}

启动

通过继承SpringInitializingBean类,重写afterPropertiesSet方法,这个方法将在所有的属性被初始化后调用。
然后会创建一个线程执行ServerSocket的监听,初始化我们的TcpSocket对象,一旦Server接收到了连接请求后,会创建一个SocketReceive对象将其扔给线程池执行,在线程池中的SocketReceive对象可以通过ApplicationContext获取Spring Bean执行业务代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.example.socket_demo.socket;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SpringFinishedListener implements InitializingBean {
@Autowired
private TcpSocket tcpsocket;

@Override
public void afterPropertiesSet() {
Thread serverThread = new Thread(tcpsocket);
serverThread.start();
}
}

测试工具

这里推荐一个测试工具,还挺好用的。下载连接


SpringBoot整合Socket
https://www.songhaozhi.com/2022/10/27/SpringBoot整合Socket/
Beitragsautor
宋浩志
Veröffentlicht am
October 27, 2022
Urheberrechtshinweis