sofastack / sofa-rpc

SOFARPC is a high-performance, high-extensibility, production-level Java RPC framework.
https://www.sofastack.tech/sofa-rpc/docs/Home
Apache License 2.0
3.81k stars 1.17k forks source link

泛化调用是否支持熔断降级 #1148

Closed wangchen111 closed 2 years ago

wangchen111 commented 2 years ago

Your question

我在官网看到SOFARPC集成了Hystrix,支持熔断降级,我也按照文档实现了demo并测试通过了熔断降级功能,但是当我将调用方式改为泛化调用后,抛出了一些错误,不知道是我的实现方式有问题还是泛化调用不支持熔断降级呢 下面是我的demo代码:

        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("192.168.2.138:9603");
        registryConfig.setProtocol("sofa");
        ConsumerConfig<GenericService> consumerConfig = new ConsumerConfig<GenericService>()
                .setInterfaceId(HelloWorldService.class.getName())
                .setProtocol("bolt")
                .setRegistry(registryConfig)
                .setRegister(true)
                .setSubscribe(true)
                .setGeneric(true)
                .setTimeout(10000)
                .setParameter(HystrixConstants.SOFA_HYSTRIX_ENABLED, String.valueOf(true));

        SofaHystrixConfig.registerFallbackFactory(consumerConfig, new GenericFallbackFactory());

        GenericService helloService = consumerConfig.refer();

        String result = (String) helloService.$invoke("sayHello",new String[]{},new Object[]{});
        System.out.println(result);
        Assert.assertEquals(
                "fallback from server! error: com.netflix.hystrix.exception.HystrixTimeoutException",
                result);

下面是我的FallbackFactory实现

public class GenericFallbackFactory implements FallbackFactory<GenericService> {
    @Override
    public GenericService create(FallbackContext context) {
        return new GenericService() {
            @Override
            public Object $invoke(String s, String[] strings, Object[] objects) {
                return "fallback from server! error: " +
                        context.getException().getClass().getName();
            }

            @Override
            public Object $genericInvoke(String s, String[] strings, Object[] objects) {
                return null;
            }

            @Override
            public <T> T $genericInvoke(String s, String[] strings, Object[] objects, Class<T> aClass) {
                return null;
            }

            @Override
            public Object $genericInvoke(String s, String[] strings, Object[] objects, GenericContext genericContext) {
                return null;
            }

            @Override
            public <T> T $genericInvoke(String s, String[] strings, Object[] objects, Class<T> aClass, GenericContext genericContext) {
                return null;
            }
        };
    }
}

这是我遇到的错误

com.alipay.sofa.rpc.core.exception.SofaRpcException: service.HelloWorldService#$invoke(String,String[],Object[]) timed-out and fallback failed.

    at com.alipay.sofa.rpc.filter.ConsumerGenericFilter.invoke(ConsumerGenericFilter.java:101)
    at com.alipay.sofa.rpc.filter.FilterInvoker.invoke(FilterInvoker.java:100)
    at com.alipay.sofa.rpc.filter.RpcReferenceContextFilter.invoke(RpcReferenceContextFilter.java:80)
    at com.alipay.sofa.rpc.filter.FilterInvoker.invoke(FilterInvoker.java:100)
    at com.alipay.sofa.rpc.filter.ConsumerExceptionFilter.invoke(ConsumerExceptionFilter.java:37)
    at com.alipay.sofa.rpc.filter.FilterInvoker.invoke(FilterInvoker.java:100)
    at com.alipay.sofa.rpc.filter.FilterChain.invoke(FilterChain.java:268)
    at com.alipay.sofa.rpc.client.AbstractCluster.filterChain(AbstractCluster.java:545)
    at com.alipay.sofa.rpc.client.FailoverCluster.doInvoke(FailoverCluster.java:67)
    at com.alipay.sofa.rpc.client.AbstractCluster.invoke(AbstractCluster.java:297)
    at com.alipay.sofa.rpc.client.ClientProxyInvoker.invoke(ClientProxyInvoker.java:83)
    at com.alipay.sofa.rpc.api.GenericService_proxy_0.$invoke(GenericService_proxy_0.java)
    at com.wangchen.QuickStartClient.testGenericFallBack(QuickStartClient.java:235)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: service.HelloWorldService#$invoke(String,String[],Object[]) timed-out and fallback failed.
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:832)
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472)
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.observers.Subscribers$5.onError(Subscribers.java:230)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
    at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
    at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$1.run(AbstractCommand.java:1142)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:41)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:37)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable.run(HystrixContextRunnable.java:57)
    at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$2.tick(AbstractCommand.java:1159)
    at com.netflix.hystrix.util.HystrixTimer$1.run(HystrixTimer.java:99)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
    at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997)
    at com.netflix.hystrix.AbstractCommand.access$500(AbstractCommand.java:60)
    at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:609)
    at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:601)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
    ... 15 more

Your scenes

describe your use scenes (why need this feature)

Your advice

我希望知道泛化调用是否支持熔断降级,如果支持的话,能否提供相关使用示例

Environment

EvenLjj commented 2 years ago

@wangchen111 复现了下case,在SofaHystrixCommand会拿到自定义的fallback,然后代理调用,如下:

protected SofaResponse getFallback(SofaResponse response, Throwable t) {
        FallbackFactory fallbackFactory = SofaHystrixConfig.loadFallbackFactory((ConsumerConfig) invoker.getConfig());
        if (fallbackFactory == null) {
            return super.getFallback();
        }
        Object fallback = fallbackFactory.create(new FallbackContext(invoker, request, response, t));
        if (fallback == null) {
            return super.getFallback();
        }
        try {
            Object fallbackResult = request.getMethod().invoke(fallback, request.getMethodArgs());
            SofaResponse actualResponse = new SofaResponse();
            actualResponse.setAppResponse(fallbackResult);
            return actualResponse;
        } catch (IllegalAccessException e) {
            throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_HYSTRIX_FALLBACK_FAIL), e);
        } catch (InvocationTargetException e) {
            throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_HYSTRIX_FALLBACK_FAIL),
                e.getTargetException());
        }
    }

但是这里Object fallbackResult = request.getMethod().invoke(fallback, request.getMethodArgs()); 其中method是泛化调用的方法,获取的request.getMethodArgs()和实际方法参数是不匹配的,导致这个问题。目前看泛化调用这块还不支持熔断降级。如果感兴趣可以一起看看如何修复这个问题。 image

wangchen111 commented 2 years ago

@EvenLjj 感谢你的回复,我看一下