Spring Cloud Hystrix执行流程源码解析

Hystrix执行时的8大流程及原理图

1 创建HystrixCommand/HystrixObservableCommand

一个HystrixCommandHystrixObservableCommand对象,代表对某个依赖服务发起的一次请求或者调用
构造的时候,可在构造函数中传入任何需要的参数

  • HystrixCommand仅仅会返回一个结果的调用
  • HystrixObservableCommand可能会返回多条结果的调用


继承HystrixCommand实现run,getFallback,getCacheKey等方法

package com.netflix.hystrix.examples.demo;

import java.net.HttpCookie;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

/**
 * 样本HystrixCommand模拟了一个从远程服务或数据库中获取UserAccount对象.
 * 这使用了请求缓存和fallback行为.
 */
public class GetUserAccountCommand extends HystrixCommand<UserAccount> {

    private final HttpCookie httpCookie;
    private final UserCookie userCookie;

    /**
     * 
     * @param cookie
     * @throws IllegalArgumentException
     *             如果cookie无效,则表示用户未通过身份验证
     */
    public GetUserAccountCommand(HttpCookie cookie) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("User")));
        this.httpCookie = cookie;
        /* 解析或抛出IllegalArgumentException */
        this.userCookie = UserCookie.parseCookie(httpCookie);
    }

    @Override
    protected UserAccount run() {
        /* 模拟执行网络调用以检索用户信息simulate performing network call to retrieve user information */
        try {
            Thread.sleep((int) (Math.random() * 10) + 2);
        } catch (InterruptedException e) {
            // 无所事事
        }

        /* 5%的时间失败以显示fallback的工作原理 fail 5% of the time to show how fallback works */
        if (Math.random() > 0.95) {
            throw new RuntimeException("random failure processing UserAccount network response");
        }

        /* 延迟会增加5%的时间,因此有时会触发超时 */
        if (Math.random() > 0.95) {
            // 随机等待时间尖峰
            try {
                Thread.sleep((int) (Math.random() * 300) + 25);
            } catch (InterruptedException e) {
                // do nothing
            }
        }

        /* 成功...使用远程服务响应的数据创建UserAccount */
        return new UserAccount(86975, "John James", 2, true, false, true);
    }

    /**
     * Use the HttpCookie value as the cacheKey so multiple executions
     * in the same HystrixRequestContext will respond from cache.
     */
    @Override
    protected String getCacheKey() {
        return httpCookie.getValue();
    }

    /**
     * Fallback that will use data from the UserCookie and stubbed defaults
     * to create a UserAccount if the network call failed.
     */
    @Override
    protected UserAccount getFallback() {
        /*
         * 前 3 从 HttpCookie 获取
         * 后 3 从存根的默认值
         */
        return new UserAccount(userCookie.userId, userCookie.name, userCookie.accountType, true, true, true);
    }

    /**
     * Represents values containing in the cookie.
     * <p>
     * A real version of this could handle decrypting a secure HTTPS cookie.
     */
    private static class UserCookie {

        private static UserCookie parseCookie(HttpCookie cookie) {
            /* 实际代码会在这里解析cookie */
            if (Math.random() < 0.998) {
                /* 有效cookie */
                return new UserCookie(12345, "Henry Peter", 1);
            } else {
                /* 无效 cookie */
                throw new IllegalArgumentException();
            }
        }

        public UserCookie(int userId, String name, int accountType) {
            this.userId = userId;
            this.name = name;
            this.accountType = accountType;
        }

        private final int userId;
        private final String name;
        private final int accountType;
    }
}

2 调用command的执行方法

执行Command就可以发起一次对依赖服务的调用

要执行Command,需要在4个方法中选择其中的一个

  • 前两种是HystrixCommand独有的哦

    2.1 execute()

      /**
       * 用于同步执行 command.
       * 
       * @return R
       *         如果command由于任何原因失败,则执行 #run 或从 #getFallback() fallback的结果.
       *
       * @throws HystrixRuntimeException
       *             如果发生故障并且无法检索fallback
       * @throws HystrixBadRequestException
       *             如果使用了无效的参数或状态来表示用户故障,而不是系统故障
       *
       * @throws IllegalStateException
       *             如果多次调用
       */
      public R execute() {
          try {
              return queue().get();
          } catch (Exception e) {
              throw Exceptions.sneakyThrow(decomposeException(e));
          }
      }

