forked from ipython/ipykernel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
socket.py
41 lines (28 loc) · 1.29 KB
/
socket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
""" Defines a dummy socket implementing (part of) the zmq.Socket interface. """
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
from queue import Queue
import zmq
from traitlets import HasTraits, Instance, Int
# -----------------------------------------------------------------------------
# Dummy socket class
# -----------------------------------------------------------------------------
class DummySocket(HasTraits):
"""A dummy socket implementing (part of) the zmq.Socket interface."""
queue = Instance(Queue, ())
message_sent = Int(0) # Should be an Event
context = Instance(zmq.Context)
def _context_default(self):
return zmq.Context()
# -------------------------------------------------------------------------
# Socket interface
# -------------------------------------------------------------------------
def recv_multipart(self, flags=0, copy=True, track=False):
return self.queue.get_nowait()
def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
msg_parts = list(map(zmq.Message, msg_parts))
self.queue.put_nowait(msg_parts)
self.message_sent += 1
def flush(self, timeout=1.0):
"""no-op to comply with stream API"""
pass