Skip to content

Commit

Permalink
session: allow polling on a provided fd
Browse files Browse the repository at this point in the history
  • Loading branch information
bfredl committed Aug 28, 2016
1 parent b8bb7c2 commit f5f4e62
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 11 deletions.
47 changes: 38 additions & 9 deletions neovim/api/nvim.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,20 @@ def new_highlight_source(self):
"""Return new src_id for use with Buffer.add_highlight."""
return self.current.buffer.add_highlight("", 0, src_id=0)

def _error_wrapper(self, fn, call_point, *args, **kwargs):
if fn is None:
return None
def handler():
try:
fn(*args, **kwargs)
except Exception as err:
msg = ("error caught while executing async callback:\n"
"{0!r}\n{1}\n \nthe call was requested at\n{2}"
.format(err, format_exc_skip(1, 5), call_point))
self._err_cb(msg)
raise
return handler

def async_call(self, fn, *args, **kwargs):
"""Schedule `fn` to be called by the event loop soon.
Expand All @@ -333,18 +347,33 @@ def async_call(self, fn, *args, **kwargs):
that shouldn't block neovim.
"""
call_point = ''.join(format_stack(None, 5)[:-1])
handler = self._error_wrapper(fn, call_point, *args, **kwargs)

def handler():
try:
fn(*args, **kwargs)
except Exception as err:
msg = ("error caught while executing async callback:\n"
"{0!r}\n{1}\n \nthe call was requested at\n{2}"
.format(err, format_exc_skip(1, 5), call_point))
self._err_cb(msg)
raise
self._session.threadsafe_call(handler)

def poll_fd(self, fd, on_readable=None, on_writable=None, greenlet=True):
"""
Invoke callbacks when the fd is ready for reading and/or writing. if
`on_readable` is not None, it should be callback, which will be invoked
(with no arguments) when the fd is ready for writing. Similarily if
`on_writable` is not None it will be invoked when the fd is ready for
writing.
Only one callback (of each kind) can be registered on the same fd at a
time. If both readability and writability should be monitored, both
callbacks must be registered by the same `poll_fd` call.
By default, the function is invoked in a greenlet, just like a callback
scheduled by async_call.
Returns a function that deactivates the callback(s).
"""
call_point = ''.join(format_stack(None, 5)[:-1])
on_readable = self._error_wrapper(on_readable, call_point)
on_writable = self._error_wrapper(on_writable, call_point)
return self._session.poll_fd(fd, on_readable, on_writable, greenlet)



class Buffers(object):

Expand Down
4 changes: 4 additions & 0 deletions neovim/msgpack_rpc/async_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def threadsafe_call(self, fn):
"""Wrapper around `MsgpackStream.threadsafe_call`."""
self._msgpack_stream.threadsafe_call(fn)

def poll_fd(self, fd, on_readable, on_writable):
"""Wrapper around `BaseEventLoop.poll_fd`."""
return self._msgpack_stream.poll_fd(fd, on_readable, on_writable)

def request(self, method, args, response_cb):
"""Send a msgpack-rpc request to Nvim.
Expand Down
12 changes: 12 additions & 0 deletions neovim/msgpack_rpc/event_loop/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ def _stop(self):
def _threadsafe_call(self, fn):
self._loop.call_soon_threadsafe(fn)

def _poll_fd(self, fd, on_readable, on_writable):
if on_readable is not None:
self._loop.add_reader(fd, on_readable)
if on_writable is not None:
self._loop.add_writer(fd, on_writable)
def cancel():
if on_readable is not None:
self._loop.remove_reader(fd)
if on_writable is not None:
self._loop.remove_writer(fd)
return cancel

def _setup_signals(self, signals):
if os.name == 'nt':
# add_signal_handler is not supported in win32
Expand Down
18 changes: 18 additions & 0 deletions neovim/msgpack_rpc/event_loop/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ def threadsafe_call(self, fn):
"""
self._threadsafe_call(fn)

def poll_fd(self, fd, on_readable=None, on_writable=None):
"""
Invoke callbacks when the fd is ready for reading and/or writing. if
`on_readable` is not None, it should be callback, which will be invoked
(with no arguments) when the fd is ready for writing. Similarily if
`on_writable` is not None it will be invoked when the fd is ready for
writing.
Only one callback (of each kind) can be registered on the same fd at a
time. If both readability and writability should be monitored, both
callbacks must be registered by the same `poll_fd` call.
Returns a function that deactivates the callback(s).
"""
if on_readable is None and on_writable is None:
raise ValueError("poll_fd: At least one of `on_readable` and `on_writable` must be present")
return self._poll_fd(fd, on_readable, on_writable)

def run(self, data_cb):
"""Run the event loop."""
if self._error:
Expand Down
16 changes: 16 additions & 0 deletions neovim/msgpack_rpc/event_loop/uv.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,22 @@ def _on_async(self, handle):
while self._callbacks:
self._callbacks.popleft()()

def _poll_fd(self, fd, on_readable, on_writable):
poll = pyuv.Poll(self._loop, fd)
events = 0
if on_readable is not None:
events |= pyuv.UV_READABLE
if on_writable is not None:
events |= pyuv.UV_WRITABLE
def callback(poll_handle, evts, errorno):
if evts & pyuv.UV_READABLE:
on_readable()
if evts & pyuv.UV_WRITABLE:
on_writable()

poll.start(events, callback)
return poll.stop

def _setup_signals(self, signals):
self._signal_handles = []

Expand Down
4 changes: 4 additions & 0 deletions neovim/msgpack_rpc/msgpack_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def threadsafe_call(self, fn):
"""Wrapper around `BaseEventLoop.threadsafe_call`."""
self._event_loop.threadsafe_call(fn)

def poll_fd(self, fd, on_readable, on_writable):
"""Wrapper around `BaseEventLoop.poll_fd`."""
return self._event_loop.poll_fd(fd, on_readable, on_writable)

def send(self, msg):
"""Queue `msg` for sending to Nvim."""
debug('sent %s', msg)
Expand Down
21 changes: 19 additions & 2 deletions neovim/msgpack_rpc/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ def __init__(self, async_session):
self._is_running = False
self._setup_exception = None

def threadsafe_call(self, fn, *args, **kwargs):
"""Wrapper around `AsyncSession.threadsafe_call`."""
def _wrap_greenlet(self, fn, *args, **kwargs):
if fn is None:
return None

def handler():
try:
fn(*args, **kwargs)
Expand All @@ -41,8 +43,23 @@ def greenlet_wrapper():
gr = greenlet.greenlet(handler)
gr.switch()

return greenlet_wrapper

def threadsafe_call(self, fn, *args, **kwargs):
"""Wrapper around `AsyncSession.threadsafe_call`."""

greenlet_wrapper = self._wrap_greenlet(fn, *args, **kwargs)
self._async_session.threadsafe_call(greenlet_wrapper)

def poll_fd(self, fd, on_readable, on_writable, greenlet=True):
"""Wrapper around `AsyncSession.threadsafe_call`."""
if greenlet:
on_readable = self._wrap_greenlet(on_readable)
on_writable = self._wrap_greenlet(on_writable)

return self._async_session.poll_fd(fd, on_readable, on_writable)


def next_message(self):
"""Block until a message(request or notification) is available.
Expand Down

0 comments on commit f5f4e62

Please sign in to comment.