Hello everyone, I am Laokou, and let’s learn how to implement a UDP server using Vert.x together.
Implementing UDP
Vert.x Core Address
Note
UDP is a connectionless protocol, which means there is no persistent connection established with the remote client.
Therefore, the data packets you send and receive must include the remote address.
Additionally, UDP is not as secure as TCP, which means there is no guarantee that the sent packets will be received by the corresponding endpoint. This can lead to packet loss.
UDP is best suited for applications that can tolerate packet loss (such as monitoring applications and live video streaming).
Implementation Process
View Source Code
The code is quite simple, so I won’t explain it in detail.
The code is quite simple, so I won’t explain it in detail.
The code is quite simple, so I won’t explain it in detail.
Server Side
Include Dependencies
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>5.0.0</version>
</dependency>
UdpServerProperties
/**
* @author laokou
*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.udp-server")
public class UdpServerProperties {
private String host = "0.0.0.0";
private Set<Integer> ports = new HashSet<>(0);
private boolean broadcast = false;
private boolean loopbackModeDisabled = true;
private String multicastNetworkInterface = null;
private boolean ipV6 = false;
}
VertxUdpServer
/**
* @author laokou
*/
@Slf4j
public final class VertxUdpServer extends AbstractVerticle {
private volatile Flux<DatagramSocket> datagramSocket;
private final UdpServerProperties udpServerProperties;
private boolean isClosed = false;
VertxUdpServer(Vertx vertx, UdpServerProperties udpServerProperties) {
this.udpServerProperties = udpServerProperties;
this.vertx = vertx;
}
@Override
public synchronized void start() {
datagramSocket = Flux.fromIterable(udpServerProperties.getPorts()).map(port -> {
DatagramSocket datagramSocket = vertx.createDatagramSocket(getDatagramSocketOption())
.handler(packet -> log.info("【Vertx-UDP-Server】 => Received packet: {}", packet.data()));
datagramSocket.listen(port, udpServerProperties.getHost()).onComplete(result -> {
if (isClosed) {
return;
}
if (result.succeeded()) {
log.info("【Vertx-UDP-Server】 => UDP service started successfully, port: {}", port);
}
else {
Throwable ex = result.cause();
log.error("【Vertx-UDP-Server】 => UDP service failed to start, error message: {}", ex.getMessage(), ex);
}
});
return datagramSocket;
});
datagramSocket.subscribeOn(Schedulers.boundedElastic()).subscribe();
}
@Override
public synchronized void stop() {
isClosed = true;
datagramSocket.doOnNext(socket -> socket.close().onComplete(result -> {
if (result.succeeded()) {
log.info("【Vertx-UDP-Server】 => UDP service stopped successfully");
}
else {
Throwable ex = result.cause();
log.error("【Vertx-UDP-Server】 => UDP service failed to stop, error message: {}", ex.getMessage(), ex);
}
})).subscribeOn(Schedulers.boundedElastic()).subscribe();
}
public void deploy() {
// Deploy service
vertx.deployVerticle(this);
// Stop service
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
private DatagramSocketOptions getDatagramSocketOption() {
DatagramSocketOptions datagramSocketOptions = new DatagramSocketOptions();
datagramSocketOptions.setBroadcast(udpServerProperties.isBroadcast());
datagramSocketOptions.setLoopbackModeDisabled(udpServerProperties.isLoopbackModeDisabled());
datagramSocketOptions.setMulticastNetworkInterface(udpServerProperties.getMulticastNetworkInterface());
datagramSocketOptions.setIpV6(udpServerProperties.isIpV6());
return datagramSocketOptions;
}
}
VertxUdpServerManager
/**
* @author laokou
*/
public final class VertxUdpServerManager {
private VertxUdpServerManager() {
}
public static void deploy(final Vertx vertx, final UdpServerProperties properties) {
new VertxUdpServer(vertx, properties).deploy();
}
}
Client Side [Testing]
/**
* @author laokou
*/
@Slf4j
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class UdpTest {
private final Vertx vertx;
@Test
void test() throws InterruptedException {
for (int i = 4880; i < 5000; i++) {
DatagramSocket datagramSocket = vertx.createDatagramSocket();
int finalI = i;
datagramSocket.send("Hello Vert.x", i, "127.0.0.1").onComplete(result -> {
if (result.succeeded()) {
log.info("【Vertx-UDP-Client】 => Send successful, port: {}", finalI);
}
else {
Throwable ex = result.cause();
log.error("【Vertx-UDP-Client】 => Send failed, port: {}, error message: {}", finalI, ex.getMessage(), ex);
}
});
Thread.sleep(2000);
Assertions.assertDoesNotThrow(datagramSocket::close);
}
}
}
This can meet the basic protocol development needs; feel free to modify it as needed!!!
I am Laokou, see you next time!