# collect all stdout and stderr until we are done # use separate threads to not block ourself
self._stderr = []
self._stdout = [] if self.proc.stderr:
self.stderr_thread = Thread(target=read_output, args=(self.proc.stderr, self._stderr))
self.stderr_thread.start() if self.proc.stdout:
self.stdout_thread = Thread(target=read_output, args=(self.proc.stdout, self._stdout))
self.stdout_thread.start() return self.proc
def stutter_check(self, chunks: [str], stutter: datetime.timedelta): ifnot self.proc:
self.start() for chunk in chunks:
self.send(chunk)
time.sleep(stutter.total_seconds())
recv_out, recv_err = self.close() # assert we got everything back assert"".join(chunks) == "".join(recv_out) # now the tricky part: check *when* we got everything back
recv_times = [] for line in"".join(recv_err).split('\n'):
m = re.match(r'^\s*(\d+:\d+:\d+(\.\d+)?) <= Recv data, (\d+) bytes.*', line) if m and int(m.group(3)) > 0:
recv_times.append(datetime.time.fromisoformat(m.group(1))) # received as many chunks as we sent assert len(chunks) == len(recv_times), "received response not in {0} chunks, but {1}".format(
len(chunks), len(recv_times))
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.