Hystrix执行流程分析
执行流程
下图展示了Hystrix的执行过程图:
流程:
构造一个
HystrixCommand
或HystrixObservableCommand
对象,HystrixCommand
用户返回单个请求,HystrixObservableCommand
用户返回一个可观察的对象。执行命令
如果该命令启用了请求缓存,并且请求的响应在缓存中可用,那么这个缓存的响应将立即以Observable的形式返回
当执行这个命令时,
Hystrix
会检查断路器,看开关是否打开。如果电路被打开,Hystrix
将不会执行,而是将流路由到回滚函数。如果开关被关闭,那么继续线程池和信号量是否满足要求。判断执行是否超时,如果超时路由到回滚函数。
Hystrix
向断路器报告成功、失败、拒绝和超时,断路器维护一组计算统计数据的滚动计数器。它使用这些统计数据来确定何时开关,这时它将短路任何后续请求,直到恢复周期结束,在恢复周期结束后,它将在首先检查某些健康检查后再次关闭。源码流程
Hystrix 基于AOP 和 rxjava(响应式编程)实现,切面类为
HystrixCommandAspect
,会对加了@HystrixCommand
和@HystrixCollapser
进行拦截,入口代码如下:@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; }
进入到上面流程图所示的核心逻辑,进入AbstractCommand类,代码片段如下:
/* this is a stateful object so can only be used once */ // 检查当前命令状态,如果当前状态不是未开始,说明已经执行,抛出异常 // 一共有 NOT_STARTED(未开始), OBSERVABLE_CHAIN_CREATED(执行链已经创建), USER_CODE_EXECUTED(用户代码已执行), UNSUBSCRIBED(未被订阅), // TERMINAL(结束) 5种状态 if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); // 从缓存中获取数据并返回 /* try from cache first */ if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // put in cache if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } });
如果不走缓存,将会调用下面方法,逻辑如下:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ // 判断开关是否打开 if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
获取一个信号量实例。如果当前隔离模式是一个信号量,则根据
commandKey
获取信号量,如果信号量不存在,则初始化并缓存它;如果当前隔离模式是线程池,则使用默认信号量TryableSemaphoreNoOp
。默认情况下,所有请求都可以被传递。如果是线程池模式运行,private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { // 线程池隔离模式 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } // 统计信息 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); // 判断是否超时 if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } // 更新线程状态为已开始 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ //执行hook,若异常,则直接抛出异常 try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }) ....
断路器
断路器核心类为: HystrixCircuitBreaker
,对应类方法及执行流程如下:
public interface HystrixCircuitBreaker {
// 是否允许执行
boolean allowRequest();
// 断路器是否处在打开状态
boolean isOpen();
// 在半打开状态,表示测试请求调用成功
void markSuccess();
// 在半打开状态,表示测试请求调用失败
void markNonSuccess();
// 在 Command 开始执行时调用
boolean attemptExecution();
}
断路器有打开(断路器打开,一定时间内请求不可通过)、关闭(请求正常通过)、半开(打开一段时间后,放行了一个请求到下游,待结果返回)三种状态, 当请求流量大于HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
所设定的值或失败率大于HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
设定的值时断路器打开,那么,断路器如何在不同的状态间进行调整呢?后面介绍线程池的时候,会提到过基于滑动窗口和桶的监控信息统计类 HystrixThreadPoolMetrics
,其实 Command 也有相似的统计类 HystrixCommandMetrics
,它们都是 HystrixMetrics
的实现类,机制非常相似。断路器会订阅 HystrixCommandMetrics
,在滑动窗口发生滚动的时候根据最新窗口内的请求量和成功率判断是否要将断路器的状态从关闭改为打开。
断路器从关毕到打开判断代码如下:
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold
// // 是否到达判断是否断路的最低请求量,否则跳过,因为请求量少的时候通过成功百分比来判断不准确
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
// 错误率没有达到阈值,不需要处理
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 失败率太高,打开
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}
半开状态判断逻辑的关键代码如下:
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 断路开关打开 & 休眠时间超过sleepWindow 允许请求通过
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}
隔离级别
Hystrix 为不同的应用场景提供两种隔离级别:Thread
和 Semaphore
。
线程隔离
线程隔离就是将调用放在另外的线程执行,
- 对委托线程来说,能够随时在出现超时调用时 walk away,执行 fallback 的逻辑,不会阻塞到连接超时从而拖累服务的响应时间。
- 对隔离效果来说,当下游服务出现超时故障时,仅仅该线程池会爆满,对使用其它线程池的不相关服务以及服务本身没有任何影响。当下游恢复健康后,线程池会再次变得可用,恢复往常状态。
- 对监控来说,由于线程池有多种监控数据,例如占用线程数、排队请求数、执行任务数等,当我们错误地配置了客户端或是下游服务出现性能变化我们都能够第一时间感知到并做出应对。
大多数场景下,默认的 10 个线程就能足够了。如果想要进一步调整的话,官方给出了一条简单有效的公式:
requests per second at peak when healthy × 99th percentile latency in seconds + some breathing room
峰值 qps * P99 响应时间 + 适当数量的额外缓冲线程
线程隔离也有缺点。业务线程将具体调用提交到线程池到执行完成,就需要付出任务排队、线程池调度、上下文切换的开销。Netflix 也考虑到这一点,并做了对应的测试。对于一个每秒被请求 60 次的接口,使用线程隔离在 P50、P90、P99 的开销分别为 0ms、3ms 和 9ms。
信号量隔离
但是,如果你的接口响应时间非常小,无法接受线程隔离带来的开销,且信任该接口能够很快返回的话,则可以使用 Semaphore
隔离级别。原因是使用信号量隔离自然就无法像线程隔离一样在出现超时的时候直接返回,而是需要等待客户端的阻塞结束。
在 Hystrix 中,command 的执行以及 Fallback 都支持使用 Semaphore
。将 execution.isolation.strategy
配置为 SEMAPHORE
即可将默认的 THREAD
隔离级别改为信号量隔离。根据接口的响应时间以及单位时间内的调用次数,你可以根据和计算线程数相似的方式计算出可允许并发执行的数量。
总结
本文从整体执行流程、断路器、隔离机制方面对进行了说明,还有一些其他方面如指标采集不再过多阐述。