Skip to content

Commit

Permalink
sndrcv threaded mode: fix timeout (#4387)
Browse files Browse the repository at this point in the history
  • Loading branch information
gpotter2 committed May 12, 2024
1 parent 041f3ef commit ecfeb14
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 24 deletions.
10 changes: 8 additions & 2 deletions scapy/layers/l2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,13 @@ def show(self, *args, **kwargs):


@conf.commands.register
def arping(net, timeout=2, cache=0, verbose=None, **kargs):
# type: (str, int, int, Optional[int], **Any) -> Tuple[ARPingResult, PacketList] # noqa: E501
def arping(net: str,
timeout: int = 2,
cache: int = 0,
verbose: Optional[int] = None,
threaded: bool = True,
**kargs: Any,
) -> Tuple[ARPingResult, PacketList]:
"""
Send ARP who-has requests to determine which hosts are up::
Expand All @@ -1028,6 +1033,7 @@ def arping(net, timeout=2, cache=0, verbose=None, **kargs):
verbose=verbose,
filter="arp and arp[7] = 2",
timeout=timeout,
threaded=threaded,
iface_hint=net,
**kargs,
)
Expand Down
67 changes: 45 additions & 22 deletions scapy/sendrecv.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class debug:
:param prebuild: pre-build the packets before starting to send them.
Automatically enabled when a generator is passed as the packet
:param _flood:
:param threaded: if True, packets will be sent in an individual thread
:param threaded: if True, packets are sent in a thread and received in another.
defaults to False.
:param session: a flow decoder used to handle stream of packets
:param chainEX: if True, exceptions during send will be forwarded
:param stop_filter: Python function applied to each packet to determine if
Expand All @@ -109,9 +110,8 @@ class SndRcvHandler(object):
This matches the requests and answers.
Notes::
- threaded mode: enabling threaded mode will likely
break packet timestamps, but might result in a speedup
when sending a big amount of packets. Disabled by default
- threaded: if you're planning to send/receive many packets, it's likely
a good idea to use threaded mode.
- DEVS: store the outgoing timestamp right BEFORE sending the packet
to avoid races that could result in negative latency. We aren't Stadia
"""
Expand Down Expand Up @@ -156,6 +156,8 @@ def __init__(self,
self.notans = 0
self.noans = 0
self._flood = _flood
self.threaded = threaded
self.breakout = False
# Instantiate packet holders
if prebuild and not self._flood:
self.tobesent = list(pkt) # type: _PacketIterable
Expand All @@ -175,22 +177,35 @@ def __init__(self,

if threaded or self._flood:
# Send packets in thread.
# https://github.com/secdev/scapy/issues/1791
snd_thread = Thread(
target=self._sndrcv_snd
)
snd_thread.daemon = True

# Start routine with callback
self._sndrcv_rcv(snd_thread.start)
interrupted = None
try:
self._sndrcv_rcv(snd_thread.start)
except KeyboardInterrupt as ex:
interrupted = ex

self.breakout = True

# Ended. Let's close gracefully
if self._flood:
# Flood: stop send thread
self._flood.stop()
snd_thread.join()

if interrupted and self.chainCC:
raise interrupted
else:
self._sndrcv_rcv(self._sndrcv_snd)
# Send packets, then receive.
try:
self._sndrcv_rcv(self._sndrcv_snd)
except KeyboardInterrupt:
if self.chainCC:
raise

if multi:
remain = [
Expand Down Expand Up @@ -252,6 +267,8 @@ def _sndrcv_snd(self):
self.pks.send(p)
if self.inter:
time.sleep(self.inter)
if self.breakout:
break
i += 1
if self.verbose:
print("Finished sending %i packets." % i)
Expand All @@ -273,6 +290,15 @@ def _sndrcv_snd(self):
elif not self._send_done:
self.notans = i
self._send_done = True
# In threaded mode, timeout.
if self.threaded and self.timeout is not None and not self.breakout:
t = time.monotonic() + self.timeout
while time.monotonic() < t:
if self.breakout:
break
time.sleep(0.1)
if self.sniffer and self.sniffer.running:
self.sniffer.stop()

def _process_packet(self, r):
# type: (Packet) -> None
Expand Down Expand Up @@ -310,22 +336,19 @@ def _process_packet(self, r):
def _sndrcv_rcv(self, callback):
# type: (Callable[[], None]) -> None
"""Function used to receive packets and check their hashret"""
# This is blocking.
self.sniffer = None # type: Optional[AsyncSniffer]
try:
self.sniffer = AsyncSniffer()
self.sniffer._run(
prn=self._process_packet,
timeout=self.timeout,
store=False,
opened_socket=self.rcv_pks,
session=self.session,
stop_filter=self.stop_filter,
started_callback=callback,
chainCC=self.chainCC,
)
except KeyboardInterrupt:
if self.chainCC:
raise
self.sniffer = AsyncSniffer()
self.sniffer._run(
prn=self._process_packet,
timeout=None if self.threaded else self.timeout,
store=False,
opened_socket=self.rcv_pks,
session=self.session,
stop_filter=self.stop_filter,
started_callback=callback,
chainCC=True,
)


def sndrcv(*args, **kwargs):
Expand Down

0 comments on commit ecfeb14

Please sign in to comment.