[tor-commits] [stem/master] ORPort directory requests cropped
atagar at torproject.org
atagar at torproject.org
Sun Jan 27 01:53:36 UTC 2019
commit 4be6695a38c96f5fcba58d08e7f86ea1f6647c2e
Author: Damian Johnson <atagar at torproject.org>
Date: Sat Jan 26 17:40:45 2019 -0800
ORPort directory requests cropped
Great catch from starlight that if descriptors downloaded via an ORPort
exceeding a certain size get cropped...
https://trac.torproject.org/projects/tor/ticket/28961
There were a couple issues here...
* Our socket handling was pretty screwed up. Calling socket's recv() provides
data available at the time, so if we haven't received the a full cell we
should pull more.
* Descriptors can be composed of multiple RELAY cells (the descriptor was
cropped because we stopped reading after the first). Directory requests
now keep reading cells we receive the END signal.
---
stem/client/__init__.py | 125 ++++++++++++++++++++++++++++++++---------
stem/client/cell.py | 7 ++-
stem/descriptor/remote.py | 4 +-
test/task.py | 1 -
test/unit/descriptor/remote.py | 2 +-
5 files changed, 105 insertions(+), 34 deletions(-)
diff --git a/stem/client/__init__.py b/stem/client/__init__.py
index 1da008fb..76b5addc 100644
--- a/stem/client/__init__.py
+++ b/stem/client/__init__.py
@@ -33,7 +33,20 @@ import stem.client.cell
import stem.socket
import stem.util.connection
-from stem.client.datatype import ZERO, LinkProtocol, Address, KDF, split
+from stem.client.cell import (
+ CELL_TYPE_SIZE,
+ FIXED_PAYLOAD_LEN,
+ Cell,
+)
+
+from stem.client.datatype import (
+ ZERO,
+ Address,
+ KDF,
+ LinkProtocol,
+ RelayCommand,
+ split,
+)
__all__ = [
'cell',
@@ -51,8 +64,17 @@ class Relay(object):
"""
def __init__(self, orport, link_protocol):
+ # TODO: Python 3.x adds a getbuffer() method which
+ # lets us get the size...
+ #
+ # https://stackoverflow.com/questions/26827055/python-how-to-get-iobytes-allocated-memory-length
+ #
+ # When we drop python 2.x support we should replace
+ # self._orport_buffer with an io.BytesIO.
+
self.link_protocol = LinkProtocol(link_protocol)
self._orport = orport
+ self._orport_buffer = b'' # unread bytes
self._orport_lock = threading.RLock()
self._circuits = {}
@@ -130,6 +152,47 @@ class Relay(object):
return Relay(conn, link_protocol)
+ def _recv(self, raw = False):
+ """
+ Reads the next cell from our ORPort. If none is present this blocks
+ until one is available.
+
+ :param bool raw: provides bytes rather than parsing as a cell if **True**
+
+ :returns: next :class:`~stem.client.cell.Cell`
+ """
+
+ with self._orport_lock:
+ # cells begin with [circ_id][cell_type][...]
+
+ circ_id_size = self.link_protocol.circ_id_size.size
+
+ while len(self._orport_buffer) < (circ_id_size + CELL_TYPE_SIZE.size):
+ self._orport_buffer += self._orport.recv() # read until we know the cell type
+
+ cell_type = Cell.by_value(CELL_TYPE_SIZE.pop(self._orport_buffer[circ_id_size:])[0])
+
+ if cell_type.IS_FIXED_SIZE:
+ cell_size = circ_id_size + CELL_TYPE_SIZE.size + FIXED_PAYLOAD_LEN
+ else:
+ # variable length, our next field is the payload size
+
+ while len(self._orport_buffer) < (circ_id_size + CELL_TYPE_SIZE.size + FIXED_PAYLOAD_LEN.size):
+ self._orport_buffer += self._orport.recv() # read until we know the cell size
+
+ payload_len = FIXED_PAYLOAD_LEN.pop(self._orport_buffer[circ_id_size + CELL_TYPE_SIZE.size:])[0]
+ cell_size = circ_id_size + CELL_TYPE_SIZE.size + FIXED_PAYLOAD_LEN.size + payload_len
+
+ while len(self._orport_buffer) < cell_size:
+ self._orport_buffer += self._orport.recv() # read until we have the full cell
+
+ if raw:
+ content, self._orport_buffer = split(self._orport_buffer, cell_size)
+ return content
+ else:
+ cell, self._orport_buffer = Cell.pop(self._orport_buffer, self.link_protocol)
+ return cell
+
def _msg(self, cell):
"""
Sends a cell on the ORPort and provides the response we receive in reply.
@@ -263,41 +326,28 @@ class Circuit(object):
self.forward_key = Cipher(algorithms.AES(kdf.forward_key), ctr, default_backend()).encryptor()
self.backward_key = Cipher(algorithms.AES(kdf.backward_key), ctr, default_backend()).decryptor()
- def send(self, command, data = '', stream_id = 0):
+ def directory(self, request, stream_id = 0):
"""
- Sends a message over the circuit.
+ Request descriptors from the relay.
- :param stem.client.datatype.RelayCommand command: command to be issued
- :param bytes data: message payload
+ :param str request: directory request to make
:param int stream_id: specific stream this concerns
- :returns: **list** of :class:`~stem.client.cell.RelayCell` responses
+ :returns: **str** with the requested descriptor data
"""
with self.relay._orport_lock:
- # Encrypt and send the cell. Our digest/key only updates if the cell is
- # successfully sent.
+ self._send(RelayCommand.BEGIN_DIR, stream_id = stream_id)
+ self._send(RelayCommand.DATA, request, stream_id = stream_id)
- cell = stem.client.cell.RelayCell(self.id, command, data, stream_id = stream_id)
- payload, forward_key, forward_digest = cell.encrypt(self.relay.link_protocol, self.forward_key, self.forward_digest)
- self.relay._orport.send(payload)
-
- self.forward_digest = forward_digest
- self.forward_key = forward_key
-
- # Decrypt relay cells received in response. Again, our digest/key only
- # updates when handled successfully.
-
- reply = self.relay._orport.recv()
- reply_cells = []
+ response = []
- while reply:
- reply_cmd = stem.client.datatype.Size.CHAR.pop(reply[self.relay.link_protocol.circ_id_size.size:])[0]
+ while True:
+ # Decrypt relay cells received in response. Our digest/key only
+ # updates when handled successfully.
- if reply_cmd != stem.client.cell.RelayCell.VALUE:
- raise stem.ProtocolError('Circuit response should be a series of RELAY cells, but received an unexpected %s (%i)' % (stem.client.cell.Cell.by_value(reply_cmd), reply_cmd))
+ encrypted_cell = self.relay._recv(raw = True)
- encrypted_cell, reply = split(reply, self.relay.link_protocol.fixed_cell_length)
decrypted_cell, backward_key, backward_digest = stem.client.cell.RelayCell.decrypt(self.relay.link_protocol, encrypted_cell, self.backward_key, self.backward_digest)
if self.id != decrypted_cell.circ_id:
@@ -306,9 +356,30 @@ class Circuit(object):
self.backward_digest = backward_digest
self.backward_key = backward_key
- reply_cells.append(decrypted_cell)
+ if decrypted_cell.command == RelayCommand.END:
+ return b''.join([cell.data for cell in response])
+ else:
+ response.append(decrypted_cell)
+
+ def _send(self, command, data = '', stream_id = 0):
+ """
+ Sends a message over the circuit.
+
+ :param stem.client.datatype.RelayCommand command: command to be issued
+ :param bytes data: message payload
+ :param int stream_id: specific stream this concerns
+ """
+
+ with self.relay._orport_lock:
+ # Encrypt and send the cell. Our digest/key only updates if the cell is
+ # successfully sent.
+
+ cell = stem.client.cell.RelayCell(self.id, command, data, stream_id = stream_id)
+ payload, forward_key, forward_digest = cell.encrypt(self.relay.link_protocol, self.forward_key, self.forward_digest)
+ self.relay._orport.send(payload)
- return reply_cells
+ self.forward_digest = forward_digest
+ self.forward_key = forward_key
def close(self):
with self.relay._orport_lock:
diff --git a/stem/client/cell.py b/stem/client/cell.py
index 7aae7fbb..94862b38 100644
--- a/stem/client/cell.py
+++ b/stem/client/cell.py
@@ -51,6 +51,9 @@ from stem.util import datetime_to_unix, str_tools
FIXED_PAYLOAD_LEN = 509 # PAYLOAD_LEN, per tor-spec section 0.2
AUTH_CHALLENGE_SIZE = 32
+
+CELL_TYPE_SIZE = Size.CHAR
+PAYLOAD_LEN_SIZE = Size.SHORT
RELAY_DIGEST_SIZE = Size.LONG
STREAM_ID_REQUIRED = (
@@ -169,13 +172,13 @@ class Cell(object):
link_protocol = LinkProtocol(link_protocol)
circ_id, content = link_protocol.circ_id_size.pop(content)
- command, content = Size.CHAR.pop(content)
+ command, content = CELL_TYPE_SIZE.pop(content)
cls = Cell.by_value(command)
if cls.IS_FIXED_SIZE:
payload_len = FIXED_PAYLOAD_LEN
else:
- payload_len, content = Size.SHORT.pop(content)
+ payload_len, content = PAYLOAD_LEN_SIZE.pop(content)
if len(content) < payload_len:
raise ValueError('%s cell should have a payload of %i bytes, but only had %i' % (cls.NAME, payload_len, len(content)))
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index 9eb639ad..6ff1e794 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -107,7 +107,6 @@ import stem.directory
import stem.prereq
import stem.util.enum
-from stem.client.datatype import RelayCommand
from stem.util import log, str_tools
try:
@@ -966,8 +965,7 @@ def _download_from_orport(endpoint, compression, resource):
'User-Agent: %s' % stem.USER_AGENT,
)) + '\r\n\r\n'
- circ.send(RelayCommand.BEGIN_DIR, stream_id = 1)
- response = b''.join([cell.data for cell in circ.send(RelayCommand.DATA, request, stream_id = 1)])
+ response = circ.directory(request, stream_id = 1)
first_line, data = response.split(b'\r\n', 1)
header_data, body_data = data.split(b'\r\n\r\n', 1)
diff --git a/test/task.py b/test/task.py
index 14626356..c527a0c7 100644
--- a/test/task.py
+++ b/test/task.py
@@ -86,7 +86,6 @@ def _check_tor_version(tor_path):
version = test.tor_version(tor_path)
version_str = str(version).split()[0]
-
if version.git_commit:
return '%s (commit %s)' % (version_str, version.git_commit[:8])
else:
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index d2688775..858ad702 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -95,7 +95,7 @@ def _orport_mock(data, encoding = 'identity', response_code_header = None):
connect_mock = MagicMock()
relay_mock = connect_mock().__enter__()
circ_mock = relay_mock.create_circuit().__enter__()
- circ_mock.send.return_value = cells
+ circ_mock.directory.return_value = data
return connect_mock
More information about the tor-commits
mailing list