Dynamically assigned TCP port scanning

 This time, we will create an echo server that can dynamically open a TCP port to check a connection. As with the previous UDP port detection, we will use epoll. 

TCP works a little differently than UDP. In TCP, the client tries to connect to the bound socket. And the server socket creates a new socket through the accept function and then communicates with this socket.

tcp/ip socket communication process

Multiple port tests

To test ports in a specific band, all ports in the corresponding area should be opened and bind operation should be performed. If there is a port where the bind operation failed, take action after confirming that this port is already in use by another process. The TCP example is a little more complicated than the UDP example because it creates a new socket and communicates through the processes of listen and accept.

#!/usr/bin/env python

import socket
import time
import select
import logging
import argparse

logger = logging.getLogger()

def handle_client_data(sock, data, addr):
    """Handles a message read from a new or existing client."""
    if data:
        snd = sock.send(data)
            logger.debug(str(addr) + "SND[%d]:"%(snd) + data.decode())
        except UnicodeDecodeError:  #nmap sends 0xfe binary data 
            logger.debug(str(addr) + "SND[%d] bytes binary  data"%(snd) )

def find_sock(fd):
    for sock in sock_list:
        if sock.fileno() == fd:
            return sock
    return None        

def find_connection_sock(fd):
    for info in info_list:
        if info["connect_sock"].fileno() == fd:
            return info["connect_sock"], info["address"]
    return None , None       

def update_info(fd):
    for info in info_list:
        if info["connect_sock"]:
            if info["connect_sock"].fileno() == fd:
                info["test"] = 1

def update_connection(fd, conn_sock, address):
    for info in info_list:
        if info["sockfd"] == fd:
            info["connection"] = 1
            info["connect_sock"] = conn_sock
            info["address"] = address

def remove_connection(fileno):
    conn_sock, addr = find_connection_sock(fileno)
    if conn_sock:
        logger.debug(str(addr) +  " => disconnect")

def output_info():
    for info in info_list:
        logger.debug("port[%d] bind[%d] connection[%d] from [%s] test[%d]"%(info["port"], info["bind"], info["connection"], str(info["address"]), info["test"]))

def wait_for_message_and_respond(epoll):
    """Waits for a message and processes it."""

    poll_result = epoll.poll()
    for fileno, event in poll_result:
        sock = find_sock(fileno)
        if sock:    #First Ecent = > cleint tries to connect
                connection, address = sock.accept()
                epoll.register(connection, select.EPOLLIN)
                update_connection(fileno, connection, address)
                logger.debug(str(address) +  "connected:")
            except socket.error:
        elif event & select.EPOLLIN:    #event occurs in the connection sockets
            conn_sock, addr = find_connection_sock(fileno)
            if conn_sock:
                data = conn_sock.recv(1024)
                if(len(data) == 0):
                    logger.debug(str(addr) +  "RCV:0 =>disconnected")

                    logger.debug(str(addr) +  "RCV:" + data.decode())
                    handle_client_data(conn_sock, data, addr)
                except UnicodeDecodeError:  #nmap sends 0xfe binary data 
                    logger.debug(str(addr) + "RCV bytes binary  data")
        elif event & select.EPOLLHRDUP:   # Client Socket shutdown
        elif event & select.EPOLLHUP:   #Error:Hangup

sock_list = []
info_list = []

if __name__ == '__main__':
    Creates a socket and epoll object for monitoring it,
    then handles messages on the socket forever.

    parser = argparse.ArgumentParser(description='TCP Port Test')
    parser.add_argument('--bind', type=str, default='')     #default addr_any
    parser.add_argument('--sport', type=int, default=9080)  #start port 
    parser.add_argument('--eport', type=int, default=9090)  #end port
    args = parser.parse_args()

    logger.debug("TCP Port [%d ~ %d] test Started"%(args.sport, args.eport))
    epoll = select.epoll()

    for x in range(args.sport, args.eport + 1):
        #  create a datagram socket 
        sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) # TCP
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        info = {}
        info["port"] = x
        info["connection"] = 0
        info["connect_sock"] = None
        info["address"] = None
        info["test"] = 0
        #  bind the socket to a port, to allow people to send info to it
            sock.bind( (TCP_IP, x))
            epoll.register(sock, select.EPOLLIN | select.EPOLLET)
            info["bind"] = 1
            info["sockfd"] = sock.fileno()

        except socket.error as msg: # bind Error
            print("TCP Socket binding error: " + str(msg) + "\n" + "Retrying...")
            info["bind"] = 0
            info["sockfd"] = 0
    #  use epoll to wait for read events

    while True:
        except KeyboardInterrupt:
    for sock in sock_list:


Port Scanning with nmap

As with the UDP example, the client for port scanning will use nmap.

Now let's run the program like this: The test ports are from 9080 to 9100.  epoll_server.py creates 21 UDP sockets, binds ports 9080 to 9100, and provides echo service when the client sends data to the port.

root@ubuntusrv:/usr/local/src/study/port_test# python3 epoll_tcpserver.py
TCP Port [9080 ~ 9090] test Started

Now it's time to test if the server's TCP ports 9800 ~ 9100 are reachable. 

C:\Users\spypi>nmap -p 9080-9100 -T4 -A -v 9090
Starting Nmap 7.92 ( https://nmap.org ) at 2022-04-12 23:08 Korea Standard Time
NSE: Loaded 155 scripts for scanning.
NSE: Script Pre-scanning.
Initiating NSE at 23:08
Completed NSE at 23:08, 0.01s elapsed
Initiating NSE at 23:08
Completed NSE at 23:08, 0.00s elapsed
Initiating NSE at 23:08
Completed NSE at 23:08, 0.00s elapsed
Failed to resolve "9090".
Initiating ARP Ping Scan at 23:08
Scanning [1 port]
Completed ARP Ping Scan at 23:08, 0.06s elapsed (1 total hosts)
Initiating Parallel DNS resolution of 1 host. at 23:08
Completed Parallel DNS resolution of 1 host. at 23:08, 0.01s elapsed
Initiating SYN Stealth Scan at 23:08
Scanning [21 ports]
Discovered open port 9084/tcp on
Discovered open port 9080/tcp on
Discovered open port 9087/tcp on
Discovered open port 9086/tcp on
Discovered open port 9083/tcp on
Discovered open port 9089/tcp on
Discovered open port 9088/tcp on
Discovered open port 9090/tcp on
Discovered open port 9081/tcp on
Discovered open port 9082/tcp on
Discovered open port 9085/tcp on
Completed SYN Stealth Scan at 23:08, 0.00s elapsed (21 total ports)
Initiating Service scan at 23:08
Scanning 11 services on
Service scan Timing: About 9.09% done; ETC: 23:15 (0:06:50 remaining)
Completed Service scan at 23:09, 71.04s elapsed (11 services on 1 host)
Initiating OS detection (try #1) against
NSE: Script scanning
Initiating NSE at 23:09
Completed NSE at 23:09, 30.06s elapsed
Initiating NSE at 23:09
Completed NSE at 23:09, 0.00s elapsed
Initiating NSE at 23:09
Completed NSE at 23:09, 0.00s elapsed
Nmap scan report for
Host is up (0.00029s latency).
9080/tcp open echo
9081/tcp open echo
9082/tcp open echo
9083/tcp open echo
9084/tcp open echo
9085/tcp open echo
9086/tcp open echo
9087/tcp open echo
9088/tcp open echo
9089/tcp open echo
9090/tcp open echo
9091/tcp closed xmltec-xmlmail
9092/tcp closed XmlIpcRegSvc
9093/tcp closed copycat
9094/tcp closed unknown
9095/tcp closed unknown
9096/tcp closed unknown
9097/tcp closed unknown
9098/tcp closed unknown
9099/tcp closed unknown
9100/tcp closed jetdirect
MAC Address: 00:0C:29:B4:6E:4D (VMware)
Device type: general purpose
Running: Linux 4.X|5.X
OS CPE: cpe:/o:linux:linux_kernel:4 cpe:/o:linux:linux_kernel:5
OS details: Linux 4.15 - 5.6
Uptime guess: 38.132 days (since Sat Mar 5 20:00:09 2022)
Network Distance: 1 hop
TCP Sequence Prediction: Difficulty=258 (Good luck!)
IP ID Sequence Generation: All zeros
1 0.29 ms
NSE: Script Post-scanning.
Initiating NSE at 23:09
Completed NSE at 23:09, 0.00s elapsed
Initiating NSE at 23:09
Completed NSE at 23:09, 0.00s elapsed
Initiating NSE at 23:09
Completed NSE at 23:09, 0.00s elapsed
Read data files from: C:\Program Files (x86)\Nmap
OS and Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .
Nmap done: 1 IP address (1 host up) scanned in 105.49 seconds
Raw packets sent: 44 (2.730KB) | Rcvd: 55 (4.074KB)

nmap scanning result 

It was confirmed that the echo service was correctly provided for 11 ports.  And ports in 9091 ~ 9100 are not connectable.

Now let's quit the server program by pressing Ctrl + C.

root@ubuntusrv:/usr/local/src/study/port_test# python3 epoll_tcpserver.py
TCP Port [9080 ~ 9090] test Started
('', 2092)connected:
('', 2093)connected:
('', 2094)connected:
('', 2095)connected:
('', 2096)connected:
('', 2097)connected:
('', 2098)connected:
('', 2099)connected:
('', 2100)connected:
('', 2101)connected:
('', 2102)connected:


port[9080] bind[1] connection[1] from [('', 12837)] test[1]
port[9081] bind[1] connection[1] from [('', 12838)] test[1]
port[9082] bind[1] connection[1] from [('', 12839)] test[1]
port[9083] bind[1] connection[1] from [('', 12840)] test[1]
port[9084] bind[1] connection[1] from [('', 12844)] test[1]
port[9085] bind[1] connection[1] from [('', 12847)] test[1]
port[9086] bind[1] connection[1] from [('', 12848)] test[1]
port[9087] bind[1] connection[1] from [('', 12849)] test[1]
port[9088] bind[1] connection[1] from [('', 12856)] test[1]
port[9089] bind[1] connection[1] from [('', 12857)] test[1]
port[9090] bind[1] connection[1] from [('', 12868)] test[1]

epoll_tcpserver.py also provides test information for ports 9080 to 9090.

  • bind[1] :bind successful
  • bind[0] :bind failed
  • connection[1] :client connection 
  • connection[0] :no client connection
  • from["address"] : client address
  • test[1] : Provide echo service
  • test[0] : echo service not provided (no connection)

Port Scanning with Echo Client

Although it is possible to test with nmap, you can test ports in a certain area using the echo client program as follows.

#!/usr/bin/env python
import argparse
import socket
import time
import sys

bind_ip = ""

parser = argparse.ArgumentParser(description='TCP echo client Test')
parser.add_argument('--host', type=str, default='')
parser.add_argument('--sport', type=int, default=9080)  #start port 
parser.add_argument('--eport', type=int, default=9090)  #end port

args = parser.parse_args()
info_list = []
CONN_TIMEOUT = 5.0  # connect timeout, sec
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
msg = "Hello World"

def output_info():
    print("================ Summary ================")
    for info in info_list:
        print("port[%d] connection[%d] send[%d] recv[%d]"%(info["port"], info["connection"], info["send"], info["recv"]))

for port in ( args.sport, args.eport + 1):
        info = {}
        info["port"] = port
        info["connection"] = 0
        info["send"] = 0
        info["recv"] = 0
        server_address = (args.host, port)
            info["connection"] = 1
        except socket.error as e:
            print("port[%d] Connect failed: %r" % (port, e))
        print('Client Mode =>Send TCP packet to port[%d]'%port)
        for x in range(2):
            print("Snd:" + msg + " To:%s:%d"%(args.host, port))
                size = sock.send(msg.encode())
                print("Snd %d bytes"%size , " To " , (args.host, port))
                if(size > 0):
                    info["send"] = size
                data = sock.recv(1024)
                if data:
                    if(len(data) > 0):
                        info["recv"] = len(data)
                        print("Rcv:" + data.decode())
                    except UnicodeDecodeError:  #nmap sends 0xfe binary data 
                        print("Rcv Binary Data")

            except socket.timeout:
                print("TimeOut => exit")
    except KeyboardInterrupt:



Wrapping up

Introduced a simple Python socket program that can dynamically open port bands to test firewalls. By modifying this program, you can also create highly scalable socket server applications.


