Dynamically assigned TCP port scanning
This time, we will create an echo server that can dynamically open a TCP port to check a connection. As with the previous UDP port detection, we will use epoll.
TCP works a little differently than UDP. In TCP, the client tries to connect to the bound socket. And the server socket creates a new socket through the accept function and then communicates with this socket.
Multiple port tests
To test ports in a specific band, all ports in the corresponding area should be opened and bind operation should be performed. If there is a port where the bind operation failed, take action after confirming that this port is already in use by another process. The TCP example is a little more complicated than the UDP example because it creates a new socket and communicates through the processes of listen and accept.
#!/usr/bin/env python import socket import time import select import logging import argparse logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) def handle_client_data(sock, data, addr): """Handles a message read from a new or existing client.""" if data: snd = sock.send(data) try: logger.debug(str(addr) + "SND[%d]:"%(snd) + data.decode()) except UnicodeDecodeError: #nmap sends 0xfe binary data logger.debug(str(addr) + "SND[%d] bytes binary data"%(snd) ) def find_sock(fd): for sock in sock_list: if sock.fileno() == fd: return sock return None def find_connection_sock(fd): for info in info_list: if info["connect_sock"].fileno() == fd: return info["connect_sock"], info["address"] return None , None def update_info(fd): for info in info_list: if info["connect_sock"]: if info["connect_sock"].fileno() == fd: info["test"] = 1 def update_connection(fd, conn_sock, address): for info in info_list: if info["sockfd"] == fd: info["connection"] = 1 info["connect_sock"] = conn_sock info["address"] = address def remove_connection(fileno): conn_sock, addr = find_connection_sock(fileno) if conn_sock: epoll.unregister(conn_sock) conn_sock.close() logger.debug(str(addr) + " => disconnect") def output_info(): for info in info_list: logger.debug("port[%d] bind[%d] connection[%d] from [%s] test[%d]"%(info["port"], info["bind"], info["connection"], str(info["address"]), info["test"])) def wait_for_message_and_respond(epoll): """Waits for a message and processes it.""" poll_result = epoll.poll() for fileno, event in poll_result: sock = find_sock(fileno) if sock: #First Ecent = > cleint tries to connect try: connection, address = sock.accept() connection.setblocking(0) epoll.register(connection, select.EPOLLIN) update_connection(fileno, connection, address) logger.debug(str(address) + "connected:") except socket.error: pass elif event & select.EPOLLIN: #event occurs in the connection sockets conn_sock, addr = find_connection_sock(fileno) if conn_sock: data = conn_sock.recv(1024) if(len(data) == 0): logger.debug(str(addr) + "RCV:0 =>disconnected") epoll.unregister(conn_sock) conn_sock.close() try: update_info(fileno) logger.debug(str(addr) + "RCV:" + data.decode()) handle_client_data(conn_sock, data, addr) except UnicodeDecodeError: #nmap sends 0xfe binary data logger.debug(str(addr) + "RCV bytes binary data") elif event & select.EPOLLHRDUP: # Client Socket shutdown remove_connection(fileno) elif event & select.EPOLLHUP: #Error:Hangup remove_connection(fileno) sock_list = [] info_list = [] if __name__ == '__main__': """ Creates a socket and epoll object for monitoring it, then handles messages on the socket forever. """ parser = argparse.ArgumentParser(description='TCP Port Test') parser.add_argument('--bind', type=str, default='') #default addr_any parser.add_argument('--sport', type=int, default=9080) #start port parser.add_argument('--eport', type=int, default=9090) #end port args = parser.parse_args() logger.debug("TCP Port [%d ~ %d] test Started"%(args.sport, args.eport)) TCP_IP=args.bind epoll = select.epoll() for x in range(args.sport, args.eport + 1): # create a datagram socket sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) # TCP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) info = {} info["port"] = x info["connection"] = 0 info["connect_sock"] = None info["address"] = None info["test"] = 0 # bind the socket to a port, to allow people to send info to it try: sock.bind( (TCP_IP, x)) sock.listen(1) sock.setblocking(0) sock_list.append(sock) epoll.register(sock, select.EPOLLIN | select.EPOLLET) info["bind"] = 1 info["sockfd"] = sock.fileno() except socket.error as msg: # bind Error print("TCP Socket binding error: " + str(msg) + "\n" + "Retrying...") info["bind"] = 0 info["sockfd"] = 0 info_list.append(info) # use epoll to wait for read events while True: try: wait_for_message_and_respond(epoll) except KeyboardInterrupt: break output_info() for sock in sock_list: epoll.unregister(sock.fileno()) sock.close() epoll.close()
<epoll_tcpserver.py>
Port Scanning with nmap
As with the UDP example, the client for port scanning will use nmap.
Now let's run the program like this: The test ports are from 9080 to 9100. epoll_server.py creates 21 UDP sockets, binds ports 9080 to 9100, and provides echo service when the client sends data to the port.
root@ubuntusrv:/usr/local/src/study/port_test# python3 epoll_tcpserver.py TCP Port [9080 ~ 9090] test Started
Now it's time to test if the server's TCP ports 9800 ~ 9100 are reachable.
C:\Users\spypi>nmap -p 9080-9100 -T4 -A -v 9090 192.168.150.128Starting Nmap 7.92 ( https://nmap.org ) at 2022-04-12 23:08 Korea Standard TimeNSE: Loaded 155 scripts for scanning.NSE: Script Pre-scanning.Initiating NSE at 23:08Completed NSE at 23:08, 0.01s elapsedInitiating NSE at 23:08Completed NSE at 23:08, 0.00s elapsedInitiating NSE at 23:08Completed NSE at 23:08, 0.00s elapsedFailed to resolve "9090".Initiating ARP Ping Scan at 23:08Scanning 192.168.150.128 [1 port]Completed ARP Ping Scan at 23:08, 0.06s elapsed (1 total hosts)Initiating Parallel DNS resolution of 1 host. at 23:08Completed Parallel DNS resolution of 1 host. at 23:08, 0.01s elapsedInitiating SYN Stealth Scan at 23:08Scanning 192.168.150.128 [21 ports]Discovered open port 9084/tcp on 192.168.150.128Discovered open port 9080/tcp on 192.168.150.128Discovered open port 9087/tcp on 192.168.150.128Discovered open port 9086/tcp on 192.168.150.128Discovered open port 9083/tcp on 192.168.150.128Discovered open port 9089/tcp on 192.168.150.128Discovered open port 9088/tcp on 192.168.150.128Discovered open port 9090/tcp on 192.168.150.128Discovered open port 9081/tcp on 192.168.150.128Discovered open port 9082/tcp on 192.168.150.128Discovered open port 9085/tcp on 192.168.150.128Completed SYN Stealth Scan at 23:08, 0.00s elapsed (21 total ports)Initiating Service scan at 23:08Scanning 11 services on 192.168.150.128Service scan Timing: About 9.09% done; ETC: 23:15 (0:06:50 remaining)Completed Service scan at 23:09, 71.04s elapsed (11 services on 1 host)Initiating OS detection (try #1) against 192.168.150.128NSE: Script scanning 192.168.150.128.Initiating NSE at 23:09Completed NSE at 23:09, 30.06s elapsedInitiating NSE at 23:09Completed NSE at 23:09, 0.00s elapsedInitiating NSE at 23:09Completed NSE at 23:09, 0.00s elapsedNmap scan report for 192.168.150.128Host is up (0.00029s latency).PORT STATE SERVICE VERSION9080/tcp open echo9081/tcp open echo9082/tcp open echo9083/tcp open echo9084/tcp open echo9085/tcp open echo9086/tcp open echo9087/tcp open echo9088/tcp open echo9089/tcp open echo9090/tcp open echo9091/tcp closed xmltec-xmlmail9092/tcp closed XmlIpcRegSvc9093/tcp closed copycat9094/tcp closed unknown9095/tcp closed unknown9096/tcp closed unknown9097/tcp closed unknown9098/tcp closed unknown9099/tcp closed unknown9100/tcp closed jetdirectMAC Address: 00:0C:29:B4:6E:4D (VMware)Device type: general purposeRunning: Linux 4.X|5.XOS CPE: cpe:/o:linux:linux_kernel:4 cpe:/o:linux:linux_kernel:5OS details: Linux 4.15 - 5.6Uptime guess: 38.132 days (since Sat Mar 5 20:00:09 2022)Network Distance: 1 hopTCP Sequence Prediction: Difficulty=258 (Good luck!)IP ID Sequence Generation: All zerosTRACEROUTEHOP RTT ADDRESS1 0.29 ms 192.168.150.128NSE: Script Post-scanning.Initiating NSE at 23:09Completed NSE at 23:09, 0.00s elapsedInitiating NSE at 23:09Completed NSE at 23:09, 0.00s elapsedInitiating NSE at 23:09Completed NSE at 23:09, 0.00s elapsedRead data files from: C:\Program Files (x86)\NmapOS and Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .Nmap done: 1 IP address (1 host up) scanned in 105.49 secondsRaw packets sent: 44 (2.730KB) | Rcvd: 55 (4.074KB)
<nmap scanning result>
It was confirmed that the echo service was correctly provided for 11 ports. And ports in 9091 ~ 9100 are not connectable.
Now let's quit the server program by pressing Ctrl + C.
root@ubuntusrv:/usr/local/src/study/port_test# python3 epoll_tcpserver.py TCP Port [9080 ~ 9090] test Started ('192.168.150.1', 2092)connected: ('192.168.150.1', 2093)connected: ('192.168.150.1', 2094)connected: ('192.168.150.1', 2095)connected: ('192.168.150.1', 2096)connected: ('192.168.150.1', 2097)connected: ('192.168.150.1', 2098)connected: ('192.168.150.1', 2099)connected: ('192.168.150.1', 2100)connected: ('192.168.150.1', 2101)connected: ('192.168.150.1', 2102)connected:
port[9080] bind[1] connection[1] from [('192.168.150.1', 12837)] test[1] port[9081] bind[1] connection[1] from [('192.168.150.1', 12838)] test[1] port[9082] bind[1] connection[1] from [('192.168.150.1', 12839)] test[1] port[9083] bind[1] connection[1] from [('192.168.150.1', 12840)] test[1] port[9084] bind[1] connection[1] from [('192.168.150.1', 12844)] test[1] port[9085] bind[1] connection[1] from [('192.168.150.1', 12847)] test[1] port[9086] bind[1] connection[1] from [('192.168.150.1', 12848)] test[1] port[9087] bind[1] connection[1] from [('192.168.150.1', 12849)] test[1] port[9088] bind[1] connection[1] from [('192.168.150.1', 12856)] test[1] port[9089] bind[1] connection[1] from [('192.168.150.1', 12857)] test[1] port[9090] bind[1] connection[1] from [('192.168.150.1', 12868)] test[1]
epoll_tcpserver.py also provides test information for ports 9080 to 9090.
- bind[1] :bind successful
- bind[0] :bind failed
- connection[1] :client connection
- connection[0] :no client connection
- from["address"] : client address
- test[1] : Provide echo service
- test[0] : echo service not provided (no connection)
Port Scanning with Echo Client
Although it is possible to test with nmap, you can test ports in a certain area using the echo client program as follows.
#!/usr/bin/env pythonimport argparse import socket import time import sys bind_ip = "" parser = argparse.ArgumentParser(description='TCP echo client Test') parser.add_argument('--host', type=str, default='127.0.0.1') parser.add_argument('--sport', type=int, default=9080) #start port parser.add_argument('--eport', type=int, default=9090) #end port args = parser.parse_args() info_list = [] CONN_TIMEOUT = 5.0 # connect timeout, sec sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(CONN_TIMEOUT) msg = "Hello World" def output_info(): print("================ Summary ================") for info in info_list: print("port[%d] connection[%d] send[%d] recv[%d]"%(info["port"], info["connection"], info["send"], info["recv"])) for port in ( args.sport, args.eport + 1): try: info = {} info["port"] = port info["connection"] = 0 info["send"] = 0 info["recv"] = 0 server_address = (args.host, port) try: sock.connect(server_address) info["connection"] = 1 except socket.error as e: print("port[%d] Connect failed: %r" % (port, e)) info_list.append(info) continue sock.settimeout(10.0) print('Client Mode =>Send TCP packet to port[%d]'%port) for x in range(2): print("Snd:" + msg + " To:%s:%d"%(args.host, port)) try: size = sock.send(msg.encode()) print("Snd %d bytes"%size , " To " , (args.host, port)) if(size > 0): info["send"] = size data = sock.recv(1024) if data: if(len(data) > 0): info["recv"] = len(data) try: print("Rcv:" + data.decode()) except UnicodeDecodeError: #nmap sends 0xfe binary data print("Rcv Binary Data") time.sleep(1) except socket.timeout: print("TimeOut => exit") info_list.append(info) sock.close() except KeyboardInterrupt: break output_info()
<echo_tcpclient.py>
Wrapping up
Introduced a simple Python socket program that can dynamically open port bands to test firewalls. By modifying this program, you can also create highly scalable socket server applications.
댓글
댓글 쓰기