diff --git a/example/src/test/java/com/alipay/sofa/rpc/serviceasync/HelloService.java b/example/src/test/java/com/alipay/sofa/rpc/serviceasync/HelloService.java new file mode 100644 index 000000000..1b7041b07 --- /dev/null +++ b/example/src/test/java/com/alipay/sofa/rpc/serviceasync/HelloService.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.serviceasync; + +public interface HelloService { + String sayHello(String string); +} diff --git a/example/src/test/java/com/alipay/sofa/rpc/serviceasync/HelloServiceImpl.java b/example/src/test/java/com/alipay/sofa/rpc/serviceasync/HelloServiceImpl.java new file mode 100644 index 000000000..56b50ffbc --- /dev/null +++ b/example/src/test/java/com/alipay/sofa/rpc/serviceasync/HelloServiceImpl.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.serviceasync; + +import com.alipay.sofa.rpc.message.bolt.BoltAsyncContext; + +public class HelloServiceImpl implements HelloService { + @Override + public String sayHello(String name) { + BoltAsyncContext boltAsyncContext = new BoltAsyncContext(); + new Thread(() -> { + // 如果需要在新线程中使用调用上下文,需要调用signalContextSwitch方法 + boltAsyncContext.signalContextSwitch(); + + boltAsyncContext.write("Hello " + name); + + boltAsyncContext.resetContext(); + }).start(); + return null; + } +} diff --git a/example/src/test/java/com/alipay/sofa/rpc/serviceasync/start/Client.java b/example/src/test/java/com/alipay/sofa/rpc/serviceasync/start/Client.java new file mode 100644 index 000000000..3c1b63b91 --- /dev/null +++ b/example/src/test/java/com/alipay/sofa/rpc/serviceasync/start/Client.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.serviceasync.start; + +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.serviceasync.HelloService; + +public class Client { + private final static Logger LOGGER = LoggerFactory.getLogger(Client.class); + + public static void main(String[] args) { + + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) // 指定接口 + .setProtocol("bolt") // 指定协议 + .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址 + .setConnectTimeout(10 * 1000); + + HelloService helloService = consumerConfig.refer(); + + while (true) { + try { + LOGGER.info(helloService.sayHello("world")); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } +} diff --git a/example/src/test/java/com/alipay/sofa/rpc/serviceasync/start/Service.java b/example/src/test/java/com/alipay/sofa/rpc/serviceasync/start/Service.java new file mode 100644 index 000000000..de755f6e0 --- /dev/null +++ b/example/src/test/java/com/alipay/sofa/rpc/serviceasync/start/Service.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.serviceasync.start; + +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.serviceasync.HelloService; +import com.alipay.sofa.rpc.serviceasync.HelloServiceImpl; + +public class Service { + public static void main(String[] args) { + ServerConfig serverConfig = new ServerConfig() + .setProtocol("bolt") // 设置一个协议,默认bolt + .setPort(12200) // 设置一个端口,默认12200 + .setDaemon(false); // 非守护线程 + + ProviderConfig providerConfig = new ProviderConfig() + .setInterfaceId(HelloService.class.getName()) // 指定接口 + .setRef(new HelloServiceImpl()) // 指定实现 + .setServer(serverConfig); // 指定服务端 + + providerConfig.export(); // 发布服务 + } +} diff --git a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltAsyncContext.java b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltAsyncContext.java new file mode 100644 index 000000000..dcf2d2427 --- /dev/null +++ b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/message/bolt/BoltAsyncContext.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.message.bolt; + +import com.alipay.remoting.AsyncContext; +import com.alipay.sofa.rpc.common.RemotingConstants; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.event.EventBus; +import com.alipay.sofa.rpc.event.ServerEndHandleEvent; +import com.alipay.sofa.rpc.event.ServerSendEvent; + +public class BoltAsyncContext { + private volatile boolean sent = false; + + private final AsyncContext asyncContext; + private final SofaRequest sofaRequest; + + private final RpcInternalContext internalContext; + private final RpcInvokeContext invokeCtx; + private final ClassLoader restoreClassLoader; + private volatile ClassLoader stagedClassLoader; + + public BoltAsyncContext() { + internalContext = RpcInternalContext.getContext(); + asyncContext = (AsyncContext) internalContext.getAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT); + sofaRequest = (SofaRequest) internalContext.getAttachment(RpcConstants.HIDDEN_KEY_ASYNC_REQUEST); + invokeCtx = RpcInvokeContext.getContext(); + restoreClassLoader = Thread.currentThread().getContextClassLoader(); + invokeCtx.put(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN, true); + } + + public synchronized void signalContextSwitch() { + RpcInvokeContext.setContext(invokeCtx); + RpcInternalContext.setContext(internalContext); + if (restoreClassLoader != null) { + Thread.currentThread().setContextClassLoader(restoreClassLoader); + stagedClassLoader = Thread.currentThread().getContextClassLoader(); + } + } + + public synchronized void resetContext() { + RpcInvokeContext.removeContext(); + RpcInternalContext.removeContext(); + // 修复 ClassLoader 恢复逻辑 + if (stagedClassLoader != null) { + Thread.currentThread().setContextClassLoader(stagedClassLoader); + } + } + + private synchronized void checkState() { + if (sent) { + throw new IllegalStateException("Current async context has already sent response"); + } + sent = true; + } + + public void write(Object result) { + checkState(); + try { + SofaResponse response = new SofaResponse(); + response.setAppResponse(result); + sendResponse(response, null); + } catch (Exception e) { + throw new RuntimeException("Failed to send async response", e); + } + } + + public void writeException(Throwable throwable) { + checkState(); + try { + SofaResponse response = new SofaResponse(); + if (throwable instanceof SofaRpcException) { + SofaRpcException sofaRpcException = (SofaRpcException) throwable; + response.setErrorMsg(sofaRpcException.getMessage()); + sendResponse(response, sofaRpcException); + } else { + response.setAppResponse(throwable); + sendResponse(response, null); + } + } catch (Exception e) { + throw new RuntimeException("Failed to send async exception response", e); + } + } + + private void sendResponse(SofaResponse response, SofaRpcException sofaRpcException) { + try { + asyncContext.sendResponse(response); + } finally { + if (EventBus.isEnable(ServerSendEvent.class)) { + EventBus.post(new ServerSendEvent(sofaRequest, response, sofaRpcException)); + } + if (EventBus.isEnable(ServerEndHandleEvent.class)) { + EventBus.post(new ServerEndHandleEvent()); + } + } + } +} diff --git a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java index 3972797d9..e0bfebc8f 100644 --- a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java +++ b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java @@ -110,11 +110,13 @@ public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest // 是否链路异步化中 boolean isAsyncChain = false; + try { // 这个 try-finally 为了保证Context一定被清理 processingCount.incrementAndGet(); // 统计值加1 context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 远程地址 context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 远程返回的通道 + context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_REQUEST, request); InvokeContext boltInvokeCtx = bizCtx.getInvokeContext(); if (RpcInternalContext.isAttachmentEnable()) { @@ -192,7 +194,7 @@ public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest RpcInvokeContext invokeContext = RpcInvokeContext.peekContext(); isAsyncChain = CommonUtils.isTrue(invokeContext != null ? (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null); - // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的 + // 如果是服务端异步代理模式或服务端模式,特殊处理,因为该模式是在业务代码自主异步返回的 if (!isAsyncChain) { // 其它正常请求 try { // 这个try-catch 保证一定要记录tracer