调用后直接阻塞,直到依赖服务返回单条结果,或抛异常

2.2 queue()

    /**
     * 用于异步执行命令 command.
     *
     * 这将使该command在线程池上排队,并在完成后返回一个 Future 以获取结果.
     * 注意:如果配置为不在单独的线程中运行,则其效果与 #execute() 相同,并会阻塞.
     * 不会抛出异常,而只是切换为同步执行,因此无需更改代码即可 将command从运行在单独的线程切换到调用线程.
     * (switch a command from running on a separate thread to the calling thread.)
     * 
     * @return {@code Future <R>}执行 #run() 的结果,或者如果command由于任何原因失败,则返回 #getFallback() 的结果.
     * @throws HystrixRuntimeException
     *             如果不存在fallback
     *             如果通过 ExecutionException#getCause() 中的{@code Future.get(), 如果不存在失败发生的话
     *             或者如果无法将命令排队(如,短路,线程池/信号被拒绝),则立即返回
     * @throws HystrixBadRequestException
     *         通过 ExecutionException#getCause() 中的 Future.get() 如果使用了无效的参数或状态来表示用户故障而不是系统故障
     * @throws IllegalStateException
     *             如果多次调用
     */
    public Future<R> queue() {
        /*
         * 当Future.cancel(boolean)的“ mayInterrupt”标志设为true时
         * 由Observable.toBlocking().toFuture() 返回的Future不实现执行线程的中断
         * 因此,为了遵守Future的约定,我们必须围绕它.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();

        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }

                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                     * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                     * than that interruption request cannot be taken back.
                     * 这里唯一有效的转换是false -> true.
                     * 如果存在由该命令创建(这很奇怪,但从未禁止过)的两个futures,例如f1和f2,
                     * 并且对f1.cancel(true)和f2.cancel(false)的调用是由不同的线程发起,
                     * 尚不清楚在检查mayInterruptOnCancel时将使用什么值.
                     * 处理这种情况的最一致的方法是说,如果在中断的情况下调用了任何cancellation,则无法撤回该中断请求.
                     */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }

                final boolean res = delegate.cancel(interruptOnFutureCancel.get());

                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
            }

            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
            }

            @Override
            public boolean isDone() {
                return delegate.isDone();
            }

            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }

        };

        /* 对立即抛出的错误状态的特殊处理 */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
                    ca***MAND_EXCEPTION:
                    case TIMEOUT:
                        // 不会仅从 queue().get() 中将这些类型从 queue() 中抛出, 因为它们是执行错误
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        // 这些是从 queue() 抛出的错误,因为它们是拒绝类型错误
                        throw hre;
                    }
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }

        return f;
    }

调用,返回一个Future,后面可以通过Future获取单条结果

2.3 observe()

订阅一个Observable对象,Observable代表的是依赖服务返回的结果,获取到一个那个代表结果的Observable对象的拷贝对象

  • toObservable()
    返回一个Observable对象,如果我们订阅这个对象,就会执行command并且获取返回结果

其中execute()和queue()仅对HystrixCommand适用

K             value   = command.execute();
Future<K>     fValue  = command.queue();
Observable<K> ohValue = command.observe();         
Observable<K> ocValue = command.toObservable();    
  • execute()实际上会调用queue().get()
  • 在 queue() 方法中,会调用toObservable().toBlocking().toFuture()

即,无论是哪种执行command的方式,最终都是依赖toObservable()
也就是说同步的HystrixCommand最终都会依赖Observable,尽管HystrixCommand是用来发射单个事件的

3 检查是否开启缓存

如果这个command开启了请求缓存(request cache),而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果
否则,继续往后

        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                 /* 这是一个有状态的对象,因此只能使用一次 */
                if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                    IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                    // TODO 为此创建新的错误类型
                    throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                }

                commandStartTimestamp = System.currentTimeMillis();

                if (properties.requestLogEnabled().get()) {
                    // 记录此命令执行,无论发生什么情况
                    if (currentRequestLog != null) {
                        currentRequestLog.addExecutedCommand(_cmd);
                    }
                }

                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                /* 首先尝试从缓存 */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantic***ap(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 放入缓存
                if (requestCacheEnabled && cacheKey != null) {
                    // 包装以缓存
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // 另一个线程击败了我们,因此使用缓存值
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // 我们刚刚创建了一个ObservableCommand,所以我们进行了强制转换并返回了它
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     // 进行一次清理(在正常终端状态(此行)或退订(下一行))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // 进行一次清理
                        .doOnCompleted(fireOnCompletedHook);
            }
        });

