13.调用端如何同步等待网络返回结果以及如何实现调用端超时

rpc的调用不同于我们那种简单的在本地的同步调用,当一个调用发出去之后,要经过编解码,发送到provider端,等provider端处理完回来再接收到响应结果。这个过程是”异步的“,那么我们再调用时如何让异步转同步,能等到结果的?

在前面讲 发现到服务之后如何玩转调用的最简过程 部分时,当consumer端发送调用请求时要过HeaderExchangeChannel.request(Object, int) line: 112 。我们来看下这部分的代码:

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
109        DefaultFuture future = new DefaultFuture(channel, req, timeout);
110        try{
112            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

我们看到109行构建了DefaultFuture对象,并将其作为返回值返回,DefaultFuture是ResponseFuture子类。

那么,我们就发现了,同步等待结果是靠dubbo自己又做DefaultFuture的包装完成的,背后是靠ReentrantLock的Condition对象的await完成等待,并实现客户端调用超时机制。

get的时候用上面的condition进行等待。

接收到结果的时候,对上面的condition进行signal完成等待。至于 Condition对象为什么能完成异步转同步的等待,已经不在本系列文章的讲解范围了。在JDK并发库源码阅读中讲。

附上部分代码:

private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}
public Object get(int timeout) throws RemotingException {
     if (timeout <= 0) {
         timeout = Constants.DEFAULT_TIMEOUT;
     }
     if (! isDone()) {
         long start = System.currentTimeMillis();
         lock.lock();
         try {
             while (! isDone()) {
                 done.await(timeout, TimeUnit.MILLISECONDS);
                 if (isDone() || System.currentTimeMillis() - start > timeout) {
                     break;
                 }
             }
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         } finally {
             lock.unlock();
         }
         if (! isDone()) {
             throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
         }
     }
     return returnFromResponse();
 }

这个方式是基于有前面的代码阅读基础的方式发现对应的代码逻辑在这里。如果没有前面的代码阅读基础,还有个办法就是,在跑case的时候把provider端用debug的方式挂起。 然后用jstack看consumer的堆栈也能发现,等待的代码逻辑在这里。