Dynamically assigned UDP port scanning
Occasionally, there is a situation where data transmission and reception is impossible due to a firewall in server-to-server communication. This is a simple Python program that can test whether UDP communication is possible.
I work for a VoIP product development company. The SIP protocol mainly used in VoIP (Voice Over IP) mainly uses UDP. In particular, RTP (RealTime Protocol) used to transmit and receive voice data requires many UDP ports. One or two UDP ports are used for one voice channel. Therefore, if the VoIP solution supports 1000 voice channels at the same time, 1000 ~ 2000 UDP ports can be used at the same time.
Therefore, it is very important to determine whether the UDP port is serviceable.
In general, nmap is often used to check whether a port is open. However, nmap is not appropriate in this case.
The reason is that in VoIP, a UDP port for voice transmission/reception is randomly selected from a specific range and used.
For example, if there is a VoIP product set to use UDP port 20000 ~ 30000, it does not always occupy the UDP port in this area, but searches for available ports in this band and uses it when a new voice channel is created. So you cannot use a solution like nmap.
Therefore, it is necessary to have a program that opens a specific port or port of a certain band so that a foot scan program such as nmap can scan the port.
Single Port Test
This is a simple udp echo server/client program.
import argparse import socket import time bind_ip = "" parser = argparse.ArgumentParser(description='UDP Test') parser.add_argument('--mode', type=str, default="C") parser.add_argument('--host', type=str, default='127.0.0.1') parser.add_argument('--port', type=int, default=5001) parser.add_argument('--lport', type=int, default=5001) args = parser.parse_args() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) print('Binding [%s] [%d]'%(bind_ip, args.lport)) sock.bind((bind_ip, args.lport)) print('Binding Success') msg = "Hello World" if args.mode == "C" : sock.settimeout(10.0) print('Client Mode =>Send UDP packet') for x in range(2): print("Snd:" + msg + " To:%s:%d"%(args.host, args.port)) try: size = sock.sendto(msg.encode(), (args.host, args.port)) print("Snd %d bytes"%size , " To " , (args.host, args.port)) data, addr = sock.recvfrom(1024) if data: try: print("Rcv:" + data.decode()) except UnicodeDecodeError: #nmap sends 0xfe binary data print("Rcv Binary Data") time.sleep(1) except socket.timeout: print("TimeOut => exit") break else: print('Server Mode =>Recv UDP packet') sock.settimeout(5.0) while(True): try: data, addr = sock.recvfrom(1024) print("Rcv:" + data.decode()) print("Echo send to ", addr," ", msg ) sock.sendto(data, addr) except socket.timeout: print("TimeOut => continue") sock.close()
<udp_test.py>
Now run the program in server mode on the server. The test UDP port is 8888.
root@ubuntusrv:/usr/local/src/study/port_test# python3 udp_test.py --mode S --lport 8888 Binding [] [8888] Binding Success Server Mode =>Recv UDP packet TimeOut => continue TimeOut => continue TimeOut => continue TimeOut => continue TimeOut => continue TimeOut => continue TimeOut => continue TimeOut => continue TimeOut => continue Rcv:Hello World Echo send to ('192.168.150.1', 5001) Hello World Rcv:Hello World Echo send to ('192.168.150.1', 5001) Hello World TimeOut => continue
of the lower part
"
Rcv: Hello World
Echo send to ('192.168.150.1', 5001) Hello World
Rcv: Hello World
Echo send to ('192.168.150.1', 5001) Hello World
"
is receiving data from the echo client program and responding.
Now let's run the client program on another computer. When operating in client mode, the program sends the string "Hello World" twice and receives a response.
C:\lsh\study\python port test>python udp_test.py --mode C --host 192.168.150.128 --port 8888 Binding [] [5001] Binding Success Client Mode =>Send UDP packet Snd:Hello World To:192.168.150.128:8888 Snd 11 bytes To ('192.168.150.128', 8888) Rcv:Hello World Snd:Hello World To:192.168.150.128:8888 Snd 11 bytes To ('192.168.150.128', 8888) Rcv:Hello World
<udp_test.py working in client mode >
Actually, there is nothing special about this program. This is an ordinary program that is often introduced in echo server/client.
What you really need is to open several ports at once and test the connection. In particular, VoIP-related programs should test in advance whether UDP ports of a specific band can be accessed from a remote client. If a specific port cannot receive data due to firewall settings, etc., an accident occurs in which voice is transmitted only in one direction.
Multiple port tests
To test ports in a specific band, all ports in the corresponding area should be opened and bind operation should be performed. If there is a port where the bind operation failed, take action after confirming that this port is already in use by another process.
Data must be sent and received using multiple sockets. The most efficient way to do this is to use an asynchronous socket. Let's implement an asynchronous socket using epoll, which has better performance than poll. Asynchronous socket implementation using epoll is quite efficient because one process can handle hundreds or thousands of sockets.
Server side echo server for multi ports
The following is an example of detecting data reception using epoll after binding in multiple sockets by designating a specific port area in the server.
This program can test multiple ports at once by specifying the starting and ending port values. And if you press Ctrl+C after the test is over, the results such as whether or not the bind of ports in the relevant area succeeded and whether the client was connected are displayed.
#!/usr/bin/env python import socket import time import select import logging import argparse logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) def handle_client_data(sock, data, addr): """Handles a message read from a new or existing client.""" if data: snd = sock.sendto(data, addr) try: logger.debug(str(addr) + "SND[%d]:"%(snd) + data.decode()) except UnicodeDecodeError: #nmap sends 0xfe binary data logger.debug(str(addr) + "SND[%d] bytes binary data"%(snd) ) def find_sock(fd): for sock in sock_list: if sock.fileno() == fd: return sock return None def update_info(fd): for info in info_list: if info["sockfd"] == fd: info["test"] = 1 def output_info(): for info in info_list: logger.debug("port[%d] bind[%d] test[%d]"%(info["port"], info["bind"], info["test"])) def wait_for_message_and_respond(epoll): """Waits for a message and processes it.""" poll_result = epoll.poll() for fileno, event in poll_result: if event & select.EPOLLIN: sock = find_sock(fileno) if sock: data, addr = sock.recvfrom( 1024, socket.MSG_DONTWAIT ) try: update_info(fileno) logger.debug(str(addr) + "RCV:" + data.decode()) handle_client_data(sock, data, addr) except UnicodeDecodeError: #nmap sends 0xfe binary data logger.debug(str(addr) + "RCV bytes binary data") sock_list = [] info_list = [] if __name__ == '__main__': """ Creates a socket and epoll object for monitoring it, then handles messages on the socket forever. """ parser = argparse.ArgumentParser(description='UDP Test') parser.add_argument('--bind', type=str, default='') #default addr_any parser.add_argument('--sport', type=int, default=9080) #start port parser.add_argument('--eport', type=int, default=9090) #end port args = parser.parse_args()logger.debug("UDP Port [%d ~ %d] test Started"%(args.sport, args.eport))UDP_IP=args.bind epoll = select.epoll() for x in range(args.sport, args.eport + 1): # create a datagram socket sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) # UDP info = {} info["port"] = x info["test"] = 0 # bind the socket to a port, to allow people to send info to it try: sock.bind( (UDP_IP, x)) sock_list.append(sock) epoll.register(sock, select.EPOLLIN) info["bind"] = 1 info["sockfd"] = sock.fileno() except socket.error as msg: # bind Error print("Socket binding error: " + str(msg) + "\n" + "Retrying...") info["bind"] = 0 info["sockfd"] = 0 info_list.append(info) # use epoll to wait for read events while True: try: wait_for_message_and_respond(epoll) except KeyboardInterrupt: break output_info() for sock in sock_list: epoll.unregister(sock.fileno()) sock.close() epoll.close()
<epoll_server.py>
Now let's run the program like this: The test ports are from 9080 to 9100. epoll_server.py creates 21 UDP sockets, binds ports 9080 to 9100, and provides echo service when the client sends data to the port.
root@ubuntusrv:/usr/local/src/study/port_test# python3 epoll_server.py --eport 9100 UDP Port [9080 ~ 9100] test Started
Port scanning with nmap
Now it's time to test if the server's UDP ports 9800 ~ 9100 are reachable. The previously used udp_test program can be used repeatedly, but if you want to test multiple ports at once, the nmap program is suitable. Install nmap for your computer. nmap can be installed on various OS such as Linux, Windows, and Mac.
I will test using nmap for Windows.
It was confirmed that the echo service was correctly provided for 21 ports.
Now let's quit the server program by pressing Ctrl + C.
('192.168.150.1', 56436)RCV:versionbind ('192.168.150.1', 56436)SND[30]:versionbind ('192.168.150.1', 56436)RCV: ('192.168.150.1', 56436)SND[12]: ('192.168.150.1', 56436)RCV bytes binary data ('192.168.150.1', 56436)RCV:help ('192.168.150.1', 56436)SND[8]:help ^Cport[9080] bind[1] test[1] port[9081] bind[1] test[1] port[9082] bind[1] test[1] port[9083] bind[1] test[1] port[9084] bind[1] test[1] port[9085] bind[1] test[1] port[9086] bind[1] test[1] port[9087] bind[1] test[1] port[9088] bind[1] test[1] port[9089] bind[1] test[1] port[9090] bind[1] test[1] port[9091] bind[1] test[1] port[9092] bind[1] test[1] port[9093] bind[1] test[1] port[9094] bind[1] test[1] port[9095] bind[1] test[1] port[9096] bind[1] test[1] port[9097] bind[1] test[1] port[9098] bind[1] test[1] port[9099] bind[1] test[1] port[9100] bind[1] test[1]
epoll_server.py also provides test information for ports 9080 to 9100.
- bind[1] :bind successful
- bind[0] :bind failed
- test[1] : Provide echo service
- test[0] : echo service not provided (no connection)
You can see that binding succeeded for all ports and provided echo service to nmap.
Wrapping Up
A typical tcp server opens one port and then provides a service, so port scanning is simple.
However, in a service that dynamically allocates UDP ports, it is very difficult to test dynamically allocated ports at once. epoll_server.py is a simple and useful code that can check service by opening UDP port bands at once.
댓글
댓글 쓰기