上述代码中有个判断final boolean requestCacheEnabled = isRequestCachingEnabled();可以看到如果我们的Command实现了getCacheKey方法,并且requestCacheEnabled(这个属性默认是true,可以通过调用HystrixCommand的构造方法传入一个setter对象修改默认属性)这样就不会执行后续的run方法,就会直接返回一个缓存的Observable。(上一篇文章我们已经提到,必须是同一个request context里面的两个command才能用到缓存)

4 检查是否开启短路器

检查这个command对应的依赖服务是否开启短路器

如果断路器被打开了,那么hystrix就不会执行这个command,而是直接执行fallback降级

5 检查线程池/队列/semaphore是否已满

如果command对应的线程池/队列/semaphore已满,那么也不会执行command,而是直接去调用fallback降级机制,同时发送 reject 信息给断路器统计

6 执行command

调用HystrixObservableCommand.construct()或HystrixCommand.run()来实际执行这个command

  • HystrixCommand.run()
    返回一个单条结果,或者抛出一个异常
  • HystrixObservableCommand.construct()
    返回一个Observable对象,可以获取多条结果

如果HystrixCommand.run()或HystrixObservableCommand.construct()的执行,超过了timeout时长的话,那么command所在的线程就会抛出一个TimeoutException
如果timeout了,也会去执行fallback降级机制,而且就不会管run()或construct()返回的值

我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个TimeoutException,但是还是可能会因为严重延迟的调用线程占满整个线程池的
即使这个时候新来的流量都被限流了。。。

如果没有timeout的话,那么就会拿到一些调用依赖服务获取到的结果,然后hystrix会做一些logging记录和metric统计

7 短路健康检查

Hystrix会将每一个依赖服务的调用成功,失败,拒绝,超时,等事件,都会发送给circuit breaker断路器

短路器就会对调用成功/失败/拒绝/超时等事件的次数进行统计

短路器会根据这些统计次数来决定,是否要进行短路,如果打开了短路器,那么在一段时间内就会直接短路,然后如果在之后第一次检查发现调用成功了,就关闭断路器

8 调用fallback降级机制

在以下几种情况中,hystrix会调用fallback降级机制

  • run()或construct()抛出一个异常
  • 短路器打开
  • 线程池/队列/semaphore满了
  • command执行超时了

一般在降级机制中,都建议给出一些默认的返回值,比如静态的一些代码逻辑,或者从内存中的缓存中提取一些数据,尽量在这里不要再进行网络请求了
即使在降级中,一定要进行网络调用,也应该将那个调用放在一个HystrixCommand中,进行隔离

  • 在HystrixCommand中,实现getFallback()方法,可以提供降级机制
  • 在HystirxObservableCommand中,实现一个resumeWithFallback()方法,返回一个Observable对象,可以提供降级结果

如果fallback返回了结果,那么hystrix就会返回这个结果

  • 对于HystrixCommand,会返回一个Observable对象,其中会发返回对应的结果
  • 对于HystrixObservableCommand,会返回一个原始的Observable对象

如果没有实现fallback,或者是fallback抛出了异常,Hystrix会返回一个Observable,但是不会返回任何数据

不同的command执行方式,其fallback为空或者异常时的返回结果不同

  • 对于execute(),直接抛出异常
  • 对于queue(),返回一个Future,调用get()时抛出异常
  • 对于observe(),返回一个Observable对象,但是调用subscribe()方法订阅它时,理解抛出调用者的onError方法
  • 对于toObservable(),返回一个Observable对象,但是调用subscribe()方法订阅它时,理解抛出调用者的onError方法

9 不同的执行方式

  • execute(),获取一个Future.get(),然后拿到单个结果
  • queue(),返回一个Future
  • observer(),立即订阅Observable,然后启动8大执行步骤,返回一个拷贝的Observable,订阅时理解回调给你结果
  • toObservable(),返回一个原始的Observable,必须手动订阅才会去执行8大步骤

参考

  • 《Java工程师面试突击第1季-中华石杉老师》
#Spring#
全部评论

相关推荐

千千倩倩:简历问题有点多,加v细聊
点赞 评论 收藏
分享
评论
点赞
2
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务