use std::cell::RefCell;
use std::fmt;
use std::io;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use tokio;
use tokio::executor::current_thread::{CurrentThread, TaskExecutor};
use tokio_executor;
use tokio_executor::park::{Park, Unpark, ParkThread, UnparkThread};
use tokio_timer::timer::{self, Timer};
use futures::{Future, IntoFuture, Async};
use futures::future::{self, Executor, ExecuteError};
use futures::executor::{self, Spawn, Notify};
use futures::sync::mpsc;
use mio;
mod poll_evented;
mod poll_evented2;
mod timeout;
mod interval;
pub use self::poll_evented::PollEvented;
pub(crate) use self::poll_evented2::PollEvented as PollEvented2;
pub use self::timeout::Timeout;
pub use self::interval::Interval;
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
scoped_thread_local!(static CURRENT_LOOP: Core);
pub struct Core {
id: usize,
rt: tokio::runtime::Runtime,
executor: RefCell<CurrentThread<Timer<ParkThread>>>,
timer_handle: timer::Handle,
notify_future: Arc<MyNotify>,
notify_rx: Arc<MyNotify>,
tx: mpsc::UnboundedSender<Message>,
rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
inner: Rc<RefCell<Inner>>,
}
struct Inner {
pending_spawn: Vec<Box<Future<Item = (), Error = ()>>>,
}
#[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)]
pub struct CoreId(usize);
#[derive(Clone)]
pub struct Remote {
id: usize,
tx: mpsc::UnboundedSender<Message>,
new_handle: tokio::reactor::Handle,
timer_handle: timer::Handle,
}
#[derive(Clone)]
pub struct Handle {
remote: Remote,
inner: Weak<RefCell<Inner>>,
thread_pool: ::tokio::runtime::TaskExecutor,
}
enum Message {
Run(Box<FnBox>),
}
impl Core {
pub fn new() -> io::Result<Core> {
let timer = Timer::new(ParkThread::new());
let notify_future = Arc::new(MyNotify::new(timer.unpark()));
let notify_rx = Arc::new(MyNotify::new(timer.unpark()));
let rt = tokio::runtime::Runtime::new()?;
let timer_handle = timer.handle();
let executor = RefCell::new(CurrentThread::new_with_park(timer));
let (tx, rx) = mpsc::unbounded();
let rx = RefCell::new(executor::spawn(rx));
let id = NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed);
Ok(Core {
id,
rt,
notify_future,
notify_rx,
tx,
rx,
executor,
timer_handle,
inner: Rc::new(RefCell::new(Inner {
pending_spawn: vec![],
})),
})
}
pub fn handle(&self) -> Handle {
Handle {
remote: self.remote(),
inner: Rc::downgrade(&self.inner),
thread_pool: self.rt.executor().clone(),
}
}
pub fn runtime(&self) -> &tokio::runtime::Runtime {
&self.rt
}
pub fn remote(&self) -> Remote {
Remote {
id: self.id,
tx: self.tx.clone(),
new_handle: self.rt.reactor().clone(),
timer_handle: self.timer_handle.clone()
}
}
pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
where F: Future,
{
let mut task = executor::spawn(f);
let handle1 = self.rt.reactor().clone();
let handle2 = self.rt.reactor().clone();
let mut executor1 = self.rt.executor().clone();
let mut executor2 = self.rt.executor().clone();
let timer_handle = self.timer_handle.clone();
self.notify_future.notify(0);
loop {
if self.notify_future.take() {
let mut enter = tokio_executor::enter()
.ok().expect("cannot recursively call into `Core`");
let notify = &self.notify_future;
let mut current_thread = self.executor.borrow_mut();
let res = try!(CURRENT_LOOP.set(self, || {
::tokio_reactor::with_default(&handle1, &mut enter, |enter| {
tokio_executor::with_default(&mut executor1, enter, |enter| {
timer::with_default(&timer_handle, enter, |enter| {
current_thread.enter(enter)
.block_on(future::lazy(|| {
Ok::<_, ()>(task.poll_future_notify(notify, 0))
})).unwrap()
})
})
})
}));
if let Async::Ready(e) = res {
return Ok(e)
}
}
self.poll(None, &handle2, &mut executor2);
}
}
pub fn turn(&mut self, max_wait: Option<Duration>) {
let handle = self.rt.reactor().clone();
let mut executor = self.rt.executor().clone();
self.poll(max_wait, &handle, &mut executor);
}
fn poll(&mut self, max_wait: Option<Duration>,
handle: &tokio::reactor::Handle,
sender: &mut tokio::runtime::TaskExecutor) {
let mut enter = tokio_executor::enter()
.ok().expect("cannot recursively call into `Core`");
let timer_handle = self.timer_handle.clone();
::tokio_reactor::with_default(handle, &mut enter, |enter| {
tokio_executor::with_default(sender, enter, |enter| {
timer::with_default(&timer_handle, enter, |enter| {
let start = Instant::now();
if self.notify_rx.take() {
CURRENT_LOOP.set(self, || self.consume_queue());
}
{
let mut e = self.executor.borrow_mut();
let mut i = self.inner.borrow_mut();
for f in i.pending_spawn.drain(..) {
e.enter(enter).block_on(future::lazy(|| {
TaskExecutor::current().spawn_local(f).unwrap();
Ok::<_, ()>(())
})).unwrap();
}
}
CURRENT_LOOP.set(self, || {
self.executor.borrow_mut()
.enter(enter)
.turn(max_wait)
.ok().expect("error in `CurrentThread::turn`");
});
let after_poll = Instant::now();
debug!("loop poll - {:?}", after_poll - start);
debug!("loop time - {:?}", after_poll);
debug!("loop process, {:?}", after_poll.elapsed());
})
});
});
}
fn consume_queue(&self) {
debug!("consuming notification queue");
loop {
let msg = self.rx.borrow_mut().poll_stream_notify(&self.notify_rx, 0).unwrap();
match msg {
Async::Ready(Some(msg)) => self.notify(msg),
Async::NotReady |
Async::Ready(None) => break,
}
}
}
fn notify(&self, msg: Message) {
let Message::Run(r) = msg;
r.call_box(self);
}
pub fn id(&self) -> CoreId {
CoreId(self.id)
}
}
impl<F> Executor<F> for Core
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.handle().execute(future)
}
}
impl fmt::Debug for Core {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Core")
.field("id", &self.id())
.finish()
}
}
impl Remote {
fn send(&self, msg: Message) {
self.with_loop(|lp| {
match lp {
Some(lp) => {
if lp.notify_rx.take() {
lp.consume_queue();
}
lp.notify(msg);
}
None => {
match self.tx.unbounded_send(msg) {
Ok(()) => {}
Err(e) => drop(e),
}
}
}
})
}
fn with_loop<F, R>(&self, f: F) -> R
where F: FnOnce(Option<&Core>) -> R
{
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
let same = lp.id == self.id;
if same {
f(Some(lp))
} else {
f(None)
}
})
} else {
f(None)
}
}
pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>,
R::Future: 'static,
{
self.send(Message::Run(Box::new(|lp: &Core| {
let f = f(&lp.handle());
lp.handle().spawn(f.into_future());
})));
}
pub fn id(&self) -> CoreId {
CoreId(self.id)
}
pub fn handle(&self) -> Option<Handle> {
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
let same = lp.id == self.id;
if same {
Some(lp.handle())
} else {
None
}
})
} else {
None
}
}
}
impl<F> Executor<F> for Remote
where F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(|_| future);
Ok(())
}
}
impl fmt::Debug for Remote {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Remote")
.field("id", &self.id())
.finish()
}
}
impl Handle {
pub fn new_tokio_handle(&self) -> &::tokio::reactor::Handle {
&self.remote.new_handle
}
pub fn remote(&self) -> &Remote {
&self.remote
}
pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static,
{
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => {
return;
}
};
if let Ok(mut inner) = inner.try_borrow_mut() {
inner.pending_spawn.push(Box::new(f));
return;
}
let _ = TaskExecutor::current().spawn_local(Box::new(f));
}
pub fn spawn_send<F>(&self, f: F)
where F: Future<Item=(), Error=()> + Send + 'static,
{
self.thread_pool.spawn(f);
}
pub fn spawn_fn<F, R>(&self, f: F)
where F: FnOnce() -> R + 'static,
R: IntoFuture<Item=(), Error=()> + 'static,
{
self.spawn(future::lazy(f))
}
pub fn id(&self) -> CoreId {
self.remote.id()
}
}
impl<F> Executor<F> for Handle
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(future);
Ok(())
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Handle")
.field("id", &self.id())
.finish()
}
}
struct MyNotify {
unpark: UnparkThread,
notified: AtomicBool,
}
impl MyNotify {
fn new(unpark: UnparkThread) -> Self {
MyNotify {
unpark,
notified: AtomicBool::new(true),
}
}
fn take(&self) -> bool {
self.notified.swap(false, Ordering::SeqCst)
}
}
impl Notify for MyNotify {
fn notify(&self, _: usize) {
self.notified.store(true, Ordering::SeqCst);
self.unpark.unpark();
}
}
trait FnBox: Send + 'static {
fn call_box(self: Box<Self>, lp: &Core);
}
impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
fn call_box(self: Box<Self>, lp: &Core) {
(*self)(lp)
}
}
const READ: usize = 1 << 0;
const WRITE: usize = 1 << 1;
fn ready2usize(ready: mio::Ready) -> usize {
let mut bits = 0;
if ready.is_readable() {
bits |= READ;
}
if ready.is_writable() {
bits |= WRITE;
}
bits | platform::ready2usize(ready)
}
fn usize2ready(bits: usize) -> mio::Ready {
let mut ready = mio::Ready::empty();
if bits & READ != 0 {
ready.insert(mio::Ready::readable());
}
if bits & WRITE != 0 {
ready.insert(mio::Ready::writable());
}
ready | platform::usize2ready(bits)
}
#[cfg(all(unix, not(target_os = "fuchsia")))]
mod platform {
use mio::Ready;
use mio::unix::UnixReady;
const HUP: usize = 1 << 2;
const ERROR: usize = 1 << 3;
const AIO: usize = 1 << 4;
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
fn is_aio(ready: &Ready) -> bool {
UnixReady::from(*ready).is_aio()
}
#[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
fn is_aio(_ready: &Ready) -> bool {
false
}
pub fn ready2usize(ready: Ready) -> usize {
let ready = UnixReady::from(ready);
let mut bits = 0;
if is_aio(&ready) {
bits |= AIO;
}
if ready.is_error() {
bits |= ERROR;
}
if ready.is_hup() {
bits |= HUP;
}
bits
}
#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "ios",
target_os = "macos"))]
fn usize2ready_aio(ready: &mut UnixReady) {
ready.insert(UnixReady::aio());
}
#[cfg(not(any(target_os = "dragonfly",
target_os = "freebsd", target_os = "ios", target_os = "macos")))]
fn usize2ready_aio(_ready: &mut UnixReady) {
}
pub fn usize2ready(bits: usize) -> Ready {
let mut ready = UnixReady::from(Ready::empty());
if bits & AIO != 0 {
usize2ready_aio(&mut ready);
}
if bits & HUP != 0 {
ready.insert(UnixReady::hup());
}
if bits & ERROR != 0 {
ready.insert(UnixReady::error());
}
ready.into()
}
}
#[cfg(any(windows, target_os = "fuchsia"))]
mod platform {
use mio::Ready;
pub fn ready2usize(_r: Ready) -> usize {
0
}
pub fn usize2ready(_r: usize) -> Ready {
Ready::empty()
}
}