33 KiB
sofabolt基础通信模型
阻塞与同步的再讨论
四个线程的模型:(非常重要。非常简单。非常关键)
- 有的时候,不必非得给一次通信扣上同步或者一部的的帽子。
- 客户端的同步异步称为阻塞和非阻塞,服务器端的同步异步称为同步异步。
- 只要链路上有一处是一部的,我们称整个调用链路就是异步的。
- 将客户端分为客户端线程和客户端连接线程,将服务端分为服务端线程和服务端连接线程。
- 如果客户端线程等待连接线程返回结果,则称为阻塞的。如果不等待连接线程的结果,称为非阻塞的。
- 如果服务端连接线程等待服务端线程的结果,则称为同步的。如果不等待服务端处理线程的结果,称为异步的。
四种客户端模型
我们提供了四种通信模型,这四种模型都是客户端的调用模式。
- Oneway 调用(客户端非阻塞)
当前线程发起调用后,不关心调用结果,不做超时控制,只要请求已经发出,就完成本次调用。注意 Oneway 调用不保证成功,而且发起方无法知道调用结果。因此通常用于可以重试,或者定时通知类的场景,调用过程是有可能因为网络问题,机器故障等原因,导致请求失败。业务场景需要能接受这样的异常场景,才可以使用。请参考示例。
- Sync 同步调用(客户端阻塞)
当前线程发起调用后,需要在指定的超时时间内,等到响应结果,才能完成本次调用。如果超时时间内没有得到结果,那么会抛出超时异常。这种调用模式最常用。注意要根据对端的处理能力,合理设置超时时间。请参考示例。
- Future调用(客户端半阻塞)
当前线程发起调用,得到一个 RpcResponseFuture 对象,当前线程可以继续执行下一次调用。可以在任意时刻,使用 RpcResponseFuture 对象的 get() 方法来获取结果,如果响应已经回来,此时就马上得到结果;如果响应没有回来,则会阻塞住当前线程,直到响应回来,或者超时时间到。请参考示例。
- Callback异步调用(客户端不阻塞)
当前线程发起调用,则本次调用马上结束,可以马上执行下一次调用。发起调用时需要注册一个回调,该回调需要分配一个异步线程池。待响应回来后,会在回调的异步线程池,来执行回调逻辑。请参考示例。
package com.alipay.remoting.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.InvokeCallback;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.remoting.rpc.RpcResponseFuture;
import com.alipay.remoting.rpc.common.BoltServer;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import com.alipay.remoting.rpc.common.DISCONNECTEventProcessor;
import com.alipay.remoting.rpc.common.PortScan;
import com.alipay.remoting.rpc.common.RequestBody;
import com.alipay.remoting.rpc.common.SimpleClientUserProcessor;
import com.alipay.remoting.rpc.common.SimpleServerUserProcessor;
import com.alipay.remoting.util.RemotingUtil;
/**
* basic usage demo
*
* basic usage of rpc client and rpc server
*
* @author xiaomin.cxm
* @version $Id: BasicUsageDemo.java, v 0.1 Apr 6, 2016 8:58:36 PM xiaomin.cxm Exp $
*/
public class BasicUsageDemoByJunit {
static Logger logger = LoggerFactory
.getLogger(BasicUsageDemoByJunit.class);
BoltServer server;
RpcClient client;
int port = PortScan.select();
String ip = "127.0.0.1";
String addr = "127.0.0.1:" + port;
int invokeTimes = 5;
SimpleServerUserProcessor serverUserProcessor = new SimpleServerUserProcessor();
SimpleClientUserProcessor clientUserProcessor = new SimpleClientUserProcessor();
CONNECTEventProcessor clientConnectProcessor = new CONNECTEventProcessor();
CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor clientDisConnectProcessor = new DISCONNECTEventProcessor();
DISCONNECTEventProcessor serverDisConnectProcessor = new DISCONNECTEventProcessor();
@Before
public void init() {
server = new BoltServer(port, true);
server.start();
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
server.addConnectionEventProcessor(ConnectionEventType.CLOSE, serverDisConnectProcessor);
server.registerUserProcessor(serverUserProcessor);
client = new RpcClient();
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
client.registerUserProcessor(clientUserProcessor);
client.init();
}
@After
public void stop() {
try {
server.stop();
Thread.sleep(100);
} catch (InterruptedException e) {
logger.error("Stop server failed!", e);
}
}
@Test
public void testOneway() throws InterruptedException {
RequestBody req = new RequestBody(2, "hello world oneway");
for (int i = 0; i < invokeTimes; i++) {
try {
client.oneway(addr, req);
Thread.sleep(100);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
@Test
public void testSync() throws InterruptedException {
RequestBody req = new RequestBody(1, "hello world sync");
for (int i = 0; i < invokeTimes; i++) {
try {
String res = (String) client.invokeSync(addr, req, 3000);
logger.warn("Result received in sync: " + res);
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, res);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in sync!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in sync!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
@Test
public void testFuture() throws InterruptedException {
RequestBody req = new RequestBody(2, "hello world future");
for (int i = 0; i < invokeTimes; i++) {
try {
RpcResponseFuture future = client.invokeWithFuture(addr, req, 3000);
String res = (String) future.get();
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, res);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in future!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in future!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
@Test
public void testCallback() throws InterruptedException {
RequestBody req = new RequestBody(1, "hello world callback");
final List<String> rets = new ArrayList<String>(1);
for (int i = 0; i < invokeTimes; i++) {
final CountDownLatch latch = new CountDownLatch(1);
try {
client.invokeWithCallback(addr, req, new InvokeCallback() {
Executor executor = Executors.newCachedThreadPool();
@Override
public void onResponse(Object result) {
logger.warn("Result received in callback: " + result);
rets.add((String) result);
latch.countDown();
}
@Override
public void onException(Throwable e) {
logger.error("Process exception in callback.", e);
latch.countDown();
}
@Override
public Executor getExecutor() {
return executor;
}
}, 1000);
} catch (RemotingException e) {
latch.countDown();
String errMsg = "RemotingException caught in callback!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
try {
latch.await();
} catch (InterruptedException e) {
String errMsg = "InterruptedException caught in callback!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
if (rets.size() == 0) {
Assert.fail("No result! Maybe exception caught!");
}
Assert.assertEquals(RequestBody.DEFAULT_SERVER_RETURN_STR, rets.get(0));
rets.clear();
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
}
两种服务器模型
提供两种模型,都是服务器的模型。
- 同步处理器 同步处理返回结果
package com.alipay.remoting.rpc.common;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.InvokeContext;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.rpc.protocol.SyncUserProcessor;
/**
* a demo user processor for rpc server
*
* @author xiaomin.cxm
* @version $Id: SimpleServerUserProcessor.java, v 0.1 Jan 7, 2016 3:01:49 PM xiaomin.cxm Exp $
*/
public class SimpleServerUserProcessor extends SyncUserProcessor<RequestBody> {
/** logger */
private static final Logger logger = LoggerFactory
.getLogger(SimpleServerUserProcessor.class);
/** delay milliseconds */
private long delayMs;
/** whether delay or not */
private boolean delaySwitch;
/** executor */
private ThreadPoolExecutor executor;
/** default is true */
private boolean timeoutDiscard = true;
private AtomicInteger invokeTimes = new AtomicInteger();
private AtomicInteger onewayTimes = new AtomicInteger();
private AtomicInteger syncTimes = new AtomicInteger();
private AtomicInteger futureTimes = new AtomicInteger();
private AtomicInteger callbackTimes = new AtomicInteger();
private String remoteAddr;
private CountDownLatch latch = new CountDownLatch(1);
public SimpleServerUserProcessor() {
this.delaySwitch = false;
this.delayMs = 0;
this.executor = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), new NamedThreadFactory("Request-process-pool"));
}
public SimpleServerUserProcessor(long delay) {
this();
if (delay < 0) {
throw new IllegalArgumentException("delay time illegal!");
}
this.delaySwitch = true;
this.delayMs = delay;
}
public SimpleServerUserProcessor(long delay, int core, int max, int keepaliveSeconds,
int workQueue) {
this(delay);
this.executor = new ThreadPoolExecutor(core, max, keepaliveSeconds, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(workQueue), new NamedThreadFactory(
"Request-process-pool"));
}
// ~~~ override methods
@Override
public Object handleRequest(BizContext bizCtx, RequestBody request) throws Exception {
logger.warn("Request received:" + request + ", timeout:" + bizCtx.getClientTimeout()
+ ", arriveTimestamp:" + bizCtx.getArriveTimestamp());
if (bizCtx.isRequestTimeout()) {
String errMsg = "Stop process in server biz thread, already timeout!";
processTimes(request);
logger.warn(errMsg);
throw new Exception(errMsg);
}
this.remoteAddr = bizCtx.getRemoteAddress();
//test biz context get connection
Assert.assertNotNull(bizCtx.getConnection());
Assert.assertTrue(bizCtx.getConnection().isFine());
Long waittime = (Long) bizCtx.getInvokeContext().get(InvokeContext.BOLT_PROCESS_WAIT_TIME);
Assert.assertNotNull(waittime);
if (logger.isInfoEnabled()) {
logger.info("Server User processor process wait time {}", waittime);
}
latch.countDown();
logger.warn("Server User processor say, remote address is [" + this.remoteAddr + "].");
Assert.assertEquals(RequestBody.class, request.getClass());
processTimes(request);
if (!delaySwitch) {
return RequestBody.DEFAULT_SERVER_RETURN_STR;
}
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
return RequestBody.DEFAULT_SERVER_RETURN_STR;
}
@Override
public String interest() {
return RequestBody.class.getName();
}
@Override
public Executor getExecutor() {
return executor;
}
@Override
public boolean timeoutDiscard() {
return this.timeoutDiscard;
}
// ~~~ public methods
public int getInvokeTimes() {
return this.invokeTimes.get();
}
public int getInvokeTimesEachCallType(RequestBody.InvokeType type) {
return new int[] { this.onewayTimes.get(), this.syncTimes.get(), this.futureTimes.get(),
this.callbackTimes.get() }[type.ordinal()];
}
public String getRemoteAddr() throws InterruptedException {
latch.await(100, TimeUnit.MILLISECONDS);
return this.remoteAddr;
}
// ~~~ private methods
private void processTimes(RequestBody req) {
this.invokeTimes.incrementAndGet();
if (req.getMsg().equals(RequestBody.DEFAULT_ONEWAY_STR)) {
this.onewayTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_SYNC_STR)) {
this.syncTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_FUTURE_STR)) {
this.futureTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_CALLBACK_STR)) {
this.callbackTimes.incrementAndGet();
}
}
// ~~~ getters and setters
/**
* Getter method for property <tt>timeoutDiscard</tt>.
*
* @return property value of timeoutDiscard
*/
public boolean isTimeoutDiscard() {
return timeoutDiscard;
}
/**
* Setter method for property <tt>timeoutDiscard<tt>.
*
* @param timeoutDiscard value to be assigned to property timeoutDiscard
*/
public void setTimeoutDiscard(boolean timeoutDiscard) {
this.timeoutDiscard = timeoutDiscard;
}
}
- 异步处理器 直接返回结果,异步处理。其实这都是服务器上开不开异步线程的问题,与sofabolt框架关系不大。
package com.alipay.remoting.rpc.common;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.rpc.protocol.AsyncUserProcessor;
/**
* a demo aysnc user processor for rpc server
*
* @author xiaomin.cxm
* @version $Id: SimpleServerUserProcessor.java, v 0.1 Jan 7, 2016 3:01:49 PM xiaomin.cxm Exp $
*/
public class AsyncServerUserProcessor extends AsyncUserProcessor<RequestBody> {
/** logger */
private static final Logger logger = LoggerFactory
.getLogger(AsyncServerUserProcessor.class);
/** delay milliseconds */
private long delayMs;
/** whether delay or not */
private boolean delaySwitch;
/** whether exception */
private boolean isException;
/** whether null */
private boolean isNull;
/** executor */
private ThreadPoolExecutor executor;
private ThreadPoolExecutor asyncExecutor;
private AtomicInteger invokeTimes = new AtomicInteger();
private AtomicInteger onewayTimes = new AtomicInteger();
private AtomicInteger syncTimes = new AtomicInteger();
private AtomicInteger futureTimes = new AtomicInteger();
private AtomicInteger callbackTimes = new AtomicInteger();
private String remoteAddr;
private CountDownLatch latch = new CountDownLatch(1);
public AsyncServerUserProcessor() {
this.delaySwitch = false;
this.isException = false;
this.delayMs = 0;
this.executor = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), new NamedThreadFactory("Request-process-pool"));
this.asyncExecutor = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), new NamedThreadFactory(
"Another-aysnc-process-pool"));
}
public AsyncServerUserProcessor(boolean isException, boolean isNull) {
this();
this.isException = isException;
this.isNull = isNull;
}
public AsyncServerUserProcessor(long delay) {
this();
if (delay < 0) {
throw new IllegalArgumentException("delay time illegal!");
}
this.delaySwitch = true;
this.delayMs = delay;
}
public AsyncServerUserProcessor(long delay, int core, int max, int keepaliveSeconds,
int workQueue) {
this(delay);
this.executor = new ThreadPoolExecutor(core, max, keepaliveSeconds, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(workQueue), new NamedThreadFactory(
"Request-process-pool"));
}
@Override
public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, RequestBody request) {
this.asyncExecutor.execute(new InnerTask(bizCtx, asyncCtx, request));
}
class InnerTask implements Runnable {
private BizContext bizCtx;
private AsyncContext asyncCtx;
private RequestBody request;
public InnerTask(BizContext bizCtx, AsyncContext asyncCtx, RequestBody request) {
this.bizCtx = bizCtx;
this.asyncCtx = asyncCtx;
this.request = request;
}
public void run() {
logger.warn("Request received:" + request);
remoteAddr = bizCtx.getRemoteAddress();
latch.countDown();
logger.warn("Server User processor say, remote address is [" + remoteAddr + "].");
Assert.assertEquals(RequestBody.class, request.getClass());
processTimes(request);
if (isException) {
this.asyncCtx.sendResponse(new IllegalArgumentException("Exception test"));
} else if (isNull) {
this.asyncCtx.sendResponse(null);
} else {
if (!delaySwitch) {
this.asyncCtx.sendResponse(RequestBody.DEFAULT_SERVER_RETURN_STR);
return;
}
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.asyncCtx.sendResponse(RequestBody.DEFAULT_SERVER_RETURN_STR);
}
}
}
private void processTimes(RequestBody req) {
this.invokeTimes.incrementAndGet();
if (req.getMsg().equals(RequestBody.DEFAULT_ONEWAY_STR)) {
this.onewayTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_SYNC_STR)) {
this.syncTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_FUTURE_STR)) {
this.futureTimes.incrementAndGet();
} else if (req.getMsg().equals(RequestBody.DEFAULT_CALLBACK_STR)) {
this.callbackTimes.incrementAndGet();
}
}
@Override
public String interest() {
return RequestBody.class.getName();
}
@Override
public Executor getExecutor() {
return executor;
}
public int getInvokeTimes() {
return this.invokeTimes.get();
}
public int getInvokeTimesEachCallType(RequestBody.InvokeType type) {
return new int[] { this.onewayTimes.get(), this.syncTimes.get(), this.futureTimes.get(),
this.callbackTimes.get() }[type.ordinal()];
}
public String getRemoteAddr() throws InterruptedException {
latch.await(100, TimeUnit.MILLISECONDS);
return this.remoteAddr;
}
}
客户端和服务端初始化
- 客户端初始化方法
package com.alipay.remoting.demo;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import com.alipay.remoting.rpc.common.DISCONNECTEventProcessor;
import com.alipay.remoting.rpc.common.RequestBody;
import com.alipay.remoting.rpc.common.SimpleClientUserProcessor;
/**
* a demo for rpc client, you can just run the main method after started rpc server of {@link RpcServerDemoByMain}
*
* @author tsui
* @version $Id: RpcClientDemoByMain.java, v 0.1 2018-04-10 10:39 tsui Exp $
*/
public class RpcClientDemoByMain {
static Logger logger = LoggerFactory
.getLogger(RpcClientDemoByMain.class);
static RpcClient client;
static String addr = "127.0.0.1:8999";
SimpleClientUserProcessor clientUserProcessor = new SimpleClientUserProcessor();
CONNECTEventProcessor clientConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor clientDisConnectProcessor = new DISCONNECTEventProcessor();
public RpcClientDemoByMain() {
// 1. create a rpc client
client = new RpcClient();
// 2. add processor for connect and close event if you need
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
// 3. do init
client.init();
}
public static void main(String[] args) {
new RpcClientDemoByMain();
RequestBody req = new RequestBody(2, "hello world sync");
try {
String res = (String) client.invokeSync(addr, req, 3000);
System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
logger.error("interrupted!");
}
client.shutdown();
}
}
- 服务器端初始化
package com.alipay.remoting.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.rpc.common.BoltServer;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import com.alipay.remoting.rpc.common.DISCONNECTEventProcessor;
import com.alipay.remoting.rpc.common.SimpleServerUserProcessor;
/**
* a demo for rpc server, you can just run the main method to start a server
*
* @author tsui
* @version $Id: RpcServerDemoByMain.java, v 0.1 2018-04-10 10:37 tsui Exp $
*/
public class RpcServerDemoByMain {
static Logger logger = LoggerFactory
.getLogger(RpcServerDemoByMain.class);
BoltServer server;
int port = 8999;
SimpleServerUserProcessor serverUserProcessor = new SimpleServerUserProcessor();
CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();
DISCONNECTEventProcessor serverDisConnectProcessor = new DISCONNECTEventProcessor();
public RpcServerDemoByMain() {
// 1. create a Rpc server with port assigned
server = new BoltServer(port);
// 2. add processor for connect and close event if you need
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
server.addConnectionEventProcessor(ConnectionEventType.CLOSE, serverDisConnectProcessor);
// 3. register user processor for client request
server.registerUserProcessor(serverUserProcessor);
// 4. server start
if (server.start()) {
System.out.println("server start ok!");
} else {
System.out.println("server start failed!");
}
// server.getRpcServer().stop();
}
public static void main(String[] args) {
new RpcServerDemoByMain();
}
}
2 进阶功能
请求上下文
public void testOneway() throws InterruptedException {
RequestBody req = new RequestBody(2, "hello world oneway");
for (int i = 0; i < invokeTimes; i++) {
try {
InvokeContext invokeContext = new InvokeContext();
client.oneway(addr, req, invokeContext);
Assert.assertEquals("127.0.0.1", invokeContext.get(InvokeContext.CLIENT_LOCAL_IP));
Assert.assertEquals("127.0.0.1", invokeContext.get(InvokeContext.CLIENT_REMOTE_IP));
Assert.assertNotNull(invokeContext.get(InvokeContext.CLIENT_LOCAL_PORT));
Assert.assertNotNull(invokeContext.get(InvokeContext.CLIENT_REMOTE_PORT));
Assert.assertNotNull(invokeContext.get(InvokeContext.CLIENT_CONN_CREATETIME));
logger.warn("CLIENT_CONN_CREATETIME:"
+ invokeContext.get(InvokeContext.CLIENT_CONN_CREATETIME));
Thread.sleep(100);
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
}
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
双工通信
@Test
public void testServerSyncUsingConnection1() throws Exception {
for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(addr, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
Assert.assertNotNull(serverConnectProcessor.getConnection());
Connection serverConn = serverConnectProcessor.getConnection();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(serverConn, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
@Test
public void testServerSyncUsingConnection() throws Exception {
Connection clientConn = client.createStandaloneConnection(ip, port, 1000);
for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(clientConn, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
Assert.assertNotNull(serverConnectProcessor.getConnection());
Connection serverConn = serverConnectProcessor.getConnection();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(serverConn, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
@Test
public void testServerSyncUsingAddress() throws Exception {
Connection clientConn = client.createStandaloneConnection(ip, port, 1000);
String remote = RemotingUtil.parseRemoteAddress(clientConn.getChannel());
String local = RemotingUtil.parseLocalAddress(clientConn.getChannel());
logger.warn("Client say local:" + local);
logger.warn("Client say remote:" + remote);
for (int i = 0; i < invokeTimes; i++) {
RequestBody req1 = new RequestBody(1, RequestBody.DEFAULT_CLIENT_STR);
String serverres = (String) client.invokeSync(clientConn, req1, 1000);
Assert.assertEquals(serverres, RequestBody.DEFAULT_SERVER_RETURN_STR);
Assert.assertNotNull(serverConnectProcessor.getConnection());
// only when client invoked, the remote address can be get by UserProcessor
// otherwise, please use ConnectionEventProcessor
String remoteAddr = serverUserProcessor.getRemoteAddr();
RequestBody req = new RequestBody(1, RequestBody.DEFAULT_SERVER_STR);
String clientres = (String) server.getRpcServer().invokeSync(remoteAddr, req, 1000);
Assert.assertEquals(clientres, RequestBody.DEFAULT_CLIENT_RETURN_STR);
}
Assert.assertTrue(serverConnectProcessor.isConnected());
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
Assert.assertEquals(invokeTimes, serverUserProcessor.getInvokeTimes());
}
建立多连接与连接预热
通常来说,点对点的直连通信,客户端和服务端,一个 IP 一个连接对象就够用了。不管是吞吐能力还是并发度,都能满足一般业务的通信需求。而有一些场景,比如不是点对点直连通信,而是经过了 LVS VIP,或者 F5 设备的连接,此时,为了负载均衡和容错,会针对一个 URL 地址建立多个连接。我们提供如下方式来建立多连接,即发起调用时传入的 URL 增加如下参数 127.0.0.1:12200?_CONNECTIONNUM=30&_CONNECTIONWARMUP=true,表示针对这个 IP 地址,需要建立30个连接,同时需要预热连接。其中预热与不预热的区别是:
预热:即第一次调用(比如 Sync 同步调用),就建立30个连接 不预热:每一次调用,创建一个连接,直到创建满30个连接
- 客户端能够创建多个连接connection,放到连接池,
- 客户端和服务端都能注册多个Processor。
Connection与Processor是多对多的关系。