Codebase list fenrir / scrub-obsolete/main play zone / terminalManagement
scrub-obsolete/main

Tree @scrub-obsolete/main (Download .tar.gz)

terminalManagement @scrub-obsolete/mainraw · history · blame

import os
import struct
import sys
import pty
import tty
import termios
import shlex
import signal
import select
import pyte
import time

class Terminal:
    def __init__(self, columns, lines, p_in):
        self.screen = pyte.HistoryScreen(columns, lines)
        self.screen.set_mode(pyte.modes.LNM)
        self.screen.write_process_input = \
            lambda data: p_in.write(data.encode())
        self.stream = pyte.ByteStream()
        self.stream.attach(self.screen)
    def feed(self, data):
        self.stream.feed(data)
    def dump(self):
        cursor = self.screen.cursor
        lines = []
        for y in self.screen.dirty:
            line = self.screen.buffer[y]
            data = [(char.data, char.reverse, char.fg, char.bg)
                    for char in (line[x] for x in range(self.screen.columns))]
            lines.append((y, data))
        self.screen.dirty.clear()
        return {"c": (cursor.x, cursor.y), "lines": lines}


def open_terminal(command="bash", columns=80, lines=24):
    p_pid, master_fd = pty.fork()
    if p_pid == 0:  # Child.
        argv = shlex.split(command)
        env = os.environ.copy()
        env["TERM"] = 'vt100'
        os.execvpe(argv[0], argv, env)
    # File-like object for I/O with the child process aka command.
    p_out = os.fdopen(master_fd, "w+b", 0)
    return Terminal(columns, lines, p_out), p_pid, p_out

def HandleTerminal():
    debug = False
    running = True 
    try:
        old_attr = termios.tcgetattr(sys.stdin)    
        tty.setraw(0)
        terminal, p_pid, p_out = open_terminal()
        std_out = os.fdopen(sys.stdout.fileno(), "w+b", 0)
        while running:
            r, w, x = select.select([sys.stdin, p_out],[],[])
            if r == []:
                continue
            if p_out in r:
                if debug:
                    print('pre p_out')                                            
                try:
                    msgBytes = read_all(p_out.fileno())
                except (EOFError, OSError):
                    running = False
                    break    
                terminal.feed(msgBytes)                                
                os.write(sys.stdout.fileno(), msgBytes)
                if debug:
                    print('after p_out')                    
            if sys.stdin in r:
                if debug:
                    print('pre stdin')       
                try:
                    msgBytes = read_all(sys.stdin.fileno())
                except (EOFError, OSError):
                    running = False                    
                    break
                terminal.feed(msgBytes)                                
                os.write(p_out.fileno(), msgBytes)
                if debug:
                    print('after stdin')                
    except Exception as e:  # Process died?
        print(e)
        running = False
    finally:
        os.kill(p_pid, signal.SIGTERM)
        p_out.close()    
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
        sys.exit(0)

def get_terminal_size(fd):
    s = struct.pack('HHHH', 0, 0, 0, 0)
    rows, cols, _, _ = struct.unpack('HHHH', fcntl.ioctl(fd, termios.TIOCGWINSZ, s))
    return rows, cols

def resize_terminal(fd):
    s = struct.pack('HHHH', 0, 0, 0, 0)
    s = fcntl.ioctl(0, termios.TIOCGWINSZ, s)
    fcntl.ioctl(fd, termios.TIOCSWINSZ, s)
    rows, cols, _, _ = struct.unpack('hhhh', s)
    return rows, cols

def read_all(fd):
    bytes = os.read(fd, 65536)
    if bytes == b'':
        raise EOFError
    while has_more(fd):
        data = os.read(fd, 65536)
        if data == b'':
            raise EOFError
        bytes += data
    return bytes

def has_more(fd):
    r, w, e = select.select([fd], [], [], 0)
    return (fd in r)

if __name__ == "__main__":
    HandleTerminal()