Dynamically assigned UDP port scanning

 Occasionally, there is a situation where data transmission and reception is impossible due to a firewall in server-to-server communication. This is a simple Python program that can test whether UDP communication is possible.

I work for a VoIP product development company. The SIP protocol mainly used in VoIP (Voice Over IP) mainly uses UDP. In particular, RTP (RealTime Protocol) used to transmit and receive voice data requires many UDP ports. One or two UDP ports are used for one voice channel. Therefore, if the VoIP solution supports 1000 voice channels at the same time, 1000 ~ 2000 UDP ports can be used at the same time.

Therefore, it is very important to determine whether the UDP port is serviceable.

In general, nmap is often used to check whether a port is open. However, nmap is not appropriate in this case.

The reason is that in VoIP, a UDP port for voice transmission/reception is randomly selected from a specific range and used.

For example, if there is a VoIP product set to use UDP port 20000 ~ 30000, it does not always occupy the UDP port in this area, but searches for available ports in this band and uses it when a new voice channel is created. So you cannot use a solution like nmap.

Therefore, it is necessary to have a program that opens a specific port or port of a certain band so that a foot scan program such as nmap can scan the port.



Single Port Test

This is a simple udp echo server/client program.

import argparse
import socket
import time

bind_ip = ""

parser = argparse.ArgumentParser(description='UDP Test')
parser.add_argument('--mode', type=str, default="C")
parser.add_argument('--host', type=str, default='127.0.0.1')
parser.add_argument('--port', type=int, default=5001)

parser.add_argument('--lport', type=int, default=5001)
args = parser.parse_args()


sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print('Binding [%s] [%d]'%(bind_ip, args.lport))
sock.bind((bind_ip, args.lport))
print('Binding Success')

msg = "Hello World"
if args.mode == "C" :
    sock.settimeout(10.0)
    print('Client Mode =>Send UDP packet')
    for x in range(2):
        print("Snd:" + msg + " To:%s:%d"%(args.host, args.port))
        try:
            size = sock.sendto(msg.encode(), (args.host, args.port))
            print("Snd %d bytes"%size , " To " , (args.host, args.port))
            data, addr = sock.recvfrom(1024)
            if data:
                try:
                    print("Rcv:" + data.decode())
                except UnicodeDecodeError:  #nmap sends 0xfe binary data 
                    print("Rcv Binary Data")

            time.sleep(1)
        except socket.timeout:
            print("TimeOut => exit")
            break
else:
    print('Server Mode =>Recv UDP packet')
    sock.settimeout(5.0)
    while(True):
        try:
            data, addr = sock.recvfrom(1024)
            print("Rcv:" + data.decode())
            print("Echo send to ", addr,"  ", msg )
            sock.sendto(data, addr)
        except socket.timeout:
            print("TimeOut => continue")
sock.close()

<udp_test.py>

Now run the program in server mode on the server. The test UDP port is 8888.

root@ubuntusrv:/usr/local/src/study/port_test# python3 udp_test.py --mode S --lport 8888
Binding [] [8888]
Binding Success
Server Mode =>Recv UDP packet
TimeOut => continue
TimeOut => continue
TimeOut => continue
TimeOut => continue
TimeOut => continue
TimeOut => continue
TimeOut => continue
TimeOut => continue
TimeOut => continue
Rcv:Hello World
Echo send to  ('192.168.150.1', 5001)    Hello World
Rcv:Hello World
Echo send to  ('192.168.150.1', 5001)    Hello World
TimeOut => continue

of the lower part

"

Rcv: Hello World

Echo send to ('192.168.150.1', 5001) Hello World

Rcv: Hello World

Echo send to ('192.168.150.1', 5001) Hello World

"

is receiving data from the echo client program and responding.


Now let's run the client program on another computer. When operating in client mode, the program sends the string "Hello World" twice and receives a response.

C:\lsh\study\python port test>python udp_test.py --mode C --host 192.168.150.128 --port 8888
Binding [] [5001]
Binding Success
Client Mode =>Send UDP packet
Snd:Hello World To:192.168.150.128:8888
Snd 11 bytes  To  ('192.168.150.128', 8888)
Rcv:Hello World
Snd:Hello World To:192.168.150.128:8888
Snd 11 bytes  To  ('192.168.150.128', 8888)
Rcv:Hello World

<udp_test.py working in client mode >


Actually, there is nothing special about this program. This is an ordinary program that is often introduced in echo server/client. 

What you really need is to open several ports at once and test the connection. In particular, VoIP-related programs should test in advance whether UDP ports of a specific band can be accessed from a remote client. If a specific port cannot receive data due to firewall settings, etc., an accident occurs in which voice is transmitted only in one direction.


Multiple port tests

To test ports in a specific band, all ports in the corresponding area should be opened and bind operation should be performed. If there is a port where the bind operation failed, take action after confirming that this port is already in use by another process.

Data must be sent and received using multiple sockets. The most efficient way to do this is to use an asynchronous socket. Let's implement an asynchronous socket using epoll, which has better performance than poll. Asynchronous socket implementation using epoll is quite efficient because one process can handle hundreds or thousands of sockets.


Server side echo server for multi ports

The following is an example of detecting data reception using epoll after binding in multiple sockets by designating a specific port area in the server.  

This program can test multiple ports at once by specifying the starting and ending port values. And if you press Ctrl+C after the test is over, the results such as whether or not the bind of ports in the relevant area succeeded and whether the client was connected are displayed.

#!/usr/bin/env python


import socket
import time
import select
import logging
import argparse

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())


def handle_client_data(sock, data, addr):
    """Handles a message read from a new or existing client."""
    if data:
        snd = sock.sendto(data, addr)
        try:
            logger.debug(str(addr) + "SND[%d]:"%(snd) + data.decode())
        except UnicodeDecodeError:  #nmap sends 0xfe binary data 
            logger.debug(str(addr) + "SND[%d] bytes binary  data"%(snd) )

def find_sock(fd):
    for sock in sock_list:
        if sock.fileno() == fd:
            return sock
    return None        

def update_info(fd):
    for info in info_list:
        if info["sockfd"] == fd:
            info["test"] = 1

def output_info():
    for info in info_list:
        logger.debug("port[%d] bind[%d] test[%d]"%(info["port"], info["bind"], info["test"]))

def wait_for_message_and_respond(epoll):
    """Waits for a message and processes it."""

    poll_result = epoll.poll()
    for fileno, event in poll_result:
        if event & select.EPOLLIN:
            sock = find_sock(fileno)
            if sock:
                data, addr = sock.recvfrom( 1024, socket.MSG_DONTWAIT ) 
                try:
                    update_info(fileno)
                    logger.debug(str(addr) +  "RCV:" + data.decode())
                    handle_client_data(sock, data, addr)
                except UnicodeDecodeError:  #nmap sends 0xfe binary data 
                    logger.debug(str(addr) + "RCV bytes binary  data")
                
sock_list = []
info_list = []

if __name__ == '__main__':
    """
    Creates a socket and epoll object for monitoring it,
    then handles messages on the socket forever.
    """

    parser = argparse.ArgumentParser(description='UDP Test')
    parser.add_argument('--bind', type=str, default='')     #default addr_any
    parser.add_argument('--sport', type=int, default=9080)  #start port 
    parser.add_argument('--eport', type=int, default=9090)  #end port
    args = parser.parse_args()

    logger.debug("UDP Port [%d ~ %d] test Started"%(args.sport, args.eport))
UDP_IP=args.bind epoll = select.epoll() for x in range(args.sport, args.eport + 1): # create a datagram socket sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) # UDP info = {} info["port"] = x info["test"] = 0 # bind the socket to a port, to allow people to send info to it try: sock.bind( (UDP_IP, x)) sock_list.append(sock) epoll.register(sock, select.EPOLLIN) info["bind"] = 1 info["sockfd"] = sock.fileno() except socket.error as msg: # bind Error print("Socket binding error: " + str(msg) + "\n" + "Retrying...") info["bind"] = 0 info["sockfd"] = 0 info_list.append(info) # use epoll to wait for read events while True: try: wait_for_message_and_respond(epoll) except KeyboardInterrupt: break output_info() for sock in sock_list: epoll.unregister(sock.fileno()) sock.close() epoll.close()

<epoll_server.py>

Now let's run the program like this: The test ports are from 9080 to 9100.  epoll_server.py creates 21 UDP sockets, binds ports 9080 to 9100, and provides echo service when the client sends data to the port.

root@ubuntusrv:/usr/local/src/study/port_test# python3 epoll_server.py --eport 9100
UDP Port [9080 ~ 9100] test Started


Port scanning with nmap

Now it's time to test if the server's UDP ports 9800 ~ 9100 are reachable. The previously used udp_test program can be used repeatedly, but if you want to test multiple ports at once, the nmap program is suitable. Install nmap for your computer. nmap can be installed on various OS such as Linux, Windows, and Mac.

I will test using nmap for Windows.

<nmap scanning result>


It was confirmed that the echo service was correctly provided for 21 ports. 

Now let's quit the server program by pressing Ctrl + C. 

('192.168.150.1', 56436)RCV:versionbind
('192.168.150.1', 56436)SND[30]:versionbind
('192.168.150.1', 56436)RCV:
('192.168.150.1', 56436)SND[12]:
('192.168.150.1', 56436)RCV bytes binary  data
('192.168.150.1', 56436)RCV:help


('192.168.150.1', 56436)SND[8]:help


^Cport[9080] bind[1] test[1]
port[9081] bind[1] test[1]
port[9082] bind[1] test[1]
port[9083] bind[1] test[1]
port[9084] bind[1] test[1]
port[9085] bind[1] test[1]
port[9086] bind[1] test[1]
port[9087] bind[1] test[1]
port[9088] bind[1] test[1]
port[9089] bind[1] test[1]
port[9090] bind[1] test[1]
port[9091] bind[1] test[1]
port[9092] bind[1] test[1]
port[9093] bind[1] test[1]
port[9094] bind[1] test[1]
port[9095] bind[1] test[1]
port[9096] bind[1] test[1]
port[9097] bind[1] test[1]
port[9098] bind[1] test[1]
port[9099] bind[1] test[1]
port[9100] bind[1] test[1]


epoll_server.py also provides test information for ports 9080 to 9100.

  • bind[1] :bind successful
  • bind[0] :bind failed
  • test[1] : Provide echo service
  • test[0] : echo service not provided (no connection)

You can see that binding succeeded for all ports and provided echo service to nmap.


Wrapping Up

A typical tcp server opens one port and then provides a service, so port scanning is simple.

However, in a service that dynamically allocates UDP ports, it is very difficult to test dynamically allocated ports at once. epoll_server.py is a simple and useful code that can check service by opening UDP port bands at once.









댓글

이 블로그의 인기 게시물

VSCode - Lua programming and debugging

Remote C/C++ Debugging with VSCode #1 - Installation and simple debugging

Remote Python Debugging with VSCode