Incremental decoding of input
and add a test for valid and invalid UTF-8 input
Markus Gerstel
5 years ago
1 | 1 | |
2 | 2 | from __future__ import absolute_import, division, print_function |
3 | 3 | |
4 | import codecs | |
4 | 5 | import copy |
5 | 6 | import logging |
6 | 7 | import os |
66 | 67 | self._buffer = '' |
67 | 68 | self._print = print_line |
68 | 69 | self._callback = callback |
70 | self._decoder = codecs.getincrementaldecoder('utf-8')('replace') | |
69 | 71 | def add(self, data): |
70 | 72 | '''Add a single character to buffer. If one or more full lines are found, |
71 | 73 | print them (if desired) and pass to callback function.''' |
72 | data = data.decode('utf-8') | |
74 | data = self._decoder.decode(data) | |
75 | if not data: return | |
73 | 76 | self._buffer += data |
74 | if "\n" in data: | |
77 | if '\n' in data: | |
75 | 78 | to_print, remainder = self._buffer.rsplit('\n') |
76 | 79 | if self._print: |
77 | 80 | print(to_print) |
80 | 83 | self._buffer = remainder |
81 | 84 | def flush(self): |
82 | 85 | '''Print/send any remaining data to callback function.''' |
86 | self._buffer += self._decoder.decode(b'', final=True) | |
83 | 87 | if self._buffer: |
84 | 88 | if self._print: |
85 | 89 | print(self._buffer) |
3 | 3 | import sys |
4 | 4 | |
5 | 5 | import procrunner |
6 | import pytest | |
6 | 7 | |
7 | 8 | def test_simple_command_invocation(): |
8 | 9 | if os.name == 'nt': |
16 | 17 | assert result['stdout'] == b'hello' + os.linesep.encode('utf-8') |
17 | 18 | assert result['stderr'] == b'' |
18 | 19 | |
19 | def test_input_encoding(): | |
20 | def test_decode_invalid_utf8_input(capsys): | |
20 | 21 | command = [sys.executable, '-c', 'import sys; sys.stdout.write("".join(chr(x) for x in ' |
21 | '(0x74,0x65,0x73,0x74,0xa0,0x50,0x73,0x74,0x72,0x69,0x6e,0x67,0x0a)' | |
22 | '(0x74,0x65,0x73,0x74,0xa0,0x73,0x74,0x72,0x69,0x6e,0x67,0x0a)' | |
22 | 23 | '))'] |
23 | 24 | result = procrunner.run(command) |
25 | assert result['exitcode'] == 0 | |
26 | assert not result['stderr'] | |
27 | assert result['stdout'] == b'test\xa0string\n' | |
28 | out, err = capsys.readouterr() | |
29 | assert out == u'test\ufffdstring\n' | |
30 | assert err == u'' | |
24 | 31 | |
32 | def test_running_wget(tmpdir): | |
33 | tmpdir.chdir() | |
34 | command = ['wget', 'https://www.google.com', '-O', '-'] | |
35 | try: | |
36 | result = procrunner.run(command) | |
37 | except OSError as e: | |
38 | if e.errno == 2: | |
39 | pytest.skip('wget not available') | |
40 | raise | |
25 | 41 | assert result['exitcode'] == 0 |
42 | assert b'http' in result['stderr'] | |
43 | assert b'google' in result['stdout'